Source code for pyiron_workflow.executors.cloudpickleprocesspool

import warnings
from collections.abc import Callable
from concurrent.futures import Future, ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool, _global_shutdown, _WorkItem
from sys import version_info

import cloudpickle


[docs] class CloudLoadsFuture(Future):
[docs] def result(self, timeout=None): result = super().result(timeout=timeout) if isinstance(result, bytes): result = cloudpickle.loads(result) return result
class _CloudPickledCallable: def __init__(self, fnc: Callable): self.fnc_serial = cloudpickle.dumps(fnc) def __call__(self, /, dumped_args, dumped_kwargs): fnc = cloudpickle.loads(self.fnc_serial) args = cloudpickle.loads(dumped_args) kwargs = cloudpickle.loads(dumped_kwargs) return cloudpickle.dumps(fnc(*args, **kwargs)) @classmethod def dumps(cls, stuff): return cloudpickle.dumps(stuff)
[docs] class CloudpickleProcessPoolExecutor(ProcessPoolExecutor): """ This class wraps :class:`concurrent.futures.ProcessPoolExecutor` such that the submitted callable, its arguments, and its return value are all pickled using :mod:`cloudpickle`. In this way, the executor extends support to all objects which are cloud-pickleable, e.g. dynamically defined or decorated classes. To accomplish this, the underlying :class:`concurrent.futures.Future` class used is replaced with our :class:`CloudLoadsFuture`, which is identical except that calls to :meth:`result()` will first try to :func:`cloudpickle.loads` and `bytes` results found. Examples: Consider a class created from a function dynamically with a decorator. These are not normally pickleable, so in this example we should how this class allows us to submit a method from such a class, that both takes as an argument and returns such an unpickleable class. Actions such as registering callbacks and waiting for results behave just like normal. >>> from functools import partialmethod >>> >>> from pyiron_workflow.executors.cloudpickleprocesspool import ( ... CloudpickleProcessPoolExecutor ... ) >>> >>> class Foo: ... ''' ... A base class to be dynamically modified for testing our executor. ... ''' ... def __init__(self, fnc: callable): ... self.fnc = fnc ... self.result = None ... ... @property ... def run(self): ... return self.fnc ... ... def process_result(self, future): ... self.result = future.result() >>> >>> >>> def dynamic_foo(): ... ''' ... A decorator for dynamically modifying the Foo class. ... ... Overrides the `fnc` input of `Foo` with the decorated function. ... ''' ... def as_dynamic_foo(fnc: callable): ... return type( ... "DynamicFoo", ... (Foo,), # Define parentage ... { ... "__init__": partialmethod( ... Foo.__init__, ... fnc ... ) ... }, ... ) ... ... return as_dynamic_foo >>> >>> @dynamic_foo() ... def UnpicklableCallable(unpicklable_arg): ... unpicklable_arg.result = "This was an arg" ... return unpicklable_arg >>> >>> >>> instance = UnpicklableCallable() >>> arg = UnpicklableCallable() >>> executor = CloudpickleProcessPoolExecutor() >>> fs = executor.submit(instance.run, arg) >>> fs.add_done_callback(instance.process_result) >>> print(fs.done()) False >>> print(fs.result().__class__.__name__) DynamicFoo >>> print(fs.done()) True >>> import time >>> time.sleep(1) # Debugging doctest on github CI for python3.10 >>> print(instance.result.result) This was an arg """
[docs] def submit(self, fn, /, *args, **kwargs): return self._submit( _CloudPickledCallable(fn), _CloudPickledCallable.dumps(args), _CloudPickledCallable.dumps(kwargs), )
submit.__doc__ = ProcessPoolExecutor.submit.__doc__ def _submit(self, fn, /, *args, **kwargs): """ We override the regular `concurrent.futures.ProcessPoolExecutor` to use our custom future that unpacks cloudpickled results. This approach is simple, but the brute-force nature of it means we manually accommodate different implementations of `ProcessPoolExecutor` in different python versions. """ if version_info.major != 3: # noqa: UP036 raise RuntimeError( f"{self.__class__} is only built for python3, but got " f"{version_info.major}" ) if version_info.minor == 8: return self._submit_3_8(fn, *args, **kwargs) elif version_info.minor >= 9: return self._submit_3_gt9(fn, *args, **kwargs) else: raise RuntimeError( f"{self.__class__} is only built for python 3.8+, but got " f"{version_info.major}.{version_info.minor}." ) def _submit_3_gt9(self, fn, /, *args, **kwargs): with self._shutdown_lock: if self._broken: raise BrokenProcessPool(self._broken) if self._shutdown_thread: raise RuntimeError("cannot schedule new futures after shutdown") if _global_shutdown: raise RuntimeError( "cannot schedule new futures after " "interpreter shutdown" ) f = CloudLoadsFuture() w = _WorkItem(f, fn, args, kwargs) self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread self._executor_manager_thread_wakeup.wakeup() if self._safe_to_dynamically_spawn_children: self._adjust_process_count() self._start_executor_manager_thread() return f def _submit_3_8(*args, **kwargs): if len(args) >= 2: self, fn, *args = args elif not args: raise TypeError( "descriptor 'submit' of 'ProcessPoolExecutor' object " "needs an argument" ) elif "fn" in kwargs: fn = kwargs.pop("fn") self, *args = args warnings.warn( "Passing 'fn' as keyword argument is deprecated", DeprecationWarning, stacklevel=2, ) else: raise TypeError( "submit expected at least 1 positional argument, got {len(args) - 1}" ) with self._shutdown_lock: if self._broken: raise BrokenProcessPool(self._broken) if self._shutdown_thread: raise RuntimeError("cannot schedule new futures after shutdown") if _global_shutdown: raise RuntimeError( "cannot schedule new futures after " "interpreter shutdown" ) f = CloudLoadsFuture() w = _WorkItem(f, fn, args, kwargs) self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread self._queue_management_thread_wakeup.wakeup() self._start_queue_management_thread() return f