Source code for pyiron_workflow.workflow

"""
Provides the main workhorse class for creating and running workflows.

This class is intended as the single point of entry for users making an import.
"""

from __future__ import annotations

from concurrent import futures
from typing import TYPE_CHECKING, Any, Literal

from bidict import bidict

from pyiron_workflow.io import Inputs
from pyiron_workflow.mixin.injection import OutputsWithInjection
from pyiron_workflow.mixin.run import InterpretableAsExecutor
from pyiron_workflow.node import Node
from pyiron_workflow.nodes.composite import Composite

if TYPE_CHECKING:
    from pyiron_workflow.io import IO
    from pyiron_workflow.storage import BackendIdentifier, StorageInterface


[docs] class ParentMostError(TypeError): """ To be raised when assigning a parent to a parent-most object """
[docs] class NoArgsError(TypeError): """ To be raised when *args can't be processed but are received """
[docs] class Workflow(Composite): """ Workflows are a dynamic composite node -- i.e. they hold and run a collection of nodes (a subgraph) which can be dynamically modified (adding and removing nodes, and modifying their connections). Nodes can be added to the workflow at instantiation or with dot-assignment later on. They are then accessible either under the :attr:`nodes` dot-dictionary, or just directly by dot-access on the workflow object itself. Using the :attr:`input` and :attr:`output` attributes, the workflow gives by-reference access to all the IO channels among its nodes which are currently unconnected. The :class:`Workflow` class acts as a single-point-of-import for us; Directly from the class we can use the :meth:`create` method to instantiate workflow objects. When called from a workflow _instance_, any created nodes get their parent set to the workflow instance being used. Workflows are "living" -- i.e. their IO is always by reference to their owned nodes and you are meant to add and remove nodes as children -- and "parent-most" -- i.e. they sit at the top of any data dependency tree and may never have a parent of their own. They are flexible and great for development, but once you have a setup you like, you should consider reformulating it as a :class:`Macro`, which operates somewhat more efficiently. Because they are parent-most objects, and thus not being instantiated inside other (macro) nodes, they break the default behaviour of their parent class and _do_ attempt to auto-load saved content at instantiation. Attribute: inputs/outputs_map (bidict|None): Maps in the form `{"node_label__channel_label": "some_better_name"}` that expose canonically named channels of child nodes under a new name. This can be used both for re- naming regular IO (i.e. unconnected child channels), as well as forcing the exposure of irregular IO (i.e. child channels that are already internally connected to some other child channel). Non-`None` values provided at input can be in regular dictionary form, but get re-cast as a clean bidict to ensure the bijective nature of the maps (i.e. there is a 1:1 connection between any IO exposed at the :class:`Composite` level and the underlying channels). children (bidict.bidict[pyiron_workflow.node.Node]): The owned nodes that form the composite subgraph. Examples: We allow adding nodes to workflows in five equivalent ways: >>> from pyiron_workflow.workflow import Workflow >>> >>> @Workflow.wrap.as_function_node ... def fnc(x=0): ... return x + 1 >>> >>> # (1) As *args at instantiation >>> n1 = fnc(label="n1") >>> wf = Workflow("my_workflow", n1) >>> >>> # (2) Being passed to the `add` method >>> n2 = wf.add_child(fnc(label="n2")) >>> >>> # (3) By attribute assignment >>> wf.n3 = fnc(label="anyhow_n3_gets_used") >>> >>> # (4) By creating from the workflow class but specifying the parent kwarg >>> n4 = fnc(label="n4", parent=wf) By default, the node naming scheme is strict, so if you try to add a node to a label that already exists, you will get an error. This behaviour can be changed at instantiation with the :attr:`strict_naming` kwarg, or afterwards by assigning a bool to this property. When deactivated, repeated assignments to the same label just get appended with an index: >>> wf.strict_naming = False >>> wf.my_node = fnc(x=0) >>> wf.my_node = fnc(x=1) >>> wf.my_node = fnc(x=2) >>> print(wf.my_node.inputs.x, wf.my_node0.inputs.x, wf.my_node1.inputs.x) 0 1 2 The :class:`Workflow` class is designed as a single point of entry for workflows, so you can also access decorators to define new node classes right from the workflow (cf. the :class:`Node` docs for more detail on the node types). Let's use these to explore a workflow's input and output, which are dynamically generated from the unconnected IO of its nodes: >>> @Workflow.wrap.as_function_node("y") ... def plus_one(x: int = 0): ... return x + 1 >>> >>> wf = Workflow("io_workflow") >>> wf.first = plus_one() >>> wf.second = plus_one() >>> print(len(wf.inputs), len(wf.outputs)) 2 2 If we connect the output of one node to the input of the other, there are fewer dangling channels for the workflow IO to find: >>> wf.second.inputs.x = wf.first.outputs.y >>> print(len(wf.inputs), len(wf.outputs)) 1 1 Then we just run the workflow >>> out = wf.run() The workflow joins node lavels and channel labels with a `_` character to provide direct access to the output: >>> print(wf.outputs.second__y.value) 2 These input keys can be used when calling the workflow to update the input. In our example, the nodes update automatically when their input gets updated, so all we need to do to see updated workflow output is update the input: >>> out = wf(first__x=10) >>> out {'second__y': 12} Note: this _looks_ like a dictionary, but has some extra convenience that we can dot-access data: >>> out.second__y 12 We can give more convenient names to IO, and even access IO that would normally be hidden (because it's connected) by specifying an :attr:`inputs_map` and/or :attr:`outputs_map`: >>> wf.inputs_map = {"first__x": "x"} >>> wf.outputs_map = { ... "first__y": "intermediate", ... "second__y": "y" ... } >>> wf(x=0) {'intermediate': 1, 'y': 2} Workflows can be visualized in the notebook using graphviz: >>> graphviz_graph = wf.draw() The resulting object can be saved as an image, e.g. >>> wf.draw().render(filename="demo", format="png") 'demo.png' Let's clean up after ourselves (for when the CI runs the docstrings) >>> from os import remove >>> remove("demo") >>> remove("demo.png") When your workflow's data follows a directed-acyclic pattern, it will determine the execution flow automatically. If you want or need more control, you can set the `automate_execution` flag to `False` and manually specify an execution flow. TODO: Once you're satisfied with how a workflow is structured, you can export it as a macro node for use in other workflows. (Maybe we should allow for nested workflows without exporting to a node? I was concerned then what happens to the nesting abstraction if, instead of accessing IO through the workflow's IO flags, a user manually connects IO from individual nodes from two different, nested or sibling workflows when those connections were _previously internal to their own workflow_. This seems very unsafe. Maybe there is something like a lock we can apply that falls short of a full export, but still guarantees the internal integrity of workflows when they're used somewhere else? """ def __init__( self, label: str, *nodes: Node, delete_existing_savefiles: bool = False, autoload: BackendIdentifier | StorageInterface | None = "pickle", autorun: bool = False, checkpoint: BackendIdentifier | StorageInterface | None = None, strict_naming: bool = True, inputs_map: dict | bidict | None = None, outputs_map: dict | bidict | None = None, automate_execution: bool = True, **kwargs, ): self._inputs_map = self._sanitize_map(inputs_map) self._outputs_map = self._sanitize_map(outputs_map) self._inputs = None self._outputs = None self.automate_execution: bool = automate_execution super().__init__( *nodes, label=label, parent=None, delete_existing_savefiles=delete_existing_savefiles, autoload=autoload, autorun=autorun, checkpoint=checkpoint, strict_naming=strict_naming, **kwargs, ) def _after_node_setup( self, *args, delete_existing_savefiles: bool = False, autoload: BackendIdentifier | StorageInterface | None = None, autorun: bool = False, **kwargs, ): for node in args: self.add_child(node) super()._after_node_setup( autoload=autoload, delete_existing_savefiles=delete_existing_savefiles, autorun=autorun, **kwargs, ) @property def inputs_map(self) -> bidict | None: if self._inputs_map is not None: self._deduplicate_nones(self._inputs_map) return self._inputs_map @inputs_map.setter def inputs_map(self, new_map: dict | bidict | None): self._inputs_map = self._sanitize_map(new_map) @property def outputs_map(self) -> bidict | None: if self._outputs_map is not None: self._deduplicate_nones(self._outputs_map) return self._outputs_map @outputs_map.setter def outputs_map(self, new_map: dict | bidict | None): self._outputs_map = self._sanitize_map(new_map) def _sanitize_map(self, new_map: dict | bidict | None) -> bidict | None: if new_map is not None: if isinstance(new_map, dict): self._deduplicate_nones(new_map) new_map = bidict(new_map) return new_map @staticmethod def _deduplicate_nones(some_map: dict | bidict): for k, v in some_map.items(): if v is None: some_map[k] = (None, f"{k} disabled") @property def inputs(self) -> Inputs: return self._build_inputs() def _build_inputs(self): return self._build_io("inputs", self.inputs_map) @property def outputs(self) -> OutputsWithInjection: return self._build_outputs() def _build_outputs(self): return self._build_io("outputs", self.outputs_map) def _build_io( self, i_or_o: Literal["inputs", "outputs"], key_map: dict[str, str | None] | None, ) -> Inputs | OutputsWithInjection: """ Build an IO panel for exposing child node IO to the outside world at the level of the composite node's IO. Args: target [Literal["inputs", "outputs"]]: Whether this is I or O. key_map [dict[str, str]|None]: A map between the default convention for mapping child IO to composite IO (`"{node.label}__{channel.label}"`) and whatever label you actually want to expose to the composite user. Also allows non-standards channel exposure, i.e. exposing internally-connected channels (which would not normally be exposed) by providing a string-to-string map, or suppressing unconnected channels (which normally would be exposed) by providing a string-None map. Returns: (Inputs|OutputsWithInjection): The populated panel. """ key_map = {} if key_map is None else key_map io = Inputs() if i_or_o == "inputs" else OutputsWithInjection() for node in self.children.values(): panel = getattr(node, i_or_o) for channel in panel: try: io_panel_key = key_map[channel.scoped_label] if isinstance(io_panel_key, str): # Otherwise it's a None-str tuple, indicaticating that the # channel has been deactivated # This is a necessary misdirection to keep the bidict working, # as we can't simply map _multiple_ keys to `None` io[io_panel_key] = channel except KeyError: if not channel.connected: io[channel.scoped_label] = channel return io def _before_run( self, /, check_readiness: bool, rerun: bool, run_data_tree: bool, run_parent_trees_too: bool, fetch_input: bool, emit_ran_signal: bool, input_args: tuple[Any, ...], input_kwargs: dict[str, Any], ) -> tuple[bool, Any]: if self.automate_execution: self.set_run_signals_to_dag_execution() return super()._before_run( check_readiness=check_readiness, rerun=rerun, run_data_tree=run_data_tree, run_parent_trees_too=run_parent_trees_too, fetch_input=fetch_input, emit_ran_signal=emit_ran_signal, input_args=input_args, input_kwargs=input_kwargs, )
[docs] def run( self, *args, check_readiness: bool = True, rerun: bool = False, **kwargs, ): # Note: Workflows may have neither parents nor siblings, so we don't need to # worry about running their data trees first, fetching their input, nor firing # their `ran` signal, hence the change in signature from Node.run if len(args) > 0: raise NoArgsError( f"{self.__class__} does not know how to process *args on run, but " f"received {args}" ) return super().run( run_data_tree=False, run_parent_trees_too=False, fetch_input=False, check_readiness=check_readiness, rerun=rerun, emit_ran_signal=False, **kwargs, )
[docs] def run_in_thread( self, *args, check_readiness: bool = True, rerun: bool = False, **kwargs ) -> futures.Future | dict[str, Any]: executor_is_empty = self.executor is None executor_is_threadpool = self._is_interpretable_as_threadpoolexecuctor( self.executor ) if executor_is_empty: self.executor = (futures.ThreadPoolExecutor, (), {}) elif not executor_is_threadpool: raise ValueError( f"Workflow {self.label} already has an executor set. Running in a " f"thread would override this." ) f = self.run(*args, check_readiness=check_readiness, rerun=rerun, **kwargs) if executor_is_empty: if isinstance(f, futures.Future): f.add_done_callback(lambda _: setattr(self, "executor", None)) else: # We're hitting a cached result self.executor = None return f
@staticmethod def _is_interpretable_as_threadpoolexecuctor( executor: InterpretableAsExecutor | None, ) -> bool: return isinstance(executor, futures.ThreadPoolExecutor) or ( isinstance(executor, tuple) and isinstance(executor[0], type) and issubclass(executor[0], futures.ThreadPoolExecutor) )
[docs] def pull(self, run_parent_trees_too=False, **kwargs): """Workflows are a parent-most object, so this simply runs without pulling.""" return self.run(**kwargs)
[docs] def to_node(self): """ Export the workflow to a macro node, with the currently exposed IO mapped to new IO channels, and the workflow mapped into the node_function. """ raise NotImplementedError
@property def _data_connections(self) -> list[tuple[tuple[str, str], tuple[str, str]]]: """ A string-tuple representation of all connections between the data channels of child nodes. Intended for internal use during storage, so that connections can be represented in plain strings, and stored on an attribute to guarantee that the name does not conflict with a child node label. Returns: (list): Nested-pair tuples of (node label, channel label) data for (input, output) channels of data connections between children. """ data_connections = [] for node in self: for inp_label, inp in node.inputs.items(): for conn in inp.connections: data_connections.append( ((node.label, inp_label), (conn.owner.label, conn.label)) ) return data_connections @property def _signal_connections(self) -> list[tuple[tuple[str, str], tuple[str, str]]]: """ A string-tuple representation of all connections between the signal channels of child nodes. Intended for internal use during storage, so that connections can be represented in plain strings, and stored on an attribute to guarantee that the name does not conflict Returns: (list): Nested-pair tuples of (node label, channel label) data for (input, output) channels of signal connections between children. """ signal_connections = [] for node in self: for inp_label, inp in node.signals.input.items(): for conn in inp.connections: signal_connections.append( ((node.label, inp_label), (conn.owner.label, conn.label)) ) return signal_connections @property def _owned_io_panels(self) -> list[IO]: # Workflow data IO is just pointers to child IO, not actually owned directly # by the workflow; this is used in re-parenting channels, and we don't want to # override the real parent with this workflow! return [ self.signals.input, self.signals.output, ]
[docs] def push_child(self, child: Node | str, *args, **kwargs): if self.automate_execution: self.set_run_signals_to_dag_execution() return super().push_child(child, *args, **kwargs)
@property def parent(self) -> None: return None @parent.setter def parent(self, new_parent: None): if new_parent is not None: raise ParentMostError( f"{self.label} is a {self.__class__} and may only take None as a " f"parent but got {type(new_parent)}" )
[docs] def run_data_tree_for_child(self, node: Node) -> None: """ Override of Composite.run_data_tree that handles workflow-specific logic. This method temporarily disables automate_execution to prevent the workflow from automating execution during the data tree run. Args: node (Node): The child node that initiated the data tree run. """ # Workflow parents will attempt to automate execution on run, # undoing all our careful execution # This heinous hack breaks in and stops that behaviour # I recognize this is dirty, but let's be pragmatic about getting # the features playing together. Workflows and pull are anyhow # already both very annoying on their own... automated = self.automate_execution self.automate_execution = False try: super().run_data_tree_for_child(node) finally: # And revert our workflow hack self.automate_execution = automated