Source code for pyiron_workflow.mixin.run

"""
A module to extract encapsulate for complex run mechanics, such as status, executor
interaction, etc.
"""

from __future__ import annotations

import contextlib
import threading
from abc import ABC, abstractmethod
from collections.abc import Callable
from concurrent.futures import Executor as StdLibExecutor
from concurrent.futures import Future, ThreadPoolExecutor
from functools import partial
from time import sleep
from typing import Any, TypeAlias

from pyiron_workflow.mixin.has_interface_mixins import HasLabel, HasRun, UsesState

InterpretableAsExecutor: TypeAlias = (
    StdLibExecutor | tuple[Callable[..., StdLibExecutor], tuple, dict]
)


[docs] class ReadinessError(ValueError): """ To be raised when :class:`Runnable` calls run and requests a readiness check, but isn't :attr:`ready`. """ readiness_dict: dict[str, bool] # Detailed information on why it is not ready
[docs] class NotInterpretableAsExecutorError(TypeError): ...
[docs] class Runnable(UsesState, HasLabel, HasRun, ABC): """ An abstract class for interfacing with executors, etc. Child classes must define :meth:`on_run` and :attr:`.Runnable.run_args`, then the :meth:`run` will invoke `self.on_run(*run_args[0], **run_args[1])`. The :class:`Runnable` class then handles the status of the run, passing the call off for remote execution, handling any returned futures object, etc. Child classes can optionally override :meth:`process_run_result` to do something with the returned value of :meth:`on_run`, but by default the returned value just passes cleanly through the function. The `run` cycle is broken down into sub-steps: - `_before_run`: prior to the `running` status being set to `True` - `_run`: after the `running` status has been set to `True` - `_finish_run`: what is done to the results of running, and when `running` is set to `False` - `_run_exception`: What to do if an encountered - `_run_finally`: What to do after _every_ run, regardless of whether an exception was encountered Child classes can extend the behavior of these sub-steps, including introducing new keyword arguments. """ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.running: bool = False self.failed: bool = False self._executor: InterpretableAsExecutor | None = None # We call it an executor, but it can also be instructions on making one self.future: None | Future = None self._thread_pool_sleep_time: float = 1e-6
[docs] @abstractmethod def on_run(self, *args, **kwargs) -> Any: # callable[..., Any | tuple]: """ What the :meth:`run` method actually does! """
@property @abstractmethod def run_args(self) -> tuple[tuple, dict]: """ Any data needed for :meth:`on_run`, will be passed as (*args, **kwargs). """ @property def executor(self) -> InterpretableAsExecutor | None: return self._executor @executor.setter def executor(self, executor: InterpretableAsExecutor | None): if not ( isinstance(executor, StdLibExecutor | type(None)) or ( callable(executor[0]) and isinstance(executor[1], tuple) and isinstance(executor[2], dict) ) ): raise NotInterpretableAsExecutorError( f"Expected an instance of {StdLibExecutor}, or a tuple of such a " f"class, a tuple of args, and a dict of kwargs -- but got {executor}." ) self._executor = executor
[docs] def process_run_result(self, run_output: Any) -> Any: """ What to _do_ with the results of :meth:`on_run` once you have them. By extracting this as a separate method, we allow the runnable to pass the actual execution off to another entity and release the python process to do other things. In such a case, this function should be registered as a callback so that the runnable can process the result of that process. Args: run_output: The results of a `self.on_run(self.run_args)` call. """ return run_output
@property def ready(self) -> bool: """Neither running nor failed""" return not (self.running or self.failed) @property def _readiness_dict(self) -> dict[str, bool]: return { "ready": self.ready, "running": self.running, "failed": self.failed, } @property def readiness_report(self) -> str: """A human-readable summary of the readiness to run.""" report = f"{self.label} readiness report:\n" for k, v in self._readiness_dict.items(): report += f"{k}: {v}\n" return report
[docs] def executor_shutdown(self, wait=True, *, cancel_futures=False): """Invoke shutdown on the executor (if present).""" with contextlib.suppress(AttributeError): self.executor.shutdown(wait=wait, cancel_futures=cancel_futures)
[docs] def run( self, check_readiness: bool = True, raise_run_exceptions: bool = True, rerun: bool = False, before_run_kwargs: dict | None = None, run_kwargs: dict | None = None, run_exception_kwargs: dict | None = None, run_finally_kwargs: dict | None = None, finish_run_kwargs: dict | None = None, ) -> Any | tuple | Future: """ Checks that the runnable is :attr:`ready` (if requested), then executes the functionality of defined in :meth:`on_run` by passing it whatever is returned by :meth:`run_args`. Can stop early if :meth:`_before_run` called here returns `True` as its first argument. Handles the status of the runnable, communicating with any remote computing resources, and processing the result. Args: check_readiness (bool): Whether to raise a `ReadinessError` if not :attr:`ready`. (Default is True.) raise_run_exceptions (bool): Whether to raise exceptions encountered while :attr:`running`. (Default is True.) rerun (bool): Whether to proceed even if the :attr:`running` or :attr:`failed` state is encountered before runnign. (Default is False.) """ def _none_to_dict(inp: dict | None) -> dict: return {} if inp is None else inp before_run_kwargs = _none_to_dict(before_run_kwargs) run_kwargs = _none_to_dict(run_kwargs) run_exception_kwargs = _none_to_dict(run_exception_kwargs) run_finally_kwargs = _none_to_dict(run_finally_kwargs) finish_run_kwargs = _none_to_dict(finish_run_kwargs) stop_early, result = self._before_run( check_readiness=check_readiness, rerun=rerun, **before_run_kwargs ) if stop_early: return result self.running = True return self._run( raise_run_exceptions=raise_run_exceptions, run_exception_kwargs=run_exception_kwargs, run_finally_kwargs=run_finally_kwargs, finish_run_kwargs=finish_run_kwargs, **run_kwargs, )
def _before_run( self, /, check_readiness: bool, rerun: bool, *args, **kwargs ) -> tuple[bool, Any]: """ Things to do _before_ running. Args: check_readiness (bool): Whether to raise a `ReadinessError` if not :attr:`ready`. rerun (bool): Whether to proceed even if the :attr:`running` or :attr:`failed` state is encountered. **kwargs: Keyword arguments used by child classes in overriding this function. Returns: (bool): Whether to exit the parent run call early. (Any): What to return on an early-exit. Raises: (ReadinessError): If :param:`check_readiness` but not :attr:`ready`. """ if rerun: self.running = False self.failed = False if check_readiness and not self.ready: readiness_error = ReadinessError(self._readiness_error_message) readiness_error.readiness_dict = self._readiness_dict raise readiness_error return False, None def _run( self, /, raise_run_exceptions: bool, run_exception_kwargs: dict, run_finally_kwargs: dict, finish_run_kwargs: dict, *args, **kwargs, ) -> Any | tuple | Future: """ What happens while the status is :attr:`running`, namely invoking :meth:`self.on_run` using :attr:`self.run_args`, either locally or on an executor. Args: executor (concurrent.futures.Executor|None): Optionally, executor on which to run. raise_run_exceptions (bool): Whether to raise encountered exceptions. Returns: (Any | Future): The result of :meth:`on_run`, or a futures object from the executor. """ on_run_args, on_run_kwargs = self.run_args if "self" in on_run_kwargs: raise ValueError( f"{self.label} got 'self' as a run kwarg, but self is already the " f"first positional argument passed to :meth:`on_run`." ) if self.executor is None: try: run_output = self.on_run(*on_run_args, **on_run_kwargs) except (Exception, KeyboardInterrupt) as e: self._run_exception(**run_exception_kwargs) self._run_finally(**run_finally_kwargs) if raise_run_exceptions: raise e else: run_output = None return self._finish_run( run_output, raise_run_exceptions=raise_run_exceptions, run_exception_kwargs=run_exception_kwargs, run_finally_kwargs=run_finally_kwargs, unique_executor=None, **finish_run_kwargs, ) else: if isinstance(self.executor, StdLibExecutor): executor = self.executor unique_executor = False else: creator, args, kwargs = self.executor executor = creator(*args, **kwargs) if not isinstance(executor, StdLibExecutor): raise TypeError( f"Expected an instance of {StdLibExecutor}, but got " f"{type(executor)} from executor creation instructions " f"{self.executor}." ) unique_executor = True submit_function = ( self._thread_pool_run if isinstance(executor, ThreadPoolExecutor) else self.on_run ) self.future = executor.submit( submit_function, *on_run_args, **on_run_kwargs ) self.future.add_done_callback( partial( self._finish_run, raise_run_exceptions=raise_run_exceptions, run_exception_kwargs=run_exception_kwargs, run_finally_kwargs=run_finally_kwargs, unique_executor=executor if unique_executor else None, **finish_run_kwargs, ) ) return self.future def _run_exception(self, /, *args, **kwargs): """ What to do if an exception is encountered inside :meth:`_run` or :meth:`_finish_run. """ self.running = False self.failed = True def _run_finally(self, /, *args, **kwargs): """ What to do after :meth:`_finish_run` (whether an exception is encountered or not), or in :meth:`_run` after an exception is encountered. """ def _finish_run( self, run_output: tuple | Future, /, raise_run_exceptions: bool, run_exception_kwargs: dict, run_finally_kwargs: dict, unique_executor: StdLibExecutor | None, **kwargs, ) -> Any | tuple | None: """ Switch the status, then process and return the run result. """ self.running = False try: if isinstance(run_output, Future): run_output = run_output.result() self.future = None if unique_executor: # executorlib-1.8.2+ cancels futures on `shutdown` # To avoid recursively invoking this callback, delay shutdown # until the callback is complete. threading.Thread( target=unique_executor.shutdown, kwargs={"wait": True}, daemon=True, ).start() return self.process_run_result(run_output) except Exception as e: self._run_exception(**run_exception_kwargs) if raise_run_exceptions: raise e return None finally: self._run_finally(**run_finally_kwargs) def _thread_pool_run(self, *args, **kwargs): """ A poor attempt at avoiding (probably) thread races """ result = self.on_run(*args, **kwargs) sleep(self._thread_pool_sleep_time) return result @property def _readiness_error_message(self) -> str: return ( f"{self.label} received a run command but is not ready. The runnable " f"should be neither running nor failed.\n" + self.readiness_report ) def __getstate__(self): state = super().__getstate__() state["future"] = None # Don't pass the future -- with the future in the state things work fine for # the simple pyiron_workflow.executors.CloudpickleProcessPoolExecutor, but for # the more complex executorlib.Executor we're getting: # TypeError: cannot pickle '_thread.RLock' object if isinstance(self._executor, StdLibExecutor): state["_executor"] = None # Don't pass actual executors, they have an unserializable thread lock on them # _but_ if the user is just passing instructions on how to _build_ an executor, # we'll trust that those serialize OK (this way we can, hopefully, eventually # support nesting executors!) return state