Skip to content

refactor(ENG-379): make iter_packets() read-only, consolidate DB join sites#131

Merged
eywalker merged 26 commits intodevfrom
eywalker/eng-379-refactor-functionnode-clean-up-redundant-logic-and-ensure
Apr 7, 2026
Merged

refactor(ENG-379): make iter_packets() read-only, consolidate DB join sites#131
eywalker merged 26 commits intodevfrom
eywalker/eng-379-refactor-functionnode-clean-up-redundant-logic-and-ensure

Conversation

@kurodo3
Copy link
Copy Markdown
Contributor

@kurodo3 kurodo3 Bot commented Apr 7, 2026

Summary

  • iter_packets() strictly read-only: computation now only via run()/execute()/async_execute(); callers must explicitly trigger computation before iterating
  • _load_cached_entries() helper: consolidates 4+ duplicated DB join sites into a single reusable method
  • execute() rewrite: fires on_packet_start before each packet (fixes StatusObserver RUNNING records), removes incorrect concurrent asyncio.gather path (always sequential — async belongs in async_execute()), calls _update_modified_time() after completion so subsequent iter_packets() serves from in-memory cache without stale DB reload

Test Plan

  • All 3179 tests pass (uv run pytest)
  • test_fail_fast_aborts_and_records_status — RUNNING record written before crash
  • test_explicit_sync_config_overrides_async_default — sync path used when config=SYNCHRONOUS
  • test_cache_only_function_node_serves_cached_results — in-memory cache served after execute()
  • test_definition_save_load_run_roundtrip — loaded pipeline calls run() before iter_packets()

🤖 Generated with Claude Code

kurodo3 Bot and others added 23 commits April 7, 2026 08:04
Spec covering: read-only iter_packets, _load_cached_entries helper,
run() delegation to execute(), entry_id keyed result store, and
full method inventory / test impact analysis.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
15-task plan covering:
- _load_cached_entries() single DB join helper (replaces 4 duplicates)
- iter_packets() rewrite to strictly read-only
- run() rewrite with load_status guard delegating to execute()
- execute() selective DB reload + concurrent asyncio.gather path
- async_execute() Phase 1 simplification
- _process_packet_internal() entry_id keying (remove cache_index)
- get_cached_results() add-only semantics (no clear)
- as_table() empty-case fix
- Removal of 5 dead methods
- Test updates across 5 test files + 10 new iteration tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ache_index

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… execute()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…DB hot-load

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ator state

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ntries(), add-only semantics

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ached_entries()

Replace inline DB join logic with a call to _load_cached_entries(), which was
introduced in Task 2, eliminating duplicated code across cache execution paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ncurrent path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…d_entries()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ter_all_from_database, load_all_cached_records)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…fore compute-dependent assertions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… semantics

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ields nothing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ions

- Rewrite execute() to fire on_packet_start before each packet (not
  after all computation), fixing StatusObserver RUNNING record ordering
- Remove concurrent asyncio.gather path from execute(); it is always
  sequential — async path belongs in async_execute() only
- Call _update_modified_time() at end of execute() so iter_packets()
  skips is_stale check and serves from in-memory cache
- Update tests to call node.run() before iter_packets() — iter_packets()
  is strictly read-only per ENG-379; computation requires run()/execute()
- Rename test_function_node_uses_async_path to
  test_function_node_uses_sync_path_via_run, reflecting that execute()
  is always synchronous (async path only via async_execute())

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@eywalker eywalker requested a review from Copilot April 7, 2026 14:19
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 7, 2026

Codecov Report

❌ Patch coverage is 96.25000% with 3 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/orcapod/core/nodes/function_node.py 96.25% 3 Missing ⚠️

📢 Thoughts on this report? Let us know!

…dd run() before iter_packets()

iter_packets() is strictly read-only; callers must call run() first to
trigger computation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors FunctionNode iteration and caching semantics to make iter_packets() strictly read-only, moves all computation behind explicit run() / execute() / async_execute(), and consolidates repeated pipeline+result DB join logic into a single _load_cached_entries() helper.

Changes:

  • Make FunctionNode.iter_packets() yield only already-computed results (no implicit computation); callers must explicitly compute via run() / execute() / async_execute().
  • Consolidate duplicated DB join logic into _load_cached_entries() and switch _cached_output_packets to be keyed by pipeline entry_id (string).
  • Update/extend tests and add design docs to reflect the new read-only iteration contract.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/orcapod/core/nodes/function_node.py Core refactor: read-only iter_packets(), _load_cached_entries(), entry-id keyed cache, updated execute()/run()/as_table() behavior.
