Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion databricks/bundles/databricks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ variables:
description: >-
Application ID of the service principal the jobs run as. It must hold a
Lakebase Postgres role with the privileges each job needs (see
docs/reference/workflow-jobs). Required for the prod target.
docs/reference/workflow-jobs). Required for the prod target; leave empty
for dev (runs as the deploying user).
default: ""

# The workflow source lives in ../workflows (a sibling of this bundle dir).
# Sync it to the workspace so the spark_python_task files — and the shared
Expand Down
11 changes: 9 additions & 2 deletions databricks/workflows/cold_rollup_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,15 @@
from psycopg import sql

# Ensure sibling modules (lakebase_utils) are importable when run as a
# spark_python_task — the entry file's directory may not be on sys.path.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# spark_python_task. On serverless the file runs via exec() with no __file__
# defined, so fall back to the working directory (Databricks sets it to the
# file's workspace folder).
try:
_here = os.path.dirname(os.path.abspath(__file__))
except NameError:
_here = os.getcwd()
if _here not in sys.path:
sys.path.insert(0, _here)

from lakebase_utils import fetch_all, lakebase_cursor

Expand Down
11 changes: 9 additions & 2 deletions databricks/workflows/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@
import sys

# Ensure sibling modules (lakebase_utils) are importable when run as a
# spark_python_task — the entry file's directory may not be on sys.path.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# spark_python_task. On serverless the file runs via exec() with no __file__
# defined, so fall back to the working directory (Databricks sets it to the
# file's workspace folder).
try:
_here = os.path.dirname(os.path.abspath(__file__))
except NameError:
_here = os.getcwd()
if _here not in sys.path:
sys.path.insert(0, _here)

from lakebase_utils import fetch_all, lakebase_cursor

Expand Down
11 changes: 9 additions & 2 deletions databricks/workflows/retention_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@
import sys

# Ensure sibling modules (lakebase_utils) are importable when run as a
# spark_python_task — the entry file's directory may not be on sys.path.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# spark_python_task. On serverless the file runs via exec() with no __file__
# defined, so fall back to the working directory (Databricks sets it to the
# file's workspace folder).
try:
_here = os.path.dirname(os.path.abspath(__file__))
except NameError:
_here = os.getcwd()
if _here not in sys.path:
sys.path.insert(0, _here)

from lakebase_utils import fetch_all, lakebase_cursor

Expand Down
11 changes: 9 additions & 2 deletions databricks/workflows/rollup_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@
import sys

# Ensure sibling modules (lakebase_utils) are importable when run as a
# spark_python_task — the entry file's directory may not be on sys.path.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# spark_python_task. On serverless the file runs via exec() with no __file__
# defined, so fall back to the working directory (Databricks sets it to the
# file's workspace folder).
try:
_here = os.path.dirname(os.path.abspath(__file__))
except NameError:
_here = os.getcwd()
if _here not in sys.path:
sys.path.insert(0, _here)

from lakebase_utils import fetch_all, lakebase_cursor

Expand Down
11 changes: 9 additions & 2 deletions databricks/workflows/tiering_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@
import sys

# Ensure sibling modules (lakebase_utils) are importable when run as a
# spark_python_task — the entry file's directory may not be on sys.path.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# spark_python_task. On serverless the file runs via exec() with no __file__
# defined, so fall back to the working directory (Databricks sets it to the
# file's workspace folder).
try:
_here = os.path.dirname(os.path.abspath(__file__))
except NameError:
_here = os.getcwd()
if _here not in sys.path:
sys.path.insert(0, _here)

from lakebase_utils import fetch_all, lakebase_cursor

Expand Down
38 changes: 35 additions & 3 deletions website/docs/guides/how-it-works/rollups.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,46 @@ On the next `refresh_rollup()` call, all hot-tier invalidation entries older tha

## Hot-tier vs cold-tier refresh

RollUp Tables persist in Lakebase permanently (aggregates are tiny compared to raw data). When raw data tiers to a Unity Catalog Managed Table:
RollUp Tables persist in Lakebase permanently (aggregates are tiny compared to raw data). The raw **source** data, however, does not: the [tiering job](../../reference/workflow-jobs.md) evicts cold partitions from Lakebase once Lakebase CDF has flushed them to the Unity Catalog Managed Table. So at any moment a RollUp's source buckets live in one of two places:

| Tier | Data Location | Refresh Engine | How It Works |
| Tier | Source data location | Refresh engine | How it works |
|------|--------------|----------------|--------------|
| **Hot** | Lakebase (Postgres) | `refresh_rollup()` SQL function | Watermark + invalidation log, runs every 15 min |
| **Cold** | Unity Catalog Managed Table | `cold_rollup_refresh.py` Databricks job | Reads cold invalidation entries, re-aggregates via Databricks SQL, writes back to Lakebase |

