pyiron_workflow.topology module

A submodule for getting our node classes talking nicely with an external tool for topological analysis. Such analyses are useful for automating execution flows based on data flow dependencies.

exception pyiron_workflow.topology.CircularDataFlowError[source]

Bases: ValueError

pyiron_workflow.topology.get_nodes_in_data_tree(node: Node) set[Node][source]

Get a set of all nodes from this one and upstream through data connections.

pyiron_workflow.topology.nodes_to_data_digraph(nodes: dict[str, Node]) dict[str, set[str]][source]

Maps a set of nodes to a digraph of their data dependency in the format of label keys and set of label values for upstream nodes.

Parameters:

nodes (dict[str, Node]) – A label-keyed dictionary of nodes to convert into a string-based dictionary of digraph connections based on data flow.

Returns:

A dictionary of nodes and the nodes they depend on for

data.

Return type:

dict[str, set[str]]

Raises:
  • CircularDataFlowError – When a node appears in its own input.

  • ValueError – If the nodes do not all have the same parent.

  • ValueError – If one of the nodes has an upstream data connection whose node has a different parent.

pyiron_workflow.topology.set_run_connections_according_to_dag(nodes: dict[str, Node]) tuple[list[tuple[InputSignal, OutputSignal]], list[Node]][source]

Given a set of nodes that all have the same parent, have no upstream data connections outside the nodes provided, and have acyclic data flow, disconnects all their run and ran signals, then sets these signals so that each node has its accumulating and_run signals connected to all of its up-data-stream dependencies. Returns the upstream-most nodes that have no data dependencies.

In the event an exception is encountered, any disconnected connections are repaired before it is raised.

Parameters:

nodes (dict[str, Node]) – A dictionary of node labels and the node the label is from, whose connections will be set according to data flow.

Returns:

Any run/ran pairs that were

disconnected.

(list[Node]): The upstream-most nodes, i.e. those that have no dependencies.

Return type:

(list[tuple[SignalChannel, SignalChannel]])

pyiron_workflow.topology.set_run_connections_according_to_linear_dag(nodes: dict[str, Node]) tuple[list[tuple[InputSignal, OutputSignal]], list[Node]][source]

Given a set of nodes that all have the same parent, have no upstream data connections outside the nodes provided, and have acyclic data flow, disconnects all their run and ran signals, then sets these signals to a linear execution that guarantees downstream nodes are always executed after upstream nodes. Returns one of the upstream-most nodes.

In the event an exception is encountered, any disconnected connections are repaired before it is raised.

Parameters:

nodes (dict[str, Node]) – A dictionary of node labels and the node the label is from, whose connections will be set according to data flow.

Returns:

Any run/ran pairs that were

disconnected.

(list[Node]): The 0th node in the execution order, i.e. on that has no

dependencies wrapped in a list.

Return type:

(list[tuple[SignalChannel, SignalChannel]])