tests/test_core/nodes/test_function_node_iteration.py New test suite for ENG-379 iteration semantics and cache behavior.
tests/test_pipeline/test_serialization.py Ensure loaded pipeline explicitly calls run() before iterating packets.
tests/test_data/test_polars_nullability/test_function_node_nullability.py Update nullability test to use _load_cached_entries() instead of removed DB-iteration helper.
tests/test_core/sources/test_derived_source.py Update downstream derived-source test to explicitly compute before iterating.
tests/test_core/packet_function/test_executor.py Adjust executor-path expectations now that run() triggers sync execute() semantics.
tests/test_core/function_pod/test_pipeline_hash_integration.py Ensure nodes compute via run() before iter_packets().
tests/test_core/function_pod/test_function_pod_node.py Update fixture to compute via run() before iterating.
tests/test_core/function_pod/test_function_pod_node_stream.py Update many tests to call run() before iteration / as_table(), aligning with read-only iteration.
tests/test_core/function_pod/test_function_node_caching.py Ensure nodes compute via run() prior to iter_packets() assertions.
tests/test_core/function_pod/test_function_node_attach_db.py Ensure tests explicitly compute via run() before iterating after DB attach.
tests/test_channels/test_node_async_execute.py Update patched internal signature to match _process_packet_internal(..., logger=...).
docs/superpowers/specs/2026-04-07-function-node-refactor-design.md Design spec documenting the refactor rationale and intended behavior.
docs/superpowers/plans/2026-04-07-eng379-function-node-refactor.md Implementation plan capturing tasks/steps and test migration guidance.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/orcapod/core/nodes/function_node.py Outdated
node_label, tag, packet, result, cached=True
)
ctx_obs.on_packet_end(node_label, tag, packet, result, cached=True)
output.append((tag_out, result))
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the cached-hit branch, the code appends result to output unconditionally. Since _cached_output_packets stores PacketProtocol | None (and get_cached_results() can return values from that store), result can be None, which violates the method contract (“excluding None outputs”) and can break callers expecting a PacketProtocol. Mirror the non-cached branch by only appending when result is not None (while still firing observer hooks as desired).

Suggested change
output.append((tag_out, result))
if result is not None:
output.append((tag_out, result))

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added if result is not None: guard in the cached-hit branch of execute() to match the non-cached branch, ensuring None outputs are never appended to the return list regardless of whether the result came from cache or fresh computation.

Comment on lines 691 to +723
def get_cached_results(
self, entry_ids: list[str]
) -> dict[str, tuple[TagProtocol, PacketProtocol]]:
"""Retrieve cached results for specific pipeline entry IDs.

Looks up the pipeline DB and result DB, joins them, and filters
to the requested entry IDs. Returns a mapping from entry ID to
(tag, output_packet).
Checks in-memory cache first. Loads only truly missing entries from DB.
Add-only semantics: existing in-memory entries are never cleared or
overwritten (overwrite is safe since in-memory and DB entries for the
same entry_id are always semantically equivalent).

Args:
entry_ids: Pipeline entry IDs to look up.

Returns:
Mapping from entry_id to (tag, output_packet) for found entries.
Mapping from entry_id to ``(tag, output_packet)`` for found entries.
Empty dict if no DB is attached or no matches found.
"""
if self._cached_function_pod is None or not entry_ids:
return {}

self._require_pipeline_database()

PIPELINE_ENTRY_ID_COL = "__pipeline_entry_id"

taginfo = self._pipeline_database.get_all_records(
self.node_identity_path,
record_id_column=PIPELINE_ENTRY_ID_COL,
)
results = self._cached_function_pod._result_database.get_all_records(
self._cached_function_pod.record_path,
record_id_column=constants.PACKET_RECORD_ID,
)

if taginfo is None or results is None:
return {}

taginfo = self._filter_by_content_hash(taginfo)
taginfo_schema = taginfo.schema
results_schema = results.schema
filtered = (
pl.DataFrame(taginfo)
.join(
pl.DataFrame(results),
on=constants.PACKET_RECORD_ID,
how="inner",
)
.filter(pl.col(PIPELINE_ENTRY_ID_COL).is_in(entry_ids))
.to_arrow()
)
filtered = arrow_utils.restore_schema_nullability(filtered, taginfo_schema, results_schema)

if filtered.num_rows == 0:
return {}

