fix(diagnostic): page through all task instances in diagnose_dag_run#229
fix(diagnostic): page through all task instances in diagnose_dag_run#229gallejesus wants to merge 1 commit into
Conversation
diagnose_dag_run and _get_failed_task_instances read a single page of task instances (limit=100, offset=0). On a DAG run with many dynamically mapped task instances, a failed instance at a high map_index falls past the first page, so it never enters failed_tasks / state_counts and the run is reported as healthy with zero failures. Add a concrete AirflowAdapter.get_all_task_instances() that pages through the existing get_task_instances() primitive until a short page is returned (one implementation, works unchanged for both Airflow 2 and 3), and use it in both callers. Also include map_index in the diagnose failed-task payload so failed mapped instances of the same task_id can be told apart. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
4004b4c to
50c68cc
Compare
kaxil
left a comment
There was a problem hiding this comment.
The core fix looks right. get_all_task_instances lives once on the base adapter as a composition over the existing get_task_instances (so no per-version duplication), both MCP callers are migrated, and the regression tests cover it well.
The main thing I'd flag is that the CLI still has the bug. cli/runs.py carries two hand-duplicated copies of this logic, neither touched here, and it's live code: af runs diagnose and af runs trigger-wait reach it via app.add_typer(runs_module.app, name="runs"). At cli/runs.py:373 the diagnose command still calls single-page adapter.get_task_instances(...), so a failed mapped instance past map_index 100 gets dropped and the run reads as healthy. Same #228 symptom, just on the CLI. And cli/runs.py:281 (_get_failed_task_instances, used by trigger-wait at line 259) reads a single page too. Both want get_all_task_instances.
That duplication is really why the fix only landed in two places: the same diagnose/failed-task logic exists in tools/diagnostic.py, tools/dag_run.py, cli/runs.py, and utils.extract_failed_tasks. Folding it into one shared helper would mean pagination (and map_index) only has to be added once.
Same story for map_index: cli/runs.py:384 omits it from the CLI failed_tasks dict, and utils.extract_failed_tasks (utils.py:67) omits it too (see the note on dag_run.py), so trigger_dag_and_wait still can't tell two failed instances of the same task apart.
Otherwise the MCP diagnose_dag_run path is correct; I'd just want the two CLI sites and the payload size (inline) sorted before this merges.
| # failed mapped instance at a high map_index on large DAG runs. | ||
| tasks_data = adapter.get_all_task_instances(dag_id, dag_run_id) | ||
| task_instances = tasks_data.get("task_instances", []) | ||
| result["task_instances"] = task_instances |
There was a problem hiding this comment.
result["task_instances"] now holds every task instance (previously capped at one page of ~100). On the large dynamically-mapped runs this fix targets, with thousands of instances, diagnose_dag_run serializes the full list into the tool's JSON response, which is fed to the model. That can be hundreds of thousands of tokens of mostly-succeeded instances, and may blow the context window in exactly the scenario the fix is for.
The summary block already carries the complete counts and failed-task list needed to fix #228, so consider capping what gets emitted here (e.g. failed instances plus first N, plus a total/truncated indicator) while keeping summary complete.
| batch = page.get("task_instances", []) | ||
| merged.extend(batch) | ||
| offset += len(batch) | ||
| if len(batch) < page_size: |
There was a problem hiding this comment.
The termination check len(batch) < page_size assumes the server hands back exactly what you asked for. Airflow clamps limit to [api] maximum_page_limit (default 100), so on a deployment that sets that config below page_size the first page comes back short, the loop breaks after one page, and anything past that boundary is silently dropped, which re-introduces #228 at a lower threshold. And since total_entries is recomputed as len(merged), a caller can't tell the result was truncated.
Paging until len(merged) >= page.get("total_entries", 0), with an empty-batch guard so it can't spin forever, sidesteps that. The same loop also ends quietly if a mid-pagination call returns the {"available": False} not-found payload, and nothing bounds the page count if a run is enormous.
| # (e.g. a high map_index on a large mapped DAG run) are not missed. | ||
| data = adapter.get_all_task_instances(dag_id, dag_run_id) | ||
| task_instances = data.get("task_instances", []) | ||
| return extract_failed_tasks(task_instances) |
There was a problem hiding this comment.
Pagination is correctly applied here, but the map_index disambiguation added to diagnose_dag_run doesn't reach this path: extract_failed_tasks (utils.py:67) doesn't carry map_index, so trigger_dag_and_wait still collapses multiple failed instances that share a task_id. Adding "map_index": task.get("map_index") to extract_failed_tasks fixes both the MCP and CLI trigger-wait paths at once and keeps the output consistent with diagnose_dag_run.
Summary
Fixes #228.
diagnose_dag_run(and the_get_failed_task_instanceshelper) read a single page of task instances —get_task_instances(dag_id, dag_run_id)defaults tolimit=100, offset=0with no pagination. On a DAG run with many dynamically mapped task instances, a failed instance at a highmap_indexfalls past the first 100, so it never entersfailed_tasks/state_countsand the run is reported as healthy with zero failures.Changes
adapters/base.py— add a concreteget_all_task_instances()that pages through the existingget_task_instances()primitive until a short page is returned, merging the results. It's a composition over an existing endpoint (not a new one), so it lives as one concrete method on the base class and works unchanged for both Airflow 2 and 3 — no per-version duplication.tools/diagnostic.pyandtools/dag_run.py— both callers now useget_all_task_instances().tools/diagnostic.py— includemap_indexin thefailed_taskspayload. This does not affect which failures are found (the pagination does that); it lets a consumer tell apart multiple failed mapped instances that share the sametask_id.Tests
tests/test_adapters.py::TestGetAllTaskInstances— paging merges across pages (incl. a failure on a later page), a short first page stops immediately, and a payload with notask_instancesyields an empty result.tests/test_consolidated_tools.py— regression test asserting a failed mapped instance beyond the first page surfaces infailed_tasks.Run with
uv run pytest tests/ --ignore=tests/integration/.