Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ CLAUDE.md
# Backstop: any stray scratch planning markdown left at repo root (move it to .local/)
/*_PLAN.md

# Internal-only dirs kept at repo root (referenced by tracked config/docs/README)
demo/
grafana/

# Local website dev/debug helper scripts (not part of the site build)
website/scripts/

Expand Down Expand Up @@ -63,6 +59,7 @@ venv/
# Environment
.env
.env.*
!.env.example

# GitHub Actions runner cache
_actions/
Expand Down
57 changes: 26 additions & 31 deletions databricks/workflows/rollup_refresh.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""
LakeTS RollUp Refresh Job
Refreshes all RollUps (incremental where configured).
Refreshes all RollUps in dependency (DAG) order via refresh_rollup_cascade(),
so a parent RollUp always reads freshly-refreshed children within one run.

Schedule: Every 15 minutes (configurable per RollUp via refresh_lag).
Schedule: configurable; each RollUp self-gates on its refresh_lag.
"""
import logging
import os
Expand All @@ -25,37 +26,31 @@
logger = logging.getLogger("lakets.rollup_refresh")


def run(project_name: str):
"""Refresh all RollUps."""
def run(project_name: str) -> int:
"""Refresh all RollUps in DAG order. Each RollUp self-gates on refresh_lag,
so children are refreshed before the parents that read them."""
with lakebase_cursor(project_name) as cur:
rollups = fetch_all(cur, """
SELECT name, refresh_lag, last_refreshed_at, watermark
FROM lakets._rollup_registry
ORDER BY name
""")
logger.info("Found %d RollUp(s)", len(rollups))

refreshed = 0
skipped = 0
failures = []
for rollup in rollups:
try:
cur.execute(
"SELECT lakets.refresh_rollup(%s)",
(rollup["name"],),
)
result = cur.fetchone()[0]
if result:
refreshed += 1
logger.info("Refreshed: %s", rollup["name"])
else:
skipped += 1
logger.info("Skipped (refresh_lag): %s", rollup["name"])
except Exception as e:
logger.error("Failed to refresh %s: %s", rollup["name"], e)
failures.append(rollup["name"])

logger.info("Refreshed %d, skipped %d / %d total", refreshed, skipped, len(rollups))
try:
results = fetch_all(cur, """
SELECT rollup_name, refreshed, refresh_ms
FROM lakets.refresh_rollup_cascade()
""")
except Exception as e:
# Whole-cascade failure (e.g. a broken RollUp query). Surface it.
failures.append("refresh_rollup_cascade")
logger.error("Cascade refresh failed: %s", e)
raise

refreshed = sum(1 for r in results if r["refreshed"])
skipped = len(results) - refreshed
for r in results:
if r["refreshed"]:
logger.info("Refreshed: %s (%.1f ms)", r["rollup_name"], r["refresh_ms"] or 0.0)
else:
logger.info("Skipped (refresh_lag): %s", r["rollup_name"])

logger.info("Refreshed %d, skipped %d / %d total", refreshed, skipped, len(results))
if failures:
logger.error("Failed refreshes: %s", ", ".join(failures))
return refreshed
Expand Down
50 changes: 50 additions & 0 deletions demo/live/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# LakeTS Live Demo

A **living** end-to-end demo: synthetic ticks stream into a Lakebase Autoscaling
project while Databricks serverless jobs drive partitioning, DAG-ordered RollUp
refresh, CDF-gated tiering, and retention. Lakebase CDF continuously replicates
to Unity Catalog. The audience watches partitions appear, watermarks advance, the
invalidation log fill and drain, and cold partitions evict — in real time.

> **Full step-by-step setup is in the docs:**
> [`website/docs/guides/live-demo.md`](../../website/docs/guides/live-demo.md)
> (published at the Docusaurus site under **Guides → Live Demo**).

## What's here

```
demo/live/
├── sql/setup.sql ChronoTable + 3-level RollUp DAG + LVC + tiered
│ retention + enable_sync (CDF). Idempotent.
├── notebooks/stream_ticks.py Continuous synthetic ingest (psycopg3 + M2M OAuth).
├── bundle/databricks.yml 5 serverless jobs. Reuses the repo's
│ databricks/workflows/* maintenance jobs; adds stream_ticks.
└── grafana/ Local Grafana stack — hot (Lakebase) + cold (UC Delta).
```

## How it maps to current LakeTS capabilities

| Layer | Mechanism |
|---|---|
| Ingest | `stream_ticks` notebook → `stock_ticks` ChronoTable |
| Partitioning | `partition_manager` job → `_ensure_partitions()` |
| RollUps | `rollup_refresh` job → `refresh_rollup_cascade()` (DAG order: 1min→1hour→1day) |
| Latest value | `enable_lvc()` trigger (no job) |
| Cold tier | `enable_sync()` → Lakebase CDF shadow in `lakets_cdf` → Unity Catalog |
| Tiering | `tiering` job → `tier_chunk()` drops partitions only after CDF flush (gated) |
| Retention | `retention` job → `execute_retention()` |

All jobs authenticate with **machine-to-machine OAuth** against the Lakebase
Autoscaling project (no static passwords); the maintenance jobs are the exact
files shipped in `databricks/workflows/` — the demo only adds `stream_ticks`.

## Quick deploy (dev)

```bash
cd demo/live/bundle
databricks bundle deploy -t dev \
--var="lakebase_project=<your-project>" -p <profile>
```

Then run `sql/setup.sql` against the project and start the `stream_ticks` job.
See the docs guide for the CDF prerequisite, Grafana wiring, and teardown.
140 changes: 140 additions & 0 deletions demo/live/bundle/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
bundle:
name: lakets-live-demo

# Reuses the repo's production maintenance jobs (databricks/workflows/*.py) so the
# demo never drifts from the shipped code — it only adds stream_ticks. Both source
# dirs are synced to the workspace; the reused jobs sit next to lakebase_utils.py
# so their sibling import resolves.
sync:
paths:
- ../../../databricks/workflows
- ../notebooks

variables:
lakebase_project:
description: "Lakebase Autoscaling project name (e.g. lakets-tiering-test)"
service_principal_name:
description: "Service principal the jobs run as (prod target). Must own a Lakebase Postgres role."
default: ""
symbols_count:
description: "stream_ticks: number of symbols (10/100/1000)"
default: "10"
rows_per_sec:
description: "stream_ticks: ingest rate (1/10/100/1000)"
default: "10"
burst_mode:
description: "stream_ticks: periodic 10k-row bursts (on/off)"
default: "off"

resources:
jobs:
# -------------------------------------------------------------------
# stream_ticks — continuous ingest. The heartbeat of the demo (demo-only).
# -------------------------------------------------------------------
lakets_demo_stream_ticks:
name: "[LakeTS Demo] stream_ticks (continuous)"
description: "Continuously writes synthetic ticks to stock_ticks."
max_concurrent_runs: 1
continuous:
pause_status: UNPAUSED
tasks:
- task_key: stream
notebook_task:
notebook_path: ../notebooks/stream_ticks.py
base_parameters:
lakebase_project: ${var.lakebase_project}
symbols_count: ${var.symbols_count}
rows_per_sec: ${var.rows_per_sec}
burst_mode: ${var.burst_mode}
duration_minutes: "0"
environment_key: demo_env
environments: &demo_envs
- environment_key: demo_env
spec:
client: "3"
dependencies:
- "psycopg[binary]>=3.1,<4.0"
- "databricks-sdk>=0.81.0,<1.0.0"

# -------------------------------------------------------------------
# partition_manager — every 5 min. Watch partitions appear. (repo job)
# -------------------------------------------------------------------
lakets_demo_partition_manager:
name: "[LakeTS Demo] partition_manager"
schedule:
quartz_cron_expression: "0 */5 * * * ?"
timezone_id: "UTC"
pause_status: UNPAUSED
environments: *demo_envs
tasks:
- task_key: ensure_partitions
spark_python_task:
python_file: ../../../databricks/workflows/partition_manager.py
parameters:
- ${var.lakebase_project}
environment_key: demo_env

