pyiron_workflow.executors.cloudpickleprocesspool module
- class pyiron_workflow.executors.cloudpickleprocesspool.CloudLoadsFuture[source]
Bases:
Future- result(timeout=None)[source]
Return the result of the call that the future represents.
- Parameters:
timeout – The number of seconds to wait for the result if the future isn’t done. If None, then there is no limit on the wait time.
- Returns:
The result of the call that the future represents.
- Raises:
CancelledError – If the future was cancelled.
TimeoutError – If the future didn’t finish executing before the given timeout.
Exception – If the call raised then that exception will be raised.
- class pyiron_workflow.executors.cloudpickleprocesspool.CloudpickleProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), *, max_tasks_per_child=None)[source]
Bases:
ProcessPoolExecutorThis class wraps
concurrent.futures.ProcessPoolExecutorsuch that the submitted callable, its arguments, and its return value are all pickled usingcloudpickle. In this way, the executor extends support to all objects which are cloud-pickleable, e.g. dynamically defined or decorated classes.To accomplish this, the underlying
concurrent.futures.Futureclass used is replaced with ourCloudLoadsFuture, which is identical except that calls toresult()will first try tocloudpickle.loads()and bytes results found.Examples
Consider a class created from a function dynamically with a decorator. These are not normally pickleable, so in this example we should how this class allows us to submit a method from such a class, that both takes as an argument and returns such an unpickleable class. Actions such as registering callbacks and waiting for results behave just like normal.
>>> from functools import partialmethod >>> >>> from pyiron_workflow.executors.cloudpickleprocesspool import ( ... CloudpickleProcessPoolExecutor ... ) >>> >>> class Foo: ... ''' ... A base class to be dynamically modified for testing our executor. ... ''' ... def __init__(self, fnc: callable): ... self.fnc = fnc ... self.result = None ... ... @property ... def run(self): ... return self.fnc ... ... def process_result(self, future): ... self.result = future.result() >>> >>> >>> def dynamic_foo(): ... ''' ... A decorator for dynamically modifying the Foo class. ... ... Overrides the `fnc` input of `Foo` with the decorated function. ... ''' ... def as_dynamic_foo(fnc: callable): ... return type( ... "DynamicFoo", ... (Foo,), # Define parentage ... { ... "__init__": partialmethod( ... Foo.__init__, ... fnc ... ) ... }, ... ) ... ... return as_dynamic_foo >>> >>> @dynamic_foo() ... def UnpicklableCallable(unpicklable_arg): ... unpicklable_arg.result = "This was an arg" ... return unpicklable_arg >>> >>> >>> instance = UnpicklableCallable() >>> arg = UnpicklableCallable() >>> executor = CloudpickleProcessPoolExecutor() >>> fs = executor.submit(instance.run, arg) >>> fs.add_done_callback(instance.process_result) >>> print(fs.done()) False
>>> print(fs.result().__class__.__name__) DynamicFoo
>>> print(fs.done()) True
>>> import time >>> time.sleep(1) # Debugging doctest on github CI for python3.10 >>> print(instance.result.result) This was an arg