From 00403c794f60a9205410cef4e5617958eb9f7883 Mon Sep 17 00:00:00 2001 From: taran Date: Mon, 1 Jun 2026 16:54:32 +0200 Subject: [PATCH] feat(jobs): authenticate to Lakebase via M2M OAuth as a service principal Adopt the documented psycopg3 connection pattern and make the service-principal execution + permission model explicit. - lakebase_utils: mint the OAuth credential inside a psycopg.Connection subclass connect() (fresh token per physical connect, handles ~1h rotation); Postgres role overridable via LAKETS_PG_ROLE; M2M auth via the SDK's default WorkspaceClient (resolves the job's SP, or env-var client_id/secret for external runs) - bundle: prod target runs jobs as a service principal via run_as (service_principal_name variable) - deps: bump databricks-sdk to >=0.56.0 (M2M requirement) in requirements.txt and the bundle libraries - docs: workflow-jobs gains an "Authentication & permissions" section covering M2M OAuth, run_as SP, and the Lakebase Postgres role/grants the SP needs --- CHANGELOG.md | 1 + databricks/bundles/databricks.yml | 12 ++++- databricks/workflows/lakebase_utils.py | 66 +++++++++++++++++++++---- requirements.txt | 2 +- website/docs/reference/workflow-jobs.md | 61 +++++++++++++++++++++++ 5 files changed, 130 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bf02e79..209a853 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), version - RollUps now sync to Unity Catalog via Lakebase CDF (`enable_sync`), replacing the custom export pipeline. Synced tables are mirrored by unpartitioned shadow tables in the new `lakets_cdf` schema. - Workflow jobs now depend on `psycopg[binary]` (psycopg 3) instead of `psycopg2-binary`. Imports moved from `psycopg2` / `psycopg2.sql` to `psycopg` / `psycopg.sql`, and connections set `autocommit=True` via the `connect()` argument. - Workflow jobs run as Databricks `spark_python_task` pointing directly at the `databricks/workflows/*.py` files instead of a `python_wheel_task` — no `lakets` wheel is built. The Asset Bundle installs only the external libraries (`psycopg[binary]`, `databricks-sdk`) on the cluster via a task `libraries` block and ships the workflow sources via `sync.paths`. +- Workflow jobs authenticate to Lakebase with machine-to-machine (M2M) OAuth following the [psycopg3 docs pattern](https://docs.databricks.com/aws/en/oltp/instances/query/notebook#psycopg3): `lakebase_utils` mints a fresh `generate_database_credential` token inside `connect()` (handles ~1 h token rotation), the Postgres role is overridable via `LAKETS_PG_ROLE`, and the bundle's prod target runs the jobs as a service principal via `run_as` (`service_principal_name` variable). The service principal must hold a Lakebase Postgres role with privileges for the operations — see the [Workflow jobs](./website/docs/reference/workflow-jobs.md) doc. Bumps `databricks-sdk` to `>=0.56.0` (M2M requirement). ### Deprecated diff --git a/databricks/bundles/databricks.yml b/databricks/bundles/databricks.yml index 9a31adb..3fc7bb7 100644 --- a/databricks/bundles/databricks.yml +++ b/databricks/bundles/databricks.yml @@ -10,6 +10,11 @@ variables: delta_schema: description: "Unity Catalog schema for tiered data" default: "lakets" + service_principal_name: + description: >- + Application ID of the service principal the jobs run as. It must hold a + Lakebase Postgres role with the privileges each job needs (see + docs/reference/workflow-jobs). Required for the prod target. # The workflow source lives in ../workflows (a sibling of this bundle dir). # Sync it to the workspace so the spark_python_task files — and the shared @@ -39,7 +44,7 @@ resources: - pypi: package: "psycopg[binary]>=3.1,<4.0" - pypi: - package: "databricks-sdk>=0.20.0,<1.0.0" + package: "databricks-sdk>=0.56.0,<1.0.0" existing_cluster_id: ${var.cluster_id} lakets_tiering: @@ -110,5 +115,10 @@ targets: prod: mode: production + # Jobs execute as this service principal — the identity that authenticates + # to Lakebase via M2M OAuth. It must own a Lakebase Postgres role with the + # privileges each job performs (see docs/reference/workflow-jobs). + run_as: + service_principal_name: ${var.service_principal_name} variables: cluster_id: "" # Set to your prod cluster ID diff --git a/databricks/workflows/lakebase_utils.py b/databricks/workflows/lakebase_utils.py index 20c0828..869f7a9 100644 --- a/databricks/workflows/lakebase_utils.py +++ b/databricks/workflows/lakebase_utils.py @@ -1,7 +1,35 @@ """ LakeTS Lakebase Connection Utilities -Shared helper for all Databricks workflow jobs to connect to Lakebase. + +Shared helper for all Databricks workflow jobs to connect to Lakebase using a +short-lived OAuth credential minted via the Databricks SDK. Follows the +psycopg3 connection pattern from the Databricks docs: +https://docs.databricks.com/aws/en/oltp/instances/query/notebook#psycopg3 + +Authentication (machine-to-machine OAuth) +----------------------------------------- +The jobs run as a Databricks **service principal**. Inside a Databricks job the +default ``WorkspaceClient()`` resolves that identity automatically. For runs +outside Databricks, configure the service principal's M2M OAuth credentials via +the standard SDK environment variables and the SDK picks them up unchanged: + + DATABRICKS_HOST https:/// + DATABRICKS_CLIENT_ID + DATABRICKS_CLIENT_SECRET + +See: https://docs.databricks.com/aws/en/oltp/instances/authentication#obtain-an-oauth-token-in-a-machine-to-machine-flow +(requires databricks-sdk >= 0.56.0). + +The service principal must also have a matching **Postgres role** on the +Lakebase instance, granted the privileges the job needs (see the "Workflow +jobs" doc). The role name defaults to the running identity (``current_user``); +override it with the ``LAKETS_PG_ROLE`` environment variable if it differs. + +Lakebase OAuth tokens are short-lived (~1 h) and rotate. ``_oauth_connection_class`` +mints a fresh token on every physical connect, so a reconnect never carries a +stale password. """ +import os import uuid from contextlib import contextmanager @@ -9,26 +37,44 @@ from databricks.sdk import WorkspaceClient +def _oauth_connection_class(workspace: WorkspaceClient, instance_name: str): + """Build a psycopg connection class that injects a freshly minted Lakebase + OAuth token as the password on every connect (psycopg3 pattern from the docs). + + Generating the credential inside ``connect()`` — rather than once up front — + means any reconnect transparently obtains a non-expired token. + """ + + class _OAuthConnection(psycopg.Connection): + @classmethod + def connect(cls, conninfo: str = "", **kwargs): + cred = workspace.database.generate_database_credential( + request_id=str(uuid.uuid4()), + instance_names=[instance_name], + ) + kwargs["password"] = cred.token + return super().connect(conninfo, **kwargs) + + return _OAuthConnection + + def get_lakebase_connection(instance_name: str, database: str = "databricks_postgres"): - """Create a fresh connection to a Lakebase instance using OAuth.""" + """Open a fresh connection to a Lakebase instance, authenticating as the + job's service principal via a machine-to-machine OAuth credential.""" w = WorkspaceClient() - cred = w.database.generate_database_credential( - instance_names=[instance_name], - request_id=str(uuid.uuid4()), - ) instance = w.database.get_database_instance(name=instance_name) - conn = psycopg.connect( + pg_role = os.environ.get("LAKETS_PG_ROLE") or w.current_user.me().user_name + connection_class = _oauth_connection_class(w, instance_name) + return connection_class.connect( host=instance.read_write_dns, port=5432, dbname=database, - user=w.current_user.me().user_name, - password=cred.token, + user=pg_role, sslmode="require", connect_timeout=30, options="-c statement_timeout=600000 -c lock_timeout=30000", autocommit=True, ) - return conn @contextmanager diff --git a/requirements.txt b/requirements.txt index 1b38a99..291cba4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -databricks-sdk>=0.20.0,<1.0.0 +databricks-sdk>=0.56.0,<1.0.0 psycopg[binary]>=3.1,<4.0 diff --git a/website/docs/reference/workflow-jobs.md b/website/docs/reference/workflow-jobs.md index 2c5a515..e794aa5 100644 --- a/website/docs/reference/workflow-jobs.md +++ b/website/docs/reference/workflow-jobs.md @@ -18,3 +18,64 @@ These scheduled Databricks Jobs drive the operational lifecycle of LakeTS. The b | **Cold RollUp Refresh** | On-demand (no fixed schedule) | `refresh_rollup()` with cold-tier dirty buckets, run after cold-tier ETL corrections | Each job is idempotent and stateless — re-running it cannot corrupt data. Lakebase remains the source of truth for state (registries, watermarks, invalidation log); the jobs read that state and execute against it. + +## Authentication & permissions + +The jobs connect to Lakebase with **machine-to-machine (M2M) OAuth** — there are no static passwords. Each job runs as a Databricks **service principal**, and the shared helper [`lakebase_utils.py`](https://github.com/databricks-solutions/lakets/blob/main/databricks/workflows/lakebase_utils.py) mints a short-lived Postgres credential for that identity on every connection, following the [psycopg3 connection pattern](https://docs.databricks.com/aws/en/oltp/instances/query/notebook#psycopg3): + +```python +cred = w.database.generate_database_credential( + request_id=str(uuid.uuid4()), instance_names=[instance_name]) +# cred.token is used as the Postgres password; it is minted inside connect() +# so any reconnect transparently gets a fresh, non-expired token (~1 h lifetime). +``` + +You need a service principal to run these jobs, **and that service principal must have permission on the Lakebase instance to perform the operations** the jobs execute (creating/dropping partitions, refreshing RollUps, enforcing retention). + +### 1. A service principal executes the jobs + +The bundle's **prod** target runs every job as a service principal via `run_as`: + +```yaml +# databricks/bundles/databricks.yml +targets: + prod: + run_as: + service_principal_name: ${var.service_principal_name} +``` + +Deploy with the service principal's application ID: + +```bash +databricks bundle deploy -t prod --var="service_principal_name=" +``` + +The service principal must have **indefinitely-lived OAuth (M2M) credentials**; follow [Authorize service principal access to Databricks with OAuth](https://docs.databricks.com/aws/en/dev-tools/auth/oauth-m2m) and [Obtain an OAuth token in a machine-to-machine flow](https://docs.databricks.com/aws/en/oltp/instances/authentication#obtain-an-oauth-token-in-a-machine-to-machine-flow). Inside a Databricks job the SDK resolves this identity automatically; to run a job file **outside** Databricks, supply the same credentials via the standard environment variables: + +```bash +export DATABRICKS_HOST="https:///" +export DATABRICKS_CLIENT_ID="" +export DATABRICKS_CLIENT_SECRET="" +``` + +(Requires `databricks-sdk >= 0.56.0`.) + +### 2. The service principal needs a Lakebase Postgres role + +The OAuth token authenticates as a **Postgres role named after the service principal**. That role must exist on the Lakebase instance and be granted the privileges the jobs use. Connect as a Lakebase admin and grant, for example: + +```sql +-- Register the service principal as a Postgres role (Databricks-managed identity) +-- and grant the privileges the maintenance jobs need. +GRANT USAGE, CREATE ON SCHEMA public, lakets TO ""; +GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public, lakets TO ""; +GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA lakets TO ""; + +-- Cover objects created later (new partitions, new RollUp tables): +ALTER DEFAULT PRIVILEGES IN SCHEMA public, lakets + GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO ""; +``` + +`CREATE` on `public`/`lakets` lets the Partition Manager add partitions and lets Tiering/Retention drop them; `EXECUTE` on `lakets` functions covers `tier_chunk()`, `_ensure_partitions()`, `refresh_rollup()`, and friends. If your install scripts ran as a different owner, the simplest alternative is to make the service principal the **owner** of the LakeTS objects (or a member of the owning role). + +By default the helper uses the running identity (`current_user`) as the Postgres role. If your Lakebase role name differs from the service principal's application ID, override it with the `LAKETS_PG_ROLE` environment variable on the job.