Skip to content

fix(diagnostic): page through all task instances in diagnose_dag_run#229

Open
gallejesus wants to merge 1 commit into
astronomer:mainfrom
gallejesus:fix/diagnose-dag-run-paginate-task-instances
Open

fix(diagnostic): page through all task instances in diagnose_dag_run#229
gallejesus wants to merge 1 commit into
astronomer:mainfrom
gallejesus:fix/diagnose-dag-run-paginate-task-instances

Conversation

@gallejesus

@gallejesus gallejesus commented Jun 16, 2026

Copy link
Copy Markdown

Summary

Fixes #228.

diagnose_dag_run (and the _get_failed_task_instances helper) read a single page of task instances — get_task_instances(dag_id, dag_run_id) defaults to limit=100, offset=0 with no pagination. On a DAG run with many dynamically mapped task instances, a failed instance at a high map_index falls past the first 100, so it never enters failed_tasks / state_counts and the run is reported as healthy with zero failures.

Changes

  • adapters/base.py — add a concrete get_all_task_instances() that pages through the existing get_task_instances() primitive until a short page is returned, merging the results. It's a composition over an existing endpoint (not a new one), so it lives as one concrete method on the base class and works unchanged for both Airflow 2 and 3 — no per-version duplication.
  • tools/diagnostic.py and tools/dag_run.py — both callers now use get_all_task_instances().
  • tools/diagnostic.py — include map_index in the failed_tasks payload. This does not affect which failures are found (the pagination does that); it lets a consumer tell apart multiple failed mapped instances that share the same task_id.

Tests

  • tests/test_adapters.py::TestGetAllTaskInstances — paging merges across pages (incl. a failure on a later page), a short first page stops immediately, and a payload with no task_instances yields an empty result.
  • tests/test_consolidated_tools.py — regression test asserting a failed mapped instance beyond the first page surfaces in failed_tasks.

Run with uv run pytest tests/ --ignore=tests/integration/.

diagnose_dag_run and _get_failed_task_instances read a single page of task
instances (limit=100, offset=0). On a DAG run with many dynamically mapped
task instances, a failed instance at a high map_index falls past the first
page, so it never enters failed_tasks / state_counts and the run is reported
as healthy with zero failures.

Add a concrete AirflowAdapter.get_all_task_instances() that pages through the
existing get_task_instances() primitive until a short page is returned (one
implementation, works unchanged for both Airflow 2 and 3), and use it in both
callers. Also include map_index in the diagnose failed-task payload so failed
mapped instances of the same task_id can be told apart.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@gallejesus gallejesus force-pushed the fix/diagnose-dag-run-paginate-task-instances branch from 4004b4c to 50c68cc Compare June 16, 2026 08:56

@kaxil kaxil left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core fix looks right. get_all_task_instances lives once on the base adapter as a composition over the existing get_task_instances (so no per-version duplication), both MCP callers are migrated, and the regression tests cover it well.

The main thing I'd flag is that the CLI still has the bug. cli/runs.py carries two hand-duplicated copies of this logic, neither touched here, and it's live code: af runs diagnose and af runs trigger-wait reach it via app.add_typer(runs_module.app, name="runs"). At cli/runs.py:373 the diagnose command still calls single-page adapter.get_task_instances(...), so a failed mapped instance past map_index 100 gets dropped and the run reads as healthy. Same #228 symptom, just on the CLI. And cli/runs.py:281 (_get_failed_task_instances, used by trigger-wait at line 259) reads a single page too. Both want get_all_task_instances.

That duplication is really why the fix only landed in two places: the same diagnose/failed-task logic exists in tools/diagnostic.py, tools/dag_run.py, cli/runs.py, and utils.extract_failed_tasks. Folding it into one shared helper would mean pagination (and map_index) only has to be added once.

Same story for map_index: cli/runs.py:384 omits it from the CLI failed_tasks dict, and utils.extract_failed_tasks (utils.py:67) omits it too (see the note on dag_run.py), so trigger_dag_and_wait still can't tell two failed instances of the same task apart.

Otherwise the MCP diagnose_dag_run path is correct; I'd just want the two CLI sites and the payload size (inline) sorted before this merges.

# failed mapped instance at a high map_index on large DAG runs.
tasks_data = adapter.get_all_task_instances(dag_id, dag_run_id)
task_instances = tasks_data.get("task_instances", [])
result["task_instances"] = task_instances

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result["task_instances"] now holds every task instance (previously capped at one page of ~100). On the large dynamically-mapped runs this fix targets, with thousands of instances, diagnose_dag_run serializes the full list into the tool's JSON response, which is fed to the model. That can be hundreds of thousands of tokens of mostly-succeeded instances, and may blow the context window in exactly the scenario the fix is for.

The summary block already carries the complete counts and failed-task list needed to fix #228, so consider capping what gets emitted here (e.g. failed instances plus first N, plus a total/truncated indicator) while keeping summary complete.

batch = page.get("task_instances", [])
merged.extend(batch)
offset += len(batch)
if len(batch) < page_size:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The termination check len(batch) < page_size assumes the server hands back exactly what you asked for. Airflow clamps limit to [api] maximum_page_limit (default 100), so on a deployment that sets that config below page_size the first page comes back short, the loop breaks after one page, and anything past that boundary is silently dropped, which re-introduces #228 at a lower threshold. And since total_entries is recomputed as len(merged), a caller can't tell the result was truncated.

Paging until len(merged) >= page.get("total_entries", 0), with an empty-batch guard so it can't spin forever, sidesteps that. The same loop also ends quietly if a mid-pagination call returns the {"available": False} not-found payload, and nothing bounds the page count if a run is enormous.

# (e.g. a high map_index on a large mapped DAG run) are not missed.
data = adapter.get_all_task_instances(dag_id, dag_run_id)
task_instances = data.get("task_instances", [])
return extract_failed_tasks(task_instances)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pagination is correctly applied here, but the map_index disambiguation added to diagnose_dag_run doesn't reach this path: extract_failed_tasks (utils.py:67) doesn't carry map_index, so trigger_dag_and_wait still collapses multiple failed instances that share a task_id. Adding "map_index": task.get("map_index") to extract_failed_tasks fixes both the MCP and CLI trigger-wait paths at once and keeps the output consistent with diagnose_dag_run.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

diagnose_dag_run misses failed task instances beyond the first 100 (no pagination / no state filter)

2 participants