tag_keys = self._input_stream.keys()[0]
drop_cols = [
c
for c in filtered.column_names
if c.startswith(constants.META_PREFIX)
or c == PIPELINE_ENTRY_ID_COL
or c == constants.NODE_CONTENT_HASH_COL
]
data_table = filtered.drop([c for c in drop_cols if c in filtered.column_names])

stream = ArrowTableStream(data_table, tag_columns=tag_keys)
filtered_entry_ids = filtered.column(PIPELINE_ENTRY_ID_COL).to_pylist()

result_dict: dict[str, tuple[TagProtocol, PacketProtocol]] = {}
for entry_id, (tag, packet) in zip(filtered_entry_ids, stream.iter_packets()):
result_dict[entry_id] = (tag, packet)

# Populate internal cache with retrieved results (clear first to
# avoid duplicates on repeated orchestrator runs)
self._cached_output_packets.clear()
self._cached_output_table = None
self._cached_content_hash_column = None
for entry_id, (tag, packet) in result_dict.items():
next_idx = len(self._cached_output_packets)
self._cached_output_packets[next_idx] = (tag, packet)
self._cached_input_iterator = None
self._needs_iterator = False
missing = [eid for eid in entry_ids if eid not in self._cached_output_packets]
if missing:
loaded = self._load_cached_entries(missing)
self._cached_output_packets.update(loaded)
if loaded:
self._cached_output_table = None
self._cached_content_hash_column = None

return result_dict
return {
eid: self._cached_output_packets[eid]
for eid in entry_ids
if eid in self._cached_output_packets
}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_cached_results() is annotated to return dict[str, tuple[TagProtocol, PacketProtocol]], but it returns values directly from _cached_output_packets, whose packet value is PacketProtocol | None. This can surface None packets to callers (and currently feeds into execute()), contradicting the docstring (“excluding None outputs”). Consider filtering out None packets in the returned mapping (and optionally treating cached None values as cache misses if recomputation/DB load should be allowed).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. get_cached_results() now filters out entries where the packet is None before returning, so callers always receive (TagProtocol, PacketProtocol) pairs. Stored None values are still retained in _cached_output_packets to prevent unnecessary DB re-fetches — they just won't surface to callers.

Comment on lines +1158 to +1169
from orcapod.pipeline.serialization import LoadStatus

def run(self) -> None:
"""Eagerly process all input packets, filling the pipeline and result databases."""
for _ in self.iter_packets():
pass
if self._load_status == LoadStatus.UNAVAILABLE:
raise RuntimeError(
f"FunctionNode {self.label!r} is unavailable: "
"no function pod and no database attached."
)
if self._load_status == LoadStatus.CACHE_ONLY:
# Upstream is unavailable; computation requires a live input stream.
# Callers should use iter_packets() to serve existing DB results.
return
self.execute(self._input_stream)
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run() doesn’t handle LoadStatus.READ_ONLY (PacketFunctionProxy): it will call execute() and then fail with PacketFunctionUnavailableError. It also doesn’t clear in-memory caches when self.is_stale, which can leave stale entries in the add-only _cached_output_packets and cause iter_packets() to yield a mix of old+new results after a rerun. Recommend: (1) guard READ_ONLY with a clear RuntimeError/no-op (similar to CACHE_ONLY), and (2) if is_stale then clear_cache() before computing to avoid retaining stale entries.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed both issues. run() now treats READ_ONLY as a no-op (same as CACHE_ONLY) — proxy function pods cannot compute, so silently returning is the right behaviour. Also added if self.is_stale: self.clear_cache() before calling execute() so any stale in-memory entries are discarded before a fresh computation run.

Comment on lines +136 to +147
def test_execute_concurrent_error_policy_continue(self):
"""execute() fires on_packet_crash per failing packet and returns successes when error_policy='continue'."""
errors = []

def sometimes_fail(x: int) -> int:
if x == 1:
raise ValueError("intentional failure")
return x * 2

pf = PythonPacketFunction(sometimes_fail, output_keys="result")
pf.executor = LocalExecutor() # sets executor (LocalExecutor.supports_concurrent_execution is False)
pod = FunctionPod(pf)
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test name is misleading: test_execute_concurrent_error_policy_continue uses LocalExecutor with supports_concurrent_execution false, so it exercises the sequential execute() path. Rename the test to avoid “concurrent” in the name, or switch to an executor that actually enables the concurrent path (if that behavior is intended to be covered here).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Renamed to test_execute_error_policy_continue_skips_failures and added a clarifying note to the docstring that the test uses LocalExecutor (non-concurrent) and exercises the sequential execute() path.

Comment on lines 232 to 237
)