# -------------------------------------------------------------------
# rollup_refresh — every 1 min. Watermarks advance in DAG order. (repo job)
# -------------------------------------------------------------------
lakets_demo_rollup_refresh:
name: "[LakeTS Demo] rollup_refresh"
schedule:
quartz_cron_expression: "0 * * * * ?"
timezone_id: "UTC"
pause_status: UNPAUSED
environments: *demo_envs
tasks:
- task_key: refresh_rollups
spark_python_task:
python_file: ../../../databricks/workflows/rollup_refresh.py
parameters:
- ${var.lakebase_project}
environment_key: demo_env

# -------------------------------------------------------------------
# tiering — every 5 min. Drops cold partitions once CDF has flushed them
# to Unity Catalog (durability-gated). (repo job)
# -------------------------------------------------------------------
lakets_demo_tiering:
name: "[LakeTS Demo] tiering"
schedule:
quartz_cron_expression: "0 */5 * * * ?"
timezone_id: "UTC"
pause_status: UNPAUSED
environments: *demo_envs
tasks:
- task_key: tier_cold_partitions
spark_python_task:
python_file: ../../../databricks/workflows/tiering_job.py
parameters:
- ${var.lakebase_project}
environment_key: demo_env

# -------------------------------------------------------------------
# retention — every 15 min. Hot tier shrinks; UC keeps history. (repo job)
# -------------------------------------------------------------------
lakets_demo_retention:
name: "[LakeTS Demo] retention"
schedule:
quartz_cron_expression: "0 */15 * * * ?"
timezone_id: "UTC"
pause_status: UNPAUSED
environments: *demo_envs
tasks:
- task_key: enforce_retention
spark_python_task:
python_file: ../../../databricks/workflows/retention_job.py
parameters:
- ${var.lakebase_project}
environment_key: demo_env

