"""
Classes for "lexical" reasoning.
The motivation here is to be able to provide the object with a unique identifier
in the context of other lexical objects. Each object may have at most one parent,
while lexical parents may have an arbitrary number of children, and each child's name
must be unique in the scope of that parent. In this way, when lexical parents are also
themselves lexical, we can build a path from the parent-most object to any child that
is completely unique. The typical filesystem on a computer is an excellent
example and fulfills our requirements, the only reason we depart from it is so that
we are free to have objects stored in different locations (possibly even on totally
different drives or machines) belong to the same lexical group.
"""
from __future__ import annotations
import contextlib
from abc import ABC, abstractmethod
from difflib import get_close_matches
from pathlib import Path
from typing import ClassVar, Generic, TypeVar
from bidict import bidict
from pyiron_workflow.logging import logger
from pyiron_workflow.mixin.has_interface_mixins import HasLabel, UsesState
ParentType = TypeVar("ParentType", bound="LexicalParent")
[docs]
class Lexical(UsesState, HasLabel, Generic[ParentType], ABC):
"""
An object with a unique lexical path.
The lexical parent object (if any), and the parent-most object are both easily
accessible.
"""
lexical_delimiter: ClassVar[str] = "/"
def __init__(
self,
*args,
label: str | None = None,
parent: ParentType | None = None,
**kwargs,
):
self._label = ""
self._parent: ParentType | None = None
self._detached_parent_path = None
self.label = self.__class__.__name__ if label is None else label
self.parent = parent
super().__init__(*args, **kwargs)
def _check_label(self, new_label: str) -> None:
super()._check_label(new_label)
if self.lexical_delimiter in new_label:
raise ValueError(
f"Lexical delimiter {self.lexical_delimiter} cannot be in new label "
f"{new_label}"
)
@property
def parent(self) -> ParentType | None:
return self._parent
@parent.setter
def parent(self, new_parent: ParentType | None) -> None:
self._set_parent(new_parent)
def _set_parent(self, new_parent: ParentType | None):
"""
mypy is uncooperative with super calls for setters, so we pull the behaviour
out.
"""
if new_parent is self._parent:
# Exit early if nothing is changing
return
self._detached_parent_path = None
if self.parent is not None and self in self.parent.children.values():
old_parent = self.parent
self._parent = None
old_parent.remove_child(self)
_ensure_path_is_not_cyclic(new_parent, self)
self._parent = new_parent
if new_parent is not None and self not in new_parent.children.values():
new_parent.add_child(self)
@property
def lexical_path(self) -> str:
"""
The path of node labels from the graph root (parent-most node) down to this
node.
"""
prefix: str
if self.parent is None and self.detached_parent_path is None:
prefix = ""
elif self.parent is None and self.detached_parent_path is not None:
prefix = self.detached_parent_path
elif self.parent is not None and self.detached_parent_path is None:
if isinstance(self.parent, Lexical):
prefix = self.parent.lexical_path
else:
prefix = self.lexical_delimiter + self.parent.label
else:
raise ValueError(
f"The parent and detached path should not be able to take non-None "
f"values simultaneously, but got {self.parent} and "
f"{self.detached_parent_path}, respectively. Please raise an issue on "
f"GitHub outlining how your reached this state."
)
return prefix + self.lexical_delimiter + self.label
@property
def detached_parent_path(self) -> str | None:
"""
The get/set state cycle of :class:`Lexical` de-parents objects, but we may
still be interested in the lexical path -- e.g. if we `pickle` dump and load
the object we will lose parent information, but this will still hold what the
path _was_ before the orphaning process.
The detached path will get cleared if a new parent is set, but is otherwise
used as the root for the purposes of finding the lexical path.
"""
return self._detached_parent_path
@property
def full_label(self) -> str:
"""
A shortcut that combines the lexical path and label into a single string.
"""
return self.lexical_path
@property
def lexical_root(self) -> Lexical:
"""The parent-most object in this lexical path; may be self."""
if isinstance(self.parent, Lexical):
return self.parent.lexical_root
else:
return self
[docs]
def as_path(self, root: Path | str | None = None, mkdir: bool = False) -> Path:
"""
The lexical path as a :class:`pathlib.Path`, with a filesystem :param:`root`
(default is the current working directory).
"""
path = (Path.cwd() if root is None else Path(root)).joinpath(
*self.lexical_path.split(self.lexical_delimiter)
)
if mkdir:
path.mkdir(parents=True, exist_ok=True)
return path
[docs]
def clean_path(
self,
root: Path | str | None = None,
clean_parents: bool = True,
remove_files: bool = False,
) -> None:
"""
Recursively remove this object's directory and optionally its ancestors.
Unless :param:`remove_files` is True, this will only remove empty directories.
If non-lexical directories are present, they will always block removal.
If this object is itself a lexical parent, it will run cleaning recursively on
its children.
Args:
root: Base path from which to resolve the lexical path.
clean_parents: If True, also attempt to remove parent directories.
remove_files: If True, delete files within the directory before removal.
"""
directory = self.as_path(root)
if isinstance(self, LexicalParent):
for child in self.children.values():
child.clean_path(
root=root, clean_parents=False, remove_files=remove_files
)
if remove_files and directory.is_dir():
for item in directory.iterdir():
if item.is_file():
item.unlink()
with contextlib.suppress(OSError): # If it's not empty just move on
directory.rmdir()
if (
clean_parents
and self.parent is not None
and isinstance(self.parent, Lexical)
):
self.parent.clean_path(
root=root, clean_parents=clean_parents, remove_files=remove_files
)
def __getstate__(self):
state = super().__getstate__()
if self.parent is not None:
state["_detached_parent_path"] = self.parent.lexical_path
state["_parent"] = None
# Regarding removing parent from state:
# Basically we want to avoid recursion during (de)serialization; when the
# parent object is deserializing itself, _it_ should know who its children are
# and inform them of this.
# In the case the object gets passed to another process using __getstate__,
# this also avoids dragging our whole lexical parent graph along with us.
return state
[docs]
class CyclicPathError(ValueError):
"""
To be raised when adding a child would result in a cyclic lexical path.
"""
[docs]
class AlreadyHasParentError(ValueError):
"""
To be raised when parenting an already-parented child.
"""
ChildType = TypeVar("ChildType", bound=Lexical)
[docs]
class LexicalParent(HasLabel, Generic[ChildType], ABC):
"""
A labeled object with a collection of uniquely-named lexical children.
Children should be added or removed via the :meth:`add_child` and
:meth:`remove_child` methods and _not_ by direct manipulation of the
:attr:`children` container.
Children are dot-accessible and appear in :meth:`__dir__` for tab-completion.
Iterating over the parent yields the children, and the length of the parent is
the number of children.
When adding children or assigning parents, a check is performed on the lexical
path to forbid cyclic paths.
"""
def __init__(
self,
*args,
strict_naming: bool = True,
**kwargs,
):
self._children: bidict[str, ChildType] = bidict()
self.strict_naming = strict_naming
super().__init__(*args, **kwargs)
[docs]
@classmethod
@abstractmethod
def child_type(cls) -> type[ChildType]:
# Dev note: In principle, this could be a regular attribute
# However, in other situations this is precluded (e.g. in channels)
# since it would result in circular references.
# Here we favour consistency over brevity,
# and maintain the X_type() class method pattern
pass
@property
def children(self) -> bidict[str, ChildType]:
return self._children
@property
def child_labels(self) -> tuple[str]:
return tuple(child.label for child in self)
def _check_label(self, new_label: str) -> None:
super()._check_label(new_label)
if self.child_type().lexical_delimiter in new_label:
raise ValueError(
f"Child type ({self.child_type()}) lexical delimiter "
f"{self.child_type().lexical_delimiter} cannot be in new label "
f"{new_label}"
)
def __getattr__(self, key) -> ChildType:
try:
return self._children[key]
except KeyError as key_error:
# Raise an attribute error from getattr to make sure hasattr works well!
msg = f"Could not find attribute '{key}' on {self.label} "
msg += f"({self.__class__.__name__}) or among its children "
msg += f"({self._children.keys()})."
matches = get_close_matches(key, self._children.keys(), cutoff=0.8)
if len(matches) > 0:
msg += f" Did you mean '{matches[0]}' and not '{key}'?"
raise AttributeError(msg) from key_error
def __iter__(self):
return self.children.values().__iter__()
def __len__(self) -> int:
return len(self.children)
def __dir__(self):
return set(super().__dir__() + list(self.children.keys()))
[docs]
def add_child(
self,
child: ChildType,
label: str | None = None,
strict_naming: bool | None = None,
) -> ChildType:
"""
Add a child, optionally assigning it a new label in the process.
Args:
child (ChildType): The child to add.
label (str|None): A (potentially) new label to assign the child. (Default
is None, leave the child's label alone.)
strict_naming (bool|None): Whether to append a suffix to the label if
another child is already held with the same label. (Default is None,
use the class-level flag.)
Returns:
(ChildType): The child being added.
Raises:
TypeError: When the child is not of an allowed class.
ValueError: When the child has a different parent already.
AttributeError: When the label is already an attribute (but not a child).
AttributeError: When the label conflicts with another child and
`strict_naming` is true.
"""
if not isinstance(child, self.child_type()):
raise TypeError(
f"{self.label} expected a new child of type {self.child_type()} "
f"but got {child}"
)
self._ensure_child_has_no_other_parent(child)
_ensure_path_is_not_cyclic(self, child)
label = child.label if label is None else label
strict_naming = self.strict_naming if strict_naming is None else strict_naming
if self._this_child_is_already_at_this_label(child, label):
pass
else:
label = self._get_unique_label(label, strict_naming)
if self._this_child_is_already_at_a_different_label(child, label):
self.children.inv.pop(child)
# Finally, update label and reflexively form the parent-child relationship
child.label = label
self.children[child.label] = child
child.parent = self
return child
def _ensure_child_has_no_other_parent(self, child: Lexical) -> None:
if child.parent is not None and child.parent is not self:
raise AlreadyHasParentError(
f"The child ({child.label}) already belongs to the parent "
f"{child.parent.label}. Please remove it there before trying to "
f"add it to this parent ({self.label})."
)
def _this_child_is_already_at_this_label(self, child: Lexical, label: str) -> bool:
return (
label == child.label
and label in self.child_labels
and self.children[label] is child
)
def _this_child_is_already_at_a_different_label(self, child, label) -> bool:
return child.parent is self and label != child.label
def _get_unique_label(self, label: str, strict_naming: bool) -> str:
if label in self.__dir__():
if label in self.child_labels:
if strict_naming:
raise AttributeError(
f"{label} is already the label for a child. Please remove it "
f"before assigning another child to this label."
)
else:
label = self._add_suffix_to_label(label)
else:
raise AttributeError(
f"{label} is an attribute or method of the {self.__class__} class, "
f"and cannot be used as a child label."
)
return label
def _add_suffix_to_label(self, label: str) -> str:
i = 0
new_label = label
while new_label in self.__dir__():
# We search dir and not just the child_labels for the edge case that
# someone has a very label-like attribute
new_label = f"{label}{i}"
i += 1
if new_label != label:
logger.info(
f"{label} is already a node; appending an index to the "
f"node label instead: {new_label}"
)
return new_label
[docs]
def remove_child(self, child: ChildType | str) -> ChildType:
if isinstance(child, str):
child_instance = self.children.pop(child)
elif isinstance(child, self.child_type()):
self.children.inv.pop(child)
child_instance = child
else:
raise TypeError(
f"{self.label} expected to remove a child of type str or "
f"{self.child_type()} but got {child}"
)
child_instance.parent = None
return child_instance
def __getstate__(self):
state = super().__getstate__()
# Remove the children from the state and store each element right in the state
# -- the labels are guaranteed to not be attributes already so this is safe,
# and it makes sure that the state path matches the lexical path
del state["_children"]
state["child_labels"] = self.child_labels
for child in self:
state[child.label] = child
return state
def __setstate__(self, state):
# Reconstruct children from state
# Remove them from the state as you go, so they don't hang around in the
# __dict__ after we set state -- they were only there to start with to guarantee
# that the state path and the lexical path matched (i.e. without ".children."
# in between)
state["_children"] = bidict(
{label: state.pop(label) for label in state.pop("child_labels")}
)
super().__setstate__(state)
self._children = bidict(self._children)
# Children purge their parent information in their __getstate__. This avoids
# recursion, so we don't need to ship an entire graph off to a second process,
# but rather can send just the requested object and its scope (lexical
# children). So, now return their parent to them:
for child in self:
child.parent = self
def _ensure_path_is_not_cyclic(parent, child: Lexical) -> None:
if isinstance(parent, Lexical) and parent.lexical_path.startswith(
child.lexical_path + child.lexical_delimiter
):
raise CyclicPathError(
f"{parent.label} cannot be the parent of {child.label}, because its "
f"lexical path is already in {child.label}'s path and cyclic paths "
f"are not allowed. (i.e. {child.lexical_path} is in "
f"{parent.lexical_path})"
)