# node2 doubles the already-doubled values: 0*2*2=0, 1*2*2=4, 2*2*2=8
node2.run()
results = sorted(cast(int, p["result"]) for _, p in node2.iter_packets())
assert results == [0, 4, 8]
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_derived_source_matches_node_output in this file still collects node_results via node.iter_packets() without calling node.run() first. With ENG-379 making iter_packets() read-only, this will now yield an empty result set and the assertion will no longer validate the intended behavior. Update that test to call node.run() (or node.execute(...)) before comparing node vs DerivedSource outputs.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added node.run() before node.iter_packets() in test_derived_source_matches_node_output so the test properly exercises the read-only iteration contract (compute via run(), then iterate).

…et_cached_results filter

- execute(): guard cached-hit branch with `if result is not None` to
  match non-cached branch and satisfy PacketProtocol return contract
- get_cached_results(): filter None-packet entries from returned mapping
  so callers always receive (tag, PacketProtocol) — stored None values
  still prevent unnecessary DB re-fetches (they remain in the cache)
- run(): handle LoadStatus.READ_ONLY as a no-op (like CACHE_ONLY) —
  proxy function pods cannot compute; also call clear_cache() when
  is_stale to discard stale entries before a fresh computation run
- test_execute_concurrent_error_policy_continue → rename to
  test_execute_error_policy_continue_skips_failures — the test uses
  LocalExecutor (non-concurrent), so "concurrent" in the name was
  misleading; add clarifying note to docstring
- test_derived_source_matches_node_output: add node.run() before
  iter_packets() to satisfy read-only iteration contract

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@kurodo3
Copy link
Copy Markdown
Contributor Author

kurodo3 Bot commented Apr 7, 2026

Review round addressed — commit d67aca1

All 5 Copilot comments resolved in a single follow-up commit:

# Comment Fix
1 Misleading test name (test_execute_concurrent_error_policy_continue used a non-concurrent executor) Renamed to test_execute_error_policy_continue_skips_failures; added docstring note clarifying it exercises the sequential path via LocalExecutor
2 Cached-hit branch appends None unconditionally in execute() Added if result is not None: guard to mirror the non-cached branch
3 test_derived_source_matches_node_output missing node.run() Added node.run() before node.iter_packets()
4 run() missing READ_ONLY guard + stale cache not cleared READ_ONLY now treated as no-op (like CACHE_ONLY); added if self.is_stale: self.clear_cache() before execute()
5 get_cached_results() could return None packets despite PacketProtocol annotation Return dict now filters out entries where the packet is None; None values remain in _cached_output_packets as cache sentinels to avoid unnecessary DB re-fetches

All 3724 tests pass (uv run pytest tests/ test-objective/).

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/orcapod/core/nodes/function_node.py Outdated
Comment on lines +611 to +615
if entry_id in cached:
tag_out, result = cached[entry_id]
ctx_obs.on_packet_end(
node_label, tag, packet, result, cached=True
)
output.append((tag_out, result))
ctx_obs.on_packet_end(node_label, tag, packet, result, cached=True)
if result is not None:
output.append((tag_out, result))
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In execute(), cache-hit detection relies on the dict returned by get_cached_results(). That method intentionally filters out entries whose cached packet is None, so any entry_id already present in _cached_output_packets with a None output will be treated as a cache miss and recomputed on subsequent execute() calls (potentially re-triggering side effects). Consider using _cached_output_packets membership for the cache-hit check (including None) and only filtering out None when appending to the returned output list; keep get_cached_results() primarily for DB hot-loading of missing entry_ids.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. execute() now uses _cached_output_packets directly for the cache-hit membership check (which includes None-output entries) and calls get_cached_results() only for its DB hot-load side effect (the returned dict is discarded). A None packet stored in the cache correctly prevents recomputation on subsequent execute() calls while still being excluded from the output list.

Comment on lines +148 to +155
if _executor_supports_concurrent(self._packet_function) and to_compute:
inline concurrent gather (see below)
else:
for tag, pkt, eid in to_compute:
try:
_process_packet_internal(tag, pkt)
except Exception as exc:
log warning; on_packet_crash; re-raise if error_policy=="fail_fast"
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design doc describes an execute() “concurrent gather” path keyed off _executor_supports_concurrent(...). In the current implementation of FunctionNode.execute(), computation is always sequential and the async path is only in async_execute(). Please update this section to match the actual behavior (or clearly mark the concurrent path as out-of-scope/future work) so readers don’t implement against an outdated contract.

