"""
A base class for objects that can form nodes in the graph representation of a
computational workflow.
The workhorse class for the entire concept.
"""
from __future__ import annotations
import contextlib
import pathlib
import shutil
from abc import ABC, abstractmethod
from concurrent.futures import Future
from importlib import import_module
from typing import TYPE_CHECKING, Any, Literal, cast
from pyiron_snippets.colors import SeabornColors
from pyiron_snippets.dotdict import DotDict
from pyiron_workflow import overloading
from pyiron_workflow.channels import (
AccumulatingInputSignal,
Channel,
InputLockedError,
InputSignal,
OutputSignal,
)
from pyiron_workflow.draw import Node as GraphvizNode
from pyiron_workflow.executors.wrapped_executorlib import CacheOverride
from pyiron_workflow.io import IO, Inputs, Signals
from pyiron_workflow.logging import logger
from pyiron_workflow.mixin.display_state import HasStateDisplay
from pyiron_workflow.mixin.injection import (
InjectsOnChannel,
OutputDataWithInjection,
OutputsWithInjection,
)
from pyiron_workflow.mixin.lexical import Lexical
from pyiron_workflow.mixin.run import ReadinessError, Runnable
from pyiron_workflow.storage import (
BackendIdentifier,
StorageInterface,
available_backends,
)
from pyiron_workflow.topology import (
get_nodes_in_data_tree,
set_run_connections_according_to_linear_dag,
)
if TYPE_CHECKING:
from pathlib import Path
import graphviz
from pyiron_workflow.nodes.composite import Composite
[docs]
class WaitingForFutureError(ValueError): ...
[docs]
class AmbiguousOutputError(ValueError):
"""Raised when searching for exactly one output, but multiple are found."""
[docs]
class ConnectionCopyError(ValueError):
"""Raised when trying to copy IO, but connections cannot be copied"""
[docs]
class ValueCopyError(ValueError):
"""Raised when trying to copy IO, but values cannot be copied"""
[docs]
class Node(
HasStateDisplay,
Lexical["Composite"],
Runnable,
InjectsOnChannel,
ABC,
):
"""
Nodes are elements of a computational graph.
They have inputs and outputs to interface with the wider world, and perform some
operation.
By connecting multiple nodes' inputs and outputs together, computational graphs can
be formed.
These can be collected under a parent, such that new graphs can be composed of
one or more sub-graphs.
This is an abstract class.
Children *must* define how :attr:`inputs` and :attr:`outputs` are constructed,
what will happen :meth:`_on_run`, the :attr:`run_args` that will get passed to
:meth:`_on_run`, and how to :meth:`process_run_result` once :meth:`_on_run` finishes.
They may optionally add additional signal channels to the signals IO.
Attributes:
failed (bool): Whether the node raised an error calling :meth:`run`. (Default
is False.)
future (concurrent.futures.Future | None): A futures object, if the node is
currently running or has already run using an executor.
label (str): A name for the node.
parent (pyiron_workflow.composite.Composite | None): The parent object
owning this, if any.
recovery: (BackendIdentifier | StorageInterface | None): The storage
backend to use for saving a "recovery" file if the node execution crashes
and this is the parent-most node. Default is `"pickle"`, setting `None`
will prevent any file from being saved.
running (bool): Whether the node has called :meth:`run` and has not yet
received output from this call. (Default is False.)
checkpoint (BackendIdentifier | StorageInterface | None): Whether to trigger a
save of the entire graph after each run of the node, and if so what storage
back end to use. (Default is None, don't do any checkpoint saving.)
use_cache (bool): Whether or not to cache the inputs and, when the current
inputs match the cached input (by `==` comparison), to bypass running the
node and simply continue using the existing outputs. Note that you may be
able to trigger a false cache hit in some special case of non-idempotent
nodes working on mutable data.
"""
use_cache = True
def __init__(
self,
*args,
label: str | None = None,
parent: Composite | None = None,
delete_existing_savefiles: bool = False,
autoload: BackendIdentifier | StorageInterface | None = None,
autorun: bool = False,
checkpoint: BackendIdentifier | StorageInterface | None = None,
**kwargs,
):
"""
A parent class for objects that can form nodes in the graph representation of a
computational workflow.
Initialization ends with a routine :meth:`_after_node_setup` that may,
depending on instantiation arguments, try to actually execute the node. Since
child classes may need to get things done before this point, we want to make
sure that this happens _after_ all the other setup. This can be accomplished
by children (a) sticking stuff that is independent of `super().__init__` calls
before the super call, and (b) overriding :meth:`_setup_node(self)` to do any
remaining, parameter-free setup. This latter function gets called prior to any
execution.
Initialization will also try to parse any outstanding `args` and `kwargs` as
input to the node's input channels. For node class developers, that means it's
also important that `Node` parentage appear to the right-most of the
inheritance set in the class definition, so that it's invokation of `__init__`
appears as late as possible with the minimal set of args and kwargs.
Args:
label (str): A name for this node.
*args: Interpreted as node input data, in order of input channels.
parent: (Composite|None): The composite node that owns this as a child.
delete_existing_savefiles (bool): Whether to look for and delete any
matching save files at instantiation. Uses all default storage
back ends and anything passed to :param:`autoload`. (Default is False,
leave those files alone!)
autoload (BackendIdentifier | StorageInterface | None): The back end
to use for checking whether node data can be loaded from file. A None
value indicates no auto-loading. (Default is "pickle".)
autorun (bool): Whether to run at the end of initialization.
checkpoint (BackendIdentifier | StorageInterface | None): The storage
back end to use for saving the overall graph at the end of this node's
run. (Default is None, don't do checkpoint saves.)
**kwargs: Interpreted as node input data, with keys corresponding to
channel labels.
"""
super().__init__(label=label, parent=parent)
self._validate_ontologies = True # Back-door to turn off the alpha feature
self._signals = Signals()
self._signals.input.run = InputSignal("run", self, self.run)
self._signals.input.accumulate_and_run = AccumulatingInputSignal(
"accumulate_and_run", self, self.run
)
self._signals.output.ran = OutputSignal("ran", self)
self._signals.output.failed = OutputSignal("failed", self)
self.checkpoint = checkpoint
self.recovery: BackendIdentifier | StorageInterface | None = "pickle"
self._remove_executorlib_cache: bool = True # Power-user override for cleaning
# up temporary serialized results from runs with executorlib; intended to be
# used for testing
self._cached_inputs: dict[str, Any] | None = None
self._user_data: dict[str, Any] = {}
# A place for power-users to bypass node-injection
self._setup_node()
self._after_node_setup(
*args,
delete_existing_savefiles=delete_existing_savefiles,
autoload=autoload,
autorun=autorun,
**kwargs,
)
def _setup_node(self) -> None:
"""
Called _before_ :meth:`Node.__init__` finishes.
Child node classes can use this for any parameter-free node setup that should
happen _before_ :meth:`Node._after_node_setup` gets called.
"""
def _after_node_setup(
self,
*args,
delete_existing_savefiles: bool = False,
autoload: BackendIdentifier | StorageInterface | None = None,
autorun: bool = False,
**kwargs,
):
if delete_existing_savefiles:
self.delete_storage(backend=autoload)
if autoload is not None:
for backend in available_backends(backend=autoload):
if backend.has_saved_content(self):
logger.info(
f"A saved file was found for the node {self.full_label} -- "
f"attempting to load it...(To delete the saved file instead, "
f"use `delete_existing_savefiles=True`) "
)
self.load(backend=autoload)
break
self.set_input_values(*args, **kwargs)
if autorun:
with contextlib.suppress(ReadinessError):
self.run()
@property
def channel(self) -> OutputDataWithInjection:
"""
The single output channel. Fulfills the interface expectations for the
:class:`HasChannel` mixin and allows this object to be used directly for
forming connections, etc.
Returns:
(OutputDataWithInjection): The single output channel.
Raises:
AmbiguousOutputError: If there is not exactly one output channel.
"""
if len(self.outputs) != 1:
raise AmbiguousOutputError(
f"Tried to access the channel value of {self.label}, but this is only "
f"possible when there is a single output channel -- {self.label} has: "
f"{self.outputs.labels}. Access probably occurred attempting to use "
f"this object like an output channel, e.g. with injection or to form a "
f"connection. Either make sure it has exactly one output channel, or "
f"use the particular channel you want directly."
)
else:
return self.outputs[self.outputs.labels[0]]
@property
def graph_path(self) -> str:
"""
The path of node labels from the graph root (parent-most node in this lexical
path) down to this node.
"""
prefix = self.parent.lexical_path if isinstance(self.parent, Node) else ""
return prefix + self.lexical_delimiter + self.label
@property
def graph_root(self) -> Node:
"""The parent-most node in this lexical path."""
return self.parent.graph_root if isinstance(self.parent, Node) else self
@property
@abstractmethod
def inputs(self) -> Inputs: ...
@property
@abstractmethod
def outputs(self) -> OutputsWithInjection: ...
@property
def signals(self) -> Signals:
"""
A container for input and output signals, which are channels for controlling
execution flow. By default, has a :attr:`signals.inputs.run` channel which has
a callback to the :meth:`run` method that fires whenever _any_ of its
connections sends a signal to it, a :attr:`signals.inputs.accumulate_and_run`
channel which has a callback to the :meth:`run` method but only fires after
_all_ its connections send at least one signal to it, and `signals.outputs.ran`
which gets called when the `run` method is finished.
Additional signal channels in derived classes can be added to
:attr:`signals.inputs` and :attr:`signals.outputs` after this mixin class is
initialized.
"""
return self._signals
@property
def connected(self) -> bool:
"""Whether _any_ of the IO (including signals) are connected."""
return self.inputs.connected or self.outputs.connected or self.signals.connected
@property
def fully_connected(self) -> bool:
"""Whether _all_ of the IO (including signals) are connected."""
return (
self.inputs.fully_connected
and self.outputs.fully_connected
and self.signals.fully_connected
)
[docs]
def disconnect(self) -> list[tuple[Channel, Channel]]:
"""
Disconnect all connections belonging to inputs, outputs, and signals channels.
Returns:
[list[tuple[Channel, Channel]]]: A list of the pairs of channels that no
longer participate in a connection.
"""
destroyed_connections = []
destroyed_connections.extend(self.inputs.disconnect())
destroyed_connections.extend(self.outputs.disconnect())
destroyed_connections.extend(self.signals.disconnect())
return destroyed_connections
[docs]
def activate_strict_hints(self) -> None:
"""Enable type hint checks for all data IO"""
self.inputs.activate_strict_hints()
self.outputs.activate_strict_hints()
[docs]
def deactivate_strict_hints(self) -> None:
"""Disable type hint checks for all data IO"""
self.inputs.deactivate_strict_hints()
self.outputs.deactivate_strict_hints()
def _connect_output_signal(self, signal: OutputSignal) -> None:
self.signals.input.run.connect(signal)
def __rshift__(self, other: InputSignal | Node) -> InputSignal | Node:
"""
Allows users to connect run and ran signals like: `first >> second`.
"""
other._connect_output_signal(self.signals.output.ran)
return other
def _connect_accumulating_input_signal(
self, signal: AccumulatingInputSignal
) -> None:
self.signals.output.ran.connect(signal)
def __lshift__(self, others: tuple[OutputSignal | Node, ...]):
"""
Connect one or more `ran` signals to `accumulate_and_run` signals like:
`this << some_object, another_object, or_by_channel.signals.output.ran`
"""
self.signals.input.accumulate_and_run << others
def _get_complete_input(self, *args, **kwargs) -> dict[str, Any]:
if len(args) > len(self.inputs.labels):
raise ValueError(
f"Received {len(args)} args, but only have {len(self.inputs.labels)} "
f"input channels available"
)
keyed_args = dict(zip(self.inputs.labels, args, strict=False))
if len(set(keyed_args.keys()).intersection(kwargs.keys())) > 0:
raise ValueError(
f"n args are interpreted using the first n input channels "
f"({self.inputs.labels}), but this conflicted with received kwargs "
f"({list(kwargs.keys())}) -- perhaps the input was ordered differently "
f"than expected? Got args {args} and kwargs {kwargs}."
)
kwargs.update(keyed_args)
self._ensure_all_input_keys_present(kwargs.keys(), self.inputs.labels)
return kwargs
@staticmethod
def _ensure_all_input_keys_present(used_keys, available_keys):
diff = set(used_keys).difference(available_keys)
if len(diff) > 0:
raise ValueError(
f"{diff} not found among available inputs: {available_keys}"
)
@property
def _owned_io_panels(self) -> list[IO]:
return [
self.inputs,
self.outputs,
self.signals.input,
self.signals.output,
]
@property
def _readiness_dict(self) -> dict[str, bool]:
dict = super()._readiness_dict
for k, v in self.inputs.items():
dict[f"inputs.{k}"] = v.ready
return dict
@property
def _readiness_error_message(self) -> str:
return (
f"{self.label} received a run command but is not ready. The node "
f"should be neither running nor failed, and all input values should"
f" conform to type hints.\n" + self.readiness_report
)
def _is_using_wrapped_excutorlib_executor(self) -> bool:
return self.executor is not None and (
isinstance(self.executor, CacheOverride)
or (
isinstance(self.executor, tuple)
and isinstance(self.executor[0], type)
and issubclass(self.executor[0], CacheOverride)
)
)
def _clean_wrapped_executorlib_executor_cache(self) -> None:
self._wrapped_executorlib_cache_file.unlink()
cache_subdir = self.as_path() / CacheOverride.override_cache_file_name
if pathlib.Path(cache_subdir).is_dir():
shutil.rmtree(cache_subdir)
self.clean_path()
@property
def _wrapped_executorlib_cache_file(self) -> Path:
"""For internal use to clean up cached executorlib files"""
# Depends on executorlib implementation details not protected by semver
file_name = CacheOverride.override_cache_file_name + "_o.h5"
return self.as_path() / file_name
[docs]
def on_run(self, *args, **kwargs) -> Any:
return self._on_run(*args, **kwargs)
@abstractmethod
def _on_run(self, *args, **kwargs) -> Any: ...
[docs]
def run(
self,
*args,
run_data_tree: bool = False,
run_parent_trees_too: bool = False,
fetch_input: bool = True,
check_readiness: bool = True,
raise_run_exceptions: bool = True,
rerun: bool = False,
emit_ran_signal: bool = True,
**kwargs,
):
"""
The master method for running in a variety of ways.
By default, whatever data is currently available in upstream nodes will be
fetched, if the input all conforms to type hints then this node will be run
(perhaps using an executor), and finally the `ran` signal will be emitted to
trigger downstream runs.
If executor information is specified, execution happens on that process, a
callback is registered, and futures object is returned.
Input values can be updated at call time with kwargs, but this happens _first_
so any input updates that happen as a result of the computation graph will
override these by default. If you really want to execute the node with a
particular set of input, set it all manually and use `execute` (or `run` with
carefully chosen flags).
Args:
run_data_tree (bool): Whether to first run all upstream nodes in the data
graph. (Default is False.)
run_parent_trees_too (bool): Whether to recursively run the data tree in
parent nodes (if any). (Default is False.)
fetch_input (bool): Whether to first update inputs with the
highest-priority connections holding data (i.e. the first valid
connection; and the most recently formed connections appear first
unless the connections list has been manually tampered with). (Default
is True.)
check_readiness (bool): Whether to raise an exception if the node is not
:attr:`ready` to run after fetching new input. (Default is True.)
raise_run_exceptions (bool): Whether to raise exceptions encountered during
the run, or just ignore them. (Default is True, raise them!)
rerun (bool): Whether to force-set :attr:`running` and :attr:`failed` to
`False` before running. (Default is False.)
emit_ran_signal (bool): Whether to fire off all the output `ran` signal
afterwards. (Default is True.)
**kwargs: Keyword arguments matching input channel labels; used to update
the input channel values before running anything.
Returns:
(Any | Future): The result of running the node, or a futures object (if
running on an executor).
Note:
Running data trees is a pull-based paradigm and only compatible with graphs
whose data forms a directed acyclic graph (DAG).
Note:
Kwargs updating input channel values happens _first_ and will get
overwritten by any subsequent graph-based data manipulation.
"""
return super().run(
check_readiness=check_readiness,
raise_run_exceptions=raise_run_exceptions,
rerun=rerun,
before_run_kwargs={
"run_data_tree": run_data_tree,
"run_parent_trees_too": run_parent_trees_too,
"fetch_input": fetch_input,
"emit_ran_signal": emit_ran_signal,
"input_args": args,
"input_kwargs": kwargs,
},
run_finally_kwargs={
"emit_ran_signal": emit_ran_signal,
"raise_run_exceptions": raise_run_exceptions,
},
)
def _before_run(
self,
/,
check_readiness: bool,
rerun: bool,
run_data_tree: bool,
run_parent_trees_too: bool,
fetch_input: bool,
emit_ran_signal: bool,
input_args: tuple[Any, ...],
input_kwargs: dict[str, Any],
) -> tuple[bool, Any]:
if self.running:
if len(input_args) > 0 or len(input_kwargs) > 0:
raise InputLockedError(
f"Node {self.label} is running. Input values are not allowed to be "
f"updated in this state, but got args {input_args} and kwargs "
f"{input_kwargs}."
)
if self.future is not None:
if rerun:
raise WaitingForFutureError(
f"Node {self.label} is running and has a future attached to "
f"it. It cannot be rerun in this state."
)
else:
return True, self.future
if self._is_using_wrapped_excutorlib_executor():
return False, None # Let it cook
elif not rerun:
raise ReadinessError(self._readiness_error_message)
if self.failed and check_readiness and not rerun:
raise ReadinessError(self._readiness_error_message)
if run_data_tree:
self.run_data_tree(run_parent_trees_too=run_parent_trees_too)
self.set_input_values(*input_args, **input_kwargs)
if fetch_input:
self.inputs.fetch()
if self.use_cache and self.cache_hit: # Read and use cache
self._on_cache_hit()
if (self.parent is None or not self.parent.running) and emit_ran_signal:
self.emit()
elif self.parent is not None and self.parent.running:
self.parent.register_child_starting(self)
self.parent.register_child_finished(self)
if emit_ran_signal:
self.parent.register_child_emitting(self)
return True, self._outputs_to_run_return()
else:
self._on_cache_miss()
if self.use_cache: # Write cache and continue
self._cache_inputs()
return super()._before_run(check_readiness=check_readiness, rerun=rerun)
def _on_cache_hit(self) -> None:
"""A hook for subclasses to act on cache hits"""
return
def _on_cache_miss(self) -> None:
"""A hook for subclasses to act on cache misses"""
return
def _cache_inputs(self):
self._cached_inputs = self.inputs.to_value_dict()
[docs]
def clear_cache(self):
self._cached_inputs = None
def _run(
self,
raise_run_exceptions: bool,
run_exception_kwargs: dict,
run_finally_kwargs: dict,
finish_run_kwargs: dict,
) -> Any | tuple | Future:
if self.parent is not None and self.parent.running:
self.parent.register_child_starting(self)
return super()._run(
raise_run_exceptions=raise_run_exceptions,
run_exception_kwargs=run_exception_kwargs,
run_finally_kwargs=run_finally_kwargs,
finish_run_kwargs=finish_run_kwargs,
)
def _run_finally(self, /, emit_ran_signal: bool, raise_run_exceptions: bool):
super()._run_finally()
if self.parent is not None and self.parent.running:
self.parent.register_child_finished(self)
if self.checkpoint is not None:
self.save_checkpoint(self.checkpoint)
if emit_ran_signal:
if self.parent is None or not self.parent.running:
self.emit()
else:
self.parent.register_child_emitting(self)
if (
self.failed
and raise_run_exceptions
and self.recovery is not None
and self.graph_root is self
):
self.save(
backend=self.recovery, filename=self.as_path().joinpath("recovery")
)
if (
self._remove_executorlib_cache
and self._is_using_wrapped_excutorlib_executor()
):
self._clean_wrapped_executorlib_executor_cache()
[docs]
def run_data_tree(self, run_parent_trees_too=False) -> None:
"""
Use topological analysis to build a tree of all upstream dependencies and run
them.
Args:
run_parent_trees_too (bool): First, call the same method on this node's
parent (if one exists), and recursively up the parentage tree. (Default
is False, only run nodes in this scope, i.e. sharing the same parent.)
"""
if run_parent_trees_too and self.parent is not None:
self.parent.run_data_tree(run_parent_trees_too=True)
self.parent.inputs.fetch()
data_tree_nodes = get_nodes_in_data_tree(self)
# If we have a parent, delegate to it
if self.parent is not None:
self.parent.run_data_tree_for_child(self)
return
# The rest of this method handles the case when self.parent is None
label_map = {}
nodes = {}
for node in data_tree_nodes:
if node.executor is not None:
raise ValueError(
f"Running the data tree is pull-paradigm action, and is "
f"incompatible with using executors. While running "
f"{self.full_label}, an executor request was found on "
f"{node.full_label}"
)
for node in data_tree_nodes:
modified_label = node.label + str(id(node))
label_map[modified_label] = node.label
node.label = modified_label # Ensure each node has a unique label
# This is necessary when the nodes do not have a workflow and may thus have
# arbitrary labels.
# This is pretty ugly; it would be nice to not depend so heavily on labels.
# Maybe we could switch a bunch of stuff to rely on the unique ID?
nodes[modified_label] = node
try:
disconnected_pairs, starters = set_run_connections_according_to_linear_dag(
nodes
)
data_tree_starters = list(set(starters).intersection(data_tree_nodes))
except Exception as e:
# If the dag setup fails it will repair any connections it breaks before
# raising the error, but we still need to repair our label changes
for modified_label, node in nodes.items():
node.label = label_map[modified_label]
raise e
try:
if len(data_tree_starters) > 1 or data_tree_starters[0] is not self:
self.signals.disconnect_run()
# Don't let anything upstream trigger _this_ node
for starter in data_tree_starters:
starter.run() # Now push from the top
# Otherwise the requested node is the only one in the data tree, so there's
# nothing upstream to run.
finally:
# No matter what, restore the original connections and labels afterwards
for modified_label, node in nodes.items():
node.label = label_map[modified_label]
node.signals.disconnect_run()
for c1, c2 in disconnected_pairs:
c1.connect(c2)
@property
def cache_hit(self) -> bool:
try:
return self.inputs.to_value_dict() == self._cached_inputs
except Exception:
return False
def _outputs_to_run_return(self):
return DotDict(self.outputs.to_value_dict())
@property
def emitting_channels(self) -> tuple[OutputSignal, ...]:
if self.failed:
return (self.signals.output.failed,)
else:
return (self.signals.output.ran,)
[docs]
def emit(self):
for channel in self.emitting_channels:
channel()
[docs]
def execute(self, *args, **kwargs):
"""
A shortcut for :meth:`run` with particular flags.
Run the node with whatever input it currently has (or is given as kwargs here),
run it on this python process, and don't emit the `ran` signal afterwards.
Intended to be useful for debugging by just forcing the node to do its thing
right here, right now, and as-is.
"""
return self.run(
*args,
run_data_tree=False,
run_parent_trees_too=False,
fetch_input=False,
check_readiness=False,
rerun=False,
emit_ran_signal=False,
**kwargs,
)
[docs]
def pull(self, *args, run_parent_trees_too=False, **kwargs):
"""
A shortcut for :meth:`run` with particular flags.
Runs nodes upstream in the data graph, then runs this node without triggering
any downstream runs. By default only runs sibling nodes, but can optionally
require the parent node to pull in its own upstream runs (this is recursive
up to the parent-most object).
Args:
run_parent_trees_too (bool): Whether to (recursively) require the parent to
first pull.
"""
return self.run(
*args,
run_data_tree=True,
run_parent_trees_too=run_parent_trees_too,
fetch_input=True,
check_readiness=True,
rerun=False,
emit_ran_signal=False,
**kwargs,
)
[docs]
def push(self, *args, **kwargs):
"""
Exactly like :meth:`run` with all the same flags, _except_ it handles an edge
case where you are trying to directly run the child node of a
:class:`pyiron_workflow.workflow.Workflow` before it has had any chance to
configure its execution signals.
_If_ the parent is a workflow set up to automate execution flow, does that
_first_ then runs as usual.
"""
# Alright, time for more egregious hacking
# Normally, running will work in a push-like mode _BUT_, because Workflow's are
# a flexible dynamic thing, they normally construct their execution signals on
# the fly at each run invocation. This is not the case for Macros, where the
# run configuration, if automated at all, happens right at macro instantiation.
# So there's this horrible edge case where you build a workflow, then
# immediately try to run one of its children directly, naively expecting that
# the run will push downstream executions like it does in a macro -- except it
# _doesn't_ because there are _no signal connections at all yet!_
# Building these on _every_ run would be needlessly expensive, so this method
# exists as a hacky guaranteed way to secure push-like run behaviour regardless
# of the context you're calling from.
if self.parent is not None:
return self.parent.push_child(self, *args, **kwargs)
else:
return self.run(*args, **kwargs)
def __call__(self, *args, **kwargs) -> None:
"""
A shortcut for :meth:`pull` that automatically runs the entire set of upstream data
dependencies all the way to the parent-most graph object.
"""
return self.pull(*args, run_parent_trees_too=True, **kwargs)
@property
def ready(self) -> bool:
"""
Whether the inputs are all ready and the node is neither already running nor
already failed.
"""
return super().ready and self.inputs.ready
@property
def color(self) -> str:
"""A hex code color for use in drawing."""
return SeabornColors.white
[docs]
def draw(
self,
depth: int = 1,
rankdir: Literal["LR", "TB"] = "LR",
size: tuple | None = None,
save: bool = False,
view: bool = False,
directory: Path | str | None = None,
filename: Path | str | None = None,
format: str | None = None,
cleanup: bool = True,
) -> graphviz.graphs.Digraph:
"""
Draw the node structure and return it as a graphviz object.
A selection of the :func:`graphviz.Graph.render` method options are exposed,
and if :param:`view` or :param:`filename` is provided, this will be called
before returning the graph.
The graph file and rendered image will be stored in a directory based of the
node's lexical path, unless a :param:`directory` is explicitly set.
This is purely for convenience -- since we directly return a graphviz object
you can instead use this to leverage the full power of graphviz.
Args:
depth (int): How deeply to decompose the representation of composite nodes
to reveal their inner structure. (Default is 1, which will show owned
nodes if _this_ is a composite node, but all children will be drawn
at the level of showing their IO only.) A depth value greater than the
max depth of the node will have no adverse side effects.
rankdir ("LR" | "TB"): Use left-right or top-bottom graphviz `rankdir` to
orient the flow of the graph.
size (tuple[int | float, int | float] | None): The size of the diagram, in
inches(?); respects ratio by scaling until at least one dimension
matches the requested size. (Default is None, automatically size.)
save (bool): Render the graph image. (Default is False. When True, all
other defaults will yield a PDF in the node's working directory.)
view (bool): `graphviz.Graph.render` argument, open the rendered result
with the default application. (Default is False. When True, default
values for the directory and filename are supplied by the node working
directory and label.)
directory (Path|str|None): `graphviz.Graph.render` argument, (sub)directory
for source saving and rendering. (Default is None, which uses the
node's working directory.)
filename (Path|str): `graphviz.Graph.render` argument, filename for saving
the source. (Default is None, which uses the node label + `"_graph"`.
format (str|None): `graphviz.Graph.render` argument, the output format used
for rendering ('pdf', 'png', etc.).
cleanup (bool): `graphviz.Graph.render` argument, delete the source file
after successful rendering. (Default is True -- unlike graphviz.)
Returns:
(graphviz.graphs.Digraph): The resulting graph object.
"""
size_str = f"{size[0]},{size[1]}" if size is not None else None
graph = GraphvizNode(self, depth=depth, rankdir=rankdir, size=size_str).graph
if save or view or filename is not None:
directory = self.as_path() if directory is None else Path(directory)
filename = self.label + "_graph" if filename is None else filename
graph.render(
view=view,
directory=directory,
filename=filename,
format=format,
cleanup=cleanup,
)
return graph
def __str__(self):
return (
f"{self.label} ({self.__class__.__name__}):\n"
f"{str(self.inputs)}\n"
f"{str(self.outputs)}\n"
f"{str(self.signals)}"
)
_save_load_warnings = """
HERE BE DRAGONS!!!
Warning:
This almost certainly only fails for subclasses of :class:`Node` that don't
override `node_function` or `macro_creator` directly, as these are expected
to be part of the class itself (and thus already present on our instantiated
object) and are never stored. Nodes created using the provided decorators
should all work.
Warning:
If you modify a `Macro` class in any way (changing its IO maps, rewiring
internal connections, or replacing internal nodes), don't expect
saving/loading to work.
Warning:
If the underlying source code has changed since saving (i.e. the node doing
the loading does not use the same code as the node doing the saving, or the
nodes in some node package have been modified), then all bets are off.
"""
[docs]
def save(
self,
backend: BackendIdentifier | StorageInterface = "pickle",
filename: str | Path | None = None,
**kwargs,
):
"""
Writes the node to file using the requested interface as a back end.
Args:
backend (str | StorageInterface): The interface to use for serializing the
node. (Default is "pickle", which loads the standard pickling back end.)
filename (str | Path | None): The name of the file (without extensions) at
which to save the node. (Default is None, which uses the node's
lexical path.)
**kwargs: Back end-specific keyword arguments.
"""
for selected_backend in available_backends(
backend=backend, only_requested=True
):
selected_backend.save(node=self, filename=filename, **kwargs)
save.__doc__ = cast(str, save.__doc__) + _save_load_warnings
[docs]
def save_checkpoint(self, backend: BackendIdentifier | StorageInterface = "pickle"):
"""
Triggers a save on the parent-most node.
Args:
backend (str | StorageInterface): The interface to use for serializing the
node. (Default is "pickle", which loads the standard pickling back end.)
"""
self.graph_root.save(backend=backend)
@classmethod
def _new_instance_from_storage(
cls,
backend: BackendIdentifier | StorageInterface = "pickle",
only_requested=False,
filename: str | Path | None = None,
_node: Node | None = None,
**kwargs,
):
"""
Loads a node from file returns its instance.
Args:
backend (str | StorageInterface): The interface to use for serializing the
node. (Default is "pickle", which loads the standard pickling back end.)
only_requested (bool): Whether to _only_ try loading from the specified
backend, or to loop through all available backends. (Default is False,
try to load whatever you can find.)
filename (str | Path | None): The name of the file (without extensions)
from which to load the node. (Default is None, which uses the node's
lexical path.)
**kwargs: back end-specific arguments (only likely to work in combination
with :param:`only_requested`, otherwise there's nothing to be specific
_to_.)
Raises:
FileNotFoundError: when nothing got loaded.
"""
inst = None
for selected_backend in available_backends(
backend=backend, only_requested=only_requested
):
inst = selected_backend.load(node=_node, filename=filename, **kwargs)
if inst is not None:
break
if inst is None:
raise FileNotFoundError(
f"Could not find saved content at {filename} using backend={backend} "
f"using only_request={only_requested}."
)
return inst
def _update_instance_from_storage(
self,
backend: BackendIdentifier | StorageInterface = "pickle",
only_requested=False,
filename: str | Path | None = None,
**kwargs,
):
"""
Loads the node file and set the loaded state as the node's own.
Args:
backend (str | StorageInterface): The interface to use for serializing the
node. (Default is "pickle", which loads the standard pickling back end.)
only_requested (bool): Whether to _only_ try loading from the specified
backend, or to loop through all available backends. (Default is False,
try to load whatever you can find.)
filename (str | Path | None): The name of the file (without extensions)
from which to load the node. (Default is None, which uses the node's
lexical path.)
**kwargs: back end-specific arguments (only likely to work in combination
with :param:`only_requested`, otherwise there's nothing to be specific
_to_.)
Raises:
FileNotFoundError: when nothing got loaded.
TypeError: when the saved node has a different class name.
"""
if self.running:
raise ValueError(
"Cannot load a node while it is running. If you are sure loading now "
"is the correct thing to do, you can set `self.running=True` where "
"`self` is this node object."
)
inst = self.__class__._new_instance_from_storage(
backend=backend,
only_requested=only_requested,
filename=filename,
_node=self if filename is None else None,
**kwargs,
)
if inst.__class__ != self.__class__:
raise TypeError(
f"{self.label} cannot load, as it has type "
f"{self.__class__.__name__}, but the saved node has type "
f"{inst.__class__.__name__}"
)
self.__setstate__(inst.__getstate__())
[docs]
@overloading.overloaded_classmethod(class_method=_new_instance_from_storage)
def load(
self,
backend: BackendIdentifier | StorageInterface = "pickle",
only_requested=False,
filename: str | Path | None = None,
**kwargs,
):
"""
Load a node from storage, either as a new instance (when used as a class
method) or by updating the current instance (when called as a regular instance
method).
Args:
backend (str | StorageInterface): The interface to use for serializing the
node. (Default is "pickle", which loads the standard pickling back end.)
only_requested (bool): Whether to _only_ try loading from the specified
backend, or to loop through all available backends. (Default is False,
try to load whatever you can find.)
filename (str | Path | None): The name of the file (without extensions)
from which to load the node. (Default is None, which uses the node's
lexical path.)
**kwargs: back end-specific arguments (only likely to work in combination
with :param:`only_requested`, otherwise there's nothing to be specific
_to_.)
Raises:
FileNotFoundError: when nothing got loaded.
TypeError: when loading into an exisiting instance and the saved node has a
different class name.
"""
return self._update_instance_from_storage(
backend=backend,
only_requested=only_requested,
filename=filename,
**kwargs,
)
load.__doc__ = cast(str, load.__doc__) + _save_load_warnings
[docs]
def delete_storage(
self,
backend: BackendIdentifier | StorageInterface | None = None,
only_requested: bool = False,
filename: str | Path | None = None,
*,
delete_even_if_not_empty: bool = False,
**kwargs,
):
"""
Remove save file(s).
Args:
backend (str | StorageInterface): The interface to use for serializing the
node. (Default is "pickle", which loads the standard pickling back end.)
only_requested (bool): Whether to _only_ search for files using the
specifiedmbackend, or to loop through all available backends. (Default
is False, try to remove whatever you can find.)
filename (str | Path | None): The name of the file (without extensions) to
remove. (Default is None, which uses the node's lexical path.)
delete_even_if_not_empty (bool): Whether to delete the file even if it is
not empty. (Default is False, which will only delete the file if it is
empty, i.e. has no content in it.)
**kwargs: back end-specific arguments (only likely to work in combination
with :param:`only_requested`, otherwise there's nothing to be specific
_to_.)
"""
for selected_backend in available_backends(
backend=backend, only_requested=only_requested
):
selected_backend.delete(
node=self if filename is None else None,
filename=filename,
delete_even_if_not_empty=delete_even_if_not_empty,
**kwargs,
)
[docs]
def has_saved_content(
self,
backend: BackendIdentifier | StorageInterface | None = None,
only_requested: bool = False,
filename: str | Path | None = None,
**kwargs,
):
"""
Whether any save files can be found at the canonical location for this node.
Args:
backend (str | StorageInterface): The interface to use for serializing the
node. (Default is "pickle", which loads the standard pickling back end.)
only_requested (bool): Whether to _only_ search for files using the
specified backend, or to loop through all available backends. (Default
is False, try to finding whatever you can find.)
filename (str | Path | None): The name of the file (without extensions) to
look for. (Default is None, which uses the node's lexical path.)
**kwargs: back end-specific arguments (only likely to work in combination
with :param:`only_requested`, otherwise there's nothing to be specific
_to_.)
Returns:
bool: Whether any save files were found
"""
return any(
be.has_saved_content(
node=self if filename is None else None, filename=filename, **kwargs
)
for be in available_backends(backend=backend, only_requested=only_requested)
)
@property
def import_ready(self) -> bool:
"""
Checks whether `importlib` can find this node's class, and if so whether the
imported object matches the node's type.
Returns:
(bool): Whether the imported module and name of this node's class match
its type.
"""
try:
module = self.__class__.__module__
class_ = getattr(import_module(module), self.__class__.__name__)
if module == "__main__":
logger.warning(f"{self.label} is only defined in __main__")
return type(self) is class_
except (ModuleNotFoundError, AttributeError):
return False
@property
def import_readiness_report(self):
print(self.report_import_readiness())
[docs]
def report_import_readiness(self, tabs=0, report_so_far=""):
newline = "\n" if len(report_so_far) > 0 else ""
tabspace = tabs * "\t"
return (
report_so_far + f"{newline}{tabspace}{self.label}: "
f"{'ok' if self.import_ready else 'NOT IMPORTABLE'}"
)
[docs]
def display_state(self, state=None, ignore_private=True):
state = dict(self.__getstate__()) if state is None else state
if self.parent is not None:
state["parent"] = self.parent.full_label
if len(state["_user_data"]) > 0:
self._make_entry_public(state, "_user_data", "user_data")
return super().display_state(state=state, ignore_private=ignore_private)
@classmethod
def _extra_info(cls) -> str:
"""
Any additional info that may be particularly useful for users of the node class.
"""
return ""