pyiron_workflow.executors.cloudpickleprocesspool module

class pyiron_workflow.executors.cloudpickleprocesspool.CloudLoadsFuture[source]

Bases: Future

result(timeout=None)[source]

Return the result of the call that the future represents.

Parameters:

timeout – The number of seconds to wait for the result if the future isn’t done. If None, then there is no limit on the wait time.

Returns:

The result of the call that the future represents.

Raises:
  • CancelledError – If the future was cancelled.

  • TimeoutError – If the future didn’t finish executing before the given timeout.

  • Exception – If the call raised then that exception will be raised.

class pyiron_workflow.executors.cloudpickleprocesspool.CloudpickleProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), *, max_tasks_per_child=None)[source]

Bases: ProcessPoolExecutor

This class wraps concurrent.futures.ProcessPoolExecutor such that the submitted callable, its arguments, and its return value are all pickled using cloudpickle. In this way, the executor extends support to all objects which are cloud-pickleable, e.g. dynamically defined or decorated classes.

To accomplish this, the underlying concurrent.futures.Future class used is replaced with our CloudLoadsFuture, which is identical except that calls to result() will first try to cloudpickle.loads() and bytes results found.

Examples

Consider a class created from a function dynamically with a decorator. These are not normally pickleable, so in this example we should how this class allows us to submit a method from such a class, that both takes as an argument and returns such an unpickleable class. Actions such as registering callbacks and waiting for results behave just like normal.

>>> from functools import partialmethod
>>>
>>> from pyiron_workflow.executors.cloudpickleprocesspool import (
...     CloudpickleProcessPoolExecutor
... )
>>>
>>> class Foo:
...     '''
...     A base class to be dynamically modified for testing our executor.
...     '''
...     def __init__(self, fnc: callable):
...         self.fnc = fnc
...         self.result = None
...
...     @property
...     def run(self):
...         return self.fnc
...
...     def process_result(self, future):
...         self.result = future.result()
>>>
>>>
>>> def dynamic_foo():
...     '''
...     A decorator for dynamically modifying the Foo class.
...
...     Overrides the `fnc` input of `Foo` with the decorated function.
...     '''
...     def as_dynamic_foo(fnc: callable):
...         return type(
...             "DynamicFoo",
...             (Foo,),  # Define parentage
...             {
...                 "__init__": partialmethod(
...                     Foo.__init__,
...                     fnc
...                 )
...             },
...         )
...
...     return as_dynamic_foo
>>>
>>> @dynamic_foo()
... def UnpicklableCallable(unpicklable_arg):
...     unpicklable_arg.result = "This was an arg"
...     return unpicklable_arg
>>>
>>>
>>> instance = UnpicklableCallable()
>>> arg = UnpicklableCallable()
>>> executor = CloudpickleProcessPoolExecutor()
>>> fs = executor.submit(instance.run, arg)
>>> fs.add_done_callback(instance.process_result)
>>> print(fs.done())
False
>>> print(fs.result().__class__.__name__)
DynamicFoo
>>> print(fs.done())
True
>>> import time
>>> time.sleep(1)  # Debugging doctest on github CI for python3.10
>>> print(instance.result.result)
This was an arg
submit(fn, /, *args, **kwargs)[source]

Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable.

Returns:

A Future representing the given call.