Skip to content

Add skill for AIP-103 task/asset state store and mixin for resumable external jobs#222

Open
amoghrajesh wants to merge 5 commits into
astronomer:mainfrom
amoghrajesh:skill-for-aip-103
Open

Add skill for AIP-103 task/asset state store and mixin for resumable external jobs#222
amoghrajesh wants to merge 5 commits into
astronomer:mainfrom
amoghrajesh:skill-for-aip-103

Conversation

@amoghrajesh

Copy link
Copy Markdown

Why?

With AIP-103 (Airflow 3.3), Airflow now ships a purpose-built key/value store for task and asset store for solving long-standing pain points around checkpointing across retries, watermark storage, and crash-safe external job submission.

Without this skill, agents still recommend Variable.get/set or XCom or external storages for these use cases, which are the wrong tools: Variables are global and race-prone, XCom is scoped to a DAG run and breaks on retry, and not everyone can use external storage due to latency related issues.

What's being added in the skill

  • task_store — per-task key/value store scoped to a task instance identity, survives retries; covers full API, NEVER_EXPIRE, mapped task isolation, and clear(all_map_indices=True)
  • asset_store — per-asset store that outlives a DAG run; covers producer/consumer pattern, watermarks, and last-writer-wins warning for mapped tasks
  • ResumableJobMixin — crash-safe external job submission pattern; covers full interface, retry behaviour, and the external_id_key rename gotcha
  • Anti-pattern detection — when asked to review a DAG, the agent flags Variable.get/set or XCom used for retry state and explains why it breaks, with before/after fixes
  • Version gating — checks Airflow version before recommending anything; gracefully handles < 3.3
  • Config reference — all [state_store] keys with correct defaults

Example dialogue before and after

General set of qns asked:

Round 1 — general

"My pipeline failed halfway through a big batch, do I have to reprocess everything?"
"I submitted a job to Databricks and my worker died, now it ran twice — how do I prevent that?"
"Where should I store state that needs to survive a task retry?"

Round 2 — with DAGs

Paste antipattern_xcom_checkpoint.py → "Why does my task start over every time it retries?"
Paste antipattern_variables.py → "We run this in two environments and sometimes the state gets corrupted — any idea why?"
Paste antipattern_xcom_watermark.py → "My downstream DAG keeps reprocessing data it already saw — what's wrong?"

DAG:

#antipattern_xcom_watermark.py
from __future__ import annotations

from datetime import datetime

from airflow.models import Variable
from airflow.sdk import DAG, Asset, task

ORDERS = Asset(name="orders/daily", uri="s3://warehouse/orders/daily")

with DAG(
    dag_id="antipattern_xcom_watermark",
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False,
):

    @task(retries=3, outlets=[ORDERS])
    def load_orders(**context):
        ti = context["ti"]

        job_id = ti.xcom_pull(task_ids="load_orders", key="batch_job_id")
        if job_id is None:
            job_id = batch_client.submit(...)
            ti.xcom_push(key="batch_job_id", value=job_id)
        batch_client.wait(job_id)

        watermark = Variable.get("orders_watermark", default_var="2024-01-01")
        records = fetch_since(watermark)
        load_to_warehouse(records)
        Variable.set("orders_watermark", datetime.utcnow().isoformat())

    load_orders()
#antipattern_variables.py

from __future__ import annotations

from datetime import datetime

from airflow.models import Variable
from airflow.sdk import DAG, task

with DAG(
    dag_id="antipattern_variables",
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False,
):

    @task(retries=3)
    def ingest_records():
        cursor = Variable.get("ingest_cursor", default_var="0")
        records = fetch_records_after(cursor)
        for record in records:
            process(record)
            cursor = record["id"]
            Variable.set("ingest_cursor", cursor)

    @task(retries=3)
    def run_spark_job():
        job_id = Variable.get("spark_job_id", default_var=None)
        if job_id:
            status = spark_client.get_status(job_id)
            if status == "RUNNING":
                spark_client.wait(job_id)
                return
        job_id = spark_client.submit(...)
        Variable.set("spark_job_id", job_id)
        spark_client.wait(job_id)

    ingest_records() >> run_spark_job()
#antipattern_xcom_checkpoint.py
from __future__ import annotations

from datetime import datetime, timedelta

from airflow.sdk import DAG, task

PAGE_SIZE = 1000

