Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), version
- RollUps now sync to Unity Catalog via Lakebase CDF (`enable_sync`), replacing the custom export pipeline. Synced tables are mirrored by unpartitioned shadow tables in the new `lakets_cdf` schema.
- Workflow jobs now depend on `psycopg[binary]` (psycopg 3) instead of `psycopg2-binary`. Imports moved from `psycopg2` / `psycopg2.sql` to `psycopg` / `psycopg.sql`, and connections set `autocommit=True` via the `connect()` argument.
- Workflow jobs run as Databricks `spark_python_task` pointing directly at the `databricks/workflows/*.py` files instead of a `python_wheel_task` — no `lakets` wheel is built. The Asset Bundle installs only the external libraries (`psycopg[binary]`, `databricks-sdk`) on the cluster via a task `libraries` block and ships the workflow sources via `sync.paths`.
- Workflow jobs authenticate to Lakebase with machine-to-machine (M2M) OAuth following the [psycopg3 docs pattern](https://docs.databricks.com/aws/en/oltp/instances/query/notebook#psycopg3): `lakebase_utils` mints a fresh `generate_database_credential` token inside `connect()` (handles ~1 h token rotation), the Postgres role is overridable via `LAKETS_PG_ROLE`, and the bundle's prod target runs the jobs as a service principal via `run_as` (`service_principal_name` variable). The service principal must hold a Lakebase Postgres role with privileges for the operations — see the [Workflow jobs](./website/docs/reference/workflow-jobs.md) doc. Bumps `databricks-sdk` to `>=0.56.0` (M2M requirement).

### Deprecated

Expand Down
12 changes: 11 additions & 1 deletion databricks/bundles/databricks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ variables:
delta_schema:
description: "Unity Catalog schema for tiered data"
default: "lakets"
service_principal_name:
description: >-
Application ID of the service principal the jobs run as. It must hold a
Lakebase Postgres role with the privileges each job needs (see
docs/reference/workflow-jobs). Required for the prod target.

# The workflow source lives in ../workflows (a sibling of this bundle dir).
# Sync it to the workspace so the spark_python_task files — and the shared
Expand Down Expand Up @@ -39,7 +44,7 @@ resources:
- pypi:
package: "psycopg[binary]>=3.1,<4.0"
- pypi:
package: "databricks-sdk>=0.20.0,<1.0.0"
package: "databricks-sdk>=0.56.0,<1.0.0"
existing_cluster_id: ${var.cluster_id}

lakets_tiering:
Expand Down Expand Up @@ -110,5 +115,10 @@ targets:

prod:
mode: production
# Jobs execute as this service principal — the identity that authenticates
# to Lakebase via M2M OAuth. It must own a Lakebase Postgres role with the
# privileges each job performs (see docs/reference/workflow-jobs).
run_as:
service_principal_name: ${var.service_principal_name}
variables:
cluster_id: "" # Set to your prod cluster ID
66 changes: 56 additions & 10 deletions databricks/workflows/lakebase_utils.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,80 @@
"""
LakeTS Lakebase Connection Utilities
Shared helper for all Databricks workflow jobs to connect to Lakebase.

Shared helper for all Databricks workflow jobs to connect to Lakebase using a
short-lived OAuth credential minted via the Databricks SDK. Follows the
psycopg3 connection pattern from the Databricks docs:
https://docs.databricks.com/aws/en/oltp/instances/query/notebook#psycopg3

Authentication (machine-to-machine OAuth)
-----------------------------------------
The jobs run as a Databricks **service principal**. Inside a Databricks job the
default ``WorkspaceClient()`` resolves that identity automatically. For runs
outside Databricks, configure the service principal's M2M OAuth credentials via
the standard SDK environment variables and the SDK picks them up unchanged:

DATABRICKS_HOST https://<workspace-url>/
DATABRICKS_CLIENT_ID <service principal application id>
DATABRICKS_CLIENT_SECRET <service principal OAuth secret>

See: https://docs.databricks.com/aws/en/oltp/instances/authentication#obtain-an-oauth-token-in-a-machine-to-machine-flow
(requires databricks-sdk >= 0.56.0).

The service principal must also have a matching **Postgres role** on the
Lakebase instance, granted the privileges the job needs (see the "Workflow
jobs" doc). The role name defaults to the running identity (``current_user``);
override it with the ``LAKETS_PG_ROLE`` environment variable if it differs.

Lakebase OAuth tokens are short-lived (~1 h) and rotate. ``_oauth_connection_class``
mints a fresh token on every physical connect, so a reconnect never carries a
stale password.
"""
import os
import uuid
from contextlib import contextmanager

import psycopg
from databricks.sdk import WorkspaceClient


def _oauth_connection_class(workspace: WorkspaceClient, instance_name: str):
"""Build a psycopg connection class that injects a freshly minted Lakebase
OAuth token as the password on every connect (psycopg3 pattern from the docs).

Generating the credential inside ``connect()`` — rather than once up front —
means any reconnect transparently obtains a non-expired token.
"""

class _OAuthConnection(psycopg.Connection):
@classmethod
def connect(cls, conninfo: str = "", **kwargs):
cred = workspace.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=[instance_name],
)
kwargs["password"] = cred.token
return super().connect(conninfo, **kwargs)

