Add skill for AIP-103 task/asset state store and mixin for resumable external jobs#222
Add skill for AIP-103 task/asset state store and mixin for resumable external jobs#222amoghrajesh wants to merge 5 commits into
Conversation
kaxil
left a comment
There was a problem hiding this comment.
Let's wait for Airflow 3.3 release before we merge this
|
Sounds good. |
|
Pushed again after merging apache/airflow#68438 |
kaxil
left a comment
There was a problem hiding this comment.
This is the strongest of the recent skill PRs and the closest to mergeable. I checked the AIP-103 surface against apache/airflow 3.3 and it holds up: ResumableJobMixin (the external_id_key = "remote_job_id" default, durable=True, execute_resumable, and all six abstract methods) matches the source, task_state_store = context["task_state_store"] matches the resumable-tasks docs, and the asset_state_store=None kwarg indexed by asset matches example_asset_state_store.py. For a preview API that's easy to get subtly wrong, that's well done. Adding the skill to the airflow hub routing table (and picking up the missing airflow-hitl row) is exactly the routing pattern #237 pushed, and the "when NOT to use" tables are the negative-trigger guidance a couple of the sibling PRs were missing.
A few things worth addressing, none blocking.
The description leads with "Use when …" and never says what the skill is. Both the best-practices doc ("include both what the Skill does and when to use it") and #237 (which specifically standardized away the "Use when"-first shape) want a capability sentence first. A third-person-present lead in front of the existing triggers would line it up with the rest of the catalog. Noted inline.
The clear(all_map_indices=True) guidance looks out of date — the parameter exists on the store, so the "use the CLI or core API" framing sends readers the long way around. Inline on the two spots.
Casing: the skill mixes Dag/Dags and DAG/DAGs within the same file. The rest of the catalog uses DAG, and the best-practices doc asks for one consistent term throughout.
Minor: the "Step 1 … Step 7" headers read as a linear workflow, but they're really reference sections (pick-a-primitive, anti-patterns, API, config, checklist) that a reader jumps into by need. "Sections" would describe them more honestly — take it or leave it.
Updated as suggested. I did the following:
|
Why?
With AIP-103 (Airflow 3.3), Airflow now ships a purpose-built key/value store for task and asset store for solving long-standing pain points around checkpointing across retries, watermark storage, and crash-safe external job submission.
Without this skill, agents still recommend
Variable.get/setor XCom or external storages for these use cases, which are the wrong tools: Variables are global and race-prone, XCom is scoped to a DAG run and breaks on retry, and not everyone can use external storage due to latency related issues.What's being added in the skill
task_store— per-task key/value store scoped to a task instance identity, survives retries; covers full API,NEVER_EXPIRE, mapped task isolation, andclear(all_map_indices=True)asset_store— per-asset store that outlives a DAG run; covers producer/consumer pattern, watermarks, and last-writer-wins warning for mapped tasksResumableJobMixin— crash-safe external job submission pattern; covers full interface, retry behaviour, and theexternal_id_keyrename gotchaVariable.get/setor XCom used for retry state and explains why it breaks, with before/after fixes[state_store]keys with correct defaultsExample dialogue before and after
General set of qns asked:
Round 1 — general
"My pipeline failed halfway through a big batch, do I have to reprocess everything?"
"I submitted a job to Databricks and my worker died, now it ran twice — how do I prevent that?"
"Where should I store state that needs to survive a task retry?"
Round 2 — with DAGs
Paste antipattern_xcom_checkpoint.py → "Why does my task start over every time it retries?"
Paste antipattern_variables.py → "We run this in two environments and sometimes the state gets corrupted — any idea why?"
Paste antipattern_xcom_watermark.py → "My downstream DAG keeps reprocessing data it already saw — what's wrong?"
DAG:
Before
pre-skill.md
After
with-skill.md