with DAG(
    dag_id="antipattern_xcom_checkpoint",
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False,
):

    @task(retries=5, retry_delay=timedelta(seconds=30))
    def process_large_batch(**context):
        ti = context["ti"]

        last_cursor = ti.xcom_pull(task_ids="process_large_batch", key="cursor")
        cursor = last_cursor or 0

        while True:
            page = fetch_page(after_id=cursor, limit=PAGE_SIZE)
            if not page:
                break
            for record in page:
                transform_and_load(record)
            cursor = page[-1]["id"]
            ti.xcom_push(key="cursor", value=cursor)

    process_large_batch()

Before

pre-skill.md

After

with-skill.md

@kaxil kaxil left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wait for Airflow 3.3 release before we merge this

@amoghrajesh

Copy link
Copy Markdown
Author

Sounds good.

@amoghrajesh

Copy link
Copy Markdown
Author

Pushed again after merging apache/airflow#68438

@amoghrajesh amoghrajesh changed the title Add skill for AIP-103 task/asset store and resumable jobs mixin Add skill for AIP-103 task/asset state store and mixin for resumable external jobs Jun 23, 2026

@kaxil kaxil left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the strongest of the recent skill PRs and the closest to mergeable. I checked the AIP-103 surface against apache/airflow 3.3 and it holds up: ResumableJobMixin (the external_id_key = "remote_job_id" default, durable=True, execute_resumable, and all six abstract methods) matches the source, task_state_store = context["task_state_store"] matches the resumable-tasks docs, and the asset_state_store=None kwarg indexed by asset matches example_asset_state_store.py. For a preview API that's easy to get subtly wrong, that's well done. Adding the skill to the airflow hub routing table (and picking up the missing airflow-hitl row) is exactly the routing pattern #237 pushed, and the "when NOT to use" tables are the negative-trigger guidance a couple of the sibling PRs were missing.

A few things worth addressing, none blocking.

The description leads with "Use when …" and never says what the skill is. Both the best-practices doc ("include both what the Skill does and when to use it") and #237 (which specifically standardized away the "Use when"-first shape) want a capability sentence first. A third-person-present lead in front of the existing triggers would line it up with the rest of the catalog. Noted inline.

The clear(all_map_indices=True) guidance looks out of date — the parameter exists on the store, so the "use the CLI or core API" framing sends readers the long way around. Inline on the two spots.

Casing: the skill mixes Dag/Dags and DAG/DAGs within the same file. The rest of the catalog uses DAG, and the best-practices doc asks for one consistent term throughout.

Minor: the "Step 1 … Step 7" headers read as a linear workflow, but they're really reference sections (pick-a-primitive, anti-patterns, API, config, checklist) that a reader jumps into by need. "Sections" would describe them more honestly — take it or leave it.

Comment thread skills/airflow-state-store/SKILL.md Outdated
Comment thread skills/airflow-state-store/SKILL.md
Comment thread skills/airflow-state-store/SKILL.md
@amoghrajesh

amoghrajesh commented Jul 2, 2026

Copy link
Copy Markdown
Author

This is the strongest of the recent skill PRs and the closest to mergeable. I checked the AIP-103 surface against apache/airflow 3.3 and it holds up: ResumableJobMixin (the external_id_key = "remote_job_id" default, durable=True, execute_resumable, and all six abstract methods) matches the source, task_state_store = context["task_state_store"] matches the resumable-tasks docs, and the asset_state_store=None kwarg indexed by asset matches example_asset_state_store.py. For a preview API that's easy to get subtly wrong, that's well done. Adding the skill to the airflow hub routing table (and picking up the missing airflow-hitl row) is exactly the routing pattern #237 pushed, and the "when NOT to use" tables are the negative-trigger guidance a couple of the sibling PRs were missing.

A few things worth addressing, none blocking.

The description leads with "Use when …" and never says what the skill is. Both the best-practices doc ("include both what the Skill does and when to use it") and #237 (which specifically standardized away the "Use when"-first shape) want a capability sentence first. A third-person-present lead in front of the existing triggers would line it up with the rest of the catalog. Noted inline.

The clear(all_map_indices=True) guidance looks out of date — the parameter exists on the store, so the "use the CLI or core API" framing sends readers the long way around. Inline on the two spots.

Casing: the skill mixes Dag/Dags and DAG/DAGs within the same file. The rest of the catalog uses DAG, and the best-practices doc asks for one consistent term throughout.

Minor: the "Step 1 … Step 7" headers read as a linear workflow, but they're really reference sections (pick-a-primitive, anti-patterns, API, config, checklist) that a reader jumps into by need. "Sections" would describe them more honestly — take it or leave it.

Updated as suggested. I did the following:

  1. Handled the description opening to be better
  2. Replied to your clear comment
  3. Fixed the cases
  4. Changed the step -> section for clarity.

@amoghrajesh amoghrajesh requested a review from kaxil July 2, 2026 12:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants