Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), version
and `show_rollups()` no longer returns a `refresh_mode` column.
- RollUps now sync to Unity Catalog via Lakebase CDF (`enable_sync`), replacing the custom export pipeline. Synced tables are mirrored by unpartitioned shadow tables in the new `lakets_cdf` schema.
- Workflow jobs now depend on `psycopg[binary]` (psycopg 3) instead of `psycopg2-binary`. Imports moved from `psycopg2` / `psycopg2.sql` to `psycopg` / `psycopg.sql`, and connections set `autocommit=True` via the `connect()` argument.
- Workflow jobs run as Databricks `spark_python_task` pointing directly at the `databricks/workflows/*.py` files instead of a `python_wheel_task` — no `lakets` wheel is built. The Asset Bundle installs only the external libraries (`psycopg[binary]`, `databricks-sdk`) on the cluster via a task `libraries` block and ships the workflow sources via `sync.paths`.
- Workflow jobs run on **serverless compute** as Databricks `spark_python_task` pointing directly at the `databricks/workflows/*.py` files instead of a `python_wheel_task` — no `lakets` wheel is built and no cluster is provisioned. Dependencies (`psycopg[binary]`, `databricks-sdk`) are declared per-job in the bundle's `environments` block, and the workflow sources ship via `sync.paths`.
- Workflow jobs authenticate to Lakebase with machine-to-machine (M2M) OAuth following the [psycopg3 docs pattern](https://docs.databricks.com/aws/en/oltp/instances/query/notebook#psycopg3): `lakebase_utils` mints a fresh `generate_database_credential` token inside `connect()` (handles ~1 h token rotation), the Postgres role is overridable via `LAKETS_PG_ROLE`, and the bundle's prod target runs the jobs as a service principal via `run_as` (`service_principal_name` variable). The service principal must hold a Lakebase Postgres role with privileges for the operations — see the [Workflow jobs](./website/docs/reference/workflow-jobs.md) doc. Bumps `databricks-sdk` to `>=0.56.0` (M2M requirement).

### Deprecated
Expand Down
42 changes: 21 additions & 21 deletions databricks/bundles/databricks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ variables:
# The workflow source lives in ../workflows (a sibling of this bundle dir).
# Sync it to the workspace so the spark_python_task files — and the shared
# lakebase_utils.py they import — land together in one folder. No wheel is built;
# the code runs directly and only external libraries are pip-installed (see libraries).
# the code runs directly. Jobs run on serverless compute, so dependencies are
# declared per-job in the `environments` block (not as task `libraries`).
sync:
paths:
- ../workflows
Expand All @@ -32,69 +33,73 @@ resources:
schedule:
quartz_cron_expression: "0 0 */6 * * ?" # Every 6 hours
timezone_id: "UTC"
# Serverless environment installed for every job. Defined once here and
# reused via the *lakets_environments alias. `client` is the serverless
# environment version — bump it as Databricks ships newer versions.
environments: &lakets_environments
- environment_key: lakets_env
spec:
client: "3"
dependencies:
- "psycopg[binary]>=3.1,<4.0"
- "databricks-sdk>=0.56.0,<1.0.0"
tasks:
- task_key: ensure_partitions
spark_python_task:
python_file: ../workflows/partition_manager.py
parameters:
- ${var.lakebase_instance}
# pip-installed on the cluster before the task runs. Defined once here,
# reused by every other task via the *job_libraries alias.
libraries: &job_libraries
- pypi:
package: "psycopg[binary]>=3.1,<4.0"
- pypi:
package: "databricks-sdk>=0.56.0,<1.0.0"
existing_cluster_id: ${var.cluster_id}
environment_key: lakets_env

lakets_tiering:
name: "LakeTS - Tiering"
description: "Drops cold ChronoTable partitions once CDF has flushed them to Unity Catalog"
schedule:
quartz_cron_expression: "0 0 2 * * ?" # Daily at 2 AM UTC
timezone_id: "UTC"
environments: *lakets_environments
tasks:
- task_key: tier_cold_partitions
spark_python_task:
python_file: ../workflows/tiering_job.py
parameters:
- ${var.lakebase_instance}
libraries: *job_libraries
existing_cluster_id: ${var.cluster_id}
environment_key: lakets_env

lakets_retention:
name: "LakeTS - Retention"
description: "Drops expired data from Lakebase and Delta Lake"
schedule:
quartz_cron_expression: "0 0 3 * * ?" # Daily at 3 AM UTC
timezone_id: "UTC"
environments: *lakets_environments
tasks:
- task_key: enforce_retention
spark_python_task:
python_file: ../workflows/retention_job.py
parameters:
- ${var.lakebase_instance}
libraries: *job_libraries
existing_cluster_id: ${var.cluster_id}
environment_key: lakets_env

lakets_rollup_refresh:
name: "LakeTS - RollUp Refresh"
description: "Incrementally refreshes all RollUps"
schedule:
quartz_cron_expression: "0 */15 * * * ?" # Every 15 minutes
timezone_id: "UTC"
environments: *lakets_environments
tasks:
- task_key: refresh_rollups
spark_python_task:
python_file: ../workflows/rollup_refresh.py
parameters:
- ${var.lakebase_instance}
libraries: *job_libraries
existing_cluster_id: ${var.cluster_id}
environment_key: lakets_env

lakets_cold_rollup_refresh:
name: "LakeTS - Cold-Tier RollUp Refresh"
description: "Re-aggregates cold-tier dirty buckets from Delta Lake"
environments: *lakets_environments
tasks:
- task_key: cold_rollup_refresh
spark_python_task:
Expand All @@ -103,15 +108,12 @@ resources:
- ${var.lakebase_instance}
- ${var.delta_catalog}
- ${var.delta_schema}
libraries: *job_libraries
existing_cluster_id: ${var.cluster_id}
environment_key: lakets_env

targets:
dev:
mode: development
default: true
variables:
cluster_id: "" # Set to your dev cluster ID

prod:
mode: production
Expand All @@ -120,5 +122,3 @@ targets:
# privileges each job performs (see docs/reference/workflow-jobs).
run_as:
service_principal_name: ${var.service_principal_name}
variables:
cluster_id: "" # Set to your prod cluster ID
3 changes: 2 additions & 1 deletion databricks/workflows/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
Schedule: Every 6 hours (or customize per chunk_interval).

Usage as Databricks Job:
spark.conf.get("lakets.instance_name") -> Lakebase instance name
Pass the Lakebase instance name as the first job parameter (sys.argv[1]),
or set the LAKETS_INSTANCE environment variable.
"""
import logging
import os
Expand Down
2 changes: 2 additions & 0 deletions website/docs/reference/workflow-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ description: The Databricks Jobs that drive the operational lifecycle of LakeTS.

These scheduled Databricks Jobs drive the operational lifecycle of LakeTS. The bundle at [`databricks/bundles/databricks.yml`](https://github.com/databricks-solutions/lakets/blob/main/databricks/bundles/databricks.yml) deploys all of them at once.

All jobs run on **serverless compute** — there is no cluster to provision. Each runs its `databricks/workflows/*.py` file as a `spark_python_task`, and the Python dependencies (`psycopg[binary]`, `databricks-sdk`) are declared per job in the bundle's `environments` block.

| Job | Schedule | What it does |
|-----|----------|--------------|
| **Partition Manager** | Every 6 h | Calls `_ensure_partitions()` — pre-creates future partitions |
Expand Down