-
Notifications
You must be signed in to change notification settings - Fork 5
refactor(ENG-379): make iter_packets() read-only, consolidate DB join sites #131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
eywalker
merged 26 commits into
dev
from
eywalker/eng-379-refactor-functionnode-clean-up-redundant-logic-and-ensure
Apr 7, 2026
Merged
Changes from all commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
a335ba0
docs: add FunctionNode refactor design spec (ENG-379)
kurodo3[bot] e213e5d
docs(ENG-379): add implementation plan for FunctionNode refactor
kurodo3[bot] 99b05eb
test(ENG-379): add failing tests for read-only iter_packets semantics
kurodo3[bot] 67b0520
test(ENG-379): fix unused asyncio import and misleading concurrent do…
kurodo3[bot] 3bda290
test(ENG-379): remove stale asyncio.gather mention from module docstring
kurodo3[bot] 3f6bb33
feat(ENG-379): add _load_cached_entries() single DB join helper
kurodo3[bot] 3398301
refactor(ENG-379): extract entry_ids_col before join.drop() for clarity
kurodo3[bot] da9ce26
refactor(ENG-379): store _cached_output_packets by entry_id, remove c…
kurodo3[bot] 508ed52
refactor(ENG-379): update _cached_output_packets type annotation to s…
kurodo3[bot] be767ed
refactor(ENG-379): rewrite run() with load_status guard delegating to…
kurodo3[bot] 8472b98
refactor(ENG-379): rewrite iter_packets() as strictly read-only with …
kurodo3[bot] 6dd21dd
refactor(ENG-379): remove _cached_input_iterator/_needs_iterator iter…
kurodo3[bot] f4c776b
refactor(ENG-379): revert str annotation to str|int interim, remove d…
kurodo3[bot] 4a1095f
refactor(ENG-379): simplify get_cached_results() using _load_cached_e…
kurodo3[bot] 6e57a3f
refactor(ENG-379): simplify _async_execute_cache_only() using _load_c…
kurodo3[bot] a773c31
refactor(ENG-379): refactor execute() with selective DB reload and co…
kurodo3[bot] 1ff23f6
refactor(ENG-379): simplify async_execute() Phase 1 using _load_cache…
kurodo3[bot] cf39ddf
refactor(ENG-379): remove dead methods (iter_sequential/concurrent, i…
kurodo3[bot] bcf61e8
test(ENG-379): update test_function_pod_node_stream.py — add run() be…
kurodo3[bot] 7c5e71f
test(ENG-379): update remaining test files for read-only iter_packets…
kurodo3[bot] c6dcf8b
fix(ENG-379): as_table() returns empty pa.Table when iter_packets() y…
kurodo3[bot] b101c49
test(ENG-379): fix test_function_node_sequential_uses_execute_packet …
kurodo3[bot] 7f97abd
fix(ENG-379): fix execute() on_packet_start ordering and test regress…
kurodo3[bot] 065d7fc
test(ENG-379): fix test_caches_computed_results in test-objective — a…
kurodo3[bot] d67aca1
fix(ENG-379): address Copilot review — None guard, run() READ_ONLY, g…
kurodo3[bot] 289573f
fix(ENG-379): address second Copilot review — cache-hit check, doc co…
kurodo3[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
1,630 changes: 1,630 additions & 0 deletions
1,630
docs/superpowers/plans/2026-04-07-eng379-function-node-refactor.md
Large diffs are not rendered by default.
Oops, something went wrong.
315 changes: 315 additions & 0 deletions
315
docs/superpowers/specs/2026-04-07-function-node-refactor-design.md
Large diffs are not rendered by default.
Oops, something went wrong.
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,164 @@ | ||
| """Tests for the refactored FunctionNode iteration semantics. | ||
|
|
||
| After ENG-379: | ||
| - iter_packets() is strictly read-only — never triggers computation | ||
| - Computation only via run() / execute() / async_execute() | ||
| """ | ||
| from __future__ import annotations | ||
|
|
||
| from unittest.mock import patch | ||
|
|
||
| import pyarrow as pa | ||
| import pytest | ||
|
|
||
| from orcapod.core.function_pod import FunctionPod | ||
| from orcapod.core.nodes import FunctionNode | ||
| from orcapod.core.packet_function import PythonPacketFunction | ||
| from orcapod.core.sources import ArrowTableSource | ||
| from orcapod.databases import InMemoryArrowDatabase | ||
| from orcapod.core.executors import LocalExecutor | ||
|
|
||
|
|
||
| def _make_source(n: int = 3) -> ArrowTableSource: | ||
| table = pa.table( | ||
| { | ||
| "id": pa.array(list(range(n)), type=pa.int64()), | ||
| "x": pa.array(list(range(n)), type=pa.int64()), | ||
| }, | ||
| schema=pa.schema( | ||
| [ | ||
| pa.field("id", pa.int64(), nullable=False), | ||
| pa.field("x", pa.int64(), nullable=False), | ||
| ] | ||
| ), | ||
| ) | ||
| return ArrowTableSource(table, tag_columns=["id"]) | ||
|
|
||
|
|
||
| def _make_node(n: int = 3, db: InMemoryArrowDatabase | None = None) -> FunctionNode: | ||
| def double(x: int) -> int: | ||
| return x * 2 | ||
|
|
||
| pf = PythonPacketFunction(double, output_keys="result") | ||
| pod = FunctionPod(pf) | ||
| pipeline_db = db if db is not None else InMemoryArrowDatabase() | ||
| return FunctionNode(pod, _make_source(n=n), pipeline_database=pipeline_db) | ||
|
|
||
|
|
||
| class TestIterPacketsReadOnly: | ||
| def test_fresh_node_no_db_yields_nothing(self): | ||
| """iter_packets() on a fresh node with no run() and empty DB yields nothing.""" | ||
| node = _make_node() | ||
| assert list(node.iter_packets()) == [] | ||
|
|
||
| def test_iter_does_not_call_process_packet_internal(self): | ||
| """iter_packets() never calls _process_packet_internal under any non-compute path.""" | ||
| node = _make_node() | ||
| with patch.object(node, "_process_packet_internal") as mock_proc: | ||
| list(node.iter_packets()) | ||
| mock_proc.assert_not_called() | ||
|
|
||
| def test_iter_after_db_populated_hot_loads_without_compute(self): | ||
| """iter_packets() on a node with DB records hot-loads without _process_packet_internal.""" | ||
| db = InMemoryArrowDatabase() | ||
| node1 = _make_node(n=3, db=db) | ||
| node1.run() # populate DB | ||
|
|
||
| node2 = _make_node(n=3, db=db) | ||
| with patch.object(node2, "_process_packet_internal") as mock_proc: | ||
| results = list(node2.iter_packets()) | ||
| mock_proc.assert_not_called() | ||
| assert len(results) == 3 | ||
|
|
||
| def test_after_run_iter_yields_from_cache_no_db_query(self): | ||
| """After run(), iter_packets() yields from _cached_output_packets without DB query.""" | ||
| node = _make_node() | ||
| node.run() | ||
| initial_count = len(node._cached_output_packets) | ||
| assert initial_count == 3 | ||
|
|
||
| with patch.object(node, "_load_cached_entries") as mock_load: | ||
| results = list(node.iter_packets()) | ||
| mock_load.assert_not_called() | ||
| assert len(results) == 3 | ||
|
|
||
| def test_iter_twice_same_order_db_queried_once(self): | ||
| """Two successive iter_packets() calls return same order; DB queried at most once.""" | ||
| db = InMemoryArrowDatabase() | ||
| node1 = _make_node(n=3, db=db) | ||
| node1.run() | ||
|
|
||
| node2 = _make_node(n=3, db=db) | ||
| with patch.object(node2, "_load_cached_entries", wraps=node2._load_cached_entries) as mock_load: | ||
| first = [(t["id"], p["result"]) for t, p in node2.iter_packets()] | ||
| second = [(t["id"], p["result"]) for t, p in node2.iter_packets()] | ||
| assert mock_load.call_count <= 1 # at most one DB query | ||
| assert first == second | ||
|
|
||
| def test_cached_output_packets_keyed_by_entry_id_strings(self): | ||
| """After run(), _cached_output_packets keys are entry_id strings, not ints.""" | ||
| node = _make_node() | ||
| node.run() | ||
| assert len(node._cached_output_packets) == 3 | ||
| for key in node._cached_output_packets: | ||
| assert isinstance(key, str), f"Expected str key, got {type(key)}: {key!r}" | ||
|
|
||
| def test_as_table_fresh_node_returns_empty_no_compute(self): | ||
| """as_table() on a fresh node with no run() and empty DB returns empty table.""" | ||
| node = _make_node() | ||
| with patch.object(node, "_process_packet_internal") as mock_proc: | ||
| table = node.as_table() | ||
| mock_proc.assert_not_called() | ||
| assert isinstance(table, pa.Table) | ||
| assert len(table) == 0 | ||
|
|
||
| def test_run_cache_only_is_noop(self): | ||
| """run() on a CACHE_ONLY node returns without error and without computation.""" | ||
| from orcapod.pipeline.serialization import LoadStatus | ||
|
|
||
| node = _make_node() | ||
| node._load_status = LoadStatus.CACHE_ONLY | ||
| node._input_stream = None # simulate no upstream | ||
|
|
||
| with patch.object(node, "execute") as mock_exec: | ||
| node.run() | ||
| mock_exec.assert_not_called() | ||
|
|
||
| def test_run_unavailable_raises(self): | ||
| """run() on an UNAVAILABLE node raises RuntimeError.""" | ||
| from orcapod.pipeline.serialization import LoadStatus | ||
|
|
||
| node = _make_node() | ||
| node._load_status = LoadStatus.UNAVAILABLE | ||
| with pytest.raises(RuntimeError, match="unavailable"): | ||
| node.run() | ||
|
|
||
| def test_execute_error_policy_continue_skips_failures(self): | ||
| """execute() fires on_packet_crash per failing packet and returns successes when error_policy='continue'. | ||
|
|
||
| Uses LocalExecutor (non-concurrent) to test the sequential execute() path. | ||
| """ | ||
| errors = [] | ||
|
|
||
| def sometimes_fail(x: int) -> int: | ||
| if x == 1: | ||
| raise ValueError("intentional failure") | ||
| return x * 2 | ||
|
|
||
| pf = PythonPacketFunction(sometimes_fail, output_keys="result") | ||
| pf.executor = LocalExecutor() # sets executor (LocalExecutor.supports_concurrent_execution is False) | ||
| pod = FunctionPod(pf) | ||
| db = InMemoryArrowDatabase() | ||
| node = FunctionNode(pod, _make_source(n=3), pipeline_database=db) | ||
|
|
||
| from orcapod.pipeline.observer import NoOpObserver | ||
|
|
||
| class CapturingObserver(NoOpObserver): | ||
| def on_packet_crash(self, node_label, tag, packet, exc): | ||
| errors.append(exc) | ||
|
|
||
| results = node.execute(node._input_stream, observer=CapturingObserver(), error_policy="continue") | ||
| assert len(errors) == 1 | ||
| assert isinstance(errors[0], ValueError) | ||
| # Two non-failing packets should succeed | ||
| assert len(results) == 2 | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't you be able to specify nullability on the arrays and skip having to specify it on the schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PyArrow doesn't expose nullability at the array level —
nullableis a property ofpa.fieldonly, notpa.array. There's nopa.array(..., nullable=False)API. The explicitschema=pa.schema([...])block is the standard PyArrow idiom for enforcing non-nullability on a raw table, so I've kept it as-is.