return _OAuthConnection


def get_lakebase_connection(instance_name: str, database: str = "databricks_postgres"):
"""Create a fresh connection to a Lakebase instance using OAuth."""
"""Open a fresh connection to a Lakebase instance, authenticating as the
job's service principal via a machine-to-machine OAuth credential."""
w = WorkspaceClient()
cred = w.database.generate_database_credential(
instance_names=[instance_name],
request_id=str(uuid.uuid4()),
)
instance = w.database.get_database_instance(name=instance_name)
conn = psycopg.connect(
pg_role = os.environ.get("LAKETS_PG_ROLE") or w.current_user.me().user_name
connection_class = _oauth_connection_class(w, instance_name)
return connection_class.connect(
host=instance.read_write_dns,
port=5432,
dbname=database,
user=w.current_user.me().user_name,
password=cred.token,
user=pg_role,
sslmode="require",
connect_timeout=30,
options="-c statement_timeout=600000 -c lock_timeout=30000",
autocommit=True,
)
return conn


@contextmanager
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
databricks-sdk>=0.20.0,<1.0.0
databricks-sdk>=0.56.0,<1.0.0
psycopg[binary]>=3.1,<4.0
61 changes: 61 additions & 0 deletions website/docs/reference/workflow-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,64 @@ These scheduled Databricks Jobs drive the operational lifecycle of LakeTS. The b
| **Cold RollUp Refresh** | On-demand (no fixed schedule) | `refresh_rollup()` with cold-tier dirty buckets, run after cold-tier ETL corrections |

Each job is idempotent and stateless — re-running it cannot corrupt data. Lakebase remains the source of truth for state (registries, watermarks, invalidation log); the jobs read that state and execute against it.

## Authentication & permissions

The jobs connect to Lakebase with **machine-to-machine (M2M) OAuth** — there are no static passwords. Each job runs as a Databricks **service principal**, and the shared helper [`lakebase_utils.py`](https://github.com/databricks-solutions/lakets/blob/main/databricks/workflows/lakebase_utils.py) mints a short-lived Postgres credential for that identity on every connection, following the [psycopg3 connection pattern](https://docs.databricks.com/aws/en/oltp/instances/query/notebook#psycopg3):

```python
cred = w.database.generate_database_credential(
request_id=str(uuid.uuid4()), instance_names=[instance_name])
# cred.token is used as the Postgres password; it is minted inside connect()
# so any reconnect transparently gets a fresh, non-expired token (~1 h lifetime).
```

You need a service principal to run these jobs, **and that service principal must have permission on the Lakebase instance to perform the operations** the jobs execute (creating/dropping partitions, refreshing RollUps, enforcing retention).

### 1. A service principal executes the jobs

The bundle's **prod** target runs every job as a service principal via `run_as`:

```yaml
# databricks/bundles/databricks.yml
targets:
prod:
run_as:
service_principal_name: ${var.service_principal_name}
```

Deploy with the service principal's application ID:

```bash
databricks bundle deploy -t prod --var="service_principal_name=<sp-application-id>"
```

The service principal must have **indefinitely-lived OAuth (M2M) credentials**; follow [Authorize service principal access to Databricks with OAuth](https://docs.databricks.com/aws/en/dev-tools/auth/oauth-m2m) and [Obtain an OAuth token in a machine-to-machine flow](https://docs.databricks.com/aws/en/oltp/instances/authentication#obtain-an-oauth-token-in-a-machine-to-machine-flow). Inside a Databricks job the SDK resolves this identity automatically; to run a job file **outside** Databricks, supply the same credentials via the standard environment variables:

```bash
export DATABRICKS_HOST="https://<workspace-url>/"
export DATABRICKS_CLIENT_ID="<sp-application-id>"
export DATABRICKS_CLIENT_SECRET="<sp-oauth-secret>"
```

(Requires `databricks-sdk >= 0.56.0`.)

### 2. The service principal needs a Lakebase Postgres role

The OAuth token authenticates as a **Postgres role named after the service principal**. That role must exist on the Lakebase instance and be granted the privileges the jobs use. Connect as a Lakebase admin and grant, for example:

```sql
-- Register the service principal as a Postgres role (Databricks-managed identity)
-- and grant the privileges the maintenance jobs need.
GRANT USAGE, CREATE ON SCHEMA public, lakets TO "<sp-application-id>";
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public, lakets TO "<sp-application-id>";
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA lakets TO "<sp-application-id>";

-- Cover objects created later (new partitions, new RollUp tables):
ALTER DEFAULT PRIVILEGES IN SCHEMA public, lakets
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO "<sp-application-id>";
```

`CREATE` on `public`/`lakets` lets the Partition Manager add partitions and lets Tiering/Retention drop them; `EXECUTE` on `lakets` functions covers `tier_chunk()`, `_ensure_partitions()`, `refresh_rollup()`, and friends. If your install scripts ran as a different owner, the simplest alternative is to make the service principal the **owner** of the LakeTS objects (or a member of the owning role).

By default the helper uses the running identity (`current_user`) as the Postgres role. If your Lakebase role name differs from the service principal's application ID, override it with the `LAKETS_PG_ROLE` environment variable on the job.