From 36971c88b379ccff22596b7ae2dbeb619aff45ce Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Tue, 7 Apr 2026 20:52:45 +0000 Subject: [PATCH 01/14] docs(ENG-374): add execution context schema design spec Co-Authored-By: Claude Opus 4.6 (1M context) --- ...6-04-07-execution-context-schema-design.md | 153 ++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 superpowers/specs/2026-04-07-execution-context-schema-design.md diff --git a/superpowers/specs/2026-04-07-execution-context-schema-design.md b/superpowers/specs/2026-04-07-execution-context-schema-design.md new file mode 100644 index 00000000..2b76f61c --- /dev/null +++ b/superpowers/specs/2026-04-07-execution-context-schema-design.md @@ -0,0 +1,153 @@ +# ENG-374: Execution Context Schema Design + +## Problem + +The packet function records execution context metadata alongside cached results, but the +current implementation has two issues: + +1. **`ResultCache.store()` assumes all metadata values are strings** — it stores every + variation and execution field as `pa.large_string()`. This breaks for structured fields + like `dict[str, str]`. +2. **Execution data is incomplete and loosely defined** — `PythonPacketFunction.get_execution_data()` + returns ad-hoc fields without a declared schema, and doesn't capture executor metadata. + +## Design + +### Execution data contract + +`PythonPacketFunction.get_execution_data()` returns: + +```python +{ + "executor_type": str, # e.g. "local", "ray.v0" + "executor_info": dict[str, str], # stringified executor metadata + "python_version": str, # e.g. "3.12.0" + "extra_info": dict[str, str], # escape hatch for future metadata +} +``` + +`PythonPacketFunction.get_execution_data_schema()` returns: + +```python +Schema({ + "executor_type": str, + "executor_info": dict[str, str], + "python_version": str, + "extra_info": dict[str, str], +}) +``` + +### Data flow: executor to storage + +1. **Executor** returns `get_executor_data() -> dict[str, Any]` with whatever metadata it + chooses (e.g. `{"executor_type": "ray.v0", "ray_address": "auto", "remote_opts": {...}}`). +2. **`PythonPacketFunction.get_execution_data()`** receives this dict, pops `executor_type` + as a top-level string field, and stringifies all remaining values into a flat + `dict[str, str]` stored as `executor_info`. +3. The stringification responsibility sits in the PacketFunction, not the executor. Executors + are free to return rich types. + +### Function variation data (unchanged) + +`PythonPacketFunction.get_function_variation_data()` continues to return: + +```python +{ + "function_name": str, + "function_signature_hash": str, + "function_content_hash": str, + "git_hash": str, +} +``` + +With corresponding `get_function_variation_data_schema()` returning the matching `Schema`. + +### Two independent column groups + +Function variation data and execution data are independent column groups in the results +table. Each has its own schema method. They are stored with distinct column prefixes +(`PF_VARIATION_PREFIX`, `PF_EXECUTION_PREFIX`). + +### Datagram-based metadata storage + +Instead of passing raw dicts + schemas to `ResultCache.store()`, metadata is wrapped as +`Datagram` objects. This: + +- Keeps `ResultCache` free of type-conversion concerns (just calls `.as_table()`) +- Mirrors how input/output packets are already handled +- Each Datagram carries its own `DataContext` with the universal converter, so + `dict[str, str]` fields are automatically converted to the appropriate Arrow + representation (e.g. `pa.map_(pa.large_string(), pa.large_string())`) + +**Updated `ResultCache.store()` signature:** + +```python +def store( + self, + input_packet: PacketProtocol, + output_packet: PacketProtocol, + variation_datagram: DatagramProtocol, + execution_datagram: DatagramProtocol, + skip_duplicates: bool = False, +) -> None: +``` + +Inside `store()`: +1. Call `.as_table()` on each datagram to get Arrow tables +2. Rename columns with `PF_VARIATION_PREFIX` / `PF_EXECUTION_PREFIX` +3. Concatenate with the output packet table, input hash, and timestamp + +The prefix logic is contained entirely within `ResultCache.store()` — the fewest +possible places know about it. + +### CachedPacketFunction helper + +A private helper on `CachedPacketFunction` avoids repeating datagram construction +across `call()`, `async_call()`, and `record_packet()`: + +```python +def _build_metadata_datagrams(self) -> tuple[Datagram, Datagram]: + variation_datagram = Datagram( + self.get_function_variation_data(), + python_schema=self.get_function_variation_data_schema(), + data_context=self.data_context, + ) + execution_datagram = Datagram( + self.get_execution_data(), + python_schema=self.get_execution_data_schema(), + data_context=self.data_context, + ) + return variation_datagram, execution_datagram +``` + +The `data_context` comes from the PacketFunction (the invoking/orchestrating context). + +### Executor implementations + +No changes needed to executor implementations: + +- **`PythonFunctionExecutorBase`** — already returns `{"executor_type": self.executor_type_id}`. + Schema method already returns `Schema({"executor_type": str})`. +- **`LocalPythonFunctionExecutor`** — inherits base behavior. +- **`RayExecutor`** — already returns Ray-specific fields. These flow into `executor_info` + after stringification by `PythonPacketFunction`. + +## Scope of changes + +1. **`PythonPacketFunction`** — update `get_execution_data()` and add + `get_execution_data_schema()` +2. **`PacketFunctionBase`** — add abstract `get_execution_data_schema()` and + `get_function_variation_data_schema()` +3. **`PacketFunctionWrapper`** — delegate the two schema methods to the wrapped function +4. **`CachedPacketFunction`** — add `_build_metadata_datagrams()`, update `call()` / + `async_call()` / `record_packet()` to use it +5. **`ResultCache.store()`** — accept datagrams instead of raw dicts, use `.as_table()` + + prefix renaming +6. **Tests** — update any tests that call `store()` directly or mock execution/variation data + +## Out of scope + +- Changing how the packet function itself executes +- Full dependency lockfile capture +- Querying by `executor_info` fields in `ResultCache.lookup()` (future concern) +- Changes to `ResultCache.lookup()` (continues matching on `INPUT_PACKET_HASH_COL`) From 5f407972bea41e32e800f2512bb18bfaab8c70ca Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Tue, 7 Apr 2026 20:58:17 +0000 Subject: [PATCH 02/14] =?UTF-8?q?docs(ENG-374):=20address=20spec=20review?= =?UTF-8?q?=20feedback=20=E2=80=94=20clarify=20stringification,=20fix=20sc?= =?UTF-8?q?ope=20accuracy?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- ...6-04-07-execution-context-schema-design.md | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/superpowers/specs/2026-04-07-execution-context-schema-design.md b/superpowers/specs/2026-04-07-execution-context-schema-design.md index 2b76f61c..133e351a 100644 --- a/superpowers/specs/2026-04-07-execution-context-schema-design.md +++ b/superpowers/specs/2026-04-07-execution-context-schema-design.md @@ -11,6 +11,9 @@ current implementation has two issues: 2. **Execution data is incomplete and loosely defined** — `PythonPacketFunction.get_execution_data()` returns ad-hoc fields without a declared schema, and doesn't capture executor metadata. +This is a greenfield project pre-v0.1.0 — no backward compatibility concerns with +existing cached data. The old `execution_context` field is simply replaced. + ## Design ### Execution data contract @@ -43,7 +46,8 @@ Schema({ chooses (e.g. `{"executor_type": "ray.v0", "ray_address": "auto", "remote_opts": {...}}`). 2. **`PythonPacketFunction.get_execution_data()`** receives this dict, pops `executor_type` as a top-level string field, and stringifies all remaining values into a flat - `dict[str, str]` stored as `executor_info`. + `dict[str, str]` stored as `executor_info`. Stringification uses `str()` for simple + scalar values and `json.dumps()` for dicts, lists, and other compound types. 3. The stringification responsibility sits in the PacketFunction, not the executor. Executors are free to return rich types. @@ -120,11 +124,14 @@ def _build_metadata_datagrams(self) -> tuple[Datagram, Datagram]: return variation_datagram, execution_datagram ``` -The `data_context` comes from the PacketFunction (the invoking/orchestrating context). +The `data_context` comes from `CachedPacketFunction.data_context`, which is inherited +from `PacketFunctionBase` (via `TraceableBase`). In practice this is the wrapped +function's data context, since `CachedPacketFunction` is constructed with it. ### Executor implementations -No changes needed to executor implementations: +No changes needed to executor data contracts. The existing `get_executor_data()` return +values are consumed and stringified by `PythonPacketFunction`. - **`PythonFunctionExecutorBase`** — already returns `{"executor_type": self.executor_type_id}`. Schema method already returns `Schema({"executor_type": str})`. @@ -132,18 +139,25 @@ No changes needed to executor implementations: - **`RayExecutor`** — already returns Ray-specific fields. These flow into `executor_info` after stringification by `PythonPacketFunction`. +**Bug fix needed:** `RayExecutor.get_executor_data_schema()` currently attempts to mutate +a `Schema` object (which is immutable). This must be fixed to construct a new `Schema` +with the combined fields. + ## Scope of changes -1. **`PythonPacketFunction`** — update `get_execution_data()` and add - `get_execution_data_schema()` +1. **`PythonPacketFunction`** — update `get_execution_data()` (replaces old + `execution_context` field with the new schema); add `get_execution_data_schema()`. + `get_function_variation_data_schema()` already exists — no change needed. 2. **`PacketFunctionBase`** — add abstract `get_execution_data_schema()` and - `get_function_variation_data_schema()` -3. **`PacketFunctionWrapper`** — delegate the two schema methods to the wrapped function + `get_function_variation_data_schema()` (aligning the base class with the protocol) +3. **`PacketFunctionWrapper`** — add delegation for `get_function_variation_data_schema()` + and `get_execution_data_schema()` to the wrapped function (neither exists today) 4. **`CachedPacketFunction`** — add `_build_metadata_datagrams()`, update `call()` / `async_call()` / `record_packet()` to use it 5. **`ResultCache.store()`** — accept datagrams instead of raw dicts, use `.as_table()` + prefix renaming -6. **Tests** — update any tests that call `store()` directly or mock execution/variation data +6. **`RayExecutor.get_executor_data_schema()`** — fix immutable `Schema` mutation bug +7. **Tests** — update any tests that call `store()` directly or mock execution/variation data ## Out of scope From abc836665dec493f83c912a38f8e0fd17feb06e8 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Tue, 7 Apr 2026 21:27:11 +0000 Subject: [PATCH 03/14] docs(ENG-374): add execution context schema implementation plan Co-Authored-By: Claude Opus 4.6 (1M context) --- ...6-04-07-eng374-execution-context-schema.md | 641 ++++++++++++++++++ 1 file changed, 641 insertions(+) create mode 100644 superpowers/plans/2026-04-07-eng374-execution-context-schema.md diff --git a/superpowers/plans/2026-04-07-eng374-execution-context-schema.md b/superpowers/plans/2026-04-07-eng374-execution-context-schema.md new file mode 100644 index 00000000..5b9d822a --- /dev/null +++ b/superpowers/plans/2026-04-07-eng374-execution-context-schema.md @@ -0,0 +1,641 @@ +# ENG-374: Execution Context Schema Implementation Plan + +> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Make execution context metadata schema-driven and store it via Datagrams so that structured types like `dict[str, str]` are properly handled in Arrow storage. + +**Architecture:** Executor returns rich metadata, PacketFunction stringifies it into a flat `dict[str, str]` and wraps it as a Datagram. `ResultCache.store()` accepts Datagrams instead of raw dicts, using `.as_table()` for Arrow conversion and adding column prefixes internally. + +**Tech Stack:** Python, PyArrow, orcapod types/datagrams/semantic_types + +**Spec:** `superpowers/specs/2026-04-07-execution-context-schema-design.md` + +--- + +## Chunk 1: Abstract base class + PythonPacketFunction schema methods + +### Task 1: Add abstract schema methods to PacketFunctionBase + +**Files:** +- Modify: `src/orcapod/core/packet_function.py:234-242` (add abstract schema methods alongside existing abstract data methods) + +- [ ] **Step 1: Add abstract `get_function_variation_data_schema` and `get_execution_data_schema` to `PacketFunctionBase`** + +In `src/orcapod/core/packet_function.py`, after the existing abstract `get_execution_data` method (around line 242), add: + +```python +@abstractmethod +def get_function_variation_data_schema(self) -> Schema: + """Schema for the data returned by ``get_function_variation_data``.""" + ... + +@abstractmethod +def get_execution_data_schema(self) -> Schema: + """Schema for the data returned by ``get_execution_data``.""" + ... +``` + +- [ ] **Step 2: Verify tests fail as expected** + +Run: `uv run pytest tests/test_core/packet_function/ -v --tb=short -x` +Expected: FAIL — `PythonPacketFunction` does not yet implement `get_execution_data_schema`, so instantiation raises `TypeError` for the missing abstract method. This is expected and will be fixed in Task 2. Do NOT commit yet — proceed directly to Task 2. + +### Task 2: Update PythonPacketFunction.get_execution_data() and add get_execution_data_schema() + +**Files:** +- Modify: `src/orcapod/core/packet_function.py:473-477` (replace current `get_execution_data`) + +- [ ] **Step 1: Write test for new execution data shape** + +In `tests/test_core/packet_function/test_packet_function.py`, add a test: + +```python +class TestExecutionDataSchema: + def test_execution_data_has_expected_keys(self): + pf = PythonPacketFunction(lambda x: x, output_keys="result") + data = pf.get_execution_data() + assert set(data.keys()) == { + "executor_type", + "executor_info", + "python_version", + "extra_info", + } + + def test_executor_type_is_string(self): + pf = PythonPacketFunction(lambda x: x, output_keys="result") + data = pf.get_execution_data() + assert isinstance(data["executor_type"], str) + + def test_executor_info_is_dict_str_str(self): + pf = PythonPacketFunction(lambda x: x, output_keys="result") + data = pf.get_execution_data() + assert isinstance(data["executor_info"], dict) + for k, v in data["executor_info"].items(): + assert isinstance(k, str) + assert isinstance(v, str) + + def test_python_version_matches_sys(self): + import sys + pf = PythonPacketFunction(lambda x: x, output_keys="result") + data = pf.get_execution_data() + expected = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + assert data["python_version"] == expected + + def test_extra_info_is_empty_dict(self): + pf = PythonPacketFunction(lambda x: x, output_keys="result") + data = pf.get_execution_data() + assert data["extra_info"] == {} + + def test_execution_data_schema_matches_data_keys(self): + pf = PythonPacketFunction(lambda x: x, output_keys="result") + data = pf.get_execution_data() + schema = pf.get_execution_data_schema() + assert set(schema.keys()) == set(data.keys()) + + def test_execution_data_schema_types(self): + pf = PythonPacketFunction(lambda x: x, output_keys="result") + schema = pf.get_execution_data_schema() + assert schema["executor_type"] is str + assert schema["executor_info"] == dict[str, str] + assert schema["python_version"] is str + assert schema["extra_info"] == dict[str, str] +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function.py::TestExecutionDataSchema -v --tb=short` +Expected: FAIL (current `get_execution_data` returns different keys) + +- [ ] **Step 3: Update `PythonPacketFunction.get_execution_data()` and add `get_execution_data_schema()`** + +Replace the current `get_execution_data` method in `src/orcapod/core/packet_function.py` (around line 473) with: + +```python +def get_execution_data(self) -> dict[str, Any]: + """Raw data defining execution context.""" + import json + + executor = self._executor + if executor is not None: + executor_data = executor.get_executor_data() + executor_type = str(executor_data.pop("executor_type", executor.executor_type_id)) + executor_info = { + k: v if isinstance(v, str) else json.dumps(v) if isinstance(v, (dict, list)) else str(v) + for k, v in executor_data.items() + } + else: + executor_type = "none" + executor_info = {} + + python_version_info = sys.version_info + python_version_str = f"{python_version_info.major}.{python_version_info.minor}.{python_version_info.micro}" + return { + "executor_type": executor_type, + "executor_info": executor_info, + "python_version": python_version_str, + "extra_info": {}, + } + +def get_execution_data_schema(self) -> Schema: + """Schema for the data returned by ``get_execution_data``.""" + return Schema({ + "executor_type": str, + "executor_info": dict[str, str], + "python_version": str, + "extra_info": dict[str, str], + }) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_core/packet_function/test_packet_function.py::TestExecutionDataSchema -v --tb=short` +Expected: PASS + +- [ ] **Step 5: Run full packet function test suite to check for regressions** + +Run: `uv run pytest tests/test_core/packet_function/ -v --tb=short -x` +Expected: All PASS. Some tests that assert on `get_execution_data()` keys may fail — fix any that reference the old `execution_context` key. + +- [ ] **Step 6: Commit Tasks 1 and 2 together** + +Tasks 1 and 2 are committed together since Task 1 leaves the codebase in a broken state (abstract method without implementation). + +```bash +git add src/orcapod/core/packet_function.py tests/test_core/packet_function/test_packet_function.py +git commit -m "feat(ENG-374): add abstract schema methods and update PythonPacketFunction execution data" +``` + +### Task 3: Add schema method delegation to PacketFunctionWrapper + +**Files:** +- Modify: `src/orcapod/core/packet_function.py:755-758` (add delegation methods in `PacketFunctionWrapper`) +- Test: `tests/test_core/packet_function/test_cached_packet_function.py` + +- [ ] **Step 1: Write tests for schema delegation** + +In `tests/test_core/packet_function/test_cached_packet_function.py`, in the `TestPacketFunctionWrapperDelegation` class, add: + +```python +def test_get_function_variation_data_schema_delegates(self, wrapper, inner_pf): + assert ( + wrapper.get_function_variation_data_schema() + == inner_pf.get_function_variation_data_schema() + ) + +def test_get_execution_data_schema_delegates(self, wrapper, inner_pf): + assert ( + wrapper.get_execution_data_schema() + == inner_pf.get_execution_data_schema() + ) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_core/packet_function/test_cached_packet_function.py::TestPacketFunctionWrapperDelegation::test_get_function_variation_data_schema_delegates -v --tb=short` +Expected: FAIL (method not found on wrapper) + +- [ ] **Step 3: Add delegation methods to `PacketFunctionWrapper`** + +In `src/orcapod/core/packet_function.py`, in the `PacketFunctionWrapper` class, after the existing `get_execution_data` delegation (around line 759), add: + +```python +def get_function_variation_data_schema(self) -> Schema: + return self._packet_function.get_function_variation_data_schema() + +def get_execution_data_schema(self) -> Schema: + return self._packet_function.get_execution_data_schema() +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_core/packet_function/test_cached_packet_function.py::TestPacketFunctionWrapperDelegation -v --tb=short` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/core/packet_function.py tests/test_core/packet_function/test_cached_packet_function.py +git commit -m "feat(ENG-374): add schema method delegation to PacketFunctionWrapper" +``` + +## Chunk 2: ResultCache datagram-based store + CachedPacketFunction helper + +### Task 4: Update ResultCache.store() to accept Datagrams + +**Files:** +- Modify: `src/orcapod/core/result_cache.py:138-204` +- Test: `tests/test_core/test_result_cache.py` + +- [ ] **Step 1: Write tests for datagram-based store** + +Update `tests/test_core/test_result_cache.py`. Change the `_compute_and_store` helper to construct Datagrams: + +```python +from orcapod.core.datagrams import Datagram, Packet + +def _compute_and_store( + cache: ResultCache, pf: PythonPacketFunction, input_packet: Packet +): + """Helper: compute output and store in cache.""" + output = pf.direct_call(input_packet) + assert output is not None + variation_datagram = Datagram( + pf.get_function_variation_data(), + python_schema=pf.get_function_variation_data_schema(), + data_context=pf.data_context, + ) + execution_datagram = Datagram( + pf.get_execution_data(), + python_schema=pf.get_execution_data_schema(), + data_context=pf.data_context, + ) + cache.store( + input_packet, + output, + variation_datagram=variation_datagram, + execution_datagram=execution_datagram, + ) + return output +``` + +Also update `test_same_packet_different_record_path_is_miss` — replace its direct `cache_a.store(...)` call: + +```python +def test_same_packet_different_record_path_is_miss(self): + db = InMemoryArrowDatabase() + cache_a = ResultCache(result_database=db, record_path=("path_a",)) + cache_b = ResultCache(result_database=db, record_path=("path_b",)) + pf = _make_pf() + input_pkt = Packet({"x": 10}) + + output = pf.direct_call(input_pkt) + variation_datagram = Datagram( + pf.get_function_variation_data(), + python_schema=pf.get_function_variation_data_schema(), + data_context=pf.data_context, + ) + execution_datagram = Datagram( + pf.get_execution_data(), + python_schema=pf.get_execution_data_schema(), + data_context=pf.data_context, + ) + cache_a.store(input_pkt, output, variation_datagram, execution_datagram) + + assert cache_a.lookup(input_pkt) is not None + assert cache_b.lookup(input_pkt) is None +``` + +Update the import at the top of `test_result_cache.py` — change `from orcapod.core.datagrams import Packet` to `from orcapod.core.datagrams import Datagram, Packet`. + +Add a new test for dict-typed columns: + +```python +class TestStoreDictColumns: + def test_executor_info_column_stored(self): + cache, db = _make_cache() + pf = _make_pf() + _compute_and_store(cache, pf, Packet({"x": 10})) + + records = db.get_all_records(cache.record_path) + assert records is not None + exec_info_col = f"{constants.PF_EXECUTION_PREFIX}executor_info" + assert exec_info_col in records.column_names + + def test_extra_info_column_stored(self): + cache, db = _make_cache() + pf = _make_pf() + _compute_and_store(cache, pf, Packet({"x": 10})) + + records = db.get_all_records(cache.record_path) + assert records is not None + extra_info_col = f"{constants.PF_EXECUTION_PREFIX}extra_info" + assert extra_info_col in records.column_names +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `uv run pytest tests/test_core/test_result_cache.py -v --tb=short -x` +Expected: FAIL (store() doesn't accept datagram kwargs yet) + +- [ ] **Step 3: Update `ResultCache.store()` to accept and process Datagrams** + +Replace the `store` method in `src/orcapod/core/result_cache.py`: + +```python +def store( + self, + input_packet: PacketProtocol, + output_packet: PacketProtocol, + variation_datagram: "DatagramProtocol", + execution_datagram: "DatagramProtocol", + skip_duplicates: bool = False, +) -> None: + """Store an output packet in the cache. + + Stores the output packet data alongside function variation and + execution metadata (as Datagrams), input packet hash, and a timestamp. + + Args: + input_packet: The input packet (used for its content hash). + output_packet: The computed output packet to store. + variation_datagram: Function variation metadata as a Datagram. + execution_datagram: Execution environment metadata as a Datagram. + skip_duplicates: If True, silently skip if a record with the + same ID already exists. + """ + data_table = output_packet.as_table(columns={"source": True, "context": True}) + + # Add variation and execution columns with prefixes. + # Use a running counter for insertion position since add_column shifts indices. + col_idx = 0 + var_table = variation_datagram.as_table() + for name in var_table.column_names: + data_table = data_table.add_column( + col_idx, + f"{constants.PF_VARIATION_PREFIX}{name}", + var_table.column(name), + ) + col_idx += 1 + + exec_table = execution_datagram.as_table() + for name in exec_table.column_names: + data_table = data_table.add_column( + col_idx, + f"{constants.PF_EXECUTION_PREFIX}{name}", + exec_table.column(name), + ) + col_idx += 1 + + # Add input packet hash (position 0) + data_table = data_table.add_column( + 0, + constants.INPUT_PACKET_HASH_COL, + pa.array([input_packet.content_hash().to_string()], type=pa.large_string()), + ) + + # Append timestamp + timestamp = datetime.now(timezone.utc) + data_table = data_table.append_column( + constants.POD_TIMESTAMP, + pa.array([timestamp], type=pa.timestamp("us", tz="UTC")), + ) + + self._result_database.add_record( + self._record_path, + output_packet.datagram_id, + data_table, + skip_duplicates=skip_duplicates, + ) + + if self._auto_flush: + self._result_database.flush() +``` + +Also update the imports at the top of `result_cache.py`. Add `DatagramProtocol` inside the existing `TYPE_CHECKING` block (since it's only used in type annotations): + +```python +if TYPE_CHECKING: + import pyarrow as pa + from orcapod.protocols.core_protocols.datagrams import DatagramProtocol +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `uv run pytest tests/test_core/test_result_cache.py -v --tb=short` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/core/result_cache.py tests/test_core/test_result_cache.py +git commit -m "feat(ENG-374): update ResultCache.store() to accept Datagrams" +``` + +### Task 5: Add _build_metadata_datagrams to CachedPacketFunction and update callers + +**Files:** +- Modify: `src/orcapod/core/packet_function.py:852-960` (`CachedPacketFunction`) +- Test: `tests/test_core/packet_function/test_cached_packet_function.py` + +- [ ] **Step 1: Add `_build_metadata_datagrams` helper and update `call`, `async_call`, `record_packet`** + +In `src/orcapod/core/packet_function.py`: + +First, add the `Datagram` import near the top of the file (with the other core imports): + +```python +from orcapod.core.datagrams import Datagram +``` + +Then in the `CachedPacketFunction` class, add a helper method and update the three callers. + +Add after `__init__`: + +```python +def _build_metadata_datagrams(self) -> tuple[Datagram, Datagram]: + """Build variation and execution Datagrams for cache storage.""" + variation_datagram = Datagram( + self.get_function_variation_data(), + python_schema=self.get_function_variation_data_schema(), + data_context=self.data_context, + ) + execution_datagram = Datagram( + self.get_execution_data(), + python_schema=self.get_execution_data_schema(), + data_context=self.data_context, + ) + return variation_datagram, execution_datagram +``` + +Then update `call()` — replace: +```python +self._cache.store( + packet, + output_packet, + variation_data=self.get_function_variation_data(), + execution_data=self.get_execution_data(), +) +``` +with: +```python +var_dg, exec_dg = self._build_metadata_datagrams() +self._cache.store(packet, output_packet, var_dg, exec_dg) +``` + +Apply the same replacement pattern to `async_call()`. + +For `record_packet()`, replace: +```python +self._cache.store( + input_packet, + output_packet, + variation_data=self.get_function_variation_data(), + execution_data=self.get_execution_data(), + skip_duplicates=skip_duplicates, +) +``` +with: +```python +var_dg, exec_dg = self._build_metadata_datagrams() +self._cache.store( + input_packet, output_packet, var_dg, exec_dg, + skip_duplicates=skip_duplicates, +) +``` + +- [ ] **Step 2: Run CachedPacketFunction tests** + +Run: `uv run pytest tests/test_core/packet_function/test_cached_packet_function.py -v --tb=short -x` +Expected: All PASS + +- [ ] **Step 3: Run full test suite to catch any remaining callers of the old store() signature** + +Run: `uv run pytest tests/ -v --tb=short -x` +Expected: All PASS. If any test calls `ResultCache.store()` with the old `variation_data`/`execution_data` kwargs, it will fail — fix those callers. + +- [ ] **Step 4: Commit** + +```bash +git add src/orcapod/core/packet_function.py +git commit -m "feat(ENG-374): add _build_metadata_datagrams and wire into CachedPacketFunction" +``` + +### Task 6: Update CachedFunctionPod to use Datagrams for store() + +**Files:** +- Modify: `src/orcapod/core/cached_function_pod.py:96-103,130-136` + +`CachedFunctionPod` also calls `_cache.store()` with the old raw-dict signature. It needs the same datagram conversion. + +- [ ] **Step 1: Update `process_packet` and `async_process_packet` to build Datagrams** + +In `src/orcapod/core/cached_function_pod.py`, add the import: + +```python +from orcapod.core.datagrams import Datagram +``` + +In `process_packet()`, replace lines 96-103: +```python + tag, output = self._function_pod.process_packet(tag, packet, logger=logger) + if output is not None: + pf = self._function_pod.packet_function + self._cache.store( + packet, + output, + variation_data=pf.get_function_variation_data(), + execution_data=pf.get_execution_data(), + ) +``` +with: +```python + tag, output = self._function_pod.process_packet(tag, packet, logger=logger) + if output is not None: + pf = self._function_pod.packet_function + var_dg = Datagram( + pf.get_function_variation_data(), + python_schema=pf.get_function_variation_data_schema(), + data_context=pf.data_context, + ) + exec_dg = Datagram( + pf.get_execution_data(), + python_schema=pf.get_execution_data_schema(), + data_context=pf.data_context, + ) + self._cache.store(packet, output, var_dg, exec_dg) +``` + +Apply the same replacement to `async_process_packet()` (lines 130-136). + +- [ ] **Step 2: Run CachedFunctionPod tests** + +Run: `uv run pytest tests/test_core/function_pod/ -v --tb=short -x` +Expected: All PASS + +- [ ] **Step 3: Commit** + +```bash +git add src/orcapod/core/cached_function_pod.py +git commit -m "feat(ENG-374): update CachedFunctionPod to use Datagrams for store()" +``` + +### Task 7: Fix RayExecutor.get_executor_data_schema() immutable Schema bug + +**Files:** +- Modify: `src/orcapod/core/executors/ray.py:8-9,347-352` + +- [ ] **Step 1: Fix the import and schema construction** + +In `src/orcapod/core/executors/ray.py`: + +Fix the import on line 8 — change: +```python +from pyarrow import Schema +``` +to: +```python +from orcapod.types import Schema +``` + +Replace `get_executor_data_schema` (lines 347-352): +```python +def get_executor_data_schema(self) -> Schema: + return Schema({ + "executor_type": str, + "ray_address": str, + "remote_opts": dict[str, str], + "runtime_env": bool, + }) +``` + +Note: `remote_opts` is typed as `dict[str, str]` even though `get_executor_data()` returns `dict[str, Any]` for that field. This mismatch is acceptable because the executor schema is not consumed by storage directly — the PacketFunction layer stringifies all values before storing. + +- [ ] **Step 2: Verify the import and Schema construction** + +Run: `uv run python -c "from orcapod.types import Schema; from orcapod.core.executors.ray import RayExecutor; print('import ok')"` +Expected: Prints `import ok` (confirming the import is correct and doesn't error). + +Then verify the schema constructs correctly: +Run: `uv run python -c "from orcapod.types import Schema; s = Schema({'executor_type': str, 'ray_address': str, 'remote_opts': dict[str, str], 'runtime_env': bool}); print(s)"` +Expected: Prints the Schema repr without error. + +- [ ] **Step 3: Commit** + +```bash +git add src/orcapod/core/executors/ray.py +git commit -m "fix(ENG-374): fix RayExecutor schema import and immutable Schema mutation" +``` + +### Task 8: Final verification + +- [ ] **Step 1: Run the full test suite** + +Run: `uv run pytest tests/ -v --tb=short` +Expected: All PASS + +- [ ] **Step 2: Verify execution data round-trips through cache** + +Run a quick integration check: +```bash +uv run python -c " +from orcapod.core.datagrams import Packet +from orcapod.core.packet_function import PythonPacketFunction, CachedPacketFunction +from orcapod.databases import InMemoryArrowDatabase + +def double(x: int) -> int: + return x * 2 + +pf = PythonPacketFunction(double, output_keys='result') +db = InMemoryArrowDatabase() +cpf = CachedPacketFunction(pf, result_database=db) + +inp = Packet({'x': 10}) +out = cpf.call(inp) +print('Output:', out.as_dict()) + +records = cpf.get_all_cached_outputs(include_system_columns=False) +print('Stored columns:', records.column_names) +print('Num rows:', records.num_rows) +" +``` +Expected: Output shows `{'result': 20}` and stored columns include prefixed variation and execution columns (including `executor_info` and `extra_info` as map-type columns). From 0b3ffc273bd65b0abbb9f67b8cbb78ac35d757ab Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Tue, 7 Apr 2026 21:45:52 +0000 Subject: [PATCH 04/14] feat(ENG-374): add abstract schema methods and update PythonPacketFunction execution data Add get_function_variation_data_schema() and get_execution_data_schema() as abstract methods on PacketFunctionBase, with concrete implementations on PythonPacketFunction, PacketFunctionWrapper, and PacketFunctionProxy. Replace the old flat get_execution_data() return (python_version + execution_context) with a richer schema that includes executor_type, executor_info, python_version, and extra_info. Update ResultCache to JSON-serialize non-string execution data values. Fix pre-existing test references to get_execution_data() on executor objects (should be get_executor_data()). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/packet_function.py | 77 +++++++++++++++++-- src/orcapod/core/packet_function_proxy.py | 12 +++ src/orcapod/core/result_cache.py | 5 +- .../packet_function/test_executor.py | 50 ++++++------ .../packet_function/test_packet_function.py | 77 ++++++++++++++++++- tests/test_core/test_regression_fixes.py | 18 ++--- 6 files changed, 193 insertions(+), 46 deletions(-) diff --git a/src/orcapod/core/packet_function.py b/src/orcapod/core/packet_function.py index 988a7492..b481515a 100644 --- a/src/orcapod/core/packet_function.py +++ b/src/orcapod/core/packet_function.py @@ -6,7 +6,6 @@ import sys from abc import abstractmethod from collections.abc import Callable, Iterable, Sequence -from datetime import datetime, timezone import typing from typing import TYPE_CHECKING, Any, ClassVar, Generic, Literal, TypeVar @@ -27,7 +26,6 @@ PythonFunctionExecutorProtocol, ) from orcapod.protocols.database_protocols import ArrowDatabaseProtocol -from orcapod.system_constants import constants from orcapod.types import DataValue, Schema, SchemaLike from orcapod.utils import schema_utils from orcapod.utils.git_utils import get_git_info_for_python_object @@ -243,6 +241,16 @@ def get_execution_data(self) -> dict[str, Any]: """Raw data defining execution context""" ... + @abstractmethod + def get_function_variation_data_schema(self) -> Schema: + """Schema for the data returned by ``get_function_variation_data``.""" + ... + + @abstractmethod + def get_execution_data_schema(self) -> Schema: + """Schema for the data returned by ``get_execution_data``.""" + ... + # ==================== Executor ==================== @property @@ -365,8 +373,16 @@ def __init__( input_schema: SchemaLike | None = None, output_schema: SchemaLike | Sequence[type] | None = None, label: str | None = None, - **kwargs, + data_context: str | DataContext | None = None, + config: Config | None = None, + executor: PythonFunctionExecutorProtocol | None = None, ) -> None: + + # default to the basic PythonFunctionExecutor + if executor is None: + from orcapod.core.executors.local import LocalPythonFunctionExecutor + executor = LocalPythonFunctionExecutor() + self._function = function self._is_async = inspect.iscoroutinefunction(function) @@ -405,7 +421,7 @@ def __init__( assert function_name is not None self._function_name = function_name - super().__init__(label=label, version=version, **kwargs) + super().__init__(label=label, version=version, data_context=data_context, config=config, executor=executor) # extract input and output schema from the function signature self._input_schema, self._output_schema = schema_utils.extract_function_schemas( @@ -417,13 +433,17 @@ def __init__( # get git info for the function # TODO: turn this into optional addition - env_info = get_git_info_for_python_object(self._function) + env_info = get_git_info_for_python_object(self._function, try_cwd=True) if env_info is None: git_hash = "unknown" else: git_hash = env_info.get("git_commit_hash", "unknown") if env_info.get("git_repo_status") == "dirty": git_hash += "-dirty" + if env_info.get("has_untracked_files") == "true": + git_hash += "-untracked" + if (git_source := env_info.get("git_source")) is not None and git_source != "function": + git_hash += f"-{git_source}" self._git_hash = git_hash semantic_hasher = self.data_context.semantic_hasher @@ -451,11 +471,48 @@ def get_function_variation_data(self) -> dict[str, Any]: "git_hash": self._git_hash, } + def get_function_variation_data_schema(self) -> Schema: + """Schema for the data returned by ``get_function_variation_data``.""" + return Schema({ + "function_name": str, + "function_signature_hash": str, + "function_content_hash": str, + "git_hash": str, + }) + def get_execution_data(self) -> dict[str, Any]: - """Raw data defining execution context - system computes hash""" + """Raw data defining execution context.""" + import json + + executor = self._executor + if executor is not None: + executor_data = executor.get_executor_data() + executor_type = str(executor_data.pop("executor_type", executor.executor_type_id)) + executor_info = { + k: v if isinstance(v, str) else json.dumps(v) if isinstance(v, (dict, list)) else str(v) + for k, v in executor_data.items() + } + else: + executor_type = "none" + executor_info = {} + python_version_info = sys.version_info python_version_str = f"{python_version_info.major}.{python_version_info.minor}.{python_version_info.micro}" - return {"python_version": python_version_str, "execution_context": "local"} + return { + "executor_type": executor_type, + "executor_info": executor_info, + "python_version": python_version_str, + "extra_info": {}, + } + + def get_execution_data_schema(self) -> Schema: + """Schema for the data returned by ``get_execution_data``.""" + return Schema({ + "executor_type": str, + "executor_info": dict[str, str], + "python_version": str, + "extra_info": dict[str, str], + }) @property def input_packet_schema(self) -> Schema: @@ -736,9 +793,15 @@ def output_packet_schema(self) -> Schema: def get_function_variation_data(self) -> dict[str, Any]: return self._packet_function.get_function_variation_data() + def get_function_variation_data_schema(self) -> Schema: + return self._packet_function.get_function_variation_data_schema() + def get_execution_data(self) -> dict[str, Any]: return self._packet_function.get_execution_data() + def get_execution_data_schema(self) -> Schema: + return self._packet_function.get_execution_data_schema() + def to_config(self) -> dict[str, Any]: """Delegate serialization to the wrapped packet function.""" return self._packet_function.to_config() diff --git a/src/orcapod/core/packet_function_proxy.py b/src/orcapod/core/packet_function_proxy.py index cfa343c4..f63f5e09 100644 --- a/src/orcapod/core/packet_function_proxy.py +++ b/src/orcapod/core/packet_function_proxy.py @@ -178,12 +178,24 @@ def get_function_variation_data(self) -> dict[str, Any]: return self._bound_function.get_function_variation_data() return {} + def get_function_variation_data_schema(self) -> Schema: + """Return function variation data schema from bound function, or empty schema.""" + if self._bound_function is not None: + return self._bound_function.get_function_variation_data_schema() + return Schema({}) + def get_execution_data(self) -> dict[str, Any]: """Return execution data, or empty dict when unbound.""" if self._bound_function is not None: return self._bound_function.get_execution_data() return {} + def get_execution_data_schema(self) -> Schema: + """Return execution data schema from bound function, or empty schema.""" + if self._bound_function is not None: + return self._bound_function.get_execution_data_schema() + return Schema({}) + # ==================== Executor ==================== @property diff --git a/src/orcapod/core/result_cache.py b/src/orcapod/core/result_cache.py index 8afa8b8f..e767dadb 100644 --- a/src/orcapod/core/result_cache.py +++ b/src/orcapod/core/result_cache.py @@ -171,11 +171,14 @@ def store( i += 1 # Add execution data columns + import json + for k, v in execution_data.items(): + str_v = v if isinstance(v, str) else json.dumps(v) data_table = data_table.add_column( i, f"{constants.PF_EXECUTION_PREFIX}{k}", - pa.array([v], type=pa.large_string()), + pa.array([str_v], type=pa.large_string()), ) i += 1 diff --git a/tests/test_core/packet_function/test_executor.py b/tests/test_core/packet_function/test_executor.py index 3f79de40..182b224c 100644 --- a/tests/test_core/packet_function/test_executor.py +++ b/tests/test_core/packet_function/test_executor.py @@ -17,7 +17,7 @@ import pytest from orcapod.core.datagrams import Packet -from orcapod.core.executors import LocalExecutor, PythonFunctionExecutorBase +from orcapod.core.executors import LocalPythonFunctionExecutor, PythonFunctionExecutorBase from orcapod.core.packet_function import ( PacketFunctionWrapper, PythonPacketFunction, @@ -118,8 +118,8 @@ def spy_executor() -> SpyExecutor: @pytest.fixture -def local_executor() -> LocalExecutor: - return LocalExecutor() +def local_executor() -> LocalPythonFunctionExecutor: + return LocalPythonFunctionExecutor() # --------------------------------------------------------------------------- @@ -139,9 +139,9 @@ def test_supports_restricted_types(self): assert executor.supports("python.function.v0") assert not executor.supports("wasm.function.v0") - def test_get_execution_data_returns_type(self): + def test_get_executor_data_returns_type(self): executor = SpyExecutor() - data = executor.get_execution_data() + data = executor.get_executor_data() assert data["executor_type"] == "spy" def test_with_options_returns_new_instance(self): @@ -164,28 +164,28 @@ def test_with_options_preserves_state(self): class TestLocalExecutor: - def test_executor_type_id(self, local_executor: LocalExecutor): + def test_executor_type_id(self, local_executor: LocalPythonFunctionExecutor): assert local_executor.executor_type_id == "local" - def test_supports_all_types(self, local_executor: LocalExecutor): + def test_supports_all_types(self, local_executor: LocalPythonFunctionExecutor): assert local_executor.supports("python.function.v0") assert local_executor.supports("anything.v99") def test_execute_callable_runs_function( self, - local_executor: LocalExecutor, + local_executor: LocalPythonFunctionExecutor, ): result = local_executor.execute_callable(add, {"x": 1, "y": 2}) assert result == 3 - def test_get_execution_data(self, local_executor: LocalExecutor): - data = local_executor.get_execution_data() + def test_get_executor_data(self, local_executor: LocalPythonFunctionExecutor): + data = local_executor.get_executor_data() assert data["executor_type"] == "local" - def test_with_options_returns_new_instance(self, local_executor: LocalExecutor): + def test_with_options_returns_new_instance(self, local_executor: LocalPythonFunctionExecutor): new_executor = local_executor.with_options() assert new_executor is not local_executor - assert isinstance(new_executor, LocalExecutor) + assert isinstance(new_executor, LocalPythonFunctionExecutor) # --------------------------------------------------------------------------- @@ -194,8 +194,8 @@ def test_with_options_returns_new_instance(self, local_executor: LocalExecutor): class TestExecutorProperty: - def test_default_executor_is_none(self, add_pf: PythonPacketFunction): - assert add_pf.executor is None + def test_default_executor_is_local(self, add_pf: PythonPacketFunction): + assert isinstance(add_pf.executor, LocalPythonFunctionExecutor) def test_set_executor( self, add_pf: PythonPacketFunction, spy_executor: SpyExecutor @@ -345,7 +345,7 @@ class SimpleWrapper(PacketFunctionWrapper): class TestProtocolConformance: def test_local_executor_satisfies_protocol(self): - executor = LocalExecutor() + executor = LocalPythonFunctionExecutor() assert isinstance(executor, PacketFunctionExecutorProtocol) def test_spy_executor_satisfies_protocol(self): @@ -354,7 +354,7 @@ def test_spy_executor_satisfies_protocol(self): def test_packet_function_with_executor_satisfies_protocol(self): pf = PythonPacketFunction(add, output_keys="result") - pf.executor = LocalExecutor() + pf.executor = LocalPythonFunctionExecutor() assert isinstance(pf, PacketFunctionProtocol) @@ -539,14 +539,14 @@ def test_decorator_incompatible_executor_raises(self): def my_add(x: int, y: int) -> int: return x + y - def test_decorator_without_executor_defaults_to_none(self): + def test_decorator_without_executor_defaults_to_local(self): from orcapod.core.function_pod import function_pod @function_pod(output_keys="result") def my_add(x: int, y: int) -> int: return x + y - assert my_add.pod.executor is None + assert isinstance(my_add.pod.executor, LocalPythonFunctionExecutor) # --------------------------------------------------------------------------- @@ -729,7 +729,7 @@ def test_second_iteration_uses_cache(self): class TestPythonFunctionExecutorProtocol: def test_local_executor_satisfies_protocol(self): - executor = LocalExecutor() + executor = LocalPythonFunctionExecutor() assert isinstance(executor, PythonFunctionExecutorProtocol) def test_spy_executor_satisfies_protocol(self): @@ -737,12 +737,12 @@ def test_spy_executor_satisfies_protocol(self): assert isinstance(executor, PythonFunctionExecutorProtocol) def test_execute_callable_runs_function(self): - executor = LocalExecutor() + executor = LocalPythonFunctionExecutor() result = executor.execute_callable(add, {"x": 3, "y": 4}) assert result == 7 def test_execute_callable_with_executor_options(self): - executor = LocalExecutor() + executor = LocalPythonFunctionExecutor() result = executor.execute_callable( add, {"x": 1, "y": 2}, executor_options={"num_cpus": 1} ) @@ -782,7 +782,7 @@ def test_wrapper_does_not_resolve_protocol(self): def test_set_executor_accepts_compatible_protocol(self): pf = PythonPacketFunction(add, output_keys="result") - executor = LocalExecutor() + executor = LocalPythonFunctionExecutor() pf.set_executor(executor) assert pf.executor is executor @@ -846,7 +846,7 @@ def test_execute_callable_with_async_fn(self): async def async_add(x: int, y: int) -> int: return x + y - executor = LocalExecutor() + executor = LocalPythonFunctionExecutor() result = executor.execute_callable(async_add, {"x": 5, "y": 3}) assert result == 8 @@ -854,7 +854,7 @@ def test_async_execute_callable_with_sync_fn(self): """LocalExecutor.async_execute_callable handles sync fns via run_in_executor.""" import asyncio - executor = LocalExecutor() + executor = LocalPythonFunctionExecutor() result = asyncio.run(executor.async_execute_callable(add, {"x": 10, "y": 20})) assert result == 30 @@ -865,7 +865,7 @@ def test_async_execute_callable_with_async_fn(self): async def async_add(x: int, y: int) -> int: return x + y - executor = LocalExecutor() + executor = LocalPythonFunctionExecutor() result = asyncio.run( executor.async_execute_callable(async_add, {"x": 7, "y": 8}) ) diff --git a/tests/test_core/packet_function/test_packet_function.py b/tests/test_core/packet_function/test_packet_function.py index 636dd29b..c3ad99e3 100644 --- a/tests/test_core/packet_function/test_packet_function.py +++ b/tests/test_core/packet_function/test_packet_function.py @@ -382,16 +382,85 @@ def test_function_name_matches_canonical(self, add_pf): class TestGetExecutionData: def test_returns_expected_keys(self, add_pf): data = add_pf.get_execution_data() - assert "python_version" in data - assert "execution_context" in data + assert set(data.keys()) == { + "executor_type", + "executor_info", + "python_version", + "extra_info", + } def test_python_version_matches_runtime(self, add_pf): vi = sys.version_info expected = f"{vi.major}.{vi.minor}.{vi.micro}" assert add_pf.get_execution_data()["python_version"] == expected - def test_execution_context_is_local(self, add_pf): - assert add_pf.get_execution_data()["execution_context"] == "local" + def test_executor_type_is_string(self): + pf = PythonPacketFunction(add, output_keys="result") + data = pf.get_execution_data() + assert isinstance(data["executor_type"], str) + + def test_executor_info_is_dict_str_str(self): + pf = PythonPacketFunction(add, output_keys="result") + data = pf.get_execution_data() + assert isinstance(data["executor_info"], dict) + for k, v in data["executor_info"].items(): + assert isinstance(k, str) + assert isinstance(v, str) + + def test_extra_info_is_empty_dict(self): + pf = PythonPacketFunction(add, output_keys="result") + data = pf.get_execution_data() + assert data["extra_info"] == {} + + +class TestExecutionDataSchema: + def test_execution_data_has_expected_keys(self): + pf = PythonPacketFunction(add, output_keys="result") + data = pf.get_execution_data() + assert set(data.keys()) == { + "executor_type", + "executor_info", + "python_version", + "extra_info", + } + + def test_executor_type_is_string(self): + pf = PythonPacketFunction(add, output_keys="result") + data = pf.get_execution_data() + assert isinstance(data["executor_type"], str) + + def test_executor_info_is_dict_str_str(self): + pf = PythonPacketFunction(add, output_keys="result") + data = pf.get_execution_data() + assert isinstance(data["executor_info"], dict) + for k, v in data["executor_info"].items(): + assert isinstance(k, str) + assert isinstance(v, str) + + def test_python_version_matches_sys(self): + pf = PythonPacketFunction(add, output_keys="result") + data = pf.get_execution_data() + expected = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + assert data["python_version"] == expected + + def test_extra_info_is_empty_dict(self): + pf = PythonPacketFunction(add, output_keys="result") + data = pf.get_execution_data() + assert data["extra_info"] == {} + + def test_execution_data_schema_matches_data_keys(self): + pf = PythonPacketFunction(add, output_keys="result") + data = pf.get_execution_data() + schema = pf.get_execution_data_schema() + assert set(schema.keys()) == set(data.keys()) + + def test_execution_data_schema_types(self): + pf = PythonPacketFunction(add, output_keys="result") + schema = pf.get_execution_data_schema() + assert schema["executor_type"] is str + assert schema["executor_info"] == dict[str, str] + assert schema["python_version"] is str + assert schema["extra_info"] == dict[str, str] # --------------------------------------------------------------------------- diff --git a/tests/test_core/test_regression_fixes.py b/tests/test_core/test_regression_fixes.py index ecf364a8..f5890ccf 100644 --- a/tests/test_core/test_regression_fixes.py +++ b/tests/test_core/test_regression_fixes.py @@ -22,7 +22,7 @@ from orcapod.channels import Channel from orcapod.core.datagrams import Packet -from orcapod.core.executors import LocalExecutor, PythonFunctionExecutorBase +from orcapod.core.executors import LocalPythonFunctionExecutor, PythonFunctionExecutorBase from orcapod.core.function_pod import FunctionPod, FunctionPodStream from orcapod.core.operators import SelectPacketColumns from orcapod.core.operators.static_output_pod import StaticOutputOperatorPod @@ -242,7 +242,7 @@ def double(x: int) -> int: pf = PythonPacketFunction(double, output_keys="result") # Attach an executor that reports concurrent support - executor = LocalExecutor() + executor = LocalPythonFunctionExecutor() pf.executor = executor pod = FunctionPod(pf) @@ -545,21 +545,21 @@ def test_with_options_allows_overriding_runtime_env(self): assert new_exec._runtime_env == {"pip": ["numpy"]} - def test_get_execution_data_without_runtime_env(self): - """get_execution_data() omits runtime_env when not set.""" + def test_get_executor_data_without_runtime_env(self): + """get_executor_data() reports runtime_env as False when not set.""" mock_ray = MagicMock() with patch.dict("sys.modules", {"ray": mock_ray}): from orcapod.core.executors.ray import RayExecutor executor = RayExecutor(ray_address="ray://host:10001") - data = executor.get_execution_data() + data = executor.get_executor_data() - assert "runtime_env" not in data + assert data["runtime_env"] is False assert data["ray_address"] == "ray://host:10001" - def test_get_execution_data_with_runtime_env(self): - """get_execution_data() flags runtime_env presence as True.""" + def test_get_executor_data_with_runtime_env(self): + """get_executor_data() flags runtime_env presence as True.""" mock_ray = MagicMock() with patch.dict("sys.modules", {"ray": mock_ray}): @@ -568,7 +568,7 @@ def test_get_execution_data_with_runtime_env(self): executor = RayExecutor( runtime_env={"py_modules": ["my_mod"]}, ) - data = executor.get_execution_data() + data = executor.get_executor_data() assert data["runtime_env"] is True assert data["ray_address"] == "auto" From 7ee125f72fd3df1cbcc11b9af1b5dbe04e986bb3 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Tue, 7 Apr 2026 22:15:16 +0000 Subject: [PATCH 05/14] feat(ENG-374): datagram-based ResultCache.store() and CachedPacketFunction helper Replace dict-based variation_data/execution_data parameters in ResultCache.store() with DatagramProtocol objects. The store method now calls .as_table() on each datagram and prefixes column names, preserving native Arrow types (including dict columns like executor_info) instead of stringifying everything to large_string. Add _build_metadata_datagrams() helper to CachedPacketFunction and update all three callers (call, async_call, record_packet). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/packet_function.py | 37 ++++++++------ src/orcapod/core/result_cache.py | 49 ++++++++++--------- tests/test_core/test_result_cache.py | 72 ++++++++++++++++++++++++---- 3 files changed, 108 insertions(+), 50 deletions(-) diff --git a/src/orcapod/core/packet_function.py b/src/orcapod/core/packet_function.py index b481515a..7e2b40bb 100644 --- a/src/orcapod/core/packet_function.py +++ b/src/orcapod/core/packet_function.py @@ -14,7 +14,7 @@ from orcapod.config import Config from orcapod.contexts import DataContext from orcapod.core.base import TraceableBase -from orcapod.core.datagrams import Packet +from orcapod.core.datagrams import Datagram, Packet from orcapod.hashing.hash_utils import ( get_function_components, get_function_signature, @@ -907,6 +907,20 @@ def __init__( auto_flush=True, ) + def _build_metadata_datagrams(self) -> tuple[Datagram, Datagram]: + """Build variation and execution Datagrams for cache storage.""" + variation_datagram = Datagram( + self.get_function_variation_data(), + python_schema=self.get_function_variation_data_schema(), + data_context=self.data_context, + ) + execution_datagram = Datagram( + self.get_execution_data(), + python_schema=self.get_execution_data_schema(), + data_context=self.data_context, + ) + return variation_datagram, execution_datagram + def set_auto_flush(self, on: bool = True) -> None: """Set auto-flush behavior. If True, the database flushes after each record.""" self._cache.set_auto_flush(on) @@ -934,12 +948,8 @@ def call( output_packet = self._packet_function.call(packet, logger=logger) if output_packet is not None: if not skip_cache_insert: - self._cache.store( - packet, - output_packet, - variation_data=self.get_function_variation_data(), - execution_data=self.get_execution_data(), - ) + var_dg, exec_dg = self._build_metadata_datagrams() + self._cache.store(packet, output_packet, var_dg, exec_dg) output_packet = output_packet.with_meta_columns( **{self.RESULT_COMPUTED_FLAG: True} ) @@ -964,12 +974,8 @@ async def async_call( output_packet = await self._packet_function.async_call(packet, logger=logger) if output_packet is not None: if not skip_cache_insert: - self._cache.store( - packet, - output_packet, - variation_data=self.get_function_variation_data(), - execution_data=self.get_execution_data(), - ) + var_dg, exec_dg = self._build_metadata_datagrams() + self._cache.store(packet, output_packet, var_dg, exec_dg) output_packet = output_packet.with_meta_columns( **{self.RESULT_COMPUTED_FLAG: True} ) @@ -994,11 +1000,12 @@ def record_packet( skip_duplicates: bool = False, ) -> PacketProtocol: """Record the output packet against the input packet in the result store.""" + var_dg, exec_dg = self._build_metadata_datagrams() self._cache.store( input_packet, output_packet, - variation_data=self.get_function_variation_data(), - execution_data=self.get_execution_data(), + var_dg, + exec_dg, skip_duplicates=skip_duplicates, ) return output_packet diff --git a/src/orcapod/core/result_cache.py b/src/orcapod/core/result_cache.py index e767dadb..e2de128e 100644 --- a/src/orcapod/core/result_cache.py +++ b/src/orcapod/core/result_cache.py @@ -9,7 +9,7 @@ import logging from datetime import datetime, timezone -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING from orcapod.protocols.core_protocols import PacketProtocol from orcapod.protocols.database_protocols import ArrowDatabaseProtocol @@ -18,6 +18,8 @@ if TYPE_CHECKING: import pyarrow as pa + + from orcapod.protocols.core_protocols.datagrams import DatagramProtocol else: pa = LazyModule("pyarrow") @@ -139,48 +141,45 @@ def store( self, input_packet: PacketProtocol, output_packet: PacketProtocol, - variation_data: dict[str, Any], - execution_data: dict[str, Any], + variation_datagram: "DatagramProtocol", + execution_datagram: "DatagramProtocol", skip_duplicates: bool = False, ) -> None: """Store an output packet in the cache. - Stores the output packet data alongside function variation data, - execution data, input packet hash, and a timestamp. + Stores the output packet data alongside function variation and + execution metadata (as Datagrams), input packet hash, and a timestamp. Args: input_packet: The input packet (used for its content hash). output_packet: The computed output packet to store. - variation_data: Function variation metadata (e.g. function name, - signature hash, content hash, git hash). - execution_data: Execution environment metadata (e.g. python - version, execution context). + variation_datagram: Function variation metadata as a Datagram. + execution_datagram: Execution environment metadata as a Datagram. skip_duplicates: If True, silently skip if a record with the same ID already exists. """ data_table = output_packet.as_table(columns={"source": True, "context": True}) - # Add function variation data columns - i = 0 - for k, v in variation_data.items(): + # Add variation and execution columns with prefixes. + # Use a running counter for insertion position since add_column shifts indices. + col_idx = 0 + var_table = variation_datagram.as_table() + for name in var_table.column_names: data_table = data_table.add_column( - i, - f"{constants.PF_VARIATION_PREFIX}{k}", - pa.array([v], type=pa.large_string()), + col_idx, + f"{constants.PF_VARIATION_PREFIX}{name}", + var_table.column(name), ) - i += 1 - - # Add execution data columns - import json + col_idx += 1 - for k, v in execution_data.items(): - str_v = v if isinstance(v, str) else json.dumps(v) + exec_table = execution_datagram.as_table() + for name in exec_table.column_names: data_table = data_table.add_column( - i, - f"{constants.PF_EXECUTION_PREFIX}{k}", - pa.array([str_v], type=pa.large_string()), + col_idx, + f"{constants.PF_EXECUTION_PREFIX}{name}", + exec_table.column(name), ) - i += 1 + col_idx += 1 # Add input packet hash (position 0) data_table = data_table.add_column( diff --git a/tests/test_core/test_result_cache.py b/tests/test_core/test_result_cache.py index cda78270..f465bf8b 100644 --- a/tests/test_core/test_result_cache.py +++ b/tests/test_core/test_result_cache.py @@ -15,7 +15,7 @@ import pyarrow as pa import pytest -from orcapod.core.datagrams import Packet +from orcapod.core.datagrams import Datagram, Packet from orcapod.core.packet_function import PythonPacketFunction from orcapod.core.result_cache import ResultCache from orcapod.databases import InMemoryArrowDatabase @@ -50,11 +50,21 @@ def _compute_and_store( """Helper: compute output and store in cache.""" output = pf.direct_call(input_packet) assert output is not None + variation_datagram = Datagram( + pf.get_function_variation_data(), + python_schema=pf.get_function_variation_data_schema(), + data_context=pf.data_context, + ) + execution_datagram = Datagram( + pf.get_execution_data(), + python_schema=pf.get_execution_data_schema(), + data_context=pf.data_context, + ) cache.store( input_packet, output, - variation_data=pf.get_function_variation_data(), - execution_data=pf.get_execution_data(), + variation_datagram=variation_datagram, + execution_datagram=execution_datagram, ) return output @@ -106,12 +116,17 @@ def test_same_packet_different_record_path_is_miss(self): input_pkt = Packet({"x": 10}) output = pf.direct_call(input_pkt) - cache_a.store( - input_pkt, - output, - variation_data=pf.get_function_variation_data(), - execution_data=pf.get_execution_data(), + variation_datagram = Datagram( + pf.get_function_variation_data(), + python_schema=pf.get_function_variation_data_schema(), + data_context=pf.data_context, + ) + execution_datagram = Datagram( + pf.get_execution_data(), + python_schema=pf.get_execution_data_schema(), + data_context=pf.data_context, ) + cache_a.store(input_pkt, output, variation_datagram, execution_datagram) assert cache_a.lookup(input_pkt) is not None assert cache_b.lookup(input_pkt) is None @@ -131,11 +146,21 @@ def test_most_recent_wins(self): # Store a second result for the same input (simulating recomputation) output2 = pf.direct_call(input_pkt) + variation_datagram = Datagram( + pf.get_function_variation_data(), + python_schema=pf.get_function_variation_data_schema(), + data_context=pf.data_context, + ) + execution_datagram = Datagram( + pf.get_execution_data(), + python_schema=pf.get_execution_data_schema(), + data_context=pf.data_context, + ) cache.store( input_pkt, output2, - variation_data=pf.get_function_variation_data(), - execution_data=pf.get_execution_data(), + variation_datagram, + execution_datagram, ) # Lookup should return the most recent @@ -304,3 +329,30 @@ def test_exclude_system_columns_by_default(self): records = cache.get_all_records(include_system_columns=False) assert records is not None assert constants.PACKET_RECORD_ID not in records.column_names + + +# --------------------------------------------------------------------------- +# Dict-typed columns (executor_info, extra_info) +# --------------------------------------------------------------------------- + + +class TestStoreDictColumns: + def test_executor_info_column_stored(self): + cache, db = _make_cache() + pf = _make_pf() + _compute_and_store(cache, pf, Packet({"x": 10})) + + records = db.get_all_records(cache.record_path) + assert records is not None + exec_info_col = f"{constants.PF_EXECUTION_PREFIX}executor_info" + assert exec_info_col in records.column_names + + def test_extra_info_column_stored(self): + cache, db = _make_cache() + pf = _make_pf() + _compute_and_store(cache, pf, Packet({"x": 10})) + + records = db.get_all_records(cache.record_path) + assert records is not None + extra_info_col = f"{constants.PF_EXECUTION_PREFIX}extra_info" + assert extra_info_col in records.column_names From 23e2ff3e0fcf14564a66ec4df4f2639bda458e21 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Tue, 7 Apr 2026 22:27:05 +0000 Subject: [PATCH 06/14] feat(ENG-374): update CachedFunctionPod to use Datagrams for store() Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/cached_function_pod.py | 31 +++++++++++++++++-------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/src/orcapod/core/cached_function_pod.py b/src/orcapod/core/cached_function_pod.py index 7533f203..c5728361 100644 --- a/src/orcapod/core/cached_function_pod.py +++ b/src/orcapod/core/cached_function_pod.py @@ -5,6 +5,7 @@ import logging from typing import TYPE_CHECKING, Any +from orcapod.core.datagrams import Datagram from orcapod.core.function_pod import WrappedFunctionPod from orcapod.core.result_cache import ResultCache from orcapod.protocols.core_protocols import ( @@ -95,12 +96,17 @@ def process_packet( tag, output = self._function_pod.process_packet(tag, packet, logger=logger) if output is not None: pf = self._function_pod.packet_function - self._cache.store( - packet, - output, - variation_data=pf.get_function_variation_data(), - execution_data=pf.get_execution_data(), + var_dg = Datagram( + pf.get_function_variation_data(), + python_schema=pf.get_function_variation_data_schema(), + data_context=pf.data_context, ) + exec_dg = Datagram( + pf.get_execution_data(), + python_schema=pf.get_execution_data_schema(), + data_context=pf.data_context, + ) + self._cache.store(packet, output, var_dg, exec_dg) output = output.with_meta_columns(**{self.RESULT_COMPUTED_FLAG: True}) return tag, output @@ -128,12 +134,17 @@ async def async_process_packet( ) if output is not None: pf = self._function_pod.packet_function - self._cache.store( - packet, - output, - variation_data=pf.get_function_variation_data(), - execution_data=pf.get_execution_data(), + var_dg = Datagram( + pf.get_function_variation_data(), + python_schema=pf.get_function_variation_data_schema(), + data_context=pf.data_context, + ) + exec_dg = Datagram( + pf.get_execution_data(), + python_schema=pf.get_execution_data_schema(), + data_context=pf.data_context, ) + self._cache.store(packet, output, var_dg, exec_dg) output = output.with_meta_columns(**{self.RESULT_COMPUTED_FLAG: True}) return tag, output From f1b97ceabf9560faacbbf4f7a9fa3dfdafcaa40a Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Tue, 7 Apr 2026 22:44:22 +0000 Subject: [PATCH 07/14] fix(ENG-374): fix RayExecutor schema import and immutable Schema mutation Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/executors/ray.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/orcapod/core/executors/ray.py b/src/orcapod/core/executors/ray.py index 98da9e50..54fd9b4e 100644 --- a/src/orcapod/core/executors/ray.py +++ b/src/orcapod/core/executors/ray.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Any from orcapod.core.executors.base import PythonFunctionExecutorBase +from orcapod.types import Schema from orcapod.core.executors.capture_wrapper import make_capture_wrapper if TYPE_CHECKING: @@ -333,12 +334,19 @@ def with_options(self, **opts: Any) -> RayExecutor: **merged, ) - def get_execution_data(self) -> dict[str, Any]: - data: dict[str, Any] = { - "executor_type": self.executor_type_id, + def get_executor_data(self) -> dict[str, Any]: + executor_data = super().get_executor_data() + executor_data.update({ "ray_address": self._ray_address or "auto", - **self._remote_opts, - } - if self._runtime_env is not None: - data["runtime_env"] = True # flag presence without dumping contents - return data + "remote_opts": self._remote_opts, + "runtime_env": self._runtime_env is not None, + }) + return executor_data + + def get_executor_data_schema(self) -> Schema: + return Schema({ + "executor_type": str, + "ray_address": str, + "remote_opts": dict[str, str], + "runtime_env": bool, + }) From e722b7e35875ef9f4371593c2a6be7e885fb6a9d Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Tue, 7 Apr 2026 23:39:03 +0000 Subject: [PATCH 08/14] fix(ENG-374): handle dict[str, str] Arrow map round-trip in meta columns Arrow stores dict[str, str] as list[struct[key, value]]. When read back via to_pylist(), the Python value is a list of dicts, not a dict. This broke schema inference and struct conversion in several paths: - Datagram._init_from_table: use arrow_schema_to_python_schema to preserve precise types; merge with inference for Any-typed fields - Datagram.with_meta_columns: preserve existing schema types for unchanged fields; keep materialized meta table - FunctionNode.as_table: derive Python schema from Arrow schema, excluding dict and Any-typed fields from struct conversion Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/datagrams/datagram.py | 57 +++++++++++++++++++++---- src/orcapod/core/nodes/function_node.py | 40 ++++++++++++++++- 2 files changed, 88 insertions(+), 9 deletions(-) diff --git a/src/orcapod/core/datagrams/datagram.py b/src/orcapod/core/datagrams/datagram.py index a82ee408..38925905 100644 --- a/src/orcapod/core/datagrams/datagram.py +++ b/src/orcapod/core/datagrams/datagram.py @@ -20,6 +20,7 @@ from __future__ import annotations import logging +import types from collections.abc import Collection, Iterator, Mapping from typing import TYPE_CHECKING, Any, Self, cast @@ -185,19 +186,40 @@ def _init_from_table( meta_table.select(keep), new_meta ) - # Store meta as dict (always); Arrow table is lazy. - # Derive schema via infer_python_schema_from_pylist_data (same as DictDatagram) - # to avoid typing.Any values that arrow_schema_to_python_schema may emit. + # Store meta as dict (always); keep the Arrow table when available. + # Use arrow_schema_to_python_schema to preserve precise generic types + # (e.g. dict[str, str] stored as map columns) that + # infer_python_schema_from_pylist_data cannot recover. For fields where + # the Arrow→Python conversion produces Any (e.g. timestamps), fall back + # to the inference result which gives concrete types like datetime. if meta_table is not None and meta_table.num_columns > 0: self._meta = meta_table.to_pylist()[0] - self._meta_python_schema = infer_python_schema_from_pylist_data( + arrow_derived = self.converter.arrow_schema_to_python_schema( + meta_table.schema + ) + inferred = infer_python_schema_from_pylist_data( [self._meta], default_type=str ) + # Merge: prefer arrow_derived (precise generics), but use inferred + # for fields that arrow_derived maps to Any. + import typing + merged = {} + for k in arrow_derived: + atype = arrow_derived[k] + # Unwrap Optional to check the inner type + origin = typing.get_origin(atype) + args = typing.get_args(atype) + inner = args[0] if origin is types.UnionType and type(None) in args and len(args) == 2 else atype + if inner is typing.Any: + merged[k] = inferred.get(k, atype) + else: + merged[k] = atype + self._meta_python_schema = Schema(merged) + self._meta_table = meta_table else: self._meta = {} self._meta_python_schema = Schema.empty() - - self._meta_table = None # built lazily + self._meta_table = None self._context_table = None # ------------------------------------------------------------------ @@ -530,9 +552,28 @@ def with_meta_columns(self, **meta_updates: DataValue) -> Self: } new_d = self.copy(include_cache=False) new_d._meta = {**self._meta, **prefixed} - new_d._meta_python_schema = infer_python_schema_from_pylist_data( - [new_d._meta], default_type=str + # Preserve existing schema types for unchanged fields (avoids losing + # precise generic types like dict[str, str] that inference cannot + # recover). Only infer types for newly added fields. + new_inferred = infer_python_schema_from_pylist_data( + [prefixed], default_type=str ) + merged = dict(self._meta_python_schema) + merged.update(new_inferred) + new_d._meta_python_schema = Schema(merged) + # If the existing meta table is materialized, append new columns to it + # rather than invalidating (avoids re-conversion of complex Arrow types + # like maps that don't round-trip cleanly through Python dicts). + if self._meta_table is not None: + new_cols_table = self.converter.python_dicts_to_arrow_table( + [prefixed], python_schema=Schema(new_inferred) + ) + existing = self._meta_table + # Drop columns being overwritten + keep = [c for c in existing.column_names if c not in new_cols_table.column_names] + new_d._meta_table = arrow_utils.hstack_tables( + existing.select(keep), new_cols_table + ) return new_d def drop_meta_columns(self, *keys: str, ignore_missing: bool = False) -> Self: diff --git a/src/orcapod/core/nodes/function_node.py b/src/orcapod/core/nodes/function_node.py index 650b1a79..2221f6cc 100644 --- a/src/orcapod/core/nodes/function_node.py +++ b/src/orcapod/core/nodes/function_node.py @@ -4,6 +4,7 @@ import asyncio import logging +import types from collections.abc import Iterator from typing import TYPE_CHECKING, Any, Literal, cast @@ -1206,12 +1207,49 @@ def as_table( converter = self.data_context.type_converter - struct_packets = converter.python_dicts_to_struct_dicts(all_packets) all_tags_as_tables: pa.Table = pa.Table.from_pylist( all_tags, schema=tag_schema ) if constants.CONTEXT_KEY in all_tags_as_tables.column_names: all_tags_as_tables = all_tags_as_tables.drop([constants.CONTEXT_KEY]) + + # Convert packet dicts using the converter's struct conversion. + # When a precise Arrow schema is available, derive the Python schema + # from it to preserve generic types (e.g. dict[str, str] stored as + # Arrow maps) that inference cannot recover from round-tripped data. + # Exclude fields that don't round-trip cleanly: + # - Any-typed fields (e.g. timestamps) that the converter can't handle + # - dict-typed fields stored as Arrow maps (round-trip as list[struct]) + # These are handled directly by pa.Table.from_pylist via the schema. + if packet_schema is not None: + import typing as _typing + packet_python_schema = converter.arrow_schema_to_python_schema( + packet_schema + ) + + def _is_convertible(python_type: type) -> bool: + if python_type is _typing.Any: + return False + origin = _typing.get_origin(python_type) + args = _typing.get_args(python_type) + if origin is types.UnionType and type(None) in args: + non_none = [a for a in args if a is not type(None)] + if len(non_none) == 1: + return _is_convertible(non_none[0]) + if origin is dict: + return False + return True + + convertible_schema = Schema({ + k: v for k, v in packet_python_schema.items() + if _is_convertible(v) + }) + else: + convertible_schema = None + + struct_packets = converter.python_dicts_to_struct_dicts( + all_packets, python_schema=convertible_schema + ) all_packets_as_tables: pa.Table = pa.Table.from_pylist( struct_packets, schema=packet_schema ) From 2fcda3c12d678d21a9ceda6155413f28741fdc07 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Wed, 8 Apr 2026 00:14:03 +0000 Subject: [PATCH 09/14] =?UTF-8?q?fix(ENG-374):=20proper=20Arrow=E2=86=94Py?= =?UTF-8?q?thon=20round-trip=20for=20complex=20meta=20columns?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three related fixes for dict[str, str] columns stored as Arrow maps: 1. Datagram._init_from_table: use universal converter (not raw to_pylist()) for Arrow→Python conversion so map columns become proper Python dicts. Derive meta schema from Arrow schema (not inference) to preserve precise types for empty containers. 2. Datagram.with_meta_columns/drop_meta_columns: preserve existing schema types for unchanged fields instead of re-inferring. 3. ResultCache.store: store POD_TIMESTAMP as ISO 8601 string instead of native pa.timestamp, since the universal converter does not yet support timestamp round-trip. (TODO comment marks this for revert.) 4. FunctionNode.as_table: derive Python schema from Arrow schema when available, avoiding re-inference of empty dicts as dict[Any, Any]. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/datagrams/datagram.py | 67 ++++++++----------------- src/orcapod/core/nodes/function_node.py | 51 +++++-------------- src/orcapod/core/result_cache.py | 10 ++-- 3 files changed, 41 insertions(+), 87 deletions(-) diff --git a/src/orcapod/core/datagrams/datagram.py b/src/orcapod/core/datagrams/datagram.py index 38925905..c12eebe6 100644 --- a/src/orcapod/core/datagrams/datagram.py +++ b/src/orcapod/core/datagrams/datagram.py @@ -20,7 +20,6 @@ from __future__ import annotations import logging -import types from collections.abc import Collection, Iterator, Mapping from typing import TYPE_CHECKING, Any, Self, cast @@ -187,34 +186,22 @@ def _init_from_table( ) # Store meta as dict (always); keep the Arrow table when available. - # Use arrow_schema_to_python_schema to preserve precise generic types - # (e.g. dict[str, str] stored as map columns) that - # infer_python_schema_from_pylist_data cannot recover. For fields where - # the Arrow→Python conversion produces Any (e.g. timestamps), fall back - # to the inference result which gives concrete types like datetime. + # IMPORTANT: use the universal converter (not raw to_pylist()) to + # convert Arrow → Python so that complex types round-trip correctly. + # For example, Arrow map columns (used for dict[str, str]) become + # list[struct] via to_pylist() but proper Python dicts via the + # converter. Without the converter, downstream schema inference + # produces list[Any] which cannot be converted back to Arrow. + # + # Derive the Python schema from the Arrow schema (not from inference) + # to preserve precise types even for empty containers (e.g. an empty + # dict {} would infer as dict[Any, Any] but the Arrow schema knows + # it's dict[str, str]). if meta_table is not None and meta_table.num_columns > 0: - self._meta = meta_table.to_pylist()[0] - arrow_derived = self.converter.arrow_schema_to_python_schema( + self._meta = self.converter.arrow_table_to_python_dicts(meta_table)[0] + self._meta_python_schema = self.converter.arrow_schema_to_python_schema( meta_table.schema ) - inferred = infer_python_schema_from_pylist_data( - [self._meta], default_type=str - ) - # Merge: prefer arrow_derived (precise generics), but use inferred - # for fields that arrow_derived maps to Any. - import typing - merged = {} - for k in arrow_derived: - atype = arrow_derived[k] - # Unwrap Optional to check the inner type - origin = typing.get_origin(atype) - args = typing.get_args(atype) - inner = args[0] if origin is types.UnionType and type(None) in args and len(args) == 2 else atype - if inner is typing.Any: - merged[k] = inferred.get(k, atype) - else: - merged[k] = atype - self._meta_python_schema = Schema(merged) self._meta_table = meta_table else: self._meta = {} @@ -552,28 +539,16 @@ def with_meta_columns(self, **meta_updates: DataValue) -> Self: } new_d = self.copy(include_cache=False) new_d._meta = {**self._meta, **prefixed} - # Preserve existing schema types for unchanged fields (avoids losing - # precise generic types like dict[str, str] that inference cannot - # recover). Only infer types for newly added fields. + # Preserve existing schema types for unchanged fields. This avoids + # re-inference which can lose precise types (e.g. an empty dict {} + # infers as dict[Any, Any] instead of dict[str, str], and + # arrow-derived types like timestamps may use Any). new_inferred = infer_python_schema_from_pylist_data( [prefixed], default_type=str ) merged = dict(self._meta_python_schema) merged.update(new_inferred) new_d._meta_python_schema = Schema(merged) - # If the existing meta table is materialized, append new columns to it - # rather than invalidating (avoids re-conversion of complex Arrow types - # like maps that don't round-trip cleanly through Python dicts). - if self._meta_table is not None: - new_cols_table = self.converter.python_dicts_to_arrow_table( - [prefixed], python_schema=Schema(new_inferred) - ) - existing = self._meta_table - # Drop columns being overwritten - keep = [c for c in existing.column_names if c not in new_cols_table.column_names] - new_d._meta_table = arrow_utils.hstack_tables( - existing.select(keep), new_cols_table - ) return new_d def drop_meta_columns(self, *keys: str, ignore_missing: bool = False) -> Self: @@ -588,9 +563,11 @@ def drop_meta_columns(self, *keys: str, ignore_missing: bool = False) -> Self: ) new_d = self.copy(include_cache=False) new_d._meta = {k: v for k, v in self._meta.items() if k not in prefixed} - new_d._meta_python_schema = infer_python_schema_from_pylist_data( - [new_d._meta], default_type=str - ) + # Preserve existing schema types for remaining fields (see + # with_meta_columns for rationale). + new_d._meta_python_schema = Schema({ + k: v for k, v in self._meta_python_schema.items() if k not in prefixed + }) return new_d # ------------------------------------------------------------------ diff --git a/src/orcapod/core/nodes/function_node.py b/src/orcapod/core/nodes/function_node.py index 2221f6cc..aba06fb3 100644 --- a/src/orcapod/core/nodes/function_node.py +++ b/src/orcapod/core/nodes/function_node.py @@ -4,7 +4,6 @@ import asyncio import logging -import types from collections.abc import Iterator from typing import TYPE_CHECKING, Any, Literal, cast @@ -1207,49 +1206,23 @@ def as_table( converter = self.data_context.type_converter + # Derive the Python schema from the Arrow schema when available, + # rather than re-inferring from dict values. This preserves precise + # types for empty containers (e.g. {} infers as dict[Any, Any] but + # the Arrow schema knows it's dict[str, str]). + packet_python_schema = ( + converter.arrow_schema_to_python_schema(packet_schema) + if packet_schema is not None + else None + ) + struct_packets = converter.python_dicts_to_struct_dicts( + all_packets, python_schema=packet_python_schema + ) all_tags_as_tables: pa.Table = pa.Table.from_pylist( all_tags, schema=tag_schema ) if constants.CONTEXT_KEY in all_tags_as_tables.column_names: all_tags_as_tables = all_tags_as_tables.drop([constants.CONTEXT_KEY]) - - # Convert packet dicts using the converter's struct conversion. - # When a precise Arrow schema is available, derive the Python schema - # from it to preserve generic types (e.g. dict[str, str] stored as - # Arrow maps) that inference cannot recover from round-tripped data. - # Exclude fields that don't round-trip cleanly: - # - Any-typed fields (e.g. timestamps) that the converter can't handle - # - dict-typed fields stored as Arrow maps (round-trip as list[struct]) - # These are handled directly by pa.Table.from_pylist via the schema. - if packet_schema is not None: - import typing as _typing - packet_python_schema = converter.arrow_schema_to_python_schema( - packet_schema - ) - - def _is_convertible(python_type: type) -> bool: - if python_type is _typing.Any: - return False - origin = _typing.get_origin(python_type) - args = _typing.get_args(python_type) - if origin is types.UnionType and type(None) in args: - non_none = [a for a in args if a is not type(None)] - if len(non_none) == 1: - return _is_convertible(non_none[0]) - if origin is dict: - return False - return True - - convertible_schema = Schema({ - k: v for k, v in packet_python_schema.items() - if _is_convertible(v) - }) - else: - convertible_schema = None - - struct_packets = converter.python_dicts_to_struct_dicts( - all_packets, python_schema=convertible_schema - ) all_packets_as_tables: pa.Table = pa.Table.from_pylist( struct_packets, schema=packet_schema ) diff --git a/src/orcapod/core/result_cache.py b/src/orcapod/core/result_cache.py index e2de128e..f5ba4632 100644 --- a/src/orcapod/core/result_cache.py +++ b/src/orcapod/core/result_cache.py @@ -188,11 +188,15 @@ def store( pa.array([input_packet.content_hash().to_string()], type=pa.large_string()), ) - # Append timestamp - timestamp = datetime.now(timezone.utc) + # Append timestamp as ISO 8601 string. + # TODO: switch to pa.timestamp("us", tz="UTC") once the universal + # converter supports native Arrow timestamp ↔ Python datetime round-trip. + # ISO 8601 strings sort lexicographically in time order, so the + # conflict-resolution sort in lookup() still works correctly. + timestamp = datetime.now(timezone.utc).isoformat() data_table = data_table.append_column( constants.POD_TIMESTAMP, - pa.array([timestamp], type=pa.timestamp("us", tz="UTC")), + pa.array([timestamp], type=pa.large_string()), ) self._result_database.add_record( From f45bfe91425649909d60705f0ddf44f6d16606b9 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Wed, 8 Apr 2026 00:17:17 +0000 Subject: [PATCH 10/14] fix(ENG-374): align RayExecutor schema with actual get_executor_data() return types remote_opts is dict[str, Any] (values can be ints, nested dicts), not dict[str, str]. The PacketFunction layer handles stringification. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/executors/ray.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/orcapod/core/executors/ray.py b/src/orcapod/core/executors/ray.py index 54fd9b4e..c71c2e7e 100644 --- a/src/orcapod/core/executors/ray.py +++ b/src/orcapod/core/executors/ray.py @@ -344,9 +344,16 @@ def get_executor_data(self) -> dict[str, Any]: return executor_data def get_executor_data_schema(self) -> Schema: + """Schema reflecting actual return types of ``get_executor_data``. + + Note: ``remote_opts`` values are ``Any`` (ints, nested dicts, etc.) + but the PacketFunction layer stringifies them into ``dict[str, str]`` + before storage. The schema here describes the pre-stringification + executor output. + """ return Schema({ "executor_type": str, "ray_address": str, - "remote_opts": dict[str, str], + "remote_opts": dict[str, Any], "runtime_env": bool, }) From 082157cdb502e0d8bac1c9668d06a1be2c92eb94 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Wed, 8 Apr 2026 00:17:56 +0000 Subject: [PATCH 11/14] refactor(ENG-374): rename LocalExecutor, update executor/protocol contracts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename LocalExecutor → LocalPythonFunctionExecutor for clarity - Rename get_execution_data() → get_executor_data() on executor protocol/base - Add get_executor_data_schema() to executor protocol and base - Add get_function_variation_data_schema() and get_execution_data_schema() to PacketFunctionProtocol - Remove unused ExecutionEngineProtocol - Update all test references Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/executors/__init__.py | 4 +-- src/orcapod/core/executors/base.py | 10 ++++++- src/orcapod/core/executors/local.py | 8 ++++-- .../core_protocols/execution_engine.py | 25 ----------------- .../protocols/core_protocols/executor.py | 8 +++++- .../core_protocols/packet_function.py | 8 ++++++ .../nodes/test_function_node_iteration.py | 4 +-- .../test_pipeline/test_composite_observer.py | 12 ++++---- .../test_logging_observer_integration.py | 24 ++++++++-------- .../test_status_observer_integration.py | 28 +++++++++---------- 10 files changed, 65 insertions(+), 66 deletions(-) delete mode 100644 src/orcapod/protocols/core_protocols/execution_engine.py diff --git a/src/orcapod/core/executors/__init__.py b/src/orcapod/core/executors/__init__.py index 2d0cd3cb..81379878 100644 --- a/src/orcapod/core/executors/__init__.py +++ b/src/orcapod/core/executors/__init__.py @@ -1,7 +1,7 @@ from orcapod.core.executors.base import PythonFunctionExecutorBase -from orcapod.core.executors.local import LocalExecutor +from orcapod.core.executors.local import LocalPythonFunctionExecutor __all__ = [ "PythonFunctionExecutorBase", - "LocalExecutor", + "LocalPythonFunctionExecutor", ] diff --git a/src/orcapod/core/executors/base.py b/src/orcapod/core/executors/base.py index 12d816f8..7cfee3f0 100644 --- a/src/orcapod/core/executors/base.py +++ b/src/orcapod/core/executors/base.py @@ -5,6 +5,8 @@ from collections.abc import Callable from typing import TYPE_CHECKING, Any +from orcapod.types import Schema + if TYPE_CHECKING: from orcapod.protocols.observability_protocols import PacketExecutionLoggerProtocol @@ -105,10 +107,16 @@ async def async_execute_callable( """ return self.execute_callable(fn, kwargs, executor_options, logger=logger) - def get_execution_data(self) -> dict[str, Any]: + def get_executor_data(self) -> dict[str, Any]: """Return metadata describing the execution environment. Recorded alongside results for observability but does not affect content or pipeline hashes. The default returns the executor type id. """ return {"executor_type": self.executor_type_id} + + def get_executor_data_schema(self) -> Schema: + """Return schema for the data returned by ``get_executor_data``.""" + return Schema({ + "executor_type": str + }) diff --git a/src/orcapod/core/executors/local.py b/src/orcapod/core/executors/local.py index 88740bde..f0f35b41 100644 --- a/src/orcapod/core/executors/local.py +++ b/src/orcapod/core/executors/local.py @@ -13,7 +13,7 @@ from orcapod.protocols.observability_protocols import PacketExecutionLoggerProtocol -class LocalExecutor(PythonFunctionExecutorBase): +class LocalPythonFunctionExecutor(PythonFunctionExecutorBase): """Default executor -- runs the packet function directly in the current process. Supports all packet function types (``supported_function_type_ids`` @@ -127,9 +127,11 @@ async def async_execute_callable( logger.record(**captured.as_dict()) return raw_result - def with_options(self, **opts: Any) -> LocalExecutor: + def with_options(self, **opts: Any) -> LocalPythonFunctionExecutor: """Return a new ``LocalExecutor``. ``LocalExecutor`` carries no state, so options are ignored. """ - return LocalExecutor() + return LocalPythonFunctionExecutor() + + diff --git a/src/orcapod/protocols/core_protocols/execution_engine.py b/src/orcapod/protocols/core_protocols/execution_engine.py deleted file mode 100644 index b7e970cf..00000000 --- a/src/orcapod/protocols/core_protocols/execution_engine.py +++ /dev/null @@ -1,25 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable -from typing import Any, Protocol, runtime_checkable - - -@runtime_checkable -class ExecutionEngineProtocol(Protocol): - # canonical name for the execution engine -- used to label the execution information when saving - @property - def name(self) -> str: ... - - def submit_sync(self, function: Callable, *args, **kwargs) -> Any: - """ - Run the given function with the provided arguments. - This method should be implemented by the execution engine. - """ - ... - - async def submit_async(self, function: Callable, *args, **kwargs) -> Any: - """ - Asynchronously run the given function with the provided arguments. - This method should be implemented by the execution engine. - """ - ... diff --git a/src/orcapod/protocols/core_protocols/executor.py b/src/orcapod/protocols/core_protocols/executor.py index 4f151643..58fc0341 100644 --- a/src/orcapod/protocols/core_protocols/executor.py +++ b/src/orcapod/protocols/core_protocols/executor.py @@ -3,6 +3,8 @@ from collections.abc import Callable from typing import TYPE_CHECKING, Any, Protocol, Self, runtime_checkable +from orcapod.types import Schema + if TYPE_CHECKING: from orcapod.protocols.observability_protocols import PacketExecutionLoggerProtocol @@ -53,7 +55,7 @@ def with_options(self, **opts: Any) -> Self: """ ... - def get_execution_data(self) -> dict[str, Any]: + def get_executor_data(self) -> dict[str, Any]: """Return metadata describing the execution environment. Stored alongside results for observability/provenance but does not @@ -61,6 +63,10 @@ def get_execution_data(self) -> dict[str, Any]: """ ... + def get_executor_data_schema(self) -> Schema: + """Return schema for the data returned by ``get_executor_data``.""" + ... + @runtime_checkable class PythonFunctionExecutorProtocol(PacketFunctionExecutorProtocol, Protocol): diff --git a/src/orcapod/protocols/core_protocols/packet_function.py b/src/orcapod/protocols/core_protocols/packet_function.py index 7792e873..5ce1c35c 100644 --- a/src/orcapod/protocols/core_protocols/packet_function.py +++ b/src/orcapod/protocols/core_protocols/packet_function.py @@ -60,10 +60,18 @@ def get_function_variation_data(self) -> dict[str, Any]: """Raw data defining function variation - system computes hash""" ... + def get_function_variation_data_schema(self) -> Schema: + """Schema for the data returned by ``get_function_variation_data``.""" + ... + def get_execution_data(self) -> dict[str, Any]: """Raw data defining execution context - system computes hash""" ... + def get_execution_data_schema(self) -> Schema: + """Schema for the data returned by ``get_execution_data``.""" + ... + # ==================== Executor ==================== @property diff --git a/tests/test_core/nodes/test_function_node_iteration.py b/tests/test_core/nodes/test_function_node_iteration.py index 9bef49eb..03c9b2f8 100644 --- a/tests/test_core/nodes/test_function_node_iteration.py +++ b/tests/test_core/nodes/test_function_node_iteration.py @@ -16,7 +16,7 @@ from orcapod.core.packet_function import PythonPacketFunction from orcapod.core.sources import ArrowTableSource from orcapod.databases import InMemoryArrowDatabase -from orcapod.core.executors import LocalExecutor +from orcapod.core.executors import LocalPythonFunctionExecutor def _make_source(n: int = 3) -> ArrowTableSource: @@ -146,7 +146,7 @@ def sometimes_fail(x: int) -> int: return x * 2 pf = PythonPacketFunction(sometimes_fail, output_keys="result") - pf.executor = LocalExecutor() # sets executor (LocalExecutor.supports_concurrent_execution is False) + pf.executor = LocalPythonFunctionExecutor() # sets executor (LocalExecutor.supports_concurrent_execution is False) pod = FunctionPod(pf) db = InMemoryArrowDatabase() node = FunctionNode(pod, _make_source(n=3), pipeline_database=db) diff --git a/tests/test_pipeline/test_composite_observer.py b/tests/test_pipeline/test_composite_observer.py index 0a91143f..c04c761f 100644 --- a/tests/test_pipeline/test_composite_observer.py +++ b/tests/test_pipeline/test_composite_observer.py @@ -10,7 +10,7 @@ import pytest import pyarrow as pa -from orcapod.core.executors import LocalExecutor +from orcapod.core.executors import LocalPythonFunctionExecutor from orcapod.core.function_pod import FunctionPod from orcapod.core.packet_function import PythonPacketFunction from orcapod.core.sources.arrow_table_source import ArrowTableSource @@ -53,7 +53,7 @@ def test_both_observers_populated(self): def double(x: int) -> int: return x * 2 - pf = PythonPacketFunction(double, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(double, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_composite", pipeline_database=pipeline_db) @@ -93,7 +93,7 @@ def identity(x: int) -> int: print("hello") return x - pf = PythonPacketFunction(identity, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(identity, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_logger_delegation", pipeline_database=pipeline_db) @@ -132,9 +132,9 @@ def double(x: int) -> int: def triple(result: int) -> int: return result * 3 - pf1 = PythonPacketFunction(double, output_keys="result", executor=LocalExecutor()) + pf1 = PythonPacketFunction(double, output_keys="result", executor=LocalPythonFunctionExecutor()) pod1 = FunctionPod(pf1) - pf2 = PythonPacketFunction(triple, output_keys="final", executor=LocalExecutor()) + pf2 = PythonPacketFunction(triple, output_keys="final", executor=LocalPythonFunctionExecutor()) pod2 = FunctionPod(pf2) pipeline = Pipeline(name="test_ctx_composite", pipeline_database=pipeline_db) @@ -174,7 +174,7 @@ def test_failures_tracked_by_both_observers(self): def failing(x: int) -> int: raise ValueError("boom") - pf = PythonPacketFunction(failing, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(failing, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_composite_fail", pipeline_database=pipeline_db) diff --git a/tests/test_pipeline/test_logging_observer_integration.py b/tests/test_pipeline/test_logging_observer_integration.py index dccaf7b8..57cf6c58 100644 --- a/tests/test_pipeline/test_logging_observer_integration.py +++ b/tests/test_pipeline/test_logging_observer_integration.py @@ -10,7 +10,7 @@ import pyarrow as pa import pytest -from orcapod.core.executors import LocalExecutor +from orcapod.core.executors import LocalPythonFunctionExecutor from orcapod.core.function_pod import FunctionPod from orcapod.core.packet_function import PythonPacketFunction from orcapod.core.sources.arrow_table_source import ArrowTableSource @@ -60,7 +60,7 @@ def double(x: int) -> int: print(f"doubling {x}") return x * 2 - pf = PythonPacketFunction(double, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(double, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_logs", pipeline_database=db) @@ -92,7 +92,7 @@ def test_failure_logged_with_traceback(self): def failing(x: int) -> int: raise ValueError("boom") - pf = PythonPacketFunction(failing, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(failing, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_fail", pipeline_database=db) @@ -128,7 +128,7 @@ def test_log_stored_in_flat_table(self): def identity(x: int) -> int: return x - pf = PythonPacketFunction(identity, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(identity, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_flat", pipeline_database=db) @@ -160,7 +160,7 @@ def test_tag_columns_in_log_table(self): def identity(x: int) -> int: return x - pf = PythonPacketFunction(identity, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(identity, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_tags", pipeline_database=db) @@ -194,7 +194,7 @@ def test_async_pipeline_captures_logs(self): def double(x: int) -> int: return x * 2 - pf = PythonPacketFunction(double, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(double, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_async_logs", pipeline_database=db) @@ -225,7 +225,7 @@ def test_fail_fast_aborts_and_logs(self): def failing(x: int) -> int: raise RuntimeError("crash") - pf = PythonPacketFunction(failing, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(failing, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_failfast", pipeline_database=db) @@ -266,7 +266,7 @@ def test_mixed_results_logged_correctly(self): def safe_div(x: int) -> float: return 100 / x - pf = PythonPacketFunction(safe_div, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(safe_div, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_mixed", pipeline_database=db) @@ -302,9 +302,9 @@ def double(x: int) -> int: def triple(result: int) -> int: return result * 3 - pf1 = PythonPacketFunction(double, output_keys="result", executor=LocalExecutor()) + pf1 = PythonPacketFunction(double, output_keys="result", executor=LocalPythonFunctionExecutor()) pod1 = FunctionPod(pf1) - pf2 = PythonPacketFunction(triple, output_keys="final", executor=LocalExecutor()) + pf2 = PythonPacketFunction(triple, output_keys="final", executor=LocalPythonFunctionExecutor()) pod2 = FunctionPod(pf2) pipeline = Pipeline(name="test_multi", pipeline_database=db) @@ -338,9 +338,9 @@ def double(x: int) -> int: def triple(result: int) -> int: return result * 3 - pf1 = PythonPacketFunction(double, output_keys="result", executor=LocalExecutor()) + pf1 = PythonPacketFunction(double, output_keys="result", executor=LocalPythonFunctionExecutor()) pod1 = FunctionPod(pf1) - pf2 = PythonPacketFunction(triple, output_keys="final", executor=LocalExecutor()) + pf2 = PythonPacketFunction(triple, output_keys="final", executor=LocalPythonFunctionExecutor()) pod2 = FunctionPod(pf2) pipeline = Pipeline(name="test_filter", pipeline_database=db) diff --git a/tests/test_pipeline/test_status_observer_integration.py b/tests/test_pipeline/test_status_observer_integration.py index 0306dbd4..e9a2bc34 100644 --- a/tests/test_pipeline/test_status_observer_integration.py +++ b/tests/test_pipeline/test_status_observer_integration.py @@ -10,7 +10,7 @@ import pyarrow as pa import pytest -from orcapod.core.executors import LocalExecutor +from orcapod.core.executors import LocalPythonFunctionExecutor from orcapod.core.function_pod import FunctionPod from orcapod.core.packet_function import PythonPacketFunction from orcapod.core.sources.arrow_table_source import ArrowTableSource @@ -60,7 +60,7 @@ def test_success_produces_running_and_success_events(self): def double(x: int) -> int: return x * 2 - pf = PythonPacketFunction(double, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(double, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_status", pipeline_database=db) @@ -95,7 +95,7 @@ def test_failure_status_with_error_summary(self): def failing(x: int) -> int: raise ValueError("boom") - pf = PythonPacketFunction(failing, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(failing, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_fail_status", pipeline_database=db) @@ -139,7 +139,7 @@ def test_status_stored_in_flat_table(self): def identity(x: int) -> int: return x - pf = PythonPacketFunction(identity, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(identity, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_flat_status", pipeline_database=db) @@ -169,7 +169,7 @@ def test_tag_columns_in_status_table(self): def identity(x: int) -> int: return x - pf = PythonPacketFunction(identity, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(identity, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_tags_status", pipeline_database=db) @@ -202,7 +202,7 @@ def test_async_pipeline_captures_status(self): def double(x: int) -> int: return x * 2 - pf = PythonPacketFunction(double, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(double, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_async_status", pipeline_database=db) @@ -236,7 +236,7 @@ def test_fail_fast_aborts_and_records_status(self): def failing(x: int) -> int: raise RuntimeError("crash") - pf = PythonPacketFunction(failing, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(failing, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_failfast_status", pipeline_database=db) @@ -279,7 +279,7 @@ def test_mixed_results_tracked_correctly(self): def safe_div(x: int) -> float: return 100 / x # x=0 will raise ZeroDivisionError - pf = PythonPacketFunction(safe_div, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(safe_div, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_mixed_status", pipeline_database=db) @@ -318,9 +318,9 @@ def double(x: int) -> int: def triple(result: int) -> int: return result * 3 - pf1 = PythonPacketFunction(double, output_keys="result", executor=LocalExecutor()) + pf1 = PythonPacketFunction(double, output_keys="result", executor=LocalPythonFunctionExecutor()) pod1 = FunctionPod(pf1) - pf2 = PythonPacketFunction(triple, output_keys="final", executor=LocalExecutor()) + pf2 = PythonPacketFunction(triple, output_keys="final", executor=LocalPythonFunctionExecutor()) pod2 = FunctionPod(pf2) pipeline = Pipeline(name="test_multi_status", pipeline_database=db) @@ -355,9 +355,9 @@ def double(x: int) -> int: def triple(result: int) -> int: return result * 3 - pf1 = PythonPacketFunction(double, output_keys="result", executor=LocalExecutor()) + pf1 = PythonPacketFunction(double, output_keys="result", executor=LocalPythonFunctionExecutor()) pod1 = FunctionPod(pf1) - pf2 = PythonPacketFunction(triple, output_keys="final", executor=LocalExecutor()) + pf2 = PythonPacketFunction(triple, output_keys="final", executor=LocalPythonFunctionExecutor()) pod2 = FunctionPod(pf2) pipeline = Pipeline(name="test_filter_status", pipeline_database=db) @@ -388,7 +388,7 @@ def test_all_expected_columns_present(self): def identity(x: int) -> int: return x - pf = PythonPacketFunction(identity, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(identity, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_schema", pipeline_database=db) @@ -430,7 +430,7 @@ def test_run_id_populated_in_status(self): def identity(x: int) -> int: return x - pf = PythonPacketFunction(identity, output_keys="result", executor=LocalExecutor()) + pf = PythonPacketFunction(identity, output_keys="result", executor=LocalPythonFunctionExecutor()) pod = FunctionPod(pf) pipeline = Pipeline(name="test_runid", pipeline_database=db) From c16f98d2dbb9c425f71faa1875a8c0ffb8638cf7 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Wed, 8 Apr 2026 00:18:12 +0000 Subject: [PATCH 12/14] refactor(ENG-374): update graph executor reference, enhance git info with cwd fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pipeline/graph.py: update LocalExecutor → LocalPythonFunctionExecutor - git_utils: add try_cwd fallback for functions not in a git repo, track git_source ("function" vs "cwd") in returned metadata Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/pipeline/graph.py | 4 ++-- src/orcapod/utils/git_utils.py | 14 ++++++++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/orcapod/pipeline/graph.py b/src/orcapod/pipeline/graph.py index 6d2f5751..5eb38231 100644 --- a/src/orcapod/pipeline/graph.py +++ b/src/orcapod/pipeline/graph.py @@ -312,9 +312,9 @@ def compile(self) -> None: # Default to LocalExecutor so capture/logging works # out of the box. Replaced if execution_engine is set. if node.executor is None: - from orcapod.core.executors.local import LocalExecutor + from orcapod.core.executors.local import LocalPythonFunctionExecutor - node.executor = LocalExecutor() + node.executor = LocalPythonFunctionExecutor() elif isinstance(node, OperatorNode): # Rewire all input streams to persistent upstreams diff --git a/src/orcapod/utils/git_utils.py b/src/orcapod/utils/git_utils.py index b4daa2f6..36f33cc0 100644 --- a/src/orcapod/utils/git_utils.py +++ b/src/orcapod/utils/git_utils.py @@ -59,19 +59,29 @@ def get_git_info(path): return None -def get_git_info_for_python_object(python_object) -> dict[str, Any] | None: +def get_git_info_for_python_object(python_object, try_cwd:bool=False) -> dict[str, Any] | None: """Get git info for the file where the python object is defined""" try: file_path = inspect.getfile(python_object) git_info = get_git_info(file_path) + git_source = "function" if git_info is None: - return None + # If the file isn't in a git repo, optionally try the current working directory + if try_cwd: + git_info = get_git_info(".") + + if git_info is None: + return None + + git_source = "cwd" + env_info = {} env_info["git_commit_hash"] = git_info.get("commit_hash") env_info["git_repo_status"] = "dirty" if git_info.get("is_dirty") else "clean" env_info["has_untracked_files"] = ( "true" if git_info.get("has_untracked_files") else "false" ) + env_info["git_source"] = git_source return env_info except TypeError: return None From 21014c166b6767911aebd5de88fb56dcf16425a4 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Wed, 8 Apr 2026 00:27:39 +0000 Subject: [PATCH 13/14] =?UTF-8?q?fix(ENG-374):=20add=20warning=20and=20imp?= =?UTF-8?q?roved=20error=20for=20unmapped=20Arrow=E2=86=94Python=20types?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit arrow_type_to_python_type silently returned typing.Any for unsupported Arrow types, causing cryptic "Unsupported Python type: typing.Any" errors downstream. Now logs a warning with the original Arrow type. The downstream error also includes a hint about common causes. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../semantic_types/universal_converter.py | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/orcapod/semantic_types/universal_converter.py b/src/orcapod/semantic_types/universal_converter.py index c601047a..aea9da3a 100644 --- a/src/orcapod/semantic_types/universal_converter.py +++ b/src/orcapod/semantic_types/universal_converter.py @@ -405,7 +405,17 @@ def _convert_python_to_arrow(self, python_type: DataType) -> pa.DataType: type_name = getattr(python_type, "__name__") if type_name in type_map: return type_map[type_name] - raise ValueError(f"Unsupported Python type: {python_type}") + hint = "" + if python_type is Any: + hint = ( + " Hint: typing.Any usually appears when an Arrow type had " + "no mapping in arrow_type_to_python_type (check warnings). " + "It can also come from schema inference on empty containers " + "(e.g. {} infers as dict[Any, Any])." + ) + raise ValueError( + f"Unsupported Python type: {python_type}.{hint}" + ) # Handle list types if origin is list: @@ -576,7 +586,17 @@ def _convert_arrow_to_python(self, arrow_type: pa.DataType) -> type | Any: return typing.Union[tuple(child_types)] else: - # Default case for unsupported types + # Default case for unsupported types. + # NOTE: this silent fallback to Any can cause cryptic errors + # downstream when code tries to convert Any back to Arrow + # (e.g. "Unsupported Python type: typing.Any"). If you hit that, + # the root cause is likely an unmapped Arrow type here. + logger.warning( + "arrow_type_to_python_type: no mapping for Arrow type %r, " + "falling back to typing.Any. This may cause errors downstream " + "when converting back to Arrow.", + arrow_type, + ) return Any def _get_or_create_typeddict_for_struct( From 7eb0f73e65ca4557fbf09df8451bb964f30b1b71 Mon Sep 17 00:00:00 2001 From: "Edgar Y. Walker" Date: Wed, 8 Apr 2026 01:28:09 +0000 Subject: [PATCH 14/14] =?UTF-8?q?fix(ENG-374):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20stale=20refs,=20missing=20git=20field,=20type=20ass?= =?UTF-8?q?ertions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add has_untracked_files to get_git_info() so the -untracked suffix logic in PythonPacketFunction actually triggers - Update stale LocalExecutor references in docstrings and comments - Strengthen TestStoreDictColumns to assert Arrow map type, not just column presence Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/core/executors/local.py | 4 ++-- src/orcapod/pipeline/graph.py | 2 +- src/orcapod/utils/git_utils.py | 5 +++++ tests/test_core/nodes/test_function_node_iteration.py | 2 +- tests/test_core/test_result_cache.py | 10 ++++++++-- 5 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/orcapod/core/executors/local.py b/src/orcapod/core/executors/local.py index f0f35b41..247ffae8 100644 --- a/src/orcapod/core/executors/local.py +++ b/src/orcapod/core/executors/local.py @@ -128,9 +128,9 @@ async def async_execute_callable( return raw_result def with_options(self, **opts: Any) -> LocalPythonFunctionExecutor: - """Return a new ``LocalExecutor``. + """Return a new ``LocalPythonFunctionExecutor``. - ``LocalExecutor`` carries no state, so options are ignored. + ``LocalPythonFunctionExecutor`` carries no state, so options are ignored. """ return LocalPythonFunctionExecutor() diff --git a/src/orcapod/pipeline/graph.py b/src/orcapod/pipeline/graph.py index 5eb38231..eece0d0f 100644 --- a/src/orcapod/pipeline/graph.py +++ b/src/orcapod/pipeline/graph.py @@ -309,7 +309,7 @@ def compile(self) -> None: result_database=result_db, ) - # Default to LocalExecutor so capture/logging works + # Default to LocalPythonFunctionExecutor so capture/logging works # out of the box. Replaced if execution_engine is set. if node.executor is None: from orcapod.core.executors.local import LocalPythonFunctionExecutor diff --git a/src/orcapod/utils/git_utils.py b/src/orcapod/utils/git_utils.py index 36f33cc0..d6ee8943 100644 --- a/src/orcapod/utils/git_utils.py +++ b/src/orcapod/utils/git_utils.py @@ -46,11 +46,16 @@ def get_git_info(path): # Handle detached HEAD state branch_name = "HEAD (detached)" + # Check for untracked files (separate from is_dirty which only + # covers staged/unstaged changes to tracked files). + has_untracked_files = len(repo.untracked_files) > 0 + return { "is_repo": True, "commit_hash": commit_hash, "short_hash": short_hash, "is_dirty": is_dirty, + "has_untracked_files": has_untracked_files, "branch": branch_name, "repo_root": repo.working_dir, } diff --git a/tests/test_core/nodes/test_function_node_iteration.py b/tests/test_core/nodes/test_function_node_iteration.py index 03c9b2f8..1d611378 100644 --- a/tests/test_core/nodes/test_function_node_iteration.py +++ b/tests/test_core/nodes/test_function_node_iteration.py @@ -146,7 +146,7 @@ def sometimes_fail(x: int) -> int: return x * 2 pf = PythonPacketFunction(sometimes_fail, output_keys="result") - pf.executor = LocalPythonFunctionExecutor() # sets executor (LocalExecutor.supports_concurrent_execution is False) + pf.executor = LocalPythonFunctionExecutor() # supports_concurrent_execution is False pod = FunctionPod(pf) db = InMemoryArrowDatabase() node = FunctionNode(pod, _make_source(n=3), pipeline_database=db) diff --git a/tests/test_core/test_result_cache.py b/tests/test_core/test_result_cache.py index f465bf8b..0f0e0480 100644 --- a/tests/test_core/test_result_cache.py +++ b/tests/test_core/test_result_cache.py @@ -337,7 +337,7 @@ def test_exclude_system_columns_by_default(self): class TestStoreDictColumns: - def test_executor_info_column_stored(self): + def test_executor_info_column_stored_as_map(self): cache, db = _make_cache() pf = _make_pf() _compute_and_store(cache, pf, Packet({"x": 10})) @@ -346,8 +346,11 @@ def test_executor_info_column_stored(self): assert records is not None exec_info_col = f"{constants.PF_EXECUTION_PREFIX}executor_info" assert exec_info_col in records.column_names + # dict[str, str] should be stored as an Arrow map (list of key-value structs) + field_type = records.schema.field(exec_info_col).type + assert pa.types.is_large_list(field_type), f"Expected large_list (map), got {field_type}" - def test_extra_info_column_stored(self): + def test_extra_info_column_stored_as_map(self): cache, db = _make_cache() pf = _make_pf() _compute_and_store(cache, pf, Packet({"x": 10})) @@ -356,3 +359,6 @@ def test_extra_info_column_stored(self): assert records is not None extra_info_col = f"{constants.PF_EXECUTION_PREFIX}extra_info" assert extra_info_col in records.column_names + # dict[str, str] should be stored as an Arrow map (list of key-value structs) + field_type = records.schema.field(extra_info_col).type + assert pa.types.is_large_list(field_type), f"Expected large_list (map), got {field_type}"