Suggested change
if _executor_supports_concurrent(self._packet_function) and to_compute:
inline concurrent gather (see below)
else:
for tag, pkt, eid in to_compute:
try:
_process_packet_internal(tag, pkt)
except Exception as exc:
log warning; on_packet_crash; re-raise if error_policy=="fail_fast"
# Current behavior: execute() is always synchronous/sequential.
# Any async/concurrent gather path is handled by async_execute()
# and is out of scope for execute() in this design.
for tag, pkt, eid in to_compute:
try:
_process_packet_internal(tag, pkt)
except Exception as exc:
log warning; on_packet_crash; re-raise if error_policy=="fail_fast"

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Replaced the stale concurrent-gather pseudocode in execute() section of the design doc with accurate sequential-only pseudocode that matches the implementation. Added an explicit note that execute() is always synchronous and that async/concurrent execution is exclusively in async_execute(). Also updated testing item 10 and the migration-notes section accordingly.

Comment on lines +200 to +204
return x * 2

pf = PythonPacketFunction(sometimes_fail, output_keys="result")
pf.executor = LocalExecutor() # enables concurrent path
pod = FunctionPod(pf)
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plan snippet still says pf.executor = LocalExecutor() # enables concurrent path and uses the old test name test_execute_concurrent_error_policy_continue. In the current code/tests, LocalExecutor is non-concurrent and the execute() coverage here is sequential (the test was renamed accordingly). Please update the plan so it matches the implemented behavior.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Updated the plan doc in three places: (1) module docstring changed from "concurrent path via asyncio.gather" to "always sequential; async/concurrent path is only in async_execute()"; (2) test renamed from test_execute_concurrent_error_policy_continue to test_execute_error_policy_continue_skips_failures; (3) LocalExecutor comment changed from "enables concurrent path" to "non-concurrent; tests sequential execute() path". Also updated Task 9 heading, the pytest invocation, and the git commit message in the plan.

…rrectness

- execute(): use _cached_output_packets directly for cache-hit membership
  check (not the filtered get_cached_results() return value) so that
  None-output entries are correctly treated as cache hits and not
  recomputed on subsequent execute() calls; get_cached_results() is
  still called for its DB hot-load side effect
- design doc: replace pseudocode describing a concurrent asyncio.gather
  path in execute() with accurate sequential-only pseudocode; add a note
  that async/concurrent execution is exclusively in async_execute()
- design doc: update test item 10 and migration notes to use the renamed
  test and correct (sequential) path description
- plan doc: fix module docstring ("concurrent path via asyncio.gather"
  → "always sequential"), rename test_execute_concurrent_error_policy_continue
  → test_execute_error_policy_continue_skips_failures, correct LocalExecutor
  comment, and update Task 9 heading/commit message/pytest invocation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@kurodo3
Copy link
Copy Markdown
Contributor Author

kurodo3 Bot commented Apr 7, 2026

Review round 2 addressed — commit 289573f

All 3 Copilot comments resolved in a single follow-up commit:

# Comment Fix
1 execute() cache-hit check relies on filtered get_cached_results() returnNone-output entries would be treated as cache misses and recomputed execute() now uses self._cached_output_packets directly for membership check (includes None); get_cached_results() is still called but its return value is discarded — the side effect (DB hot-load into _cached_output_packets) is all that's needed
2 Design doc describes stale concurrent asyncio.gather path in execute() Replaced pseudocode with accurate sequential-only pseudocode; added explicit note that execute() is always synchronous and that async/concurrent execution is exclusively in async_execute(); updated test item 10 and migration notes
3 Plan doc still says LocalExecutor() # enables concurrent path and uses old test name Fixed module docstring, test name, LocalExecutor comment, Task 9 heading, pytest invocation, and git commit message in the plan

All 3724 tests pass (uv run pytest tests/ test-objective/).

def _make_source(n: int = 3) -> ArrowTableSource:
table = pa.table(
{
"id": pa.array(list(range(n)), type=pa.int64()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you be able to specify nullability on the arrays and skip having to specify it on the schema?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PyArrow doesn't expose nullability at the array level — nullable is a property of pa.field only, not pa.array. There's no pa.array(..., nullable=False) API. The explicit schema=pa.schema([...]) block is the standard PyArrow idiom for enforcing non-nullability on a raw table, so I've kept it as-is.

@eywalker eywalker merged commit 0703470 into dev Apr 7, 2026
9 checks passed
@eywalker eywalker deleted the eywalker/eng-379-refactor-functionnode-clean-up-redundant-logic-and-ensure branch April 7, 2026 18:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants