Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
FROM apache/airflow:2.9.3-python3.11

USER root

# Java is required by PySpark (Spark runs on the JVM).
RUN apt-get update && \
apt-get install -y --no-install-recommends default-jre-headless && \
rm -rf /var/lib/apt/lists/*

# Iceberg Spark runtime + AWS bundle — same Iceberg version (1.6.0) as Flink stack.
# Placed in /opt/airflow/jars/ and passed to spark-submit via --jars.
RUN mkdir -p /opt/airflow/jars && \
curl -fL "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.0/iceberg-spark-runtime-3.5_2.12-1.6.0.jar" \
-o /opt/airflow/jars/iceberg-spark-runtime.jar && \
curl -fL "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/1.6.0/iceberg-aws-bundle-1.6.0.jar" \
-o /opt/airflow/jars/iceberg-aws-bundle.jar

USER airflow

RUN pip install --no-cache-dir \
pyspark==3.5.1 \
apache-airflow-providers-apache-spark==4.10.0 \
apache-airflow-providers-trino==5.7.0 \
great-expectations==0.18.19 \
trino==0.328.0 \
dbt-trino==1.8.0 \
pandas==2.2.2
90 changes: 90 additions & 0 deletions airflow/dags/backfill_ohlcv_1m.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""backfill_ohlcv_1m — manually triggered Spark OHLCV backfill for a date range."""

from __future__ import annotations

from datetime import datetime, timedelta

from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

from airflow import DAG

_JARS = "/opt/airflow/jars/iceberg-spark-runtime.jar,/opt/airflow/jars/iceberg-aws-bundle.jar"

_SPARK_CONF = {
"spark.sql.extensions": ("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"),
"spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.iceberg.type": "rest",
"spark.sql.catalog.iceberg.uri": "http://iceberg-rest:8181",
"spark.sql.catalog.iceberg.warehouse": "s3://ticksense/warehouse",
"spark.sql.catalog.iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"spark.sql.catalog.iceberg.s3.endpoint": "http://minio:9000",
"spark.sql.catalog.iceberg.s3.path-style-access": "true",
"spark.sql.catalog.iceberg.s3.access-key-id": "minioadmin",
"spark.sql.catalog.iceberg.s3.secret-access-key": "minioadmin",
"spark.sql.sources.partitionOverwriteMode": "dynamic",
}

default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=2),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}


def validate_params(**context: object) -> None:
"""Fail fast if date params are missing or malformed before Spark starts."""
params = context["params"]
start = params.get("start_date")
end = params.get("end_date")
if not start or not end:
from airflow.exceptions import AirflowFailException

raise AirflowFailException("Required params: start_date and end_date (YYYY-MM-DD)")
from datetime import date

s, e = date.fromisoformat(start), date.fromisoformat(end)
if e <= s:
from airflow.exceptions import AirflowFailException

raise AirflowFailException(f"end_date ({e}) must be after start_date ({s})")


with DAG(
dag_id="backfill_ohlcv_1m",
start_date=datetime(2026, 5, 1),
schedule=None,
catchup=False,
default_args=default_args,
params={"start_date": "2026-05-01", "end_date": "2026-05-02"},
tags=["backfill", "spark", "ohlcv"],
doc_md=(
"Manually triggered. Recomputes normalized.ohlcv_1m from book_ticker "
"for the given date range. Idempotent — reruns overwrite the same partitions."
),
) as dag:
check = PythonOperator(task_id="validate_params", python_callable=validate_params)

backfill = SparkSubmitOperator(
task_id="spark_backfill_ohlcv",
application="/opt/airflow/spark-jobs/backfill_ohlcv.py",
application_args=[
"--start-date",
"{{ params.start_date }}",
"--end-date",
"{{ params.end_date }}",
],
conn_id="spark_default",
jars=_JARS,
conf=_SPARK_CONF,
env_vars={
"ICEBERG_REST_URI": "http://iceberg-rest:8181",
"ICEBERG_WAREHOUSE": "s3://ticksense/warehouse",
"S3_ENDPOINT": "http://minio:9000",
"AWS_ACCESS_KEY_ID": "minioadmin",
"AWS_SECRET_ACCESS_KEY": "minioadmin",
},
)

check >> backfill
58 changes: 58 additions & 0 deletions airflow/dags/compact_iceberg_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""compact_iceberg_tables — nightly Spark rewrite_data_files for silver tables."""

from __future__ import annotations

from datetime import datetime, timedelta

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

from airflow import DAG

_JARS = "/opt/airflow/jars/iceberg-spark-runtime.jar,/opt/airflow/jars/iceberg-aws-bundle.jar"

_SPARK_CONF = {
"spark.sql.extensions": ("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"),
"spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.iceberg.type": "rest",
"spark.sql.catalog.iceberg.uri": "http://iceberg-rest:8181",
"spark.sql.catalog.iceberg.warehouse": "s3://ticksense/warehouse",
"spark.sql.catalog.iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"spark.sql.catalog.iceberg.s3.endpoint": "http://minio:9000",
"spark.sql.catalog.iceberg.s3.path-style-access": "true",
"spark.sql.catalog.iceberg.s3.access-key-id": "minioadmin",
"spark.sql.catalog.iceberg.s3.secret-access-key": "minioadmin",
}

default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=2),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}

with DAG(
dag_id="compact_iceberg_tables",
start_date=datetime(2026, 5, 1),
schedule="0 2 * * *",
catchup=False,
default_args=default_args,
tags=["maintenance", "iceberg", "spark"],
doc_md=(
"Nightly at 02:00 UTC. Merges small Parquet files written by Flink checkpoints "
"into larger files for efficient Trino batch scans."
),
) as dag:
SparkSubmitOperator(
task_id="rewrite_data_files",
application="/opt/airflow/spark-jobs/compact_tables.py",
conn_id="spark_default",
jars=_JARS,
conf=_SPARK_CONF,
env_vars={
"ICEBERG_REST_URI": "http://iceberg-rest:8181",
"ICEBERG_WAREHOUSE": "s3://ticksense/warehouse",
"S3_ENDPOINT": "http://minio:9000",
"AWS_ACCESS_KEY_ID": "minioadmin",
"AWS_SECRET_ACCESS_KEY": "minioadmin",
},
)
57 changes: 57 additions & 0 deletions airflow/dags/expire_iceberg_snapshots.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""expire_iceberg_snapshots — retain last 7 days of Iceberg snapshots."""

from __future__ import annotations

import os
from datetime import datetime, timedelta

import trino
from airflow.operators.python import PythonOperator

from airflow import DAG

RETENTION_DAYS = 7

TABLES = [
"normalized.book_ticker",
"normalized.ohlcv_1m",
"normalized.symbol_config",
]

default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=2),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}


def expire_snapshots(**_: object) -> None:
"""Run Trino expire_snapshots procedure for each silver table."""
with (
trino.dbapi.connect(
host=os.getenv("TRINO_HOST", "trino"),
port=int(os.getenv("TRINO_PORT", "8080")),
user="airflow",
catalog="iceberg",
) as conn,
conn.cursor() as cur,
):
for table in TABLES:
cur.execute(
f"ALTER TABLE iceberg.{table} "
f"EXECUTE expire_snapshots(retention_threshold => '{RETENTION_DAYS}d')"
)
cur.fetchall()


with DAG(
dag_id="expire_iceberg_snapshots",
start_date=datetime(2026, 5, 1),
schedule="0 3 * * *",
catchup=False,
default_args=default_args,
tags=["maintenance", "iceberg"],
doc_md=f"Nightly at 03:00 UTC. Removes Iceberg snapshots older than {RETENTION_DAYS} days.",
) as dag:
PythonOperator(task_id="expire_snapshots", python_callable=expire_snapshots)
58 changes: 58 additions & 0 deletions airflow/dags/freshness_sla_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""freshness_sla_check — alert if any symbol is stale > 35 seconds."""

from __future__ import annotations

import os
from datetime import datetime, timedelta

import trino
from airflow.exceptions import AirflowFailException
from airflow.operators.python import PythonOperator

from airflow import DAG

STALENESS_THRESHOLD_S = 35

default_args = {
"retries": 1,
"retry_delay": timedelta(minutes=1),
}


def check_freshness(**_: object) -> None:
"""Query Trino for any symbol whose latest tick is older than the SLA threshold."""
with (
trino.dbapi.connect(
host=os.getenv("TRINO_HOST", "trino"),
port=int(os.getenv("TRINO_PORT", "8080")),
user="airflow",
catalog="iceberg",
) as conn,
conn.cursor() as cur,
):
cur.execute(
f"""
SELECT symbol,
DATE_DIFF('second', MAX(exchange_event_ts), NOW()) AS staleness_s
FROM normalized.book_ticker
GROUP BY symbol
HAVING DATE_DIFF('second', MAX(exchange_event_ts), NOW()) > {STALENESS_THRESHOLD_S}
ORDER BY staleness_s DESC
"""
)
stale = cur.fetchall()
if stale:
details = ", ".join(f"{row[0]}={row[1]}s" for row in stale)
raise AirflowFailException(f"SLA breach — stale symbols: {details}")


with DAG(
dag_id="freshness_sla_check",
start_date=datetime(2026, 5, 1),
schedule="*/5 * * * *",
catchup=False,
default_args=default_args,
tags=["sla", "monitoring"],
doc_md="Runs every 5 min. Fails (alerts) if any symbol has no data within 35 s.",
) as dag:
PythonOperator(task_id="check_freshness", python_callable=check_freshness)
41 changes: 41 additions & 0 deletions airflow/dags/run_dbt_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""run_dbt_models — hourly dbt run + test against Iceberg silver tables."""

from __future__ import annotations

from datetime import datetime, timedelta

from airflow.operators.bash import BashOperator

from airflow import DAG

_DBT = "dbt --no-send-anonymous-usage-stats"
_OPTS = "--profiles-dir /opt/airflow/dbt --project-dir /opt/airflow/dbt"

default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=2),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}

with DAG(
dag_id="run_dbt_models",
start_date=datetime(2026, 5, 1),
schedule="0 * * * *",
catchup=False,
default_args=default_args,
tags=["dbt", "analytics"],
doc_md="Hourly dbt run + test. Rebuilds staging views and mart tables over Iceberg.",
) as dag:
dbt_run = BashOperator(
task_id="dbt_run",
bash_command=f"{_DBT} run {_OPTS}",
env={"TRINO_HOST": "trino", "TRINO_PORT": "8080"},
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command=f"{_DBT} test {_OPTS}",
env={"TRINO_HOST": "trino", "TRINO_PORT": "8080"},
)

dbt_run >> dbt_test
Loading
Loading