Cold-tier invalidation is triggered manually via `invalidate_rollup_range()` with `tier = 'cold'` — typically after ETL corrections or bulk re-imports into the Unity Catalog Managed Table.
A bucket's tier is **auto-detected** from the covering chunk's status (`active` → hot, `tiered` → cold; see [Tier auto-routing](#tier-auto-routing--hot-vs-cold-detected-automatically)). The hot refresh processes only `tier = 'hot'` entries and re-aggregates from Lakebase; it deliberately **skips** cold entries, because the source rows are no longer in Postgres — they exist only in Delta. The cold job is the *only* thing that can refresh those buckets, by re-aggregating from the Delta copy via a SQL warehouse and writing the result back into the Lakebase RollUp Table.

### When you need cold-tier refresh

You need `cold_rollup_refresh` only when **data that has already been tiered to cold changes**, and you want the RollUp to reflect that change:

1. **Late-arriving data** — a record for an old time window whose chunk was already tiered (e.g. a delayed device or event lands days later).
2. **Historical corrections / restatements** — an ETL fix, reprocessing, or backfill rewrites a past day in the Unity Catalog Managed Table.
3. **Manual backfill** — you explicitly mark an old, now-cold window dirty after fixing upstream data.

In each case the invalidation is logged with `tier = 'cold'` (auto-detected, because the covering chunk is `tiered`), the 15-minute hot refresh skips it, and it stays unprocessed until the cold job runs.

### When you don't

If your time series is effectively **append-only / immutable once tiered** — the common case for metrics, logs, and observability data — cold buckets are never invalidated. The job simply logs `No cold-tier invalidations pending` and exits. You can leave it deployed (it is cheap and idempotent) or skip scheduling it entirely. The tradeoff if you remove it: once data tiers to cold, its RollUps are **frozen** — later corrections to historical data will not be reflected.

### How to use it

The job is **on-demand** — the bundle ships it without a schedule. Run it after the cold source data changes:

```sql
-- 1. Mark the affected (already-tiered) window dirty. The tier is auto-detected
-- from chunk status; pass p_tier => 'cold' to force it.
SELECT lakets.invalidate_rollup_range('metrics_hourly',
'2026-01-01'::timestamptz, '2026-01-02'::timestamptz);
```

```bash
# 2. Run the cold re-aggregation job (Databricks Asset Bundle).
databricks bundle run lakets_cold_rollup_refresh -t prod
```

The job reads the pending `tier = 'cold'` entries, re-aggregates each dirty bucket from `<catalog>.<schema>.<source_table>` in Unity Catalog (auto-starting a serverless SQL warehouse if none is running), `DELETE`s the stale rows and `INSERT`s the recomputed rows into the Lakebase RollUp Table, then clears the processed cold entries. It is idempotent — re-running with nothing pending is a no-op. For RollUps whose hot `query_text` cannot be rewritten to the Delta table by simple name substitution (multi-table joins, etc.), set an explicit `cold_query_text` on the RollUp.

---

Expand Down
16 changes: 15 additions & 1 deletion website/docs/reference/workflow-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,24 @@ All jobs run on **serverless compute** — there is no cluster to provision. Eac
| **Tiering** | Daily 2 AM | `_get_chunks_to_tier()` → `tier_chunk()` per candidate — drops cold partitions whose data CDF has flushed to UC (pure Lakebase SQL, no Spark) |
| **Retention** | Daily 3 AM | `execute_retention()` — drops expired chunks in Lakebase and the UC Managed Table |
| **RollUp Refresh** | Every 15 min | `refresh_rollup()` — incremental hot-tier refresh |
| **Cold RollUp Refresh** | On-demand (no fixed schedule) | `refresh_rollup()` with cold-tier dirty buckets, run after cold-tier ETL corrections |
| **Cold RollUp Refresh** | On-demand (no fixed schedule) | Re-aggregates already-tiered (cold) source data from Unity Catalog via a SQL warehouse and writes the corrected aggregates back to the Lakebase RollUp Table |

Each job is idempotent and stateless — re-running it cannot corrupt data. Lakebase remains the source of truth for state (registries, watermarks, invalidation log); the jobs read that state and execute against it.

### When to run Cold RollUp Refresh

The other four jobs run on a schedule; **Cold RollUp Refresh is on-demand** because it only has work when historical data that has *already been tiered out of Lakebase* changes. Run it after:

- **late-arriving data** for a time window whose chunk was already tiered,
- an **ETL correction / restatement / backfill** that rewrites a past period in the Unity Catalog Managed Table, or
- a manual `lakets.invalidate_rollup_range(...)` over an old, now-cold window.

The 15-minute hot **RollUp Refresh** skips cold buckets (their source rows are no longer in Postgres), so this job is the only thing that propagates such corrections into RollUps. If your data is append-only / immutable once tiered (typical for metrics and logs), it has nothing to do and can be left unscheduled. See [Hot-tier vs cold-tier refresh](../guides/how-it-works/rollups.md#hot-tier-vs-cold-tier-refresh) for the full mechanics and a worked example.

```bash
databricks bundle run lakets_cold_rollup_refresh -t prod
```

## Authentication & permissions

The jobs connect to Lakebase with **machine-to-machine (M2M) OAuth** — there are no static passwords. Each job runs as a Databricks **service principal**, and the shared helper [`lakebase_utils.py`](https://github.com/databricks-solutions/lakets/blob/main/databricks/workflows/lakebase_utils.py) mints a short-lived Postgres credential for that identity on every connection, following the [psycopg3 connection pattern](https://docs.databricks.com/aws/en/oltp/instances/query/notebook#psycopg3):
Expand Down