{ "cells": [ { "metadata": {}, "cell_type": "markdown", "source": [ "# Ontological validation in `pyiron_workflow`\n", "\n", "In `pyiron_workflow`, we leverage pyiron's ontological validation of workflow graphs found in `semantikon`. We strive to support every syntactic aspect of `semantikon`, which you can read more about on [the GitHub repository](https://github.com/pyiron/semantikon).\n", "\n", "This is accomplished practically by wrapping function IO (or dataclass nodes) with the `semantikon.metadata.u` call. Here, we demonstrate a variety of use-cases, and show how this functionality can be taken further to get data type- and ontologically-valid suggestions for new connections in your workflow, or new nodes to add to it.\n", "\n", "Note that ontological typing _only_ works when there's a parent object around (a macro or a workflow)." ], "id": "78e57715fcc8beea" }, { "metadata": {}, "cell_type": "markdown", "source": [ "# Ontological connection checking\n", "\n", "Some success and failure cases in the presence or absence of ontological hints. A key takeway is that these hints function very much like `pyiron_workflow`'s regular type hinting: if hints are present on both sides of a new connection, they _must_ be valid, but if one or both sides are missing the hint we skip the validation." ], "id": "8b0b5e89a462c4f" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:35.510945Z", "start_time": "2026-04-30T16:35:34.079168Z" } }, "cell_type": "code", "source": [ "import rdflib\n", "\n", "from semantikon.metadata import u, SemantikonURI\n", "\n", "import pyiron_workflow as pwf\n", "from pyiron_workflow import suggest\n", "from pyiron_workflow.channels import ChannelConnectionError\n", "from pyiron_workflow.knowledge import validate_workflow\n", "from pyiron_workflow.nodes.composite import FailedChildError\n", "\n", "\n", "EX = rdflib.Namespace(\"http://www.example.org/\")\n", "\n", "class Meal: ...\n", "\n", "class Garbage: ...\n", "\n", "@pwf.as_function_node(\"pizza\")\n", "def prepare_pizza() -> u(Meal, uri=EX.Pizza):\n", " return Meal()\n", "\n", "@pwf.as_function_node(\"unidentified_meal\")\n", "def prepare_non_ontological_meal() -> Meal:\n", " return Meal()\n", "\n", "@pwf.as_function_node(\"rice\")\n", "def prepare_rice() -> u(Meal, uri=EX.Rice):\n", " return Meal()\n", "\n", "@pwf.as_function_node(\"garbage\")\n", "def prepare_garbage() -> u(Garbage, uri=EX.Garbage):\n", " return Garbage()\n", "\n", "@pwf.as_function_node(\"garbage\")\n", "def prepare_unhinted_garbage():\n", " return Garbage()\n", "\n", "@pwf.as_function_node(\"verdict\")\n", "def eat(meal: u(Meal, uri=EX.Meal)) -> str:\n", " return f\"Yummy {meal.__class__.__name__} meal\"\n", "\n", "@pwf.as_function_node(\"verdict\")\n", "def eat_pizza(meal: u(Meal, uri=EX.Pizza)) -> str:\n", " return f\"Yummy {meal.__class__.__name__} pizza\"" ], "id": "996ce3d6152f1beb", "outputs": [], "execution_count": 1 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Both fully hinted\n", "\n", "Works fine" ], "id": "cf1e5e273013acb1" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:35.747151Z", "start_time": "2026-04-30T16:35:35.512041Z" } }, "cell_type": "code", "source": [ "wf = pwf.Workflow(\"ontoflow\")\n", "wf.make = prepare_pizza()\n", "wf.eat = eat_pizza(wf.make)\n", "wf()" ], "id": "807c93a2f4436b4f", "outputs": [ { "data": { "text/plain": [ "{'eat__verdict': 'Yummy Meal pizza'}" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 2 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Upstream type hint is missing\n", "\n", "Standard `pyiron_workflow` typing behaviour: we are allowed to form the connection (since the source has no hint), but at runtime, we will fail when we try to actually assign the value" ], "id": "b8e91ece9a45bf7f" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:35.772357Z", "start_time": "2026-04-30T16:35:35.748437Z" } }, "cell_type": "code", "source": [ "wf = pwf.Workflow(\"no_type\")\n", "wf.make = prepare_unhinted_garbage()\n", "wf.eat = eat_pizza(wf.make)\n", "try:\n", " wf.recovery = None\n", " wf()\n", "except FailedChildError as e:\n", " print(e)" ], "id": "3fde7d3acb113d85", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "/no_type encountered error in child: {'/no_type/eat.accumulate_and_run': TypeError(\"The channel /no_type/eat.meal cannot take the value `<__main__.Garbage object at 0x12a6ae600>` () because it is not compliant with the type hint typing.Annotated[__main__.Meal, ('uri', rdflib.term.URIRef('http://www.example.org/Pizza'))]\")}\n" ] } ], "execution_count": 3 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Upstream type hint is wrong\n", "\n", "Standard `pyiron_workflow` typing behaviour: we're not even allowed to form the connection -- the recipe would be invalid" ], "id": "9af06c9ae9a7512e" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:35.788350Z", "start_time": "2026-04-30T16:35:35.773560Z" } }, "cell_type": "code", "source": [ "wf = pwf.Workflow(\"no_type\")\n", "wf.make = prepare_garbage()\n", "try:\n", " wf.eat = eat_pizza(wf.make)\n", "except ChannelConnectionError as e:\n", " print(e)" ], "id": "a530586921463886", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The upstream channel /no_type/make.garbage cannot connect to the downstream channel /no_type/eat_pizza.meal because the upstream type hint (typing.Annotated[__main__.Garbage, ('uri', rdflib.term.URIRef('http://www.example.org/Garbage'))]) is not as or more specific than the downstream type hint (typing.Annotated[__main__.Meal, ('uri', rdflib.term.URIRef('http://www.example.org/Pizza'))]).\n" ] } ], "execution_count": 4 }, { "metadata": {}, "cell_type": "markdown", "source": "So far, so good: `u` decoration has no negative impact on the existing type hint checking procedures", "id": "5995ff2b2e009aee" }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Upstream ontological hint is missing\n", "\n", "New ontological behaviour: As with type hints, if one side is missing we just let things pass. Unlike type hints, we can also _execute_ the workflow, because the ontologies only impact the recipe-level behaviour, not the instance behaviour!" ], "id": "ca34d53309189d96" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:35.806595Z", "start_time": "2026-04-30T16:35:35.795881Z" } }, "cell_type": "code", "source": [ "wf = pwf.Workflow(\"no_ontology\")\n", "wf.make = prepare_non_ontological_meal()\n", "wf.eat = eat_pizza(wf.make)\n", "wf()" ], "id": "d6d324aff20c4e63", "outputs": [ { "data": { "text/plain": [ "{'eat__verdict': 'Yummy Meal pizza'}" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 5 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Upstream ontological hint is WRONG\n", "\n", "New ontological behaviour: new ontological type checking now prevents us from even forming the ontologically invalid connection!" ], "id": "87f8ae15183930fc" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:35.912040Z", "start_time": "2026-04-30T16:35:35.816564Z" } }, "cell_type": "code", "source": [ "wf = pwf.Workflow(\"failed_ontology\")\n", "wf.make = prepare_rice()\n", "try:\n", " wf.eat = eat_pizza(wf.make)\n", "except ChannelConnectionError as e:\n", " print(e)" ], "id": "316fea01813d2c64", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The upstream channel /failed_ontology/make.rice cannot connect to the downstream channel /failed_ontology/eat_pizza.meal because the upstream type hint (typing.Annotated[__main__.Meal, ('uri', rdflib.term.URIRef('http://www.example.org/Rice'))]) and downstream type hint (typing.Annotated[__main__.Meal, ('uri', rdflib.term.URIRef('http://www.example.org/Pizza'))]) produce a non-empty ontological validation report:\n", "(False, )>, 'Validation Report\\nConforms: False\\nResults (1):\\nConstraint Violation in ClassConstraintComponent (http://www.w3.org/ns/shacl#ClassConstraintComponent):\\n\\tSeverity: sh:Violation\\n\\tSource Shape: [ rdf:type sh:PropertyShape ; sh:class ; sh:path ]\\n\\tFocus Node: sns:c30f63af454c39d79ea4a3b85ab5da83_failed_ontology-make-outputs-rice_data\\n\\tValue Node: sns:c30f63af454c39d79ea4a3b85ab5da83_failed_ontology-make-outputs-rice_data_uri\\n\\tResult Path: \\n\\tMessage: Value does not have class \\n')\n" ] } ], "execution_count": 6 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Downstream ontological hint is less specific\n", "\n", "This should work fine..." ], "id": "5b883e1acf6aa7c4" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:35.999221Z", "start_time": "2026-04-30T16:35:35.913281Z" } }, "cell_type": "code", "source": [ "wf = pwf.Workflow(\"relaxed_ontology\")\n", "wf.make = prepare_rice()\n", "try:\n", " wf.eat = eat(wf.make)\n", "except ChannelConnectionError as e:\n", " print(e)" ], "id": "230dba9264a053e9", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The upstream channel /relaxed_ontology/make.rice cannot connect to the downstream channel /relaxed_ontology/eat.meal because the upstream type hint (typing.Annotated[__main__.Meal, ('uri', rdflib.term.URIRef('http://www.example.org/Rice'))]) and downstream type hint (typing.Annotated[__main__.Meal, ('uri', rdflib.term.URIRef('http://www.example.org/Meal'))]) produce a non-empty ontological validation report:\n", "(False, )>, 'Validation Report\\nConforms: False\\nResults (1):\\nConstraint Violation in ClassConstraintComponent (http://www.w3.org/ns/shacl#ClassConstraintComponent):\\n\\tSeverity: sh:Violation\\n\\tSource Shape: [ rdf:type sh:PropertyShape ; sh:class ; sh:path ]\\n\\tFocus Node: sns:1f1e624b8164164e2ecce02fdf5532b5_relaxed_ontology-make-outputs-rice_data\\n\\tValue Node: sns:1f1e624b8164164e2ecce02fdf5532b5_relaxed_ontology-make-outputs-rice_data_uri\\n\\tResult Path: \\n\\tMessage: Value does not have class \\n')\n" ] } ], "execution_count": 7 }, { "metadata": {}, "cell_type": "markdown", "source": [ "But! We forgot something! This form of failure is known from the `semantikon` notebook whence these demonstration workflow spring: we never informed the ontology that \"rice\" is a subclass of \"meal\"!\n", "\n", "We let the ontology know this by adding the corresponding triple to our `rdflib.Graph`. In `pyiron_workflow` we can manage this by pre-populating a `knowledge: rdflib.Graph` property on the graph root (i.e. top-most object) as follows:" ], "id": "9a332e6a04dc2bd2" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:36.203879Z", "start_time": "2026-04-30T16:35:36.008471Z" } }, "cell_type": "code", "source": [ "wf = pwf.Workflow(\"relaxed_ontology\")\n", "\n", "wf.knowledge = rdflib.Graph()\n", "wf.knowledge.add((EX.Rice, rdflib.RDFS.subClassOf, EX.Meal))\n", "\n", "wf.make = prepare_rice()\n", "wf.eat = eat(wf.make)\n", "wf()" ], "id": "cb2a13e6144e6378", "outputs": [ { "data": { "text/plain": [ "{'eat__verdict': 'Yummy Meal meal'}" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 8 }, { "metadata": {}, "cell_type": "markdown", "source": [ "# Ontological triples\n", "\n", "Alright, for our simple pizza example things are working beautifully. Let's try it with the clothes example. For output triples, we leverage the dual A-/T-box `SemantikonURI` wrapper." ], "id": "574ac2f3813504ca" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:36.224677Z", "start_time": "2026-04-30T16:35:36.205827Z" } }, "cell_type": "code", "source": [ "EX = rdflib.Namespace(\"http://www.example.org/\")\n", "\n", "uri_cleaned = SemantikonURI(EX.cleaned)\n", "uri_color = SemantikonURI(EX.color)\n", "\n", "class Clothes:\n", " pass\n", "\n", "@pwf.as_function_node\n", "def wash(clothes: u(Clothes, uri=EX.Clothes)) -> u(\n", " Clothes,\n", " uri=EX.Clothes,\n", " triples=(EX.hasProperty, uri_cleaned),\n", " derived_from=\"inputs.clothes\"\n", "):\n", " ...\n", " return clothes\n", "\n", "@pwf.as_function_node\n", "def dye(clothes: u(Clothes, uri=EX.Clothes), color=\"blue\") -> u(\n", " Clothes,\n", " uri=EX.Clothes,\n", " triples=(EX.hasProperty, uri_color),\n", " derived_from=\"inputs.clothes\",\n", "):\n", " ...\n", " return clothes\n", "\n", "@pwf.as_function_node\n", "def sell(\n", " clothes: u(\n", " Clothes,\n", " uri=EX.Clothes,\n", " restrictions=(\n", " ((rdflib.OWL.onProperty, EX.hasProperty), (rdflib.OWL.someValuesFrom, EX.cleaned)),\n", " ((rdflib.OWL.onProperty, EX.hasProperty), (rdflib.OWL.someValuesFrom, EX.color)),\n", " )\n", " )\n", ") -> int:\n", " price = 10\n", " return price" ], "id": "dceaab6f57226f07", "outputs": [], "execution_count": 9 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Now with `restrictions`\n", "\n", "In the base case, everything works fine. The restrictions are correctly parsed.\n", "\n", "Note that unlike the `semantikon` notebook, here we had to make sure that all the node inputs are also `u` annotated (even if it's just to trivially link the type to its ontology counterpart). This is because type checking only occurs in `pyiron_workflow` when _both_ sides of the connection are typed! We follow this rule for both standard data types and ontological types." ], "id": "e9bb559e718b4f66" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:36.695788Z", "start_time": "2026-04-30T16:35:36.232827Z" } }, "cell_type": "code", "source": [ "my_correct_wf = pwf.Workflow(\"my_correct_workflow\")\n", "my_correct_wf.dyed_clothes = dye(Clothes())\n", "my_correct_wf.washed_clothes = wash(my_correct_wf.dyed_clothes)\n", "my_correct_wf.money = sell(my_correct_wf.washed_clothes)\n", "my_correct_wf()" ], "id": "c163d51cd2c676c2", "outputs": [ { "data": { "text/plain": [ "{'money__price': 10}" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 10 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## As a macro\n", "\n", "This also works fine! Be careful though, here we've only demonstrated that it _can_ work for macros, and have not yet guaranteed it works for _all_ macros." ], "id": "8dd38830d50f1b73" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:37.293445Z", "start_time": "2026-04-30T16:35:36.697024Z" } }, "cell_type": "code", "source": [ "@pwf.as_macro_node\n", "def my_correct_macro(self, clothes: Clothes):\n", " self.dyed_clothes = dye(clothes)\n", " self.washed_clothes = wash(self.dyed_clothes)\n", " self.money = sell(self.washed_clothes)\n", " return self.money\n", "\n", "correct_m = my_correct_macro(Clothes())\n", "correct_m()" ], "id": "f6b77dff8a1e5b84", "outputs": [ { "data": { "text/plain": [ "{'money': 10}" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 11 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Trivial failure\n", "\n", "If we skip a step, our `sell` `restrictions` are not fulfilled, and we sensibly fail." ], "id": "c71478fc498bdb30" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:37.425655Z", "start_time": "2026-04-30T16:35:37.311603Z" } }, "cell_type": "code", "source": [ "my_wrong_wf = pwf.Workflow(\"my_wrong_workflow\")\n", "my_wrong_wf.washed_clothes = wash(Clothes())\n", "try:\n", " my_wrong_wf.money = sell(my_wrong_wf.washed_clothes)\n", "except ChannelConnectionError as e:\n", " print(e)" ], "id": "ef51d73b476d6a0b", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The upstream channel /my_wrong_workflow/washed_clothes.clothes cannot connect to the downstream channel /my_wrong_workflow/sell.clothes because the upstream type hint (typing.Annotated[__main__.Clothes, ('uri', rdflib.term.URIRef('http://www.example.org/Clothes'), 'triples', (rdflib.term.URIRef('http://www.example.org/hasProperty'), SemantikonURI('http://www.example.org/cleaned')), 'derived_from', 'inputs.clothes')]) and downstream type hint (typing.Annotated[__main__.Clothes, ('uri', rdflib.term.URIRef('http://www.example.org/Clothes'), 'restrictions', (((rdflib.term.URIRef('http://www.w3.org/2002/07/owl#onProperty'), rdflib.term.URIRef('http://www.example.org/hasProperty')), (rdflib.term.URIRef('http://www.w3.org/2002/07/owl#someValuesFrom'), rdflib.term.URIRef('http://www.example.org/cleaned'))), ((rdflib.term.URIRef('http://www.w3.org/2002/07/owl#onProperty'), rdflib.term.URIRef('http://www.example.org/hasProperty')), (rdflib.term.URIRef('http://www.w3.org/2002/07/owl#someValuesFrom'), rdflib.term.URIRef('http://www.example.org/color')))))]) produce a non-empty ontological validation report:\n", "(False, )>, 'Validation Report\\nConforms: False\\nResults (1):\\nConstraint Violation in QualifiedValueShapeConstraintComponent (http://www.w3.org/ns/shacl#QualifiedMinCountConstraintComponent):\\n\\tSeverity: sh:Violation\\n\\tSource Shape: [ rdf:type sh:PropertyShape ; sh:path ; sh:qualifiedMinCount Literal(\"1\", datatype=xsd:integer) ; sh:qualifiedValueShape [ sh:class ] ]\\n\\tFocus Node: sns:57c60cb96739908152ad395c29fb76e4_my_wrong_workflow-washed_clothes-outputs-clothes_data\\n\\tResult Path: \\n\\tMessage: Focus node does not conform to shape MinCount 1: [ sh:class ]\\n')\n" ] } ], "execution_count": 12 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Macro failure\n", "\n", "When we wrap the failing code as a macro, we don't fail until we try to instantiate that macro -- that is the first time the recipe code is evaluated and ontologically evaluated, at which point we fail at the connection formation just like in the workflow example.\n", "\n", "In the future, if we move to `pyiron_workflow` decorators first producing (and validating) `flowrep` recipes and _then_ using these to create `pyiron_workflow` node classes, we'd be able to nicely fail at the macro definition time instead!" ], "id": "9b3d6a2838caaf72" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:37.603855Z", "start_time": "2026-04-30T16:35:37.427083Z" } }, "cell_type": "code", "source": [ "@pwf.as_macro_node\n", "def my_wrong_macro(self, clothes: Clothes):\n", " self.washed_clothes = wash(clothes)\n", " self.money = sell(self.washed_clothes)\n", " return self.money\n", "\n", "try:\n", " my_wrong_macro()\n", "except ChannelConnectionError as e:\n", " print(e)" ], "id": "1520f248cef9296e", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The upstream channel /my_wrong_macro/washed_clothes.clothes cannot connect to the downstream channel /my_wrong_macro/sell.clothes because the upstream type hint (typing.Annotated[__main__.Clothes, ('uri', rdflib.term.URIRef('http://www.example.org/Clothes'), 'triples', (rdflib.term.URIRef('http://www.example.org/hasProperty'), SemantikonURI('http://www.example.org/cleaned')), 'derived_from', 'inputs.clothes')]) and downstream type hint (typing.Annotated[__main__.Clothes, ('uri', rdflib.term.URIRef('http://www.example.org/Clothes'), 'restrictions', (((rdflib.term.URIRef('http://www.w3.org/2002/07/owl#onProperty'), rdflib.term.URIRef('http://www.example.org/hasProperty')), (rdflib.term.URIRef('http://www.w3.org/2002/07/owl#someValuesFrom'), rdflib.term.URIRef('http://www.example.org/cleaned'))), ((rdflib.term.URIRef('http://www.w3.org/2002/07/owl#onProperty'), rdflib.term.URIRef('http://www.example.org/hasProperty')), (rdflib.term.URIRef('http://www.w3.org/2002/07/owl#someValuesFrom'), rdflib.term.URIRef('http://www.example.org/color')))))]) produce a non-empty ontological validation report:\n", "(False, )>, 'Validation Report\\nConforms: False\\nResults (1):\\nConstraint Violation in QualifiedValueShapeConstraintComponent (http://www.w3.org/ns/shacl#QualifiedMinCountConstraintComponent):\\n\\tSeverity: sh:Violation\\n\\tSource Shape: [ rdf:type sh:PropertyShape ; sh:path ; sh:qualifiedMinCount Literal(\"1\", datatype=xsd:integer) ; sh:qualifiedValueShape [ sh:class ] ]\\n\\tFocus Node: sns:8ff9ae536d3445386e46abebb053590c_my_wrong_macro-washed_clothes-outputs-clothes_data\\n\\tResult Path: \\n\\tMessage: Focus node does not conform to shape MinCount 1: [ sh:class ]\\n')\n" ] } ], "execution_count": 13 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Ruling things out with knowledge\n", "\n", "In our \"meal\" example, we saw that peer-to-peer edges must have commensurate URIs, and we used the workflow's `knowledge` attribute to indicate a subclass relationship.\n", "\n", "For parent-child edges negotiating subgraph IO, `semantikon` instead uses an open-world reasoning approach, such that URIs are not required to have a pre-existing relationship.\n", "For example, we can write a macro with our `wash` node, which both expects and returns a `uri=EX.Clothes` object, and interface with it using different URIs.\n", "This validates fine:" ], "id": "f134479c38defa25" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:37.926184Z", "start_time": "2026-04-30T16:35:37.605828Z" } }, "cell_type": "code", "source": [ "@pwf.as_macro_node\n", "def wash_something_macro(self, something_to_wash: u(Clothes, uri=EX.MaybeClothesMaybeNot)) -> u(Clothes, EX.SomethingGotWashed):\n", " self.washed_something = wash(something_to_wash)\n", " return self.washed_something\n", "\n", "wf = wash_something_macro()\n", "print(\"Valid: \", validate_workflow(wf)[0])" ], "id": "4fbf303662e530ea", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Valid: True\n" ] } ], "execution_count": 14 }, { "metadata": {}, "cell_type": "markdown", "source": [ "If we want, we can preclude such a graph from validating by providing explicit disjointness as contextual knowledge.\n", "This works for both the input and output edges independently, and is accomplished by the OWL `disjointWith` predicate:" ], "id": "938a54b1684bfdd9" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:38.120450Z", "start_time": "2026-04-30T16:35:37.927573Z" } }, "cell_type": "code", "source": [ "external_knowledge = rdflib.Graph()\n", "external_knowledge.add((EX.MaybeClothesMaybeNot, rdflib.OWL.disjointWith, EX.Clothes))\n", "external_knowledge.add((EX.MaybeClothesMaybeNot, rdflib.RDF.type, rdflib.OWL.Class))\n", "external_knowledge.add((EX.Clothes, rdflib.RDF.type, rdflib.OWL.Class))\n", "\n", "print(\"Valid with input disjointness: \", validate_workflow(wf, knowledge=external_knowledge)[0])\n", "\n", "external_knowledge = rdflib.Graph()\n", "external_knowledge.add((EX.SomethingGotWashed, rdflib.OWL.disjointWith, EX.Clothes))\n", "external_knowledge.add((EX.SomethingGotWashed, rdflib.RDF.type, rdflib.OWL.Class))\n", "external_knowledge.add((EX.Clothes, rdflib.RDF.type, rdflib.OWL.Class))\n", "\n", "print(\"Valid with output disjointness: \", validate_workflow(wf, knowledge=external_knowledge)[0])" ], "id": "24fa83719557a263", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Valid with input disjointness: False\n", "Valid with output disjointness: False\n" ] } ], "execution_count": 15 }, { "metadata": {}, "cell_type": "markdown", "source": [ "# Node suggestions\n", "\n", "One of the advantages of graph-based workflows with hinted IO channels is facilitating guided workflow creation. Given a hinted channel instance in the context of some workflow, we can ask for suggestions of other channels with which to form a connection in the same, sibling graph context:" ], "id": "c612162f836d4c76" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:38.212180Z", "start_time": "2026-04-30T16:35:38.121126Z" } }, "cell_type": "code", "source": [ "wf = pwf.Workflow(\"ontoflow\")\n", "wf.make = prepare_pizza()\n", "wf.eat = eat_pizza()\n", "suggestions = suggest.suggest_connections(wf.eat.inputs.meal)\n", "for (node, channel) in suggestions:\n", " print(node.full_label, channel.label)" ], "id": "7e01728625255664", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "/ontoflow/make pizza\n" ] } ], "execution_count": 16 }, { "metadata": {}, "cell_type": "markdown", "source": "Similarly, given a corpus of node classes, we can ask for which nodes have at least one commensurate input/output with which our channel might connect. After adding such a node to our graph, we can leverage the connection suggester to see which channel(s) are appropriate.", "id": "c22ef9a507bb2230" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:38.315458Z", "start_time": "2026-04-30T16:35:38.213418Z" } }, "cell_type": "code", "source": "suggest.suggest_nodes(wf.eat.inputs.meal, pwf.std.UserInput, prepare_pizza, wash)", "id": "4233ff3f954aa8ba", "outputs": [ { "data": { "text/plain": [ "[__main__.prepare_pizza]" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 17 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Suggestion limitations\n", "\n", "When searching for new upstream nodes to add, the current implementation only looks at the immediate node, and not possible trees of upstream nodes. Returning to our clothes example, we can see that there is no _single_ suggestion for the `sell` node, because it requires clothes that are both dyed _and_ coloured, but our other nodes only provide one of these at a time!" ], "id": "afac4e0b565c801e" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:38.339572Z", "start_time": "2026-04-30T16:35:38.317592Z" } }, "cell_type": "code", "source": [ "clothing_nodes = wash, dye, sell\n", "\n", "wf = pwf.Workflow(\"working_backwards\")\n", "wf.money = sell()\n", "suggest.suggest_nodes(wf.money, *clothing_nodes)" ], "id": "df8ba4cf778b75d3", "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 18 }, { "metadata": {}, "cell_type": "markdown", "source": "Of course working backwards a single step still works fine for lots of nodes, e.g. for `dye` we will take _anything_ that gives us clothes!", "id": "a32732a594de4540" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:38.678797Z", "start_time": "2026-04-30T16:35:38.340872Z" } }, "cell_type": "code", "source": [ "wf = pwf.Workflow(\"single_step_back\")\n", "wf.dyed_clothes = dye()\n", "suggest.suggest_nodes(wf.dyed_clothes, *clothing_nodes)" ], "id": "eb0517eb5d999901", "outputs": [ { "data": { "text/plain": [ "[__main__.wash, __main__.dye]" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 19 }, { "metadata": {}, "cell_type": "markdown", "source": "And when we look _downstream_ we have the advantage of knowing the entire upstream graph concretely, so there we are able to see options for fulfilling these more complex demands.", "id": "68c785add89b7f53" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:39.270340Z", "start_time": "2026-04-30T16:35:38.680104Z" } }, "cell_type": "code", "source": [ "wf = pwf.Workflow(\"downstream\")\n", "wf.dyed_clothes = dye(Clothes())\n", "wf.washed_clothes = wash(wf.dyed_clothes)\n", "suggestions = suggest.suggest_nodes(wf.washed_clothes, *clothing_nodes)\n", "assert(sell in suggestions)\n", "print(suggestions)" ], "id": "9e0064e26ce9e554", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[, , ]\n" ] } ], "execution_count": 20 }, { "metadata": {}, "cell_type": "markdown", "source": "When looking whether requirements are fulfilled, an input only cares that it's _own_ requirements are fulfilled; we may suggest upstream sources that wind up adding new restrictions on our terminal input. For instance, below `TakesDownstream` only cares that `GivesAndTakes` promises to supply something with the \"Downstream\" characteristic -- it doesn't mind that the `GivesAndTakes` input from which this output derive now further demands an \"Upstream\" characteristic:", "id": "38b026f5e536acdb" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:39.463899Z", "start_time": "2026-04-30T16:35:39.280299Z" } }, "cell_type": "code", "source": [ "uri_Upstream = SemantikonURI(EX.Upstream)\n", "uri_Downstream = SemantikonURI(EX.Downstream)\n", "\n", "@pwf.as_function_node\n", "def GivesUpstreamNeed(x) -> u(str, uri=EX.Data, triples=(EX.has, uri_Upstream)):\n", " return str(x)\n", "\n", "@pwf.as_function_node\n", "def GivesAndTakes(\n", " y: u(\n", " str,\n", " uri=EX.Data,\n", " restrictions=(\n", " (rdflib.OWL.onProperty, EX.has),\n", " (rdflib.OWL.someValuesFrom, EX.Upstream),\n", " ),\n", " ),\n", ") -> u(\n", " str,\n", " uri=EX.Data,\n", " derived_from=\"inputs.y\",\n", " triples=(EX.has, uri_Downstream)\n", "):\n", " return y\n", "\n", "@pwf.as_function_node\n", "def TakesDownstream(\n", " z: u(\n", " str,\n", " uri=EX.Data,\n", " restrictions=(\n", " (rdflib.OWL.onProperty, EX.has),\n", " (rdflib.OWL.someValuesFrom, EX.Downstream),\n", " ),\n", " ),\n", ") -> str:\n", " return z\n", "\n", "wf = pwf.Workflow(\"derived_restrictions\")\n", "wf.up = GivesUpstreamNeed()\n", "wf.middle = GivesAndTakes()\n", "wf.down = TakesDownstream()\n", "\n", "suggest.suggest_connections(wf.middle.outputs.y)" ], "id": "42fbb12d36c415d9", "outputs": [ { "data": { "text/plain": [ "[(<__main__.TakesDownstream at 0x13b3d6ff0>,\n", " )]" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 21 }, { "metadata": {}, "cell_type": "markdown", "source": [ "## Complex workflows\n", "\n", "Ontological validation is still a new feature, and you may find an edge case we haven't found and tested yet. In general, atomic function nodes and workflows/macro nodes should play nicely. Flow control nodes (for, while, etc.) and other node types are not uniformly supported and your milage may vary." ], "id": "9c3c3de6d21f80bd" }, { "metadata": {}, "cell_type": "markdown", "source": [ "# Units\n", "\n", "`semantikon` annotations also allow us to specify physical units. When present, these are included in the ontological validation just like the other ontological terms.\n", "\n", "As such, we have no problem making same-unit connections:" ], "id": "7cf5e95f5ef33175" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:39.657067Z", "start_time": "2026-04-30T16:35:39.465796Z" } }, "cell_type": "code", "source": [ "@pwf.as_function_node\n", "def Distance(x: u(float, units=\"meter\")) -> u(float, derived_from=\"inputs.x\"):\n", " return x\n", "\n", "@pwf.as_function_node\n", "def Speed(\n", " dx: u(float, units=\"meter\"), dt: u(float, units=\"second\")\n", ") -> u(float, units=\"meter/second\"):\n", " s = dx/dt\n", " return s\n", "\n", "wf = pwf.Workflow(\"speedometer\")\n", "wf.dx = Distance(100)\n", "wf.speed = Speed(dx=wf.dx)" ], "id": "7fa30c5bb66141ee", "outputs": [], "execution_count": 22 }, { "metadata": {}, "cell_type": "markdown", "source": "With incompatible units, we get an exception at connection time, just like with other ontological failures:", "id": "3c1f02dfc7a719a1" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:39.775892Z", "start_time": "2026-04-30T16:35:39.658039Z" } }, "cell_type": "code", "source": [ "@pwf.as_function_node\n", "def NanoTime(t: u(float, units=\"nanosecond\")) -> u(float, units=\"nanosecond\"):\n", " return t\n", "\n", "wf.dt = NanoTime(10)\n", "try:\n", " wf.speed.inputs.dt = wf.dt\n", "except ChannelConnectionError as e:\n", " print(e)\n", " wf.remove_child(wf.dt)" ], "id": "bad9905df584134c", "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The upstream channel /speedometer/dt.t cannot connect to the downstream channel /speedometer/speed.dt because the upstream type hint (typing.Annotated[float, ('units', 'nanosecond')]) and downstream type hint (typing.Annotated[float, ('units', 'second')]) produce a non-empty ontological validation report:\n", "(False, )>, \"Validation Report\\nConforms: False\\nResults (1):\\nConstraint Violation in HasValueConstraintComponent (http://www.w3.org/ns/shacl#HasValueConstraintComponent):\\n\\tSeverity: sh:Violation\\n\\tSource Shape: [ rdf:type sh:PropertyShape ; sh:hasValue unit:SEC ; sh:path qudt:hasUnit ]\\n\\tFocus Node: sns:f997e651f109385915d4a770a7207d30_speedometer-dt-outputs-t_data\\n\\tResult Path: qudt:hasUnit\\n\\tMessage: Node sns:f997e651f109385915d4a770a7207d30_speedometer-dt-outputs-t_data->qudt:hasUnit does not contain a value in the set: ['unit:SEC']\\n\")\n" ] } ], "execution_count": 23 }, { "metadata": {}, "cell_type": "markdown", "source": "With correct units, it works fine", "id": "1960f91b859f1516" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:40.047274Z", "start_time": "2026-04-30T16:35:39.777084Z" } }, "cell_type": "code", "source": [ "@pwf.as_function_node\n", "def Time(t: u(float, units=\"second\")) -> u(float, units=\"second\"):\n", " return t\n", "\n", "wf.dt = Time(10)\n", "wf.speed.inputs.dt = wf.dt\n", "wf()" ], "id": "a918e2e7d80b8376", "outputs": [ { "data": { "text/plain": [ "{'speed__s': 10.0}" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "execution_count": 24 }, { "metadata": {}, "cell_type": "markdown", "source": "(Note that units are _NOT_ inherited using the `derived_from=` flag -- if two nodes use different units, you'll need to add an explicit unit conversion node into your graph.)", "id": "dafffcc1823b711f" }, { "metadata": { "ExecuteTime": { "end_time": "2026-04-30T16:35:40.058543Z", "start_time": "2026-04-30T16:35:40.048615Z" } }, "cell_type": "code", "source": "", "id": "67561c4f632db5a4", "outputs": [], "execution_count": 24 } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.7" } }, "nbformat": 4, "nbformat_minor": 5 }