targets:
dev:
mode: development
default: true

prod:
mode: production
run_as:
service_principal_name: ${var.service_principal_name}
29 changes: 29 additions & 0 deletions demo/live/grafana/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copy to .env and fill in. `.env` is gitignored.
# Used by docker-compose.yml (podman compose / docker compose).

# --- Grafana ---
GRAFANA_PORT=3030
GF_ADMIN_USER=admin
GF_ADMIN_PASSWORD=admin

# --- Lakebase hot tier (Postgres datasource) ---
# Autoscaling OAuth tokens rotate ~hourly and Grafana cannot refresh them, so the
# hot-tier datasource needs a STATIC login. Enable native Postgres login on the
# project and create a dedicated role + password for Grafana (see README).
# Host = the project's primary read-write endpoint host.
LAKEBASE_HOST=ep-xxxxxxxx.database.us-east-1.cloud.databricks.com
LAKEBASE_PORT=5432
LAKEBASE_DATABASE=databricks_postgres
LAKEBASE_USER=grafana
LAKEBASE_PASSWORD=

# --- Databricks SQL cold tier (UC Delta) ---
# Optional. Leave blank to run only the hot-tier dashboard.
# DELTA_TABLE = the Unity Catalog Managed Table that Lakebase CDF syncs the
# lakets_cdf._shadow_stock_ticks shadow into.
DATABRICKS_HOST=
DATABRICKS_HTTP_PATH=
DATABRICKS_TOKEN=
DATABRICKS_CATALOG=
DATABRICKS_SCHEMA=
DELTA_TABLE=
1 change: 1 addition & 0 deletions demo/live/grafana/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/.env
53 changes: 53 additions & 0 deletions demo/live/grafana/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# LakeTS Live Demo — Grafana

A local Grafana stack with two datasources:

- **Lakebase (hot tier)** — Postgres datasource, queries `stock_ticks`, the LVC,
the RollUp tables, `lakets.show_chunks`, the invalidation log, etc.
- **Databricks (cold tier)** — the community `mullerpeter-databricks-datasource`
plugin, queries the Unity Catalog Managed Table that Lakebase CDF syncs from
`lakets_cdf._shadow_stock_ticks`.

Dashboards (`dashboards/`): `lakets_live.json` (hot, fast refresh),
`lakets_cold_tier.json` (cold / UC), `lakets_continuum.json` (hot + cold combined).

## Run

```bash
cp .env.example .env # fill in the values below
podman compose --env-file .env up -d # or: docker compose
# open http://localhost:3030 (anonymous Viewer enabled)
```

## Auth model (important on Autoscaling)

The jobs authenticate with rotating **M2M OAuth**, but Grafana's Postgres
datasource can't refresh a token, so the **hot tier needs a static login**:

1. Enable native Postgres login on the project (one-time):
```bash
databricks postgres update-project projects/<project> \
--json '{"spec":{"enable_pg_native_login":true}}' -p <profile>
```
2. As a Lakebase admin, create a least-privilege role for Grafana and a password:
```sql
CREATE ROLE grafana LOGIN PASSWORD '<strong-password>';
GRANT USAGE ON SCHEMA public, lakets TO grafana;
GRANT SELECT ON ALL TABLES IN SCHEMA public, lakets TO grafana;
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA lakets TO grafana;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO grafana;
```
3. Put that host / user / password in `.env` (`LAKEBASE_*`).

The **cold tier** uses a Databricks **SQL warehouse** + a token. Prefer a
service-principal-owned token so audit logs attribute Grafana queries to the SP,
not your user. Set `DATABRICKS_HOST`, `DATABRICKS_HTTP_PATH`, `DATABRICKS_TOKEN`,
and `DELTA_TABLE` (the UC table CDF syncs into). Leave them blank to run only the
hot-tier dashboard.

## Notes

- The `mullerpeter-databricks-datasource` plugin is delisted from the Grafana
catalog but still maintained; it installs unsigned from its GitHub release via
`GF_INSTALL_PLUGINS` in `docker-compose.yml`.
- `.env` is gitignored — never commit credentials.
Loading
Loading