Conversation
Spec covering: read-only iter_packets, _load_cached_entries helper, run() delegation to execute(), entry_id keyed result store, and full method inventory / test impact analysis. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
15-task plan covering: - _load_cached_entries() single DB join helper (replaces 4 duplicates) - iter_packets() rewrite to strictly read-only - run() rewrite with load_status guard delegating to execute() - execute() selective DB reload + concurrent asyncio.gather path - async_execute() Phase 1 simplification - _process_packet_internal() entry_id keying (remove cache_index) - get_cached_results() add-only semantics (no clear) - as_table() empty-case fix - Removal of 5 dead methods - Test updates across 5 test files + 10 new iteration tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ache_index Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tr | int for intermediate state
… execute() Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…DB hot-load Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ator state Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ead iterator writes in get_cached_results
…ntries(), add-only semantics Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ached_entries() Replace inline DB join logic with a call to _load_cached_entries(), which was introduced in Task 2, eliminating duplicated code across cache execution paths. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ncurrent path Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…d_entries() Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ter_all_from_database, load_all_cached_records) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…fore compute-dependent assertions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… semantics Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ields nothing Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…for read-only iter_packets
…ions - Rewrite execute() to fire on_packet_start before each packet (not after all computation), fixing StatusObserver RUNNING record ordering - Remove concurrent asyncio.gather path from execute(); it is always sequential — async path belongs in async_execute() only - Call _update_modified_time() at end of execute() so iter_packets() skips is_stale check and serves from in-memory cache - Update tests to call node.run() before iter_packets() — iter_packets() is strictly read-only per ENG-379; computation requires run()/execute() - Rename test_function_node_uses_async_path to test_function_node_uses_sync_path_via_run, reflecting that execute() is always synchronous (async path only via async_execute()) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
…dd run() before iter_packets() iter_packets() is strictly read-only; callers must call run() first to trigger computation. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR refactors FunctionNode iteration and caching semantics to make iter_packets() strictly read-only, moves all computation behind explicit run() / execute() / async_execute(), and consolidates repeated pipeline+result DB join logic into a single _load_cached_entries() helper.
Changes:
- Make
FunctionNode.iter_packets()yield only already-computed results (no implicit computation); callers must explicitly compute viarun()/execute()/async_execute(). - Consolidate duplicated DB join logic into
_load_cached_entries()and switch_cached_output_packetsto be keyed by pipelineentry_id(string). - Update/extend tests and add design docs to reflect the new read-only iteration contract.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/orcapod/core/nodes/function_node.py | Core refactor: read-only iter_packets(), _load_cached_entries(), entry-id keyed cache, updated execute()/run()/as_table() behavior. |
| tests/test_core/nodes/test_function_node_iteration.py | New test suite for ENG-379 iteration semantics and cache behavior. |
| tests/test_pipeline/test_serialization.py | Ensure loaded pipeline explicitly calls run() before iterating packets. |
| tests/test_data/test_polars_nullability/test_function_node_nullability.py | Update nullability test to use _load_cached_entries() instead of removed DB-iteration helper. |
| tests/test_core/sources/test_derived_source.py | Update downstream derived-source test to explicitly compute before iterating. |
| tests/test_core/packet_function/test_executor.py | Adjust executor-path expectations now that run() triggers sync execute() semantics. |
| tests/test_core/function_pod/test_pipeline_hash_integration.py | Ensure nodes compute via run() before iter_packets(). |
| tests/test_core/function_pod/test_function_pod_node.py | Update fixture to compute via run() before iterating. |
| tests/test_core/function_pod/test_function_pod_node_stream.py | Update many tests to call run() before iteration / as_table(), aligning with read-only iteration. |
| tests/test_core/function_pod/test_function_node_caching.py | Ensure nodes compute via run() prior to iter_packets() assertions. |
| tests/test_core/function_pod/test_function_node_attach_db.py | Ensure tests explicitly compute via run() before iterating after DB attach. |
| tests/test_channels/test_node_async_execute.py | Update patched internal signature to match _process_packet_internal(..., logger=...). |
| docs/superpowers/specs/2026-04-07-function-node-refactor-design.md | Design spec documenting the refactor rationale and intended behavior. |
| docs/superpowers/plans/2026-04-07-eng379-function-node-refactor.md | Implementation plan capturing tasks/steps and test migration guidance. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| node_label, tag, packet, result, cached=True | ||
| ) | ||
| ctx_obs.on_packet_end(node_label, tag, packet, result, cached=True) | ||
| output.append((tag_out, result)) |
There was a problem hiding this comment.
In the cached-hit branch, the code appends result to output unconditionally. Since _cached_output_packets stores PacketProtocol | None (and get_cached_results() can return values from that store), result can be None, which violates the method contract (“excluding None outputs”) and can break callers expecting a PacketProtocol. Mirror the non-cached branch by only appending when result is not None (while still firing observer hooks as desired).
| output.append((tag_out, result)) | |
| if result is not None: | |
| output.append((tag_out, result)) |
There was a problem hiding this comment.
Fixed. Added if result is not None: guard in the cached-hit branch of execute() to match the non-cached branch, ensuring None outputs are never appended to the return list regardless of whether the result came from cache or fresh computation.
| def get_cached_results( | ||
| self, entry_ids: list[str] | ||
| ) -> dict[str, tuple[TagProtocol, PacketProtocol]]: | ||
| """Retrieve cached results for specific pipeline entry IDs. | ||
|
|
||
| Looks up the pipeline DB and result DB, joins them, and filters | ||
| to the requested entry IDs. Returns a mapping from entry ID to | ||
| (tag, output_packet). | ||
| Checks in-memory cache first. Loads only truly missing entries from DB. | ||
| Add-only semantics: existing in-memory entries are never cleared or | ||
| overwritten (overwrite is safe since in-memory and DB entries for the | ||
| same entry_id are always semantically equivalent). | ||
|
|
||
| Args: | ||
| entry_ids: Pipeline entry IDs to look up. | ||
|
|
||
| Returns: | ||
| Mapping from entry_id to (tag, output_packet) for found entries. | ||
| Mapping from entry_id to ``(tag, output_packet)`` for found entries. | ||
| Empty dict if no DB is attached or no matches found. | ||
| """ | ||
| if self._cached_function_pod is None or not entry_ids: | ||
| return {} | ||
|
|
||
| self._require_pipeline_database() | ||
|
|
||
| PIPELINE_ENTRY_ID_COL = "__pipeline_entry_id" | ||
|
|
||
| taginfo = self._pipeline_database.get_all_records( | ||
| self.node_identity_path, | ||
| record_id_column=PIPELINE_ENTRY_ID_COL, | ||
| ) | ||
| results = self._cached_function_pod._result_database.get_all_records( | ||
| self._cached_function_pod.record_path, | ||
| record_id_column=constants.PACKET_RECORD_ID, | ||
| ) | ||
|
|
||
| if taginfo is None or results is None: | ||
| return {} | ||
|
|
||
| taginfo = self._filter_by_content_hash(taginfo) | ||
| taginfo_schema = taginfo.schema | ||
| results_schema = results.schema | ||
| filtered = ( | ||
| pl.DataFrame(taginfo) | ||
| .join( | ||
| pl.DataFrame(results), | ||
| on=constants.PACKET_RECORD_ID, | ||
| how="inner", | ||
| ) | ||
| .filter(pl.col(PIPELINE_ENTRY_ID_COL).is_in(entry_ids)) | ||
| .to_arrow() | ||
| ) | ||
| filtered = arrow_utils.restore_schema_nullability(filtered, taginfo_schema, results_schema) | ||
|
|
||
| if filtered.num_rows == 0: | ||
| return {} | ||
|
|
||
| tag_keys = self._input_stream.keys()[0] | ||
| drop_cols = [ | ||
| c | ||
| for c in filtered.column_names | ||
| if c.startswith(constants.META_PREFIX) | ||
| or c == PIPELINE_ENTRY_ID_COL | ||
| or c == constants.NODE_CONTENT_HASH_COL | ||
| ] | ||
| data_table = filtered.drop([c for c in drop_cols if c in filtered.column_names]) | ||
|
|
||
| stream = ArrowTableStream(data_table, tag_columns=tag_keys) | ||
| filtered_entry_ids = filtered.column(PIPELINE_ENTRY_ID_COL).to_pylist() | ||
|
|
||
| result_dict: dict[str, tuple[TagProtocol, PacketProtocol]] = {} | ||
| for entry_id, (tag, packet) in zip(filtered_entry_ids, stream.iter_packets()): | ||
| result_dict[entry_id] = (tag, packet) | ||
|
|
||
| # Populate internal cache with retrieved results (clear first to | ||
| # avoid duplicates on repeated orchestrator runs) | ||
| self._cached_output_packets.clear() | ||
| self._cached_output_table = None | ||
| self._cached_content_hash_column = None | ||
| for entry_id, (tag, packet) in result_dict.items(): | ||
| next_idx = len(self._cached_output_packets) | ||
| self._cached_output_packets[next_idx] = (tag, packet) | ||
| self._cached_input_iterator = None | ||
| self._needs_iterator = False | ||
| missing = [eid for eid in entry_ids if eid not in self._cached_output_packets] | ||
| if missing: | ||
| loaded = self._load_cached_entries(missing) | ||
| self._cached_output_packets.update(loaded) | ||
| if loaded: | ||
| self._cached_output_table = None | ||
| self._cached_content_hash_column = None | ||
|
|
||
| return result_dict | ||
| return { | ||
| eid: self._cached_output_packets[eid] | ||
| for eid in entry_ids | ||
| if eid in self._cached_output_packets | ||
| } |
There was a problem hiding this comment.
get_cached_results() is annotated to return dict[str, tuple[TagProtocol, PacketProtocol]], but it returns values directly from _cached_output_packets, whose packet value is PacketProtocol | None. This can surface None packets to callers (and currently feeds into execute()), contradicting the docstring (“excluding None outputs”). Consider filtering out None packets in the returned mapping (and optionally treating cached None values as cache misses if recomputation/DB load should be allowed).
There was a problem hiding this comment.
Fixed. get_cached_results() now filters out entries where the packet is None before returning, so callers always receive (TagProtocol, PacketProtocol) pairs. Stored None values are still retained in _cached_output_packets to prevent unnecessary DB re-fetches — they just won't surface to callers.
| from orcapod.pipeline.serialization import LoadStatus | ||
|
|
||
| def run(self) -> None: | ||
| """Eagerly process all input packets, filling the pipeline and result databases.""" | ||
| for _ in self.iter_packets(): | ||
| pass | ||
| if self._load_status == LoadStatus.UNAVAILABLE: | ||
| raise RuntimeError( | ||
| f"FunctionNode {self.label!r} is unavailable: " | ||
| "no function pod and no database attached." | ||
| ) | ||
| if self._load_status == LoadStatus.CACHE_ONLY: | ||
| # Upstream is unavailable; computation requires a live input stream. | ||
| # Callers should use iter_packets() to serve existing DB results. | ||
| return | ||
| self.execute(self._input_stream) |
There was a problem hiding this comment.
run() doesn’t handle LoadStatus.READ_ONLY (PacketFunctionProxy): it will call execute() and then fail with PacketFunctionUnavailableError. It also doesn’t clear in-memory caches when self.is_stale, which can leave stale entries in the add-only _cached_output_packets and cause iter_packets() to yield a mix of old+new results after a rerun. Recommend: (1) guard READ_ONLY with a clear RuntimeError/no-op (similar to CACHE_ONLY), and (2) if is_stale then clear_cache() before computing to avoid retaining stale entries.
There was a problem hiding this comment.
Fixed both issues. run() now treats READ_ONLY as a no-op (same as CACHE_ONLY) — proxy function pods cannot compute, so silently returning is the right behaviour. Also added if self.is_stale: self.clear_cache() before calling execute() so any stale in-memory entries are discarded before a fresh computation run.
| def test_execute_concurrent_error_policy_continue(self): | ||
| """execute() fires on_packet_crash per failing packet and returns successes when error_policy='continue'.""" | ||
| errors = [] | ||
|
|
||
| def sometimes_fail(x: int) -> int: | ||
| if x == 1: | ||
| raise ValueError("intentional failure") | ||
| return x * 2 | ||
|
|
||
| pf = PythonPacketFunction(sometimes_fail, output_keys="result") | ||
| pf.executor = LocalExecutor() # sets executor (LocalExecutor.supports_concurrent_execution is False) | ||
| pod = FunctionPod(pf) |
There was a problem hiding this comment.
Test name is misleading: test_execute_concurrent_error_policy_continue uses LocalExecutor with supports_concurrent_execution false, so it exercises the sequential execute() path. Rename the test to avoid “concurrent” in the name, or switch to an executor that actually enables the concurrent path (if that behavior is intended to be covered here).
There was a problem hiding this comment.
Fixed. Renamed to test_execute_error_policy_continue_skips_failures and added a clarifying note to the docstring that the test uses LocalExecutor (non-concurrent) and exercises the sequential execute() path.
| ) | ||
|
|
||
| # node2 doubles the already-doubled values: 0*2*2=0, 1*2*2=4, 2*2*2=8 | ||
| node2.run() | ||
| results = sorted(cast(int, p["result"]) for _, p in node2.iter_packets()) | ||
| assert results == [0, 4, 8] |
There was a problem hiding this comment.
test_derived_source_matches_node_output in this file still collects node_results via node.iter_packets() without calling node.run() first. With ENG-379 making iter_packets() read-only, this will now yield an empty result set and the assertion will no longer validate the intended behavior. Update that test to call node.run() (or node.execute(...)) before comparing node vs DerivedSource outputs.
There was a problem hiding this comment.
Fixed. Added node.run() before node.iter_packets() in test_derived_source_matches_node_output so the test properly exercises the read-only iteration contract (compute via run(), then iterate).
…et_cached_results filter - execute(): guard cached-hit branch with `if result is not None` to match non-cached branch and satisfy PacketProtocol return contract - get_cached_results(): filter None-packet entries from returned mapping so callers always receive (tag, PacketProtocol) — stored None values still prevent unnecessary DB re-fetches (they remain in the cache) - run(): handle LoadStatus.READ_ONLY as a no-op (like CACHE_ONLY) — proxy function pods cannot compute; also call clear_cache() when is_stale to discard stale entries before a fresh computation run - test_execute_concurrent_error_policy_continue → rename to test_execute_error_policy_continue_skips_failures — the test uses LocalExecutor (non-concurrent), so "concurrent" in the name was misleading; add clarifying note to docstring - test_derived_source_matches_node_output: add node.run() before iter_packets() to satisfy read-only iteration contract Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Review round addressed — commit d67aca1All 5 Copilot comments resolved in a single follow-up commit:
All 3724 tests pass ( |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if entry_id in cached: | ||
| tag_out, result = cached[entry_id] | ||
| ctx_obs.on_packet_end( | ||
| node_label, tag, packet, result, cached=True | ||
| ) | ||
| output.append((tag_out, result)) | ||
| ctx_obs.on_packet_end(node_label, tag, packet, result, cached=True) | ||
| if result is not None: | ||
| output.append((tag_out, result)) |
There was a problem hiding this comment.
In execute(), cache-hit detection relies on the dict returned by get_cached_results(). That method intentionally filters out entries whose cached packet is None, so any entry_id already present in _cached_output_packets with a None output will be treated as a cache miss and recomputed on subsequent execute() calls (potentially re-triggering side effects). Consider using _cached_output_packets membership for the cache-hit check (including None) and only filtering out None when appending to the returned output list; keep get_cached_results() primarily for DB hot-loading of missing entry_ids.
There was a problem hiding this comment.
Fixed. execute() now uses _cached_output_packets directly for the cache-hit membership check (which includes None-output entries) and calls get_cached_results() only for its DB hot-load side effect (the returned dict is discarded). A None packet stored in the cache correctly prevents recomputation on subsequent execute() calls while still being excluded from the output list.
| if _executor_supports_concurrent(self._packet_function) and to_compute: | ||
| inline concurrent gather (see below) | ||
| else: | ||
| for tag, pkt, eid in to_compute: | ||
| try: | ||
| _process_packet_internal(tag, pkt) | ||
| except Exception as exc: | ||
| log warning; on_packet_crash; re-raise if error_policy=="fail_fast" |
There was a problem hiding this comment.
This design doc describes an execute() “concurrent gather” path keyed off _executor_supports_concurrent(...). In the current implementation of FunctionNode.execute(), computation is always sequential and the async path is only in async_execute(). Please update this section to match the actual behavior (or clearly mark the concurrent path as out-of-scope/future work) so readers don’t implement against an outdated contract.
| if _executor_supports_concurrent(self._packet_function) and to_compute: | |
| inline concurrent gather (see below) | |
| else: | |
| for tag, pkt, eid in to_compute: | |
| try: | |
| _process_packet_internal(tag, pkt) | |
| except Exception as exc: | |
| log warning; on_packet_crash; re-raise if error_policy=="fail_fast" | |
| # Current behavior: execute() is always synchronous/sequential. | |
| # Any async/concurrent gather path is handled by async_execute() | |
| # and is out of scope for execute() in this design. | |
| for tag, pkt, eid in to_compute: | |
| try: | |
| _process_packet_internal(tag, pkt) | |
| except Exception as exc: | |
| log warning; on_packet_crash; re-raise if error_policy=="fail_fast" |
There was a problem hiding this comment.
Fixed. Replaced the stale concurrent-gather pseudocode in execute() section of the design doc with accurate sequential-only pseudocode that matches the implementation. Added an explicit note that execute() is always synchronous and that async/concurrent execution is exclusively in async_execute(). Also updated testing item 10 and the migration-notes section accordingly.
| return x * 2 | ||
|
|
||
| pf = PythonPacketFunction(sometimes_fail, output_keys="result") | ||
| pf.executor = LocalExecutor() # enables concurrent path | ||
| pod = FunctionPod(pf) |
There was a problem hiding this comment.
This plan snippet still says pf.executor = LocalExecutor() # enables concurrent path and uses the old test name test_execute_concurrent_error_policy_continue. In the current code/tests, LocalExecutor is non-concurrent and the execute() coverage here is sequential (the test was renamed accordingly). Please update the plan so it matches the implemented behavior.
There was a problem hiding this comment.
Fixed. Updated the plan doc in three places: (1) module docstring changed from "concurrent path via asyncio.gather" to "always sequential; async/concurrent path is only in async_execute()"; (2) test renamed from test_execute_concurrent_error_policy_continue to test_execute_error_policy_continue_skips_failures; (3) LocalExecutor comment changed from "enables concurrent path" to "non-concurrent; tests sequential execute() path". Also updated Task 9 heading, the pytest invocation, and the git commit message in the plan.
…rrectness
- execute(): use _cached_output_packets directly for cache-hit membership
check (not the filtered get_cached_results() return value) so that
None-output entries are correctly treated as cache hits and not
recomputed on subsequent execute() calls; get_cached_results() is
still called for its DB hot-load side effect
- design doc: replace pseudocode describing a concurrent asyncio.gather
path in execute() with accurate sequential-only pseudocode; add a note
that async/concurrent execution is exclusively in async_execute()
- design doc: update test item 10 and migration notes to use the renamed
test and correct (sequential) path description
- plan doc: fix module docstring ("concurrent path via asyncio.gather"
→ "always sequential"), rename test_execute_concurrent_error_policy_continue
→ test_execute_error_policy_continue_skips_failures, correct LocalExecutor
comment, and update Task 9 heading/commit message/pytest invocation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Review round 2 addressed — commit 289573fAll 3 Copilot comments resolved in a single follow-up commit:
All 3724 tests pass ( |
| def _make_source(n: int = 3) -> ArrowTableSource: | ||
| table = pa.table( | ||
| { | ||
| "id": pa.array(list(range(n)), type=pa.int64()), |
There was a problem hiding this comment.
Shouldn't you be able to specify nullability on the arrays and skip having to specify it on the schema?
There was a problem hiding this comment.
PyArrow doesn't expose nullability at the array level — nullable is a property of pa.field only, not pa.array. There's no pa.array(..., nullable=False) API. The explicit schema=pa.schema([...]) block is the standard PyArrow idiom for enforcing non-nullability on a raw table, so I've kept it as-is.
Summary
iter_packets()strictly read-only: computation now only viarun()/execute()/async_execute(); callers must explicitly trigger computation before iterating_load_cached_entries()helper: consolidates 4+ duplicated DB join sites into a single reusable methodexecute()rewrite: fireson_packet_startbefore each packet (fixes StatusObserver RUNNING records), removes incorrect concurrentasyncio.gatherpath (always sequential — async belongs inasync_execute()), calls_update_modified_time()after completion so subsequentiter_packets()serves from in-memory cache without stale DB reloadTest Plan
uv run pytest)test_fail_fast_aborts_and_records_status— RUNNING record written before crashtest_explicit_sync_config_overrides_async_default— sync path used whenconfig=SYNCHRONOUStest_cache_only_function_node_serves_cached_results— in-memory cache served afterexecute()test_definition_save_load_run_roundtrip— loaded pipeline callsrun()beforeiter_packets()🤖 Generated with Claude Code