From e6fc2fea86b7d061c73dd498332274ab93d00252 Mon Sep 17 00:00:00 2001 From: taran Date: Mon, 1 Jun 2026 19:49:32 +0200 Subject: [PATCH] feat(demo): add LakeTS Live streaming demo on Lakebase Autoscaling A living end-to-end demo: synthetic ticks stream into a Lakebase Autoscaling project while serverless jobs drive partitioning, DAG-ordered RollUp refresh, CDF-gated tiering, and retention; Lakebase CDF replicates to Unity Catalog. Adapted from the standalone lakets-live-demo to current capabilities: - autoscale M2M OAuth (w.postgres endpoint resolution) + psycopg3 - reuses the shipped databricks/workflows/* maintenance jobs (no drift); the demo adds only stream_ticks - cold tier via lakets.enable_sync (Lakebase CDF, shadow in lakets_cdf) - create_rollup without the removed p_refresh_mode arg - tiering_job (CDF durability gate) instead of a Spark compression step Also fixes the repo RollUp job to refresh in DAG order: - rollup_refresh.py now calls lakets.refresh_rollup_cascade() instead of an alphabetical refresh_rollup() loop that refreshed parents before children (1day before 1hour before 1min). Guard test added. Demos are no longer gitignored (demo/ + the any-depth grafana/ pattern removed; .env.example un-ignored). Old local demos archived out of the repo. New Docusaurus guide: guides/live-demo (step-by-step setup) under a Demos sidebar category. Grafana stack (hot Lakebase + cold UC datasources) included. --- .gitignore | 5 +- databricks/workflows/rollup_refresh.py | 57 ++- demo/live/README.md | 50 +++ demo/live/bundle/databricks.yml | 140 +++++++ demo/live/grafana/.env.example | 29 ++ demo/live/grafana/.gitignore | 1 + demo/live/grafana/README.md | 53 +++ .../grafana/dashboards/lakets_cold_tier.json | 193 +++++++++ .../grafana/dashboards/lakets_continuum.json | 370 ++++++++++++++++++ demo/live/grafana/dashboards/lakets_live.json | 291 ++++++++++++++ demo/live/grafana/docker-compose.yml | 38 ++ .../provisioning/dashboards/dashboards.yml | 13 + .../provisioning/datasources/databricks.yml | 36 ++ .../provisioning/datasources/lakebase.yml | 21 + demo/live/notebooks/stream_ticks.py | 180 +++++++++ demo/live/sql/setup.sql | 251 ++++++++++++ tests/test_python_patterns.py | 7 + website/docs/guides/live-demo.md | 188 +++++++++ website/sidebars.ts | 6 + 19 files changed, 1894 insertions(+), 35 deletions(-) create mode 100644 demo/live/README.md create mode 100644 demo/live/bundle/databricks.yml create mode 100644 demo/live/grafana/.env.example create mode 100644 demo/live/grafana/.gitignore create mode 100644 demo/live/grafana/README.md create mode 100644 demo/live/grafana/dashboards/lakets_cold_tier.json create mode 100644 demo/live/grafana/dashboards/lakets_continuum.json create mode 100644 demo/live/grafana/dashboards/lakets_live.json create mode 100644 demo/live/grafana/docker-compose.yml create mode 100644 demo/live/grafana/provisioning/dashboards/dashboards.yml create mode 100644 demo/live/grafana/provisioning/datasources/databricks.yml create mode 100644 demo/live/grafana/provisioning/datasources/lakebase.yml create mode 100644 demo/live/notebooks/stream_ticks.py create mode 100644 demo/live/sql/setup.sql create mode 100644 website/docs/guides/live-demo.md diff --git a/.gitignore b/.gitignore index 6604812..6495932 100644 --- a/.gitignore +++ b/.gitignore @@ -12,10 +12,6 @@ CLAUDE.md # Backstop: any stray scratch planning markdown left at repo root (move it to .local/) /*_PLAN.md -# Internal-only dirs kept at repo root (referenced by tracked config/docs/README) -demo/ -grafana/ - # Local website dev/debug helper scripts (not part of the site build) website/scripts/ @@ -63,6 +59,7 @@ venv/ # Environment .env .env.* +!.env.example # GitHub Actions runner cache _actions/ diff --git a/databricks/workflows/rollup_refresh.py b/databricks/workflows/rollup_refresh.py index 9eb2184..4692787 100644 --- a/databricks/workflows/rollup_refresh.py +++ b/databricks/workflows/rollup_refresh.py @@ -1,8 +1,9 @@ """ LakeTS RollUp Refresh Job -Refreshes all RollUps (incremental where configured). +Refreshes all RollUps in dependency (DAG) order via refresh_rollup_cascade(), +so a parent RollUp always reads freshly-refreshed children within one run. -Schedule: Every 15 minutes (configurable per RollUp via refresh_lag). +Schedule: configurable; each RollUp self-gates on its refresh_lag. """ import logging import os @@ -25,37 +26,31 @@ logger = logging.getLogger("lakets.rollup_refresh") -def run(project_name: str): - """Refresh all RollUps.""" +def run(project_name: str) -> int: + """Refresh all RollUps in DAG order. Each RollUp self-gates on refresh_lag, + so children are refreshed before the parents that read them.""" with lakebase_cursor(project_name) as cur: - rollups = fetch_all(cur, """ - SELECT name, refresh_lag, last_refreshed_at, watermark - FROM lakets._rollup_registry - ORDER BY name - """) - logger.info("Found %d RollUp(s)", len(rollups)) - - refreshed = 0 - skipped = 0 failures = [] - for rollup in rollups: - try: - cur.execute( - "SELECT lakets.refresh_rollup(%s)", - (rollup["name"],), - ) - result = cur.fetchone()[0] - if result: - refreshed += 1 - logger.info("Refreshed: %s", rollup["name"]) - else: - skipped += 1 - logger.info("Skipped (refresh_lag): %s", rollup["name"]) - except Exception as e: - logger.error("Failed to refresh %s: %s", rollup["name"], e) - failures.append(rollup["name"]) - - logger.info("Refreshed %d, skipped %d / %d total", refreshed, skipped, len(rollups)) + try: + results = fetch_all(cur, """ + SELECT rollup_name, refreshed, refresh_ms + FROM lakets.refresh_rollup_cascade() + """) + except Exception as e: + # Whole-cascade failure (e.g. a broken RollUp query). Surface it. + failures.append("refresh_rollup_cascade") + logger.error("Cascade refresh failed: %s", e) + raise + + refreshed = sum(1 for r in results if r["refreshed"]) + skipped = len(results) - refreshed + for r in results: + if r["refreshed"]: + logger.info("Refreshed: %s (%.1f ms)", r["rollup_name"], r["refresh_ms"] or 0.0) + else: + logger.info("Skipped (refresh_lag): %s", r["rollup_name"]) + + logger.info("Refreshed %d, skipped %d / %d total", refreshed, skipped, len(results)) if failures: logger.error("Failed refreshes: %s", ", ".join(failures)) return refreshed diff --git a/demo/live/README.md b/demo/live/README.md new file mode 100644 index 0000000..30c45cb --- /dev/null +++ b/demo/live/README.md @@ -0,0 +1,50 @@ +# LakeTS Live Demo + +A **living** end-to-end demo: synthetic ticks stream into a Lakebase Autoscaling +project while Databricks serverless jobs drive partitioning, DAG-ordered RollUp +refresh, CDF-gated tiering, and retention. Lakebase CDF continuously replicates +to Unity Catalog. The audience watches partitions appear, watermarks advance, the +invalidation log fill and drain, and cold partitions evict — in real time. + +> **Full step-by-step setup is in the docs:** +> [`website/docs/guides/live-demo.md`](../../website/docs/guides/live-demo.md) +> (published at the Docusaurus site under **Guides → Live Demo**). + +## What's here + +``` +demo/live/ +├── sql/setup.sql ChronoTable + 3-level RollUp DAG + LVC + tiered +│ retention + enable_sync (CDF). Idempotent. +├── notebooks/stream_ticks.py Continuous synthetic ingest (psycopg3 + M2M OAuth). +├── bundle/databricks.yml 5 serverless jobs. Reuses the repo's +│ databricks/workflows/* maintenance jobs; adds stream_ticks. +└── grafana/ Local Grafana stack — hot (Lakebase) + cold (UC Delta). +``` + +## How it maps to current LakeTS capabilities + +| Layer | Mechanism | +|---|---| +| Ingest | `stream_ticks` notebook → `stock_ticks` ChronoTable | +| Partitioning | `partition_manager` job → `_ensure_partitions()` | +| RollUps | `rollup_refresh` job → `refresh_rollup_cascade()` (DAG order: 1min→1hour→1day) | +| Latest value | `enable_lvc()` trigger (no job) | +| Cold tier | `enable_sync()` → Lakebase CDF shadow in `lakets_cdf` → Unity Catalog | +| Tiering | `tiering` job → `tier_chunk()` drops partitions only after CDF flush (gated) | +| Retention | `retention` job → `execute_retention()` | + +All jobs authenticate with **machine-to-machine OAuth** against the Lakebase +Autoscaling project (no static passwords); the maintenance jobs are the exact +files shipped in `databricks/workflows/` — the demo only adds `stream_ticks`. + +## Quick deploy (dev) + +```bash +cd demo/live/bundle +databricks bundle deploy -t dev \ + --var="lakebase_project=" -p +``` + +Then run `sql/setup.sql` against the project and start the `stream_ticks` job. +See the docs guide for the CDF prerequisite, Grafana wiring, and teardown. diff --git a/demo/live/bundle/databricks.yml b/demo/live/bundle/databricks.yml new file mode 100644 index 0000000..660cf22 --- /dev/null +++ b/demo/live/bundle/databricks.yml @@ -0,0 +1,140 @@ +bundle: + name: lakets-live-demo + +# Reuses the repo's production maintenance jobs (databricks/workflows/*.py) so the +# demo never drifts from the shipped code — it only adds stream_ticks. Both source +# dirs are synced to the workspace; the reused jobs sit next to lakebase_utils.py +# so their sibling import resolves. +sync: + paths: + - ../../../databricks/workflows + - ../notebooks + +variables: + lakebase_project: + description: "Lakebase Autoscaling project name (e.g. lakets-tiering-test)" + service_principal_name: + description: "Service principal the jobs run as (prod target). Must own a Lakebase Postgres role." + default: "" + symbols_count: + description: "stream_ticks: number of symbols (10/100/1000)" + default: "10" + rows_per_sec: + description: "stream_ticks: ingest rate (1/10/100/1000)" + default: "10" + burst_mode: + description: "stream_ticks: periodic 10k-row bursts (on/off)" + default: "off" + +resources: + jobs: + # ------------------------------------------------------------------- + # stream_ticks — continuous ingest. The heartbeat of the demo (demo-only). + # ------------------------------------------------------------------- + lakets_demo_stream_ticks: + name: "[LakeTS Demo] stream_ticks (continuous)" + description: "Continuously writes synthetic ticks to stock_ticks." + max_concurrent_runs: 1 + continuous: + pause_status: UNPAUSED + tasks: + - task_key: stream + notebook_task: + notebook_path: ../notebooks/stream_ticks.py + base_parameters: + lakebase_project: ${var.lakebase_project} + symbols_count: ${var.symbols_count} + rows_per_sec: ${var.rows_per_sec} + burst_mode: ${var.burst_mode} + duration_minutes: "0" + environment_key: demo_env + environments: &demo_envs + - environment_key: demo_env + spec: + client: "3" + dependencies: + - "psycopg[binary]>=3.1,<4.0" + - "databricks-sdk>=0.81.0,<1.0.0" + + # ------------------------------------------------------------------- + # partition_manager — every 5 min. Watch partitions appear. (repo job) + # ------------------------------------------------------------------- + lakets_demo_partition_manager: + name: "[LakeTS Demo] partition_manager" + schedule: + quartz_cron_expression: "0 */5 * * * ?" + timezone_id: "UTC" + pause_status: UNPAUSED + environments: *demo_envs + tasks: + - task_key: ensure_partitions + spark_python_task: + python_file: ../../../databricks/workflows/partition_manager.py + parameters: + - ${var.lakebase_project} + environment_key: demo_env + + # ------------------------------------------------------------------- + # rollup_refresh — every 1 min. Watermarks advance in DAG order. (repo job) + # ------------------------------------------------------------------- + lakets_demo_rollup_refresh: + name: "[LakeTS Demo] rollup_refresh" + schedule: + quartz_cron_expression: "0 * * * * ?" + timezone_id: "UTC" + pause_status: UNPAUSED + environments: *demo_envs + tasks: + - task_key: refresh_rollups + spark_python_task: + python_file: ../../../databricks/workflows/rollup_refresh.py + parameters: + - ${var.lakebase_project} + environment_key: demo_env + + # ------------------------------------------------------------------- + # tiering — every 5 min. Drops cold partitions once CDF has flushed them + # to Unity Catalog (durability-gated). (repo job) + # ------------------------------------------------------------------- + lakets_demo_tiering: + name: "[LakeTS Demo] tiering" + schedule: + quartz_cron_expression: "0 */5 * * * ?" + timezone_id: "UTC" + pause_status: UNPAUSED + environments: *demo_envs + tasks: + - task_key: tier_cold_partitions + spark_python_task: + python_file: ../../../databricks/workflows/tiering_job.py + parameters: + - ${var.lakebase_project} + environment_key: demo_env + + # ------------------------------------------------------------------- + # retention — every 15 min. Hot tier shrinks; UC keeps history. (repo job) + # ------------------------------------------------------------------- + lakets_demo_retention: + name: "[LakeTS Demo] retention" + schedule: + quartz_cron_expression: "0 */15 * * * ?" + timezone_id: "UTC" + pause_status: UNPAUSED + environments: *demo_envs + tasks: + - task_key: enforce_retention + spark_python_task: + python_file: ../../../databricks/workflows/retention_job.py + parameters: + - ${var.lakebase_project} + environment_key: demo_env + +targets: + dev: + mode: development + default: true + + prod: + mode: production + run_as: + service_principal_name: ${var.service_principal_name} diff --git a/demo/live/grafana/.env.example b/demo/live/grafana/.env.example new file mode 100644 index 0000000..6d21746 --- /dev/null +++ b/demo/live/grafana/.env.example @@ -0,0 +1,29 @@ +# Copy to .env and fill in. `.env` is gitignored. +# Used by docker-compose.yml (podman compose / docker compose). + +# --- Grafana --- +GRAFANA_PORT=3030 +GF_ADMIN_USER=admin +GF_ADMIN_PASSWORD=admin + +# --- Lakebase hot tier (Postgres datasource) --- +# Autoscaling OAuth tokens rotate ~hourly and Grafana cannot refresh them, so the +# hot-tier datasource needs a STATIC login. Enable native Postgres login on the +# project and create a dedicated role + password for Grafana (see README). +# Host = the project's primary read-write endpoint host. +LAKEBASE_HOST=ep-xxxxxxxx.database.us-east-1.cloud.databricks.com +LAKEBASE_PORT=5432 +LAKEBASE_DATABASE=databricks_postgres +LAKEBASE_USER=grafana +LAKEBASE_PASSWORD= + +# --- Databricks SQL cold tier (UC Delta) --- +# Optional. Leave blank to run only the hot-tier dashboard. +# DELTA_TABLE = the Unity Catalog Managed Table that Lakebase CDF syncs the +# lakets_cdf._shadow_stock_ticks shadow into. +DATABRICKS_HOST= +DATABRICKS_HTTP_PATH= +DATABRICKS_TOKEN= +DATABRICKS_CATALOG= +DATABRICKS_SCHEMA= +DELTA_TABLE= diff --git a/demo/live/grafana/.gitignore b/demo/live/grafana/.gitignore new file mode 100644 index 0000000..f10862a --- /dev/null +++ b/demo/live/grafana/.gitignore @@ -0,0 +1 @@ +/.env diff --git a/demo/live/grafana/README.md b/demo/live/grafana/README.md new file mode 100644 index 0000000..465174c --- /dev/null +++ b/demo/live/grafana/README.md @@ -0,0 +1,53 @@ +# LakeTS Live Demo — Grafana + +A local Grafana stack with two datasources: + +- **Lakebase (hot tier)** — Postgres datasource, queries `stock_ticks`, the LVC, + the RollUp tables, `lakets.show_chunks`, the invalidation log, etc. +- **Databricks (cold tier)** — the community `mullerpeter-databricks-datasource` + plugin, queries the Unity Catalog Managed Table that Lakebase CDF syncs from + `lakets_cdf._shadow_stock_ticks`. + +Dashboards (`dashboards/`): `lakets_live.json` (hot, fast refresh), +`lakets_cold_tier.json` (cold / UC), `lakets_continuum.json` (hot + cold combined). + +## Run + +```bash +cp .env.example .env # fill in the values below +podman compose --env-file .env up -d # or: docker compose +# open http://localhost:3030 (anonymous Viewer enabled) +``` + +## Auth model (important on Autoscaling) + +The jobs authenticate with rotating **M2M OAuth**, but Grafana's Postgres +datasource can't refresh a token, so the **hot tier needs a static login**: + +1. Enable native Postgres login on the project (one-time): + ```bash + databricks postgres update-project projects/ \ + --json '{"spec":{"enable_pg_native_login":true}}' -p + ``` +2. As a Lakebase admin, create a least-privilege role for Grafana and a password: + ```sql + CREATE ROLE grafana LOGIN PASSWORD ''; + GRANT USAGE ON SCHEMA public, lakets TO grafana; + GRANT SELECT ON ALL TABLES IN SCHEMA public, lakets TO grafana; + GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA lakets TO grafana; + ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO grafana; + ``` +3. Put that host / user / password in `.env` (`LAKEBASE_*`). + +The **cold tier** uses a Databricks **SQL warehouse** + a token. Prefer a +service-principal-owned token so audit logs attribute Grafana queries to the SP, +not your user. Set `DATABRICKS_HOST`, `DATABRICKS_HTTP_PATH`, `DATABRICKS_TOKEN`, +and `DELTA_TABLE` (the UC table CDF syncs into). Leave them blank to run only the +hot-tier dashboard. + +## Notes + +- The `mullerpeter-databricks-datasource` plugin is delisted from the Grafana + catalog but still maintained; it installs unsigned from its GitHub release via + `GF_INSTALL_PLUGINS` in `docker-compose.yml`. +- `.env` is gitignored — never commit credentials. diff --git a/demo/live/grafana/dashboards/lakets_cold_tier.json b/demo/live/grafana/dashboards/lakets_cold_tier.json new file mode 100644 index 0000000..67bb5bd --- /dev/null +++ b/demo/live/grafana/dashboards/lakets_cold_tier.json @@ -0,0 +1,193 @@ +{ + "uid": "lakets-cold-tier", + "title": "LakeTS — Cold Tier (Delta SCD Type 2)", + "tags": ["lakets", "databricks", "delta"], + "timezone": "browser", + "schemaVersion": 39, + "version": 1, + "refresh": "30s", + "time": { "from": "now-1h", "to": "now" }, + "templating": { + "list": [ + { + "name": "catalog", + "type": "textbox", + "label": "Catalog", + "current": { "text": "taran-interop", "value": "taran-interop" }, + "query": "taran-interop" + }, + { + "name": "schema", + "type": "textbox", + "label": "Schema", + "current": { "text": "lakets", "value": "lakets" }, + "query": "lakets" + }, + { + "name": "table", + "type": "textbox", + "label": "Delta table (Sync target)", + "current": { "text": "lb__shadow_stock_ticks_history", "value": "lb__shadow_stock_ticks_history" }, + "query": "lb__shadow_stock_ticks_history" + } + ] + }, + "annotations": { "list": [] }, + "panels": [ + { + "id": 100, + "type": "text", + "title": "", + "gridPos": { "x": 0, "y": 0, "w": 24, "h": 3 }, + "options": { + "mode": "markdown", + "content": "# Cold Tier — Lakehouse Sync SCD Type 2\n**Target**: `${catalog}.${schema}.${table}` · Every source change is appended with a `_pg_change_type` marker. Deletes never remove data — only add a tombstone row. Historical data is preserved forever." + } + }, + { + "id": 101, + "type": "stat", + "title": "Total SCD Type 2 events", + "gridPos": { "x": 0, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "colorMode": "value", "graphMode": "area", "textMode": "auto" + }, + "fieldConfig": { "defaults": { "color": { "mode": "thresholds" }, "thresholds": { "mode": "absolute", "steps": [ { "color": "text", "value": null } ] }, "unit": "short" }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "SELECT COUNT(*) AS total FROM `${catalog}`.`${schema}`.`${table}`" + } + ] + }, + { + "id": 102, + "type": "stat", + "title": "Insert events", + "gridPos": { "x": 6, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "colorMode": "value", "graphMode": "area", "textMode": "auto" + }, + "fieldConfig": { "defaults": { "color": { "fixedColor": "green", "mode": "fixed" }, "unit": "short" }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "SELECT COUNT(*) AS inserts FROM `${catalog}`.`${schema}`.`${table}` WHERE _pg_change_type = 'insert'" + } + ] + }, + { + "id": 103, + "type": "stat", + "title": "Delete events (tombstones)", + "gridPos": { "x": 12, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "colorMode": "value", "graphMode": "area", "textMode": "auto" + }, + "fieldConfig": { "defaults": { "color": { "fixedColor": "orange", "mode": "fixed" }, "unit": "short" }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "SELECT COUNT(*) AS deletes FROM `${catalog}`.`${schema}`.`${table}` WHERE _pg_change_type = 'delete'" + } + ] + }, + { + "id": 104, + "type": "stat", + "title": "Distinct symbols ever seen", + "gridPos": { "x": 18, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "colorMode": "value", "graphMode": "none", "textMode": "auto" + }, + "fieldConfig": { "defaults": { "unit": "short" }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "SELECT COUNT(DISTINCT symbol) AS symbols FROM `${catalog}`.`${schema}`.`${table}` WHERE _pg_change_type IN ('insert','update_postimage')" + } + ] + }, + { + "id": 110, + "type": "barchart", + "title": "Events by _pg_change_type", + "gridPos": { "x": 0, "y": 7, "w": 12, "h": 8 }, + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "fieldConfig": { + "defaults": { "color": { "mode": "palette-classic" } }, + "overrides": [] + }, + "options": { + "orientation": "horizontal", + "xTickLabelRotation": 0, + "legend": { "displayMode": "list", "placement": "bottom" } + }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "SELECT _pg_change_type AS change_type, COUNT(*) AS events FROM `${catalog}`.`${schema}`.`${table}` GROUP BY _pg_change_type ORDER BY events DESC" + } + ] + }, + { + "id": 111, + "type": "timeseries", + "title": "Events landing per minute", + "gridPos": { "x": 12, "y": 7, "w": 12, "h": 8 }, + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { "drawStyle": "bars", "lineWidth": 0, "fillOpacity": 100 }, + "unit": "short" + }, + "overrides": [] + }, + "options": { "legend": { "displayMode": "list", "placement": "bottom" }, "tooltip": { "mode": "multi" } }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "time_series", + "rawSql": "SELECT date_trunc('MINUTE', _timestamp) AS time, _pg_change_type AS metric, COUNT(*) AS value FROM `${catalog}`.`${schema}`.`${table}` WHERE _timestamp > date_sub(now(), 1) GROUP BY 1, 2 ORDER BY 1" + } + ] + }, + { + "id": 120, + "type": "table", + "title": "Recent change events (last 30)", + "gridPos": { "x": 0, "y": 15, "w": 24, "h": 10 }, + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "fieldConfig": { "defaults": { "custom": { "align": "auto" } }, "overrides": [] }, + "options": { "showHeader": true }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "SELECT _timestamp, _pg_change_type, _pg_lsn, time, symbol, price, volume FROM `${catalog}`.`${schema}`.`${table}` ORDER BY _pg_lsn DESC LIMIT 30" + } + ] + } + ] +} diff --git a/demo/live/grafana/dashboards/lakets_continuum.json b/demo/live/grafana/dashboards/lakets_continuum.json new file mode 100644 index 0000000..fedda8a --- /dev/null +++ b/demo/live/grafana/dashboards/lakets_continuum.json @@ -0,0 +1,370 @@ +{ + "uid": "lakets-continuum", + "title": "LakeTS — Hot ↔ Cold Continuum", + "tags": ["lakets", "lakebase", "databricks", "delta", "union"], + "timezone": "browser", + "schemaVersion": 39, + "version": 6, + "refresh": "10s", + "time": { "from": "now-1h", "to": "now" }, + "templating": { + "list": [ + { + "name": "catalog", + "type": "textbox", + "label": "Cold catalog", + "current": { "text": "taran-interop", "value": "taran-interop" }, + "query": "taran-interop" + }, + { + "name": "schema", + "type": "textbox", + "label": "Cold schema", + "current": { "text": "lakets", "value": "lakets" }, + "query": "lakets" + }, + { + "name": "table", + "type": "textbox", + "label": "Cold table (Sync target)", + "current": { "text": "lb__shadow_stock_ticks_history", "value": "lb__shadow_stock_ticks_history" }, + "query": "lb__shadow_stock_ticks_history" + } + ] + }, + "annotations": { "list": [] }, + "panels": [ + { + "id": 1, + "type": "text", + "title": "", + "gridPos": { "x": 0, "y": 0, "w": 24, "h": 3 }, + "options": { + "mode": "markdown", + "content": "# LakeTS — Hot ↔ Cold Continuum\nEvery panel UNIONs **Lakebase** (hot, last 10 min by retention policy) with **Databricks SQL** (cold, SCD Type 2 Delta via Lakehouse Sync — inserts kept forever). Watch the cold series extend left as time passes while hot stays anchored at `now`. Target: `${catalog}.${schema}.${table}`." + } + }, + + { + "id": 10, + "type": "stat", + "title": "Hot ticks (Lakebase live)", + "description": "stock_ticks row count. Bounded by drop_after=1h retention.", + "gridPos": { "x": 0, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "colorMode": "value", "graphMode": "area", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short", "color": { "fixedColor": "yellow", "mode": "fixed" } }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "format": "table", + "rawSql": "SELECT count(*)::bigint AS hot FROM stock_ticks" + } + ] + }, + { + "id": 11, + "type": "stat", + "title": "Cold inserts (Delta forever)", + "description": "_pg_change_type='insert' rows in the SCD Type 2 history. Never shrinks.", + "gridPos": { "x": 6, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "colorMode": "value", "graphMode": "area", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short", "color": { "fixedColor": "blue", "mode": "fixed" } }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "SELECT count(*) AS cold_inserts FROM `${catalog}`.`${schema}`.`${table}` WHERE _pg_change_type = 'insert'" + } + ] + }, + { + "id": 12, + "type": "stat", + "title": "Cold tombstones (Delta)", + "description": "_pg_change_type='delete' rows. Each retention sweep adds tombstones; original inserts stay.", + "gridPos": { "x": 12, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "colorMode": "value", "graphMode": "area", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short", "color": { "fixedColor": "orange", "mode": "fixed" } }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "SELECT count(*) AS cold_deletes FROM `${catalog}`.`${schema}`.`${table}` WHERE _pg_change_type = 'delete'" + } + ] + }, + { + "id": 13, + "type": "stat", + "title": "CDC lag (sec)", + "description": "Wall-clock seconds since the cold tier's most recent insert. In steady state ~5-30s. Growing means Lakehouse CDF is falling behind. (Cold is the canonical record; hot is the last ~10 min of cold, so we don't sum.)", + "gridPos": { "x": 18, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "colorMode": "value", "graphMode": "area", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "s", "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }, { "color": "orange", "value": 30 }, { "color": "red", "value": 120 }] } }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "SELECT unix_timestamp(current_timestamp()) - unix_timestamp(max(time)) AS lag_sec FROM `${catalog}`.`${schema}`.`${table}` WHERE _pg_change_type = 'insert'" + } + ] + }, + + { + "id": 50, + "type": "stat", + "title": "Lakebase total", + "description": "pg_database_size('databricks_postgres'). Lakebase is the live tier — bounded by retention. Steady state stays small no matter how long the demo runs.", + "gridPos": { "x": 0, "y": 7, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "colorMode": "value", "graphMode": "area", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "bytes", "color": { "fixedColor": "yellow", "mode": "fixed" } }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "format": "table", + "rawSql": "SELECT pg_database_size('databricks_postgres')::bigint AS bytes" + } + ] + }, + { + "id": 51, + "type": "stat", + "title": "Hot tier (stock_ticks partitions)", + "description": "sum(pg_total_relation_size) across all RANGE partitions of stock_ticks. Capped by drop_after=1h. Old partitions are dropped, not deleted row-by-row.", + "gridPos": { "x": 6, "y": 7, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "colorMode": "value", "graphMode": "area", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "bytes", "color": { "fixedColor": "yellow", "mode": "fixed" } }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "format": "table", + "rawSql": "SELECT coalesce(sum(pg_total_relation_size(c.oid)),0)::bigint AS bytes FROM pg_class c JOIN pg_namespace n ON n.oid=c.relnamespace WHERE n.nspname='public' AND c.relname LIKE 'stock_ticks_%'" + } + ] + }, + { + "id": 52, + "type": "stat", + "title": "Active partitions", + "description": "Hourly RANGE partitions managed by lakets.show_chunks(). Climbs as time advances, then plateaus once retention starts dropping old hours.", + "gridPos": { "x": 12, "y": 7, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "colorMode": "value", "graphMode": "none", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short", "color": { "fixedColor": "yellow", "mode": "fixed" } }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "format": "table", + "rawSql": "SELECT count(*)::int AS partitions FROM lakets.show_chunks('stock_ticks') WHERE status='active'" + } + ] + }, + { + "id": 53, + "type": "stat", + "title": "Cold tier (Delta archive)", + "description": "DESCRIBE DETAIL sizeInBytes on the SCD Type 2 history table. Grows monotonically — every insert and tombstone is appended forever. Filter-fields transformation extracts just sizeInBytes from the DESCRIBE row.", + "gridPos": { "x": 18, "y": 7, "w": 6, "h": 4 }, + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "sizeInBytes", "values": false }, "colorMode": "value", "graphMode": "area", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "bytes", "color": { "fixedColor": "blue", "mode": "fixed" } }, "overrides": [] }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "DESCRIBE DETAIL `${catalog}`.`${schema}`.`${table}`" + } + ] + }, + + { + "id": 20, + "type": "text", + "title": "", + "gridPos": { "x": 0, "y": 11, "w": 24, "h": 3 }, + "options": { + "mode": "markdown", + "content": "## Ticks per minute — hot (yellow) overlaid on cold (blue)\nSame metric, two engines. Hot series only renders for the **last ≈10 min** (PG retention tier). Cold series spans the whole timeline. Where they overlap you can verify Lakehouse Sync is in lock-step; where only cold remains, you can see what would have been lost without it." + } + }, + { + "id": 21, + "type": "timeseries", + "title": "Ticks/min — mixed datasource UNION", + "description": "Series A: Lakebase stock_ticks. Series B: Delta SCD Type 2 history (inserts only).", + "gridPos": { "x": 0, "y": 14, "w": 24, "h": 10 }, + "datasource": { "type": "datasource", "uid": "-- Mixed --" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { "drawStyle": "bars", "lineWidth": 0, "fillOpacity": 80, "stacking": { "mode": "none" } }, + "unit": "short" + }, + "overrides": [ + { "matcher": { "id": "byFrameRefID", "options": "A" }, "properties": [{ "id": "color", "value": { "mode": "fixed", "fixedColor": "yellow" } }, { "id": "displayName", "value": "hot (Lakebase)" }] }, + { "matcher": { "id": "byFrameRefID", "options": "B" }, "properties": [{ "id": "color", "value": { "mode": "fixed", "fixedColor": "blue" } }, { "id": "displayName", "value": "cold (Delta)" }] } + ] + }, + "options": { + "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["sum", "lastNotNull"] }, + "tooltip": { "mode": "multi", "sort": "none" } + }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "format": "time_series", + "rawSql": "SELECT lakets.time_bucket('1 minute'::interval, time) AS time, count(*)::int AS ticks FROM stock_ticks WHERE $__timeFilter(time) GROUP BY 1 ORDER BY 1" + }, + { + "refId": "B", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "time_series", + "rawSql": "SELECT date_trunc('MINUTE', time) AS time, count(*) AS ticks FROM `${catalog}`.`${schema}`.`${table}` WHERE _pg_change_type = 'insert' AND $__timeFilter(time) GROUP BY 1 ORDER BY 1" + } + ] + }, + + { + "id": 30, + "type": "text", + "title": "", + "gridPos": { "x": 0, "y": 24, "w": 24, "h": 3 }, + "options": { + "mode": "markdown", + "content": "## Volume + symbols by tier\nLeft: aggregate **volume per minute** by tier (sum of `volume` column). Right: distinct symbol counts — hot shows what's live now, cold shows the full historical universe." + } + }, + { + "id": 31, + "type": "timeseries", + "title": "Volume per minute — hot vs cold", + "description": "sum(volume) per minute from each tier.", + "gridPos": { "x": 0, "y": 27, "w": 16, "h": 9 }, + "datasource": { "type": "datasource", "uid": "-- Mixed --" }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { "drawStyle": "line", "lineWidth": 2, "fillOpacity": 10, "lineInterpolation": "smooth", "spanNulls": true }, + "unit": "short" + }, + "overrides": [ + { "matcher": { "id": "byFrameRefID", "options": "A" }, "properties": [{ "id": "color", "value": { "mode": "fixed", "fixedColor": "yellow" } }, { "id": "displayName", "value": "hot volume" }] }, + { "matcher": { "id": "byFrameRefID", "options": "B" }, "properties": [{ "id": "color", "value": { "mode": "fixed", "fixedColor": "blue" } }, { "id": "displayName", "value": "cold volume" }] } + ] + }, + "options": { "legend": { "displayMode": "table", "placement": "right", "calcs": ["sum", "max"] }, "tooltip": { "mode": "multi" } }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "format": "time_series", + "rawSql": "SELECT lakets.time_bucket('1 minute'::interval, time) AS time, sum(volume)::bigint AS volume FROM stock_ticks WHERE $__timeFilter(time) GROUP BY 1 ORDER BY 1" + }, + { + "refId": "B", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "time_series", + "rawSql": "SELECT date_trunc('MINUTE', time) AS time, sum(volume) AS volume FROM `${catalog}`.`${schema}`.`${table}` WHERE _pg_change_type = 'insert' AND $__timeFilter(time) GROUP BY 1 ORDER BY 1" + } + ] + }, + { + "id": 32, + "type": "bargauge", + "title": "Distinct symbols by tier", + "description": "Hot: symbols currently in LVC. Cold: symbols ever seen in Delta history.", + "gridPos": { "x": 16, "y": 27, "w": 8, "h": 9 }, + "datasource": { "type": "datasource", "uid": "-- Mixed --" }, + "options": { + "orientation": "horizontal", + "displayMode": "gradient", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false } + }, + "fieldConfig": { + "defaults": { "unit": "short", "min": 0, "color": { "mode": "thresholds" }, "thresholds": { "mode": "absolute", "steps": [{ "color": "blue", "value": null }] } }, + "overrides": [ + { "matcher": { "id": "byFrameRefID", "options": "A" }, "properties": [{ "id": "color", "value": { "mode": "fixed", "fixedColor": "yellow" } }, { "id": "displayName", "value": "hot (LVC)" }] }, + { "matcher": { "id": "byFrameRefID", "options": "B" }, "properties": [{ "id": "color", "value": { "mode": "fixed", "fixedColor": "blue" } }, { "id": "displayName", "value": "cold (Delta)" }] } + ] + }, + "targets": [ + { + "refId": "A", + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "format": "table", + "rawSql": "SELECT count(*)::int AS symbols FROM _lvc_stock_ticks" + }, + { + "refId": "B", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "SELECT count(DISTINCT symbol) AS symbols FROM `${catalog}`.`${schema}`.`${table}` WHERE _pg_change_type IN ('insert','update_postimage')" + } + ] + }, + + { + "id": 40, + "type": "text", + "title": "", + "gridPos": { "x": 0, "y": 36, "w": 24, "h": 3 }, + "options": { + "mode": "markdown", + "content": "## Recent events — last 25 from each tier\nUNIONed table. `tier` column identifies the source. Hot rows come from `stock_ticks`; cold rows from the Delta history. Sorted by event time DESC." + } + }, + { + "id": 41, + "type": "table", + "title": "UNION ALL — hot + cold (latest 25 each)", + "description": "Two queries, one panel. Grafana 'concatenate fields' isn't needed — each frame appears as its own page in the table footer; the table renders them stacked.", + "gridPos": { "x": 0, "y": 39, "w": 24, "h": 11 }, + "datasource": { "type": "datasource", "uid": "-- Mixed --" }, + "fieldConfig": { "defaults": { "custom": { "align": "auto" } }, "overrides": [] }, + "options": { "showHeader": true, "footer": { "show": false } }, + "transformations": [ + { + "id": "concatenate", + "options": { "frameNameMode": "label", "frameNameLabel": "tier" } + }, + { + "id": "organize", + "options": { + "excludeByName": {}, + "indexByName": { "tier": 0, "event_time": 1, "symbol": 2, "price": 3, "volume": 4 }, + "renameByName": {} + } + } + ], + "targets": [ + { + "refId": "hot", + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "format": "table", + "rawSql": "SELECT time AS event_time, symbol, price, volume FROM stock_ticks ORDER BY time DESC LIMIT 25" + }, + { + "refId": "cold", + "datasource": { "type": "mullerpeter-databricks-datasource", "uid": "databricks" }, + "format": "table", + "rawSql": "SELECT time AS event_time, symbol, price, volume FROM `${catalog}`.`${schema}`.`${table}` WHERE _pg_change_type = 'insert' ORDER BY time DESC LIMIT 25" + } + ] + } + ] +} diff --git a/demo/live/grafana/dashboards/lakets_live.json b/demo/live/grafana/dashboards/lakets_live.json new file mode 100644 index 0000000..7e48180 --- /dev/null +++ b/demo/live/grafana/dashboards/lakets_live.json @@ -0,0 +1,291 @@ +{ + "uid": "lakets-live", + "title": "LakeTS — Live Activity", + "tags": ["lakets", "lakebase", "time-series"], + "timezone": "browser", + "schemaVersion": 39, + "version": 2, + "refresh": "5s", + "time": { "from": "now-30m", "to": "now" }, + "templating": { + "list": [ + { + "name": "symbol", + "type": "query", + "label": "Symbol focus", + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "query": "SELECT symbol FROM stock_assets ORDER BY symbol", + "refresh": 1, + "current": { "text": "AAPL", "value": "AAPL" } + } + ] + }, + "annotations": { "list": [] }, + "panels": [ + { "id": 1, "type": "text", "title": "", + "gridPos": { "x": 0, "y": 0, "w": 24, "h": 3 }, + "options": { "mode": "markdown", "content": "# LakeTS — Live Activity\nEverything below runs against **Lakebase Postgres** via LakeTS' PL/pgSQL functions. Every panel title names the LakeTS primitive driving it. Dashboard refreshes every 5s." } + }, + + { "id": 2, "type": "stat", "title": "Ticks/sec (last 60s) — ingest rate", + "description": "Live heartbeat. Every tick fires the LVC trigger, the shadow-sync trigger, and the invalidation-log trigger.", + "gridPos": { "x": 0, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "colorMode": "value", "graphMode": "area", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short", "decimals": 1, "color": { "fixedColor": "green", "mode": "fixed" } }, "overrides": [] }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT count(*)::float / 60.0 AS rate FROM stock_ticks WHERE time > now() - interval '60 seconds'" }] + }, + { "id": 3, "type": "stat", "title": "Active symbols (LVC)", + "description": "count(*) on _lvc_stock_ticks — trigger-maintained cache, sub-10ms.", + "gridPos": { "x": 6, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "colorMode": "value", "graphMode": "none", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short", "color": { "fixedColor": "blue", "mode": "fixed" } }, "overrides": [] }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT count(*)::bigint AS symbols FROM _lvc_stock_ticks" }] + }, + { "id": 4, "type": "stat", "title": "Active partitions", + "description": "ChronoTable auto-routes INSERTs to daily/hourly partitions. Count grows as time passes.", + "gridPos": { "x": 12, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "colorMode": "value", "graphMode": "none", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short" }, "overrides": [] }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT count(*)::bigint AS active FROM lakets._chunk_metadata c JOIN lakets._chronotable_registry h ON h.id=c.chronotable_id WHERE h.table_name='stock_ticks' AND c.status='active'" }] + }, + { "id": 5, "type": "stat", "title": "1-min rollup watermark lag", + "description": "Seconds behind now(). Advances each time refresh_rollup_cascade() fires (every 1 min).", + "gridPos": { "x": 18, "y": 3, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "colorMode": "value", "graphMode": "area", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "s", "thresholds": { "mode": "absolute", "steps": [ { "color": "green", "value": null }, { "color": "orange", "value": 120 }, { "color": "red", "value": 300 } ] } }, "overrides": [] }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT EXTRACT(EPOCH FROM (now() - watermark))::int AS lag FROM lakets._rollup_registry WHERE name='ohlcv_1min'" }] + }, + + { "id": 10, "type": "text", "title": "", + "gridPos": { "x": 0, "y": 7, "w": 24, "h": 3 }, + "options": { "mode": "markdown", "content": "## 1. ChronoTable — RANGE partitioning, auto-routing\nLakeTS converts `stock_ticks` into a PostgreSQL RANGE-partitioned table with one partition per hour. Each INSERT is routed to the right chunk by PG's native dispatcher — **no application logic**. Row counts below update live as ticks flow in." } + }, + { "id": 11, "type": "timeseries", "title": "Ticks per minute — time_bucket('1 minute', time)", + "description": "lakets.time_bucket() = date_bin() under the hood. Truncates timestamps to aligned bucket boundaries.", + "gridPos": { "x": 0, "y": 10, "w": 16, "h": 8 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "color": { "mode": "palette-classic" }, "custom": { "drawStyle": "bars", "lineWidth": 0, "fillOpacity": 100 }, "unit": "short" }, "overrides": [] }, + "options": { "legend": { "displayMode": "list", "placement": "bottom" }, "tooltip": { "mode": "single" } }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "time_series", + "rawSql": "SELECT lakets.time_bucket('1 minute'::interval, time) AS time, count(*)::int AS ticks FROM stock_ticks WHERE $__timeFilter(time) GROUP BY 1 ORDER BY 1" }] + }, + { "id": 12, "type": "table", "title": "Active partitions — show_chunks('stock_ticks')", + "description": "Each row = one RANGE partition. 'row_count' is auto-updated by the _touch_chunk_metadata() trigger on every write.", + "gridPos": { "x": 16, "y": 10, "w": 8, "h": 8 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "custom": { "align": "auto" } }, "overrides": [] }, + "options": { "showHeader": true }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT split_part(chunk_name,'.',2) AS chunk, range_start::time AS start, status, row_count FROM lakets.show_chunks('stock_ticks') WHERE status='active' ORDER BY range_start DESC LIMIT 10" }] + }, + + { "id": 20, "type": "text", "title": "", + "gridPos": { "x": 0, "y": 18, "w": 24, "h": 3 }, + "options": { "mode": "markdown", "content": "## 2. Time-series aggregation — `time_bucket` + `first()` + `last()`\nThese three primitives give you OHLCV candles in one GROUP BY. `lakets.first(price, time)` = the price associated with the **earliest** time in each bucket (open). `lakets.last(price, time)` = the price at the **latest** time (close). Driven by the 1-minute rollup, refreshed every minute by the `rollup_refresh_hot` Databricks job." } + }, + { "id": 21, "type": "timeseries", "title": "OHLC candles — ${symbol} (from _rollup_ohlcv_1min)", + "description": "Direct query against the rollup table. Low row count = fast. open/high/low/close all visible.", + "gridPos": { "x": 0, "y": 21, "w": 16, "h": 10 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "color": { "mode": "palette-classic" }, "custom": { "drawStyle": "line", "lineInterpolation": "stepAfter", "lineWidth": 2, "fillOpacity": 0, "spanNulls": true }, "unit": "currencyUSD" }, "overrides": [] }, + "options": { "legend": { "displayMode": "table", "placement": "right", "calcs": ["lastNotNull", "max", "min"] }, "tooltip": { "mode": "multi" } }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "time_series", + "rawSql": "SELECT bucket AS time, open, high, low, close FROM public._rollup_ohlcv_1min WHERE symbol = '${symbol}' AND bucket > now() - interval '30 minutes' ORDER BY 1" }] + }, + { "id": 22, "type": "table", "title": "Latest candle per symbol — 1-minute OHLCV", + "description": "All 10 symbols, their most recent 1-minute bar.", + "gridPos": { "x": 16, "y": 21, "w": 8, "h": 10 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "custom": { "align": "auto" }, "decimals": 2 }, "overrides": [{"matcher":{"id":"byName","options":"volume"},"properties":[{"id":"decimals","value":0}]}] }, + "options": { "showHeader": true }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT DISTINCT ON (symbol) symbol, open, high, low, close, volume FROM public._rollup_ohlcv_1min ORDER BY symbol, bucket DESC" }] + }, + + { "id": 30, "type": "text", "title": "", + "gridPos": { "x": 0, "y": 31, "w": 24, "h": 3 }, + "options": { "mode": "markdown", "content": "## 3. Change-detection primitives — `delta()` and `rate()`\n`lakets.delta(current, previous)` returns `current - previous` (with counter-reset handling). `lakets.rate(val, prev_val, time, prev_time)` divides by elapsed seconds. Common pairing: price change per bucket + VWAP." } + }, + { "id": 31, "type": "timeseries", "title": "1-min price Δ — delta() — ${symbol}", + "description": "Minute-over-minute close delta. Bars above/below zero show direction of change.", + "gridPos": { "x": 0, "y": 34, "w": 12, "h": 8 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "color": { "mode": "palette-classic" }, "custom": { "drawStyle": "bars", "lineWidth": 0, "fillOpacity": 80 }, "unit": "currencyUSD" }, "overrides": [] }, + "options": { "legend": { "displayMode": "hidden" }, "tooltip": { "mode": "single" } }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "time_series", + "rawSql": "SELECT bucket AS time, lakets.delta(close, LAG(close) OVER (ORDER BY bucket)) AS price_delta FROM public._rollup_ohlcv_1min WHERE symbol='${symbol}' AND bucket > now() - interval '30 minutes' ORDER BY 1" }] + }, + { "id": 32, "type": "timeseries", "title": "VWAP — sum(price*volume)/sum(volume) per minute", + "description": "Volume-weighted average price. Shows where most of the trading volume happened.", + "gridPos": { "x": 12, "y": 34, "w": 12, "h": 8 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "color": { "mode": "palette-classic" }, "custom": { "drawStyle": "line", "lineInterpolation": "smooth", "lineWidth": 2, "fillOpacity": 15 }, "unit": "currencyUSD" }, "overrides": [] }, + "options": { "legend": { "displayMode": "list", "placement": "bottom" }, "tooltip": { "mode": "multi" } }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "time_series", + "rawSql": "SELECT lakets.time_bucket('1 minute'::interval, time) AS time, symbol AS metric, (sum(price*volume)/NULLIF(sum(volume),0)) AS value FROM stock_ticks WHERE $__timeFilter(time) GROUP BY 1, symbol ORDER BY 1" }] + }, + + { "id": 40, "type": "text", "title": "", + "gridPos": { "x": 0, "y": 42, "w": 24, "h": 3 }, + "options": { "mode": "markdown", "content": "## 4. Window functions on rollups — Moving averages + Bollinger Bands\nBecause the rollup tables are compact, **window functions run in microseconds** instead of scanning millions of raw ticks. SMA5/SMA20 are `AVG() OVER (ROWS BETWEEN N PRECEDING)`. Bollinger = SMA20 ± 2·STDDEV(close, 20)." } + }, + { "id": 41, "type": "timeseries", "title": "Close + SMA5 + SMA20 — ${symbol}", + "description": "When SMA5 crosses above SMA20 it's a classic 'golden cross' signal. Read from _rollup_ohlcv_1min.", + "gridPos": { "x": 0, "y": 45, "w": 12, "h": 9 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "color": { "mode": "palette-classic" }, "custom": { "drawStyle": "line", "lineInterpolation": "smooth", "lineWidth": 2 }, "unit": "currencyUSD" }, "overrides": [{"matcher":{"id":"byName","options":"close"},"properties":[{"id":"custom.lineWidth","value":1},{"id":"custom.fillOpacity","value":20}]}] }, + "options": { "legend": { "displayMode": "table", "placement": "right", "calcs": ["lastNotNull"] }, "tooltip": { "mode": "multi" } }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "time_series", + "rawSql": "SELECT bucket AS time, close, AVG(close) OVER (ORDER BY bucket ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sma5, AVG(close) OVER (ORDER BY bucket ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS sma20 FROM public._rollup_ohlcv_1min WHERE symbol='${symbol}' AND bucket > now() - interval '30 minutes' ORDER BY 1" }] + }, + { "id": 42, "type": "timeseries", "title": "Bollinger Bands — ${symbol} (SMA20 ± 2σ)", + "description": "Upper/lower = SMA20 plus/minus 2·STDDEV(close, 20). Wider = higher recent volatility.", + "gridPos": { "x": 12, "y": 45, "w": 12, "h": 9 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "color": { "mode": "palette-classic" }, "custom": { "drawStyle": "line", "lineWidth": 2, "fillOpacity": 10, "fillBelowTo": "lower_band" }, "unit": "currencyUSD" }, "overrides": [] }, + "options": { "legend": { "displayMode": "table", "placement": "right", "calcs": ["lastNotNull"] }, "tooltip": { "mode": "multi" } }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "time_series", + "rawSql": "WITH w AS (SELECT bucket, close, AVG(close) OVER (ORDER BY bucket ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS sma, STDDEV(close) OVER (ORDER BY bucket ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS sd FROM public._rollup_ohlcv_1min WHERE symbol='${symbol}' AND bucket > now() - interval '30 minutes') SELECT bucket AS time, close, sma AS sma20, sma + 2*sd AS upper_band, sma - 2*sd AS lower_band FROM w ORDER BY 1" }] + }, + + { "id": 50, "type": "text", "title": "", + "gridPos": { "x": 0, "y": 54, "w": 24, "h": 3 }, + "options": { "mode": "markdown", "content": "## 5. `histogram()` — price distribution across buckets\n`lakets.histogram(value, min, max, num_buckets)` maps each value to an integer bucket index. Gives you a density curve over a fixed range." } + }, + { "id": 51, "type": "barchart", "title": "Price distribution — ${symbol} last 30 min", + "description": "10 equal-width buckets between min and max seen in the window.", + "gridPos": { "x": 0, "y": 57, "w": 24, "h": 8 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "color": { "mode": "palette-classic" }, "custom": { "lineWidth": 0, "fillOpacity": 80 } }, "overrides": [] }, + "options": { "orientation": "vertical", "xTickLabelRotation": 0, "legend": { "displayMode": "hidden" }, "tooltip": { "mode": "single" } }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "WITH b AS (SELECT min(price) AS lo, max(price) AS hi FROM stock_ticks WHERE symbol='${symbol}' AND time > now() - interval '30 minutes') SELECT (lo + (hi-lo) * (h.bucket-1) / 10.0)::numeric(10,2)::text AS price_bucket, count(*)::int AS observations FROM stock_ticks s, b, LATERAL (SELECT lakets.histogram(s.price, b.lo::double precision, b.hi::double precision, 10) AS bucket) h WHERE s.symbol='${symbol}' AND s.time > now() - interval '30 minutes' GROUP BY price_bucket ORDER BY price_bucket" }] + }, + + { "id": 60, "type": "text", "title": "", + "gridPos": { "x": 0, "y": 65, "w": 24, "h": 3 }, + "options": { "mode": "markdown", "content": "## 6. Gap-fill primitives — `time_bucket_gapfill()` + `locf()`\n`time_bucket_gapfill()` emits EVERY bucket boundary, even where no data exists. `locf()` (Last Observation Carried Forward) fills those NULLs from the most recent prior value." } + }, + { "id": 61, "type": "timeseries", "title": "time_bucket_gapfill + locf — carries forward across quiet periods", + "description": "Even if a minute has zero ticks, the series continues at the last-known price. Switch between symbols to see coverage.", + "gridPos": { "x": 0, "y": 68, "w": 24, "h": 8 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "color": { "mode": "palette-classic" }, "custom": { "drawStyle": "line", "lineInterpolation": "stepAfter", "lineWidth": 2, "spanNulls": true }, "unit": "currencyUSD" }, "overrides": [] }, + "options": { "legend": { "displayMode": "list", "placement": "bottom" }, "tooltip": { "mode": "single" } }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "time_series", + "rawSql": "WITH buckets AS (SELECT lakets.time_bucket_gapfill('1 minute'::interval, now() - interval '30 minutes', now()) AS bucket), raw AS (SELECT lakets.time_bucket('1 minute'::interval, time) AS bucket, lakets.last(price, time) AS price FROM stock_ticks WHERE symbol='${symbol}' AND time > now() - interval '30 minutes' GROUP BY 1) SELECT b.bucket AS time, lakets.locf(r.price, LAG(r.price) OVER (ORDER BY b.bucket)) AS gapfilled_close FROM buckets b LEFT JOIN raw r USING (bucket) ORDER BY 1" }] + }, + + { "id": 70, "type": "text", "title": "", + "gridPos": { "x": 0, "y": 76, "w": 24, "h": 3 }, + "options": { "mode": "markdown", "content": "## 7. Rollup engine internals — DAG cascade + watermark + invalidation log\nThe rollup DAG (1min → 1hour → 1day) refreshes in topological order. Each level has its own watermark. Writes below the watermark add entries to `_rollup_invalidation_log` — the refresh drains them every minute." } + }, + { "id": 71, "type": "table", "title": "show_rollup_dag() — refresh order + watermarks", + "description": "Kahn's topological sort orders the refresh. Each row shows where we are vs now().", + "gridPos": { "x": 0, "y": 79, "w": 12, "h": 7 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "custom": { "align": "auto" } }, "overrides": [] }, + "options": { "showHeader": true }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT r.name, d.refresh_order AS ord, r.bucket_interval::text AS bucket, d.depends_on_names AS depends_on, r.watermark, EXTRACT(EPOCH FROM (now() - r.watermark))::int AS wm_lag_s FROM lakets._rollup_registry r JOIN lakets.show_rollup_dag() d ON d.rollup_name=r.name ORDER BY d.refresh_order" }] + }, + { "id": 72, "type": "timeseries", "title": "Invalidation log depth + ticks/min (fill/drain cycle)", + "description": "Orange = dirty buckets still queued. Green = ticks entering the system. Every minute, the cascade drains orange.", + "gridPos": { "x": 12, "y": 79, "w": 12, "h": 7 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "color": { "mode": "palette-classic" }, "custom": { "drawStyle": "line", "lineInterpolation": "stepAfter", "lineWidth": 2 } }, "overrides": [{"matcher":{"id":"byName","options":"dirty_buckets"},"properties":[{"id":"color","value":{"mode":"fixed","fixedColor":"orange"}}]},{"matcher":{"id":"byName","options":"ticks_per_min"},"properties":[{"id":"color","value":{"mode":"fixed","fixedColor":"green"}}]}] }, + "options": { "legend": { "displayMode": "list", "placement": "bottom" }, "tooltip": { "mode": "multi" } }, + "targets": [ + { "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "time_series", + "rawSql": "SELECT now() AS time, (SELECT count(*)::int FROM lakets._rollup_invalidation_log) AS dirty_buckets" }, + { "refId": "B", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "time_series", + "rawSql": "SELECT lakets.time_bucket('1 minute'::interval, time) AS time, count(*)::int AS ticks_per_min FROM stock_ticks WHERE time > now() - interval '30 minutes' GROUP BY 1 ORDER BY 1" } + ] + }, + + { "id": 80, "type": "text", "title": "", + "gridPos": { "x": 0, "y": 86, "w": 24, "h": 3 }, + "options": { "mode": "markdown", "content": "## 8. Last Value Cache (LVC) — sub-10ms latest state\nEvery INSERT on `stock_ticks` fires `trg_lakets_lvc` which upserts into `_lvc_stock_ticks`. PK lookup by symbol returns the latest price in O(1). Contrast with scanning millions of raw ticks to find `max(time)` per symbol." } + }, + { "id": 81, "type": "table", "title": "LVC — latest price + staleness per symbol", + "description": "Select a symbol from the dropdown at the top to focus the other panels on it.", + "gridPos": { "x": 0, "y": 89, "w": 24, "h": 8 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "custom": { "align": "auto" }, "decimals": 2 }, "overrides": [{"matcher":{"id":"byName","options":"price"},"properties":[{"id":"unit","value":"currencyUSD"}]},{"matcher":{"id":"byName","options":"stale_seconds"},"properties":[{"id":"unit","value":"s"},{"id":"custom.displayMode","value":"color-background"},{"id":"thresholds","value":{"mode":"absolute","steps":[{"color":"green","value":null},{"color":"orange","value":30},{"color":"red","value":120}]}}]}] }, + "options": { "showHeader": true }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT l.symbol, a.sector, l.price, l.volume, l.last_updated, EXTRACT(EPOCH FROM (now() - l.last_updated))::int AS stale_seconds FROM _lvc_stock_ticks l LEFT JOIN stock_assets a USING (symbol) ORDER BY l.symbol" }] + }, + + { "id": 85, "type": "text", "title": "", + "gridPos": { "x": 0, "y": 97, "w": 24, "h": 3 }, + "options": { "mode": "markdown", "content": "## 9. Retention policy — declarative lifecycle\nThe policy lives in `lakets._policy_registry`. `tier_after` controls when the retention job `DELETE`s from `_shadow_stock_ticks` (triggers SCD Type 2 tombstones in Delta — history preserved). `drop_after` controls when `lakets.execute_retention()` drops hot partitions (DDL, not replicated). To change, run `lakets.add_tiered_retention_policy()` — no code deploy needed." } + }, + { "id": 86, "type": "table", "title": "show_retention_policy('stock_ticks')", + "description": "Config + last_run_at. Updated each time retention fires.", + "gridPos": { "x": 0, "y": 100, "w": 16, "h": 5 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "fieldConfig": { "defaults": { "custom": { "align": "auto" } }, "overrides": [] }, + "options": { "showHeader": true }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT policy_type, tier_after, drop_after, enabled, last_run_at, EXTRACT(EPOCH FROM (now() - last_run_at))::int AS since_last_s FROM lakets.show_retention_policy('stock_ticks')" }] + }, + { "id": 87, "type": "stat", "title": "Policies total", + "description": "From lakets_metrics() — Prometheus-compatible. Any policy registered counts here.", + "gridPos": { "x": 16, "y": 100, "w": 8, "h": 5 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "value", "graphMode": "none", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short" }, "overrides": [] }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT SUM(metric_value)::int AS total FROM lakets.lakets_metrics() WHERE metric_name = 'lakets_policies_total'" }] + }, + + { "id": 90, "type": "text", "title": "", + "gridPos": { "x": 0, "y": 105, "w": 24, "h": 3 }, + "options": { "mode": "markdown", "content": "## 10. Shadow table — ready for Lakehouse Sync\nPartitioned tables can't be Lakehouse Sync sources, so LakeTS maintains an unpartitioned `_shadow_stock_ticks` via an INSERT trigger. Lakehouse Sync continuously replicates shadow → Delta as **SCD Type 2** — every change is appended (DELETEs become `_pg_change_type='delete'` tombstones), preserving history forever." } + }, + { "id": 91, "type": "stat", "title": "stock_ticks total rows", + "gridPos": { "x": 0, "y": 108, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "value", "graphMode": "none", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short", "decimals": 0 }, "overrides": [] }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT count(*)::bigint AS n FROM stock_ticks" }] + }, + { "id": 92, "type": "stat", "title": "_shadow_stock_ticks rows", + "description": "Grows with INSERTs, SHRINKS when retention's shadow-prune runs (every 15 min). Delta still keeps history.", + "gridPos": { "x": 6, "y": 108, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "value", "graphMode": "none", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short", "decimals": 0 }, "overrides": [] }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT count(*)::bigint AS n FROM lakets_cdf._shadow_stock_ticks" }] + }, + { "id": 93, "type": "stat", "title": "Sync lag (shadow − Delta)", + "description": "Number of rows in shadow that Lakehouse Sync hasn't replicated yet. Static here — the cold-tier dashboard (Databricks Enterprise) shows the Delta side.", + "gridPos": { "x": 12, "y": 108, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "value", "graphMode": "none", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short", "decimals": 0, "color": { "fixedColor": "text", "mode": "fixed" } }, "overrides": [] }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT count(*)::bigint AS n FROM lakets_cdf._shadow_stock_ticks WHERE time > now() - interval '30 seconds'" }] + }, + { "id": 94, "type": "stat", "title": "Rollup speedup — raw vs _rollup_ohlcv_1min", + "description": "Scanning _rollup_ohlcv_1min to answer 'avg price by minute' is N rows of rollup vs N×~600 raw ticks.", + "gridPos": { "x": 18, "y": 108, "w": 6, "h": 4 }, + "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "value", "graphMode": "none", "textMode": "auto" }, + "fieldConfig": { "defaults": { "unit": "short", "decimals": 0, "color": { "fixedColor": "purple", "mode": "fixed" } }, "overrides": [] }, + "targets": [{ "refId": "A", "datasource": { "type": "grafana-postgresql-datasource", "uid": "lakebase" }, "format": "table", + "rawSql": "SELECT (SELECT count(*) FROM stock_ticks WHERE time > now() - interval '30 minutes') / NULLIF((SELECT count(*) FROM public._rollup_ohlcv_1min WHERE bucket > now() - interval '30 minutes'), 0) AS speedup" }] + } + ] +} diff --git a/demo/live/grafana/docker-compose.yml b/demo/live/grafana/docker-compose.yml new file mode 100644 index 0000000..de453c8 --- /dev/null +++ b/demo/live/grafana/docker-compose.yml @@ -0,0 +1,38 @@ +services: + grafana: + image: grafana/grafana-oss:11.3.0 + container_name: lakets-grafana + ports: + - "${GRAFANA_PORT:-3030}:3000" + environment: + GF_SECURITY_ADMIN_USER: ${GF_ADMIN_USER:-admin} + GF_SECURITY_ADMIN_PASSWORD: ${GF_ADMIN_PASSWORD:-admin} + GF_AUTH_ANONYMOUS_ENABLED: "true" + GF_AUTH_ANONYMOUS_ORG_ROLE: Viewer + # mullerpeter-databricks-datasource was removed from the official Grafana + # plugin catalog but the project is still actively maintained on GitHub. + # Install from release zip + allow loading as unsigned (community plugin). + # Repo: https://github.com/mullerpeter/databricks-grafana + GF_INSTALL_PLUGINS: "https://github.com/mullerpeter/databricks-grafana/releases/download/v1.3.8/mullerpeter-databricks-datasource.zip;mullerpeter-databricks-datasource" + GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS: "mullerpeter-databricks-datasource" + # Lakebase (hot tier) — Postgres datasource + LAKEBASE_HOST: ${LAKEBASE_HOST:?LAKEBASE_HOST must be set} + LAKEBASE_PORT: ${LAKEBASE_PORT:-5432} + LAKEBASE_DATABASE: ${LAKEBASE_DATABASE:-databricks_postgres} + LAKEBASE_USER: ${LAKEBASE_USER:?LAKEBASE_USER must be set} + LAKEBASE_PASSWORD: ${LAKEBASE_PASSWORD:?LAKEBASE_PASSWORD must be set} + # Databricks SQL (cold tier / UC Delta) — set via setup_grafana_env.sh + DATABRICKS_HOST: ${DATABRICKS_HOST:-} + DATABRICKS_HTTP_PATH: ${DATABRICKS_HTTP_PATH:-} + DATABRICKS_TOKEN: ${DATABRICKS_TOKEN:-} + DATABRICKS_CATALOG: ${DATABRICKS_CATALOG:-} + DATABRICKS_SCHEMA: ${DATABRICKS_SCHEMA:-} + DELTA_TABLE: ${DELTA_TABLE:-stock_ticks_history} + volumes: + - ./provisioning:/etc/grafana/provisioning:ro + - ./dashboards:/var/lib/grafana/dashboards:ro + - grafana-data:/var/lib/grafana + restart: unless-stopped + +volumes: + grafana-data: {} diff --git a/demo/live/grafana/provisioning/dashboards/dashboards.yml b/demo/live/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 0000000..32b3da7 --- /dev/null +++ b/demo/live/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,13 @@ +apiVersion: 1 + +providers: + - name: lakets-live + orgId: 1 + folder: LakeTS + type: file + disableDeletion: false + editable: true + updateIntervalSeconds: 10 + allowUiUpdates: true + options: + path: /var/lib/grafana/dashboards diff --git a/demo/live/grafana/provisioning/datasources/databricks.yml b/demo/live/grafana/provisioning/datasources/databricks.yml new file mode 100644 index 0000000..b518e15 --- /dev/null +++ b/demo/live/grafana/provisioning/datasources/databricks.yml @@ -0,0 +1,36 @@ +apiVersion: 1 + +# Databricks SQL (cold tier — UC Delta SCD Type 2 via Lakehouse Sync) +# +# Uses the community plugin mullerpeter-databricks-datasource (delisted from +# the Grafana catalog but still maintained at github.com/mullerpeter/databricks-grafana). +# Installed unsigned via GF_INSTALL_PLUGINS in docker-compose.yml. +# +# Auth: Personal Access Token (`dsn` mode). The token is SP-owned (see README +# "Identity model for the cold-tier datasource") so Databricks audit logs +# attribute queries to the SP, not to the operator's user account. +datasources: + - name: Databricks + type: mullerpeter-databricks-datasource + uid: databricks + access: proxy + isDefault: false + editable: true + jsonData: + hostname: ${DATABRICKS_HOST} + path: ${DATABRICKS_HTTP_PATH} + port: "443" + authenticationMethod: dsn + timeInterval: 30s + maxOpenConns: "5" + maxIdleConns: "2" + connMaxLifetime: "3600" + retries: "3" + retryBackoff: "1" + maxRetryDuration: "60" + timeout: "60" + maxRows: "10000" + defaultQueryFormat: table + defaultEditorMode: code + secureJsonData: + token: ${DATABRICKS_TOKEN} diff --git a/demo/live/grafana/provisioning/datasources/lakebase.yml b/demo/live/grafana/provisioning/datasources/lakebase.yml new file mode 100644 index 0000000..a7167f2 --- /dev/null +++ b/demo/live/grafana/provisioning/datasources/lakebase.yml @@ -0,0 +1,21 @@ +apiVersion: 1 + +datasources: + - name: Lakebase + type: postgres + access: proxy + uid: lakebase + url: ${LAKEBASE_HOST}:${LAKEBASE_PORT} + database: ${LAKEBASE_DATABASE} + user: ${LAKEBASE_USER} + secureJsonData: + password: ${LAKEBASE_PASSWORD} + jsonData: + sslmode: require + postgresVersion: 1600 + timescaledb: false + maxOpenConns: 5 + maxIdleConns: 2 + connMaxLifetime: 3300 + editable: true + isDefault: true diff --git a/demo/live/notebooks/stream_ticks.py b/demo/live/notebooks/stream_ticks.py new file mode 100644 index 0000000..9190166 --- /dev/null +++ b/demo/live/notebooks/stream_ticks.py @@ -0,0 +1,180 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # LakeTS Live Demo — Streaming Tick Generator +# MAGIC +# MAGIC Continuously writes synthetic ticks to `stock_ticks` on a **Lakebase +# MAGIC Autoscaling** project. Fires every downstream feature: rollup invalidation +# MAGIC triggers, LVC upserts, the CDF shadow, and Unity Catalog sync. +# MAGIC +# MAGIC **Runs on serverless compute.** Authenticates with machine-to-machine OAuth +# MAGIC (the job's service principal / your identity) — no static password. Change +# MAGIC the widgets and re-run to scale the ingest rate mid-demo. + +# COMMAND ---------- + +# MAGIC %pip install "psycopg[binary]>=3.1,<4.0" "databricks-sdk>=0.81.0,<1.0.0" +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +dbutils.widgets.text("lakebase_project", "", "Lakebase Autoscaling project name") +dbutils.widgets.text("pg_database", "databricks_postgres", "Database") +dbutils.widgets.dropdown("symbols_count", "10", ["10", "100", "1000"], "Symbols") +dbutils.widgets.dropdown("rows_per_sec", "10", ["1", "10", "100", "1000"], "Rows/sec") +dbutils.widgets.dropdown("burst_mode", "off", ["off", "on"], "Burst mode") +dbutils.widgets.text("duration_minutes", "0", "Duration (min, 0=forever)") + +PROJECT = dbutils.widgets.get("lakebase_project") +PG_DATABASE = dbutils.widgets.get("pg_database") +SYMBOLS_COUNT = int(dbutils.widgets.get("symbols_count")) +ROWS_PER_SEC = int(dbutils.widgets.get("rows_per_sec")) +BURST_MODE = dbutils.widgets.get("burst_mode") == "on" +DURATION_MINUTES = int(dbutils.widgets.get("duration_minutes")) + +if not PROJECT: + raise RuntimeError("lakebase_project widget is required (the Autoscaling project name)") +print(f"project={PROJECT} db={PG_DATABASE} symbols={SYMBOLS_COUNT} " + f"rows/s={ROWS_PER_SEC} burst={BURST_MODE} duration_min={DURATION_MINUTES}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Lakebase Autoscaling connection (M2M OAuth) +# MAGIC Mirrors `databricks/workflows/lakebase_utils.py`: resolve the project's +# MAGIC primary read-write endpoint, read its host, and mint a short-lived OAuth +# MAGIC credential used as the Postgres password. A single long-lived connection +# MAGIC outlives the ~1h token (expiry is enforced only at login); if the job +# MAGIC restarts it reconnects with a fresh token. + +# COMMAND ---------- + +import math +import os +import random +import time +from datetime import datetime, timezone + +import psycopg +from databricks.sdk import WorkspaceClient + +_w = WorkspaceClient() + + +def _resolve_endpoint(project_name): + explicit = os.environ.get("LAKETS_LAKEBASE_ENDPOINT") + if explicit: + return explicit + project = project_name if project_name.startswith("projects/") else f"projects/{project_name}" + branch = f"{project}/branches/production" + try: + default = next((b for b in _w.postgres.list_branches(parent=project) + if getattr(getattr(b, "status", None), "default", False)), None) + if default and getattr(default, "name", None): + branch = default.name + except Exception: + pass + endpoints = list(_w.postgres.list_endpoints(parent=branch)) + if not endpoints: + raise RuntimeError(f"No Lakebase endpoints found under {branch}") + def _rw(e): + et = getattr(e, "endpoint_type", None) or getattr(getattr(e, "spec", None), "endpoint_type", None) + return "READ_WRITE" in str(et) + return next((e for e in endpoints if _rw(e)), endpoints[0]).name + + +_ENDPOINT = _resolve_endpoint(PROJECT) +_HOST = _w.postgres.get_endpoint(name=_ENDPOINT).status.hosts.host +_PG_ROLE = os.environ.get("LAKETS_PG_ROLE") or _w.current_user.me().user_name +print(f"endpoint={_ENDPOINT}\nhost={_HOST}\nrole={_PG_ROLE}") + + +def connect(): + cred = _w.postgres.generate_database_credential(endpoint=_ENDPOINT) + conn = psycopg.connect( + host=_HOST, + port=5432, + dbname=PG_DATABASE, + user=_PG_ROLE, + password=cred.token, + sslmode="require", + connect_timeout=30, + options="-c statement_timeout=30000", + autocommit=True, + ) + return conn + + +def load_symbols(conn, limit): + with conn.cursor() as cur: + cur.execute( + "SELECT symbol, base_price, volatility " + "FROM stock_assets ORDER BY symbol LIMIT %s", + (limit,), + ) + rows = cur.fetchall() + if not rows: + raise RuntimeError("stock_assets is empty — run sql/setup.sql first") + return [{"symbol": r[0], "base_price": float(r[1]), "volatility": float(r[2])} for r in rows] + + +def synth_tick(sym, t_epoch): + phase = (t_epoch % 3600) / 3600.0 * 2 * math.pi + noise = (random.random() - 0.5) * 0.005 + price = sym["base_price"] * (1.0 + sym["volatility"] * math.sin(phase) + noise) + volume = random.uniform(1000.0, 6000.0) + return (datetime.fromtimestamp(t_epoch, tz=timezone.utc), sym["symbol"], + round(price, 4), round(volume, 2)) + + +def batch_insert(conn, rows): + # psycopg3 executemany is pipelined and fast enough for demo rates. + with conn.cursor() as cur: + cur.executemany( + "INSERT INTO stock_ticks (time, symbol, price, volume) VALUES (%s, %s, %s, %s)", + rows, + ) + +# COMMAND ---------- + +conn = connect() +symbols = load_symbols(conn, SYMBOLS_COUNT) +print(f"Loaded {len(symbols)} symbols: " + f"{[s['symbol'] for s in symbols[:10]]}{'...' if len(symbols) > 10 else ''}") + +# COMMAND ---------- + +started = time.time() +end_at = started + DURATION_MINUTES * 60 if DURATION_MINUTES > 0 else None +total = 0 +last_burst = started + +try: + while True: + if end_at and time.time() >= end_at: + print(f"Duration reached ({DURATION_MINUTES} min). Stopping.") + break + + t_epoch = int(time.time()) + rows = [synth_tick(random.choice(symbols), t_epoch) for _ in range(ROWS_PER_SEC)] + batch_insert(conn, rows) + total += len(rows) + + if BURST_MODE and (time.time() - last_burst) >= 60: + burst_rows = [synth_tick(random.choice(symbols), t_epoch) for _ in range(10_000)] + batch_insert(conn, burst_rows) + total += len(burst_rows) + last_burst = time.time() + print(f" [burst] +10k rows total={total:,}") + + if total % max(ROWS_PER_SEC * 30, 100) < ROWS_PER_SEC: + elapsed = time.time() - started + rate = total / elapsed if elapsed else 0 + print(f" elapsed={elapsed:6.0f}s total={total:>10,} rate={rate:6.1f}/s") + + time.sleep(1.0) +finally: + try: + conn.close() + except Exception: + pass + print(f"Ingest stopped. Total rows written: {total:,}") diff --git a/demo/live/sql/setup.sql b/demo/live/sql/setup.sql new file mode 100644 index 0000000..3304597 --- /dev/null +++ b/demo/live/sql/setup.sql @@ -0,0 +1,251 @@ +-- ============================================================================= +-- LakeTS Live Demo — One-shot setup +-- Idempotent: safe to re-run. Each block drops/recreates its own state. +-- Run AFTER installing the lakets schema (dist/lakets.sql or sql/99_install.sql). +-- +-- Targets a Lakebase Autoscaling project. Cold-tier replication uses Lakebase +-- CDF (lakets.enable_sync), so CDF (wal2delta) must already be enabled on the +-- lakets_cdf schema — a one-time Databricks setup. See the live-demo guide. +-- ============================================================================= + +\set ON_ERROR_STOP on +SET client_min_messages TO NOTICE; + +-- --------------------------------------------------------------------------- +-- 0. Sanity: lakets schema must exist +-- --------------------------------------------------------------------------- +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = 'lakets') THEN + RAISE EXCEPTION 'lakets schema not installed. Run dist/lakets.sql first.'; + END IF; + RAISE NOTICE '[setup] lakets schema present'; +END $$; + +-- --------------------------------------------------------------------------- +-- 1. Reference data: stock_assets +-- --------------------------------------------------------------------------- +DROP TABLE IF EXISTS stock_assets CASCADE; +CREATE TABLE stock_assets ( + symbol TEXT PRIMARY KEY, + name TEXT NOT NULL, + sector TEXT NOT NULL, + exchange TEXT NOT NULL, + base_price DOUBLE PRECISION NOT NULL, + volatility DOUBLE PRECISION NOT NULL +); + +INSERT INTO stock_assets (symbol, name, sector, exchange, base_price, volatility) VALUES + ('AAPL', 'Apple', 'Tech', 'NASDAQ', 185.00, 0.020), + ('MSFT', 'Microsoft', 'Tech', 'NASDAQ', 420.00, 0.018), + ('NVDA', 'NVIDIA', 'Tech', 'NASDAQ', 880.00, 0.030), + ('GOOGL', 'Alphabet', 'Tech', 'NASDAQ', 175.00, 0.022), + ('AMZN', 'Amazon', 'Consumer','NASDAQ', 185.00, 0.025), + ('TSLA', 'Tesla', 'Auto', 'NASDAQ', 200.00, 0.040), + ('JPM', 'JPMorgan', 'Finance', 'NYSE', 210.00, 0.015), + ('V', 'Visa', 'Finance', 'NYSE', 280.00, 0.014), + ('BTC-USD', 'Bitcoin', 'Crypto', 'CRYPTO', 67000.00, 0.040), + ('ETH-USD', 'Ethereum', 'Crypto', 'CRYPTO', 3500.00, 0.045); + +-- --------------------------------------------------------------------------- +-- 2. Idempotent reset — tear down prior demo state in dependency order: +-- sync + LVC first (they own triggers/shadows), then rollups, then tables. +-- --------------------------------------------------------------------------- +DO $$ BEGIN + IF EXISTS (SELECT 1 FROM lakets._chronotable_registry WHERE table_name = 'stock_ticks') THEN + PERFORM lakets.disable_sync('stock_ticks'); -- drops lakets_cdf shadow + trigger + PERFORM lakets.disable_lvc('stock_ticks'); -- drops public._lvc_stock_ticks + trigger + RAISE NOTICE '[setup] disabled prior sync + LVC on stock_ticks'; + END IF; +END $$; + +DO $$ DECLARE r RECORD; BEGIN + FOR r IN SELECT name FROM lakets._rollup_registry WHERE name LIKE 'ohlcv_%' LOOP + PERFORM lakets.drop_rollup(r.name); + RAISE NOTICE '[setup] dropped prior rollup: %', r.name; + END LOOP; +END $$; + +-- Belt-and-suspenders for any orphaned rollup tables/views from older runs. +DROP VIEW IF EXISTS public._rollup_rt_ohlcv_1min CASCADE; +DROP VIEW IF EXISTS public._rollup_rt_ohlcv_1hour CASCADE; +DROP VIEW IF EXISTS public._rollup_rt_ohlcv_1day CASCADE; +DROP TABLE IF EXISTS public._rollup_ohlcv_1min CASCADE; +DROP TABLE IF EXISTS public._rollup_ohlcv_1hour CASCADE; +DROP TABLE IF EXISTS public._rollup_ohlcv_1day CASCADE; + +-- --------------------------------------------------------------------------- +-- 3. stock_ticks → ChronoTable (1-hour chunks for demo visibility) +-- --------------------------------------------------------------------------- +DROP TABLE IF EXISTS stock_ticks CASCADE; +DELETE FROM lakets._chronotable_registry WHERE table_name = 'stock_ticks'; + +CREATE TABLE stock_ticks ( + time TIMESTAMPTZ NOT NULL, + symbol TEXT NOT NULL, + price DOUBLE PRECISION NOT NULL, + volume DOUBLE PRECISION NOT NULL +); + +-- 1-hour chunks so partition creation is visible within a 30-min demo. +SELECT lakets.create_chronotable('stock_ticks', 'time', '1 hour'); + +-- Pre-create a window of partitions so streaming ingest always has a home. +DO $$ DECLARE v_id INT; BEGIN + SELECT id INTO v_id FROM lakets._chronotable_registry WHERE table_name='stock_ticks'; + PERFORM lakets._ensure_partitions(p_chronotable_id := v_id, + p_past_count := 2, p_future_count := 6); +END $$; + +-- --------------------------------------------------------------------------- +-- 4. RollUp DAG: 1min → 1hour → 1day (RollUps are always incremental now) +-- --------------------------------------------------------------------------- +-- Level 1: 1-minute OHLCV from raw ticks +SELECT lakets.create_rollup( + p_name => 'ohlcv_1min', + p_bucket_interval => '1 minute', + p_source_table => 'stock_ticks', + p_query => $q$ + SELECT + lakets.time_bucket('1 minute'::interval, time) AS bucket, + symbol, + lakets.first(price, time) AS open, + max(price) AS high, + min(price) AS low, + lakets.last(price, time) AS close, + sum(volume) AS volume, + count(*) AS tick_count + FROM stock_ticks + GROUP BY bucket, symbol + $q$ +); + +-- Level 2: 1-hour OHLCV from the 1-minute rollup +SELECT lakets.create_rollup( + p_name => 'ohlcv_1hour', + p_bucket_interval => '1 hour', + p_source_table => 'stock_ticks', + p_depends_on => ARRAY['ohlcv_1min'], + p_query => $q$ + SELECT + lakets.time_bucket('1 hour'::interval, bucket) AS bucket, + symbol, + lakets.first(open, bucket) AS open, + max(high) AS high, + min(low) AS low, + lakets.last(close, bucket) AS close, + sum(volume) AS volume + FROM public._rollup_ohlcv_1min + GROUP BY lakets.time_bucket('1 hour'::interval, bucket), symbol + $q$ +); + +-- Level 3: 1-day OHLCV from the 1-hour rollup +SELECT lakets.create_rollup( + p_name => 'ohlcv_1day', + p_bucket_interval => '1 day', + p_source_table => 'stock_ticks', + p_depends_on => ARRAY['ohlcv_1hour'], + p_query => $q$ + SELECT + lakets.time_bucket('1 day'::interval, bucket) AS bucket, + symbol, + lakets.first(open, bucket) AS open, + max(high) AS high, + min(low) AS low, + lakets.last(close, bucket) AS close, + sum(volume) AS volume + FROM public._rollup_ohlcv_1hour + GROUP BY lakets.time_bucket('1 day'::interval, bucket), symbol + $q$ +); + +-- Install invalidation triggers so incoming writes populate the dirty log. +SELECT lakets.enable_rollup_invalidation('ohlcv_1min'); + +-- Demo cadence: default refresh_lag is 1 hour (production-sensible). The demo +-- refreshes every minute, so drop the lag to 0s and let every cascade run. +UPDATE lakets._rollup_registry +SET refresh_lag = '0 seconds' +WHERE name IN ('ohlcv_1min', 'ohlcv_1hour', 'ohlcv_1day'); + +-- --------------------------------------------------------------------------- +-- 5. Last Value Cache — sub-10ms latest price per symbol +-- --------------------------------------------------------------------------- +SELECT lakets.enable_lvc( + p_table_name => 'stock_ticks', + p_key_columns => ARRAY['symbol'], + p_value_columns => ARRAY['price', 'volume'] +); + +-- --------------------------------------------------------------------------- +-- 6. Tiered retention policy +-- tier_after : age at which CDF is expected to have flushed the chunk to UC +-- drop_after : age at which the hot Lakebase partition is dropped (tiering job) +-- --------------------------------------------------------------------------- +DO $$ DECLARE v_id INT; BEGIN + SELECT id INTO v_id FROM lakets._chronotable_registry WHERE table_name='stock_ticks'; + DELETE FROM lakets._policy_registry + WHERE chronotable_id = v_id AND policy_type IN ('retention','tiered_retention'); +END $$; + +SELECT lakets.add_tiered_retention_policy( + p_table_name => 'stock_ticks', + p_tier_after => '10 minutes', + p_drop_after => '60 minutes' +); + +-- --------------------------------------------------------------------------- +-- 7. Unity Catalog sync via Lakebase CDF (Path A) +-- enable_sync() creates the unpartitioned shadow in lakets_cdf and a +-- true-mirror trigger. CDF (wal2delta) on the lakets_cdf schema replicates +-- it to the Unity Catalog Managed Table — no per-table UI step. The tiering +-- job only drops a hot partition once CDF has flushed it (durability gate). +-- --------------------------------------------------------------------------- +SELECT lakets.enable_sync('stock_ticks'); + +-- --------------------------------------------------------------------------- +-- 8. Pre-demo state reset — invalidation log + LVC start empty so the +-- audience watches them fill from zero. +-- --------------------------------------------------------------------------- +TRUNCATE lakets._rollup_invalidation_log; + +DO $$ DECLARE t RECORD; BEGIN + FOR t IN + SELECT schemaname, tablename FROM pg_tables + WHERE schemaname='public' AND tablename LIKE '\_lvc\_%' ESCAPE '\' + LOOP + EXECUTE format('TRUNCATE %I.%I', t.schemaname, t.tablename); + RAISE NOTICE '[setup] truncated LVC cache %.%', t.schemaname, t.tablename; + END LOOP; +END $$; + +-- --------------------------------------------------------------------------- +-- 9. Summary +-- --------------------------------------------------------------------------- +\echo '--- ChronoTable registry ---' +SELECT id, schema_name, table_name, time_column, chunk_interval +FROM lakets._chronotable_registry; + +\echo '--- Partitions for stock_ticks ---' +SELECT chunk_name, range_start, range_end, status +FROM lakets.show_chunks('stock_ticks') +ORDER BY range_start +LIMIT 12; + +\echo '--- RollUp DAG (refresh order) ---' +SELECT rollup_name, depends_on_names, refresh_order, bucket_interval +FROM lakets.show_rollup_dag() +ORDER BY refresh_order; + +\echo '--- CDF shadow + LVC + invalidation ---' +SELECT 'cdf_shadow' AS kind, COUNT(*) AS rows FROM lakets_cdf._shadow_stock_ticks +UNION ALL +SELECT 'lvc', COUNT(*) FROM public._lvc_stock_ticks +UNION ALL +SELECT 'invalidation', COUNT(*) FROM lakets._rollup_invalidation_log; + +\echo '--- Retention policy ---' +SELECT policy_type, tier_after, drop_after, enabled +FROM lakets.show_retention_policy('stock_ticks'); + +\echo '[setup] DONE. Start the stream_ticks job to begin ingesting data.' diff --git a/tests/test_python_patterns.py b/tests/test_python_patterns.py index f86416a..0e4a441 100644 --- a/tests/test_python_patterns.py +++ b/tests/test_python_patterns.py @@ -122,6 +122,13 @@ def test_tracks_failures(self): assert "failures.append" in source, \ "rollup_refresh.py should append to failures list" + def test_refreshes_in_dag_order(self): + """T16: rollup_refresh uses refresh_rollup_cascade (DAG order), not an + alphabetical refresh_rollup() loop that ignores dependencies.""" + source = _read_source(self.SOURCE_PATH) + assert "refresh_rollup_cascade" in source, \ + "rollup_refresh.py should refresh via refresh_rollup_cascade() for DAG-correct order" + # ───────────────────────────────────────────────────────────────────────────── # Schema-drift guard diff --git a/website/docs/guides/live-demo.md b/website/docs/guides/live-demo.md new file mode 100644 index 0000000..d328e3c --- /dev/null +++ b/website/docs/guides/live-demo.md @@ -0,0 +1,188 @@ +--- +title: Live Demo +sidebar_label: Live Demo +sidebar_position: 1 +description: Stand up the LakeTS "living" streaming demo on a Lakebase Autoscaling project — streaming ingest, DAG RollUps, CDF-gated tiering, and Grafana, step by step. +--- + +# LakeTS Live Demo + +A **living** end-to-end demo. Synthetic stock ticks stream into a Lakebase +Autoscaling project while Databricks serverless jobs drive partitioning, +DAG-ordered RollUp refresh, CDF-gated tiering, and retention. Lakebase CDF +continuously replicates the data to Unity Catalog. You watch the whole pipeline +move in real time. + +Source lives in [`demo/live/`](https://github.com/databricks-solutions/lakets/tree/main/demo/live). + +## What you'll watch happen + +| Signal | Driven by | Cadence | +|---|---|---| +| Ticks written / minute | `stream_ticks` continuous job | continuous | +| Active partitions climb | `partition_manager` (5 min) + the RANGE dispatcher | ~every 5 min | +| Invalidation log fill → drain | write triggers fill it; `rollup_refresh` (1 min) drains it | drops to 0 each minute | +| RollUp watermarks advance | `rollup_refresh` via `refresh_rollup_cascade()` (DAG order) | every minute | +| Latest price per symbol | the LVC trigger fires inside Postgres on every insert | real time, no job | +| Rows replicated to Unity Catalog | Lakebase CDF on the `lakets_cdf` shadow | continuous | +| Active partitions drop | `tiering` (5 min) evicts chunks **only after CDF has flushed them** | once data ages past `drop_after` | + +## Architecture in one breath + +- **Postgres / Lakebase** runs everything synchronous: invalidation + LVC + triggers, partition routing, the time-series functions, and the + `refresh_rollup_cascade()` / `tier_chunk()` / `execute_retention()` SQL. +- **Databricks serverless jobs** exist only to *wake up and call* those SQL + functions on a schedule (Lakebase's allow-list excludes `pg_cron`). The demo + **reuses the exact maintenance jobs shipped in `databricks/workflows/`** — it + only adds `stream_ticks`. +- **Lakebase CDF** replicates the unpartitioned shadow in `lakets_cdf` to a Unity + Catalog Managed Table. No pipeline code, no schedule. + +## Prerequisites + +1. A **Lakebase Autoscaling project** in your workspace (e.g. `lakets-tiering-test`). + Note its name — every step takes the project name, not a host. +2. **Lakebase CDF enabled on the `lakets_cdf` schema** of that project. This is a + one-time Databricks setup and a hard prerequisite — without it the shadow + won't replicate and tiering will never evict a partition. See + [Lakebase CDF setup](./lakebase-cdf-setup.md). +3. **LakeTS installed** on the project (`dist/lakets.sql`). +4. **Databricks CLI** (`>= 1.0`) with a profile for the workspace: + `databricks auth login --host https:// -p `. +5. **`psql`** on PATH (to run `setup.sql`). +6. **Podman or Docker** + compose — only for the optional Grafana dashboards. + +:::note Authentication +Everything uses **machine-to-machine OAuth** against the Autoscaling project — no +static passwords. The jobs resolve the project's primary read-write endpoint and +mint a short-lived credential per connection (see +[`lakebase_utils.py`](https://github.com/databricks-solutions/lakets/blob/main/databricks/workflows/lakebase_utils.py)). +For `psql` you mint a token yourself (below). Grafana is the one exception — see +[its section](#optional-grafana-dashboards). +::: + +## Step 1 — Install LakeTS (if not already) + +Mint a short-lived credential and install the schema. Get the endpoint host and a +token: + +```bash +PROJECT=lakets-tiering-test +PROFILE= + +# Primary read-write endpoint of the production branch +EP=$(databricks postgres list-endpoints \ + projects/$PROJECT/branches/production -p $PROFILE -o json \ + | jq -r '.[] | select(.endpoint_type=="ENDPOINT_TYPE_READ_WRITE") | .name') +HOST=$(databricks postgres get-endpoint "$EP" -p $PROFILE -o json | jq -r '.status.hosts.host') +USER=$(databricks current-user me -p $PROFILE -o json | jq -r '.userName') +export PGPASSWORD=$(databricks postgres generate-database-credential \ + --json "{\"endpoint\":\"$EP\"}" -p $PROFILE -o json | jq -r '.token') + +PG_URL="host=$HOST port=5432 dbname=databricks_postgres user=$USER sslmode=require" + +psql "$PG_URL" -v ON_ERROR_STOP=1 -f dist/lakets.sql # from the repo root +``` + +## Step 2 — Run the demo setup + +`setup.sql` is idempotent. It creates `stock_assets`, the `stock_ticks` +ChronoTable (1-hour chunks), the 1min→1hour→1day RollUp DAG, the Last Value Cache, +a tiered-retention policy, and `enable_sync()` (the CDF shadow). It also resets the +invalidation log and LVC so the audience watches them fill from zero. + +```bash +psql "$PG_URL" -v ON_ERROR_STOP=1 -f demo/live/sql/setup.sql +``` + +The summary at the end prints the ChronoTable registry, the pre-created +partitions, the RollUp DAG in refresh order, and the retention policy. + +## Step 3 — Deploy the jobs + +```bash +cd demo/live/bundle +databricks bundle deploy -t dev \ + --var="lakebase_project=$PROJECT" -p $PROFILE +``` + +This deploys five serverless jobs: `stream_ticks` (continuous), `partition_manager` +(5 min), `rollup_refresh` (1 min), `tiering` (5 min), and `retention` (15 min). In +`dev` mode they are prefixed `[dev ]` and run as you. + +For a shared/prod deployment, use the `prod` target and a service principal that +owns a Lakebase Postgres role: + +```bash +databricks bundle deploy -t prod \ + --var="lakebase_project=$PROJECT" \ + --var="service_principal_name=" -p $PROFILE +``` + +The `stream_ticks` job is **continuous** and starts immediately. + +## Step 4 — Watch it move + +Open the jobs in the workspace. Within a minute or two you'll see ticks flowing, +the invalidation log draining each minute, RollUp watermarks advancing, and +partitions accruing. Query the hot tier directly: + +```sql +SELECT * FROM lakets.show_chunks('stock_ticks') ORDER BY range_start; +SELECT * FROM public._lvc_stock_ticks ORDER BY symbol; -- latest price/symbol +SELECT * FROM public._rollup_ohlcv_1min ORDER BY bucket DESC LIMIT 20; +SELECT count(*) FROM lakets_cdf._shadow_stock_ticks; -- replicating to UC +``` + +Tiering evicts a hot partition only once its data has aged past `drop_after` +(60 min in the demo) **and** CDF confirms it's flushed to Unity Catalog — so plan +to run the demo for over an hour to see partitions actually drop, or lower +`drop_after` in `setup.sql`. + +### Mid-demo knobs + +Re-deploy with different variables, or edit the `stream_ticks` job widgets in the +UI and re-run: + +| Variable / widget | Effect | +|---|---| +| `symbols_count` | 10 / 100 / 1000 — widens LVC + RollUp cardinality | +| `rows_per_sec` | 1 / 10 / 100 / 1000 — ingest rate | +| `burst_mode` | `on` pushes 10k extra rows every 60s → watch the invalidation log spike | + +## Optional: Grafana dashboards + +`demo/live/grafana/` ships a local Grafana stack with two datasources: the hot +Lakebase tier (Postgres) and the cold Unity Catalog tier (Databricks SQL). See +[`demo/live/grafana/README.md`](https://github.com/databricks-solutions/lakets/tree/main/demo/live/grafana) for the full wiring. + +:::caution Grafana auth differs +Grafana's Postgres datasource can't rotate OAuth tokens, so the **hot-tier** +datasource needs a static login. Enable native Postgres login on the project and +create a role with a password for Grafana to use; the jobs keep using OAuth. The +**cold-tier** datasource uses a Databricks SQL warehouse + a service-principal +token. +::: + +## Teardown + +```bash +# Remove the jobs +cd demo/live/bundle && databricks bundle destroy -t dev --var="lakebase_project=$PROJECT" -p $PROFILE + +# Reset Lakebase state (keeps the lakets schema itself) +psql "$PG_URL" <<'SQL' + SELECT lakets.disable_sync('stock_ticks'); + SELECT lakets.disable_lvc('stock_ticks'); + SELECT lakets.drop_rollup('ohlcv_1day'); + SELECT lakets.drop_rollup('ohlcv_1hour'); + SELECT lakets.drop_rollup('ohlcv_1min'); + DROP TABLE IF EXISTS stock_ticks CASCADE; + DROP TABLE IF EXISTS stock_assets CASCADE; + DELETE FROM lakets._chronotable_registry WHERE table_name='stock_ticks'; +SQL +``` + +The Unity Catalog Managed Table populated by CDF is retained — drop it from +Catalog Explorer if you want a full reset. diff --git a/website/sidebars.ts b/website/sidebars.ts index 31e11cb..69adb0f 100644 --- a/website/sidebars.ts +++ b/website/sidebars.ts @@ -53,6 +53,12 @@ const sidebars: SidebarsConfig = { collapsed: false, items: ["examples/sensor-reading-journey"], }, + { + type: "category", + label: "Demos", + collapsed: false, + items: ["guides/live-demo"], + }, { type: "category", label: "Reference",