diff --git a/CHANGELOG.md b/CHANGELOG.md index 65bda48..7413c29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,13 @@ Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), version ### Added +- **Per-chunk tiering durability gate.** A partition is dropped only when its CDF shadow is `STREAMING` in `wal2delta.tables` **and** `committed_lsn >= chunk.last_write_lsn` — proving Lakebase CDF has flushed every write to that chunk into the Unity Catalog Managed Table before the data leaves Lakebase. The gate compares against the chunk's own recorded write position (stamped by statement-level triggers), not the global WAL head, because a per-table `committed_lsn` does not advance while the shadow is idle. Tiering is fail-closed: it defers and retries whenever the gate is not satisfiable. +- **`show_tiering_status()`** and the `lakets_tiering_pending_chunks` / `lakets_tiering_tiered_chunks_total` / `lakets_tiering_caught_up` Prometheus metrics for tiering observability (including CDF gate state). +- `enable_sync` now warns when Lakebase CDF (`wal2delta`) is absent, since CDF is a prerequisite enabled on the `lakets_cdf` schema via Databricks, not by LakeTS. + ### Changed +- **Compression is now Tiering.** `add_compression_policy` → `add_tiering_policy` (drops the `segment_by`/`order_by` parameters); `compress_chunk`/`decompress_chunk` → `tier_chunk`/`untier_chunk` (`tier_chunk` returns `BOOLEAN` — `TRUE` dropped, `FALSE` deferred); `show_`/`remove_compression_policy` → `show_`/`remove_tiering_policy`. The `compression` policy type is now `tiering`; the `compressed` chunk status is removed (`active` → `tiered`). `_chronotable_registry.compression_enabled` is renamed `tiering_enabled`, and `_chunk_metadata` gains `last_write_lsn` (and drops the unused `compressed_at`). **Breaking — fresh release, no upgrade migration.** - RollUps are now always incremental. Removed the `refresh_mode` toggle and the `'full'` (`TRUNCATE`) refresh strategy; `create_rollup` no longer accepts a `p_refresh_mode` argument and `show_rollups()` no longer returns a `refresh_mode` column. @@ -21,6 +26,7 @@ Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), version ### Removed +- The Spark-based compression job and its `{table}_archive` Delta export path. Tiering is pure Lakebase SQL (`tiering_job.py`); CDF owns the Unity Catalog copy. - `enable_rollup_export` / `disable_rollup_export` / `show_rollup_exports`, `sql/15_uc_integration.sql` and its functions (`register_uc_table`, `tag_uc_table`, `get_uc_registrations`, `unregister_uc_table`, `_uc_registry`), and the `rollup_export.py` / `uc_registration.py` Databricks jobs. ### Fixed diff --git a/README.md b/README.md index 995f2d3..ddd3545 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ LakeTS is a time series toolkit for Databricks Lakebase — pure SQL (PL/pgSQL) | **Time Series Functions** | `time_bucket`, `first`, `last`, `locf`, `interpolate`, `delta`, `rate`, `histogram` | | **Gap-filling** | `time_bucket_gapfill` + LEFT JOIN for continuous time series | | **RollUp Engine** | Incremental aggregates with per-bucket refresh, invalidation tracking, cold-tier re-aggregation, chunk-skip pruning, batch refresh, DAG orchestration, and Delta export | -| **Compression & Tiering** | Policy-based tiering from Lakebase to Delta Lake | +| **Tiering** | Policy-based eviction of cold partitions from Lakebase once CDF has flushed them to the Unity Catalog Managed Table | | **Retention** | Automated data lifecycle management across both tiers | | **Lakehouse Sync** | CDC-based replication to Delta via shadow table pattern | | **Last Value Cache** | Sub-10ms latest-state queries via `enable_lvc()` | @@ -112,7 +112,7 @@ FROM metrics GROUP BY 1 ORDER BY 1; SELECT lakets.enable_lvc('system_metrics', ARRAY['host'], ARRAY['cpu','memory']); -- Set up lifecycle policies -SELECT lakets.add_compression_policy('metrics', '7 days'); +SELECT lakets.add_tiering_policy('metrics', '7 days'); SELECT lakets.add_retention_policy('metrics', '30 days'); ``` @@ -182,7 +182,7 @@ LakeTS ships a pre-built **Databricks AI/BI dashboard** for monitoring your inst | Page | Panels | |------|--------| -| **Partition Health** | Hypertable count, chunk counts by status (active/compressed/tiered/dropped), chunk health table per hypertable, estimated row counts | +| **Partition Health** | Hypertable count, chunk counts by status (active/tiered/dropped), chunk health table per hypertable, estimated row counts | | **RollUp Monitoring** | Stale RollUp counter, dirty bucket total, watermark lag bar chart per RollUp (colored by refresh mode), invalidation log depth, full RollUp status table | | **LVC & System** | LVC-enabled table count, total cached series, database size (GB), LVC stats table, active policies by type | diff --git a/build.sh b/build.sh index c52592c..ca0e187 100755 --- a/build.sh +++ b/build.sh @@ -31,7 +31,7 @@ MODULES=( "02_chronotable.sql" "03_timeseries_functions.sql" "04_rollup.sql" - "05_compression.sql" + "05_tiering.sql" "06_retention.sql" "07_monitoring.sql" "08_metric_table.sql" diff --git a/databricks/bundles/databricks.yml b/databricks/bundles/databricks.yml index 6b3e15b..66b1d4f 100644 --- a/databricks/bundles/databricks.yml +++ b/databricks/bundles/databricks.yml @@ -28,20 +28,19 @@ resources: - ${var.lakebase_instance} existing_cluster_id: ${var.cluster_id} - lakets_compression: - name: "LakeTS - Compression & Tiering" - description: "Tiers old chunks from Lakebase to Delta Lake" + lakets_tiering: + name: "LakeTS - Tiering" + description: "Drops cold ChronoTable partitions once CDF has flushed them to Unity Catalog" schedule: quartz_cron_expression: "0 0 2 * * ?" # Daily at 2 AM UTC timezone_id: "UTC" tasks: - - task_key: compress_and_tier + - task_key: tier_cold_partitions python_wheel_task: package_name: lakets - entry_point: compression_job + entry_point: tiering_job parameters: - ${var.lakebase_instance} - - ${var.delta_catalog} existing_cluster_id: ${var.cluster_id} lakets_retention: diff --git a/databricks/workflows/compression_job.py b/databricks/workflows/compression_job.py deleted file mode 100644 index b402f86..0000000 --- a/databricks/workflows/compression_job.py +++ /dev/null @@ -1,102 +0,0 @@ -""" -LakeTS Compression & Tiering Job -Databricks Workflow job that tiers old Lakebase chunks to Delta Lake. - -For each hypertable with a compression policy: -1. Find chunks older than compress_after -2. Verify data is in Delta (via Lakehouse Sync CDC) -3. Mark chunk as compressed/tiered in metadata -4. Optionally drop the Lakebase partition - -Schedule: Daily or per compression policy interval. - -Usage as Databricks Job: - Pass instance_name and catalog as parameters. -""" -import logging - -from psycopg2 import sql -from pyspark.sql import SparkSession - -from lakebase_utils import fetch_all, lakebase_cursor - -logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") -logger = logging.getLogger("lakets.compression_job") - - -def run(instance_name: str, catalog: str, schema: str = "default", drop_partitions: bool = False): - """Execute compression/tiering for all hypertables with policies.""" - spark = SparkSession.builder.getOrCreate() - - with lakebase_cursor(instance_name) as cur: - # Find all hypertables with compression policies - hypertables = fetch_all(cur, """ - SELECT hr.id, hr.schema_name, hr.table_name, hr.shadow_table_name, - pr.config->>'compress_after' as compress_after, - pr.config->>'segment_by' as segment_by, - pr.config->>'order_by' as order_by - FROM lakets._chronotable_registry hr - JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id - WHERE pr.policy_type = 'compression' AND pr.enabled = TRUE - """) - logger.info("Found %d hypertable(s) with compression policies", len(hypertables)) - - total_compressed = 0 - for ht in hypertables: - # Get eligible chunks - chunks = fetch_all(cur, """ - SELECT * FROM lakets._get_chunks_to_compress(%s, %s) - """, (ht["table_name"], ht["schema_name"])) - - if not chunks: - logger.info("No chunks to compress for %s.%s", ht["schema_name"], ht["table_name"]) - continue - - logger.info( - "Processing %d chunk(s) for %s.%s", - len(chunks), ht["schema_name"], ht["table_name"], - ) - - for chunk in chunks: - delta_table = f"{catalog}.{schema}.{ht['table_name']}_archive" - - chunk_parts = chunk["chunk_name"].split(".") - logger.info("Tiering chunk %s -> %s", chunk["chunk_name"], delta_table) - - # Mark as compressed in metadata - cur.execute("SELECT lakets.compress_chunk(%s)", (chunk["chunk_name"],)) - - if drop_partitions: - # Drop the Lakebase partition to free storage - drop_query = sql.SQL("DROP TABLE IF EXISTS {}.{}").format( - sql.Identifier(chunk_parts[0]), - sql.Identifier(chunk_parts[1]), - ) - cur.execute(drop_query) - cur.execute(""" - UPDATE lakets._chunk_metadata - SET status = 'tiered', tiered_at = now() - WHERE chunk_name = %s - """, (chunk["chunk_name"],)) - logger.info("Dropped partition %s", chunk["chunk_name"]) - - total_compressed += 1 - - # Update policy last_run - cur.execute(""" - UPDATE lakets._policy_registry - SET last_run_at = now() - WHERE chronotable_id = %s AND policy_type = 'compression' - """, (ht["id"],)) - - logger.info("Total chunks compressed: %d", total_compressed) - return total_compressed - - -if __name__ == "__main__": - import os - import sys - - instance = sys.argv[1] if len(sys.argv) > 1 else os.environ["LAKETS_INSTANCE"] - cat = sys.argv[2] if len(sys.argv) > 2 else "main" - run(instance, cat) diff --git a/databricks/workflows/tiering_job.py b/databricks/workflows/tiering_job.py new file mode 100644 index 0000000..e05740e --- /dev/null +++ b/databricks/workflows/tiering_job.py @@ -0,0 +1,81 @@ +""" +LakeTS Tiering Job + +Drops cold ChronoTable partitions to reclaim Lakebase storage. The data is +already durable in the Unity Catalog Managed Table via Lakebase CDF; this job +only evicts partitions that lakets.tier_chunk confirms are safe to drop (the +table's CDF shadow is STREAMING and has flushed past the WAL head). The drop +and the metadata transition happen inside tier_chunk, so this job is pure +Lakebase SQL — no Spark. + +Schedule: Daily or per tiering policy interval. + +Usage as Databricks Job: + Pass instance_name as a parameter. +""" +import logging + +from lakebase_utils import fetch_all, lakebase_cursor + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger("lakets.tiering_job") + + +def run(instance_name: str) -> int: + """Evict eligible, CDF-durable chunks for all tables with a tiering policy. + + Returns the number of partitions actually dropped this run. + """ + total_tiered = 0 + deferred = 0 + with lakebase_cursor(instance_name) as cur: + tables = fetch_all(cur, """ + SELECT hr.id, hr.schema_name, hr.table_name + FROM lakets._chronotable_registry hr + JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id + WHERE pr.policy_type = 'tiering' AND pr.enabled = TRUE + """) + logger.info("Found %d table(s) with tiering policies", len(tables)) + + for t in tables: + chunks = fetch_all(cur, """ + SELECT * FROM lakets._get_chunks_to_tier(%s, %s) + """, (t["table_name"], t["schema_name"])) + + if not chunks: + logger.info( + "No eligible chunks for %s.%s (not aged out, or CDF not streaming)", + t["schema_name"], t["table_name"], + ) + continue + + logger.info( + "Evaluating %d chunk(s) for %s.%s", + len(chunks), t["schema_name"], t["table_name"], + ) + + for chunk in chunks: + cur.execute("SELECT lakets.tier_chunk(%s)", (chunk["chunk_name"],)) + dropped = cur.fetchone()[0] # raw cursor returns tuples (see fetch_all) + if dropped: + total_tiered += 1 + logger.info("Tiered (dropped) partition %s", chunk["chunk_name"]) + else: + deferred += 1 + logger.info("Deferred %s — CDF not caught up to WAL head", chunk["chunk_name"]) + + cur.execute(""" + UPDATE lakets._policy_registry SET last_run_at = now() + WHERE chronotable_id = %s AND policy_type = 'tiering' + """, (t["id"],)) + + logger.info("Tiering complete: %d dropped, %d deferred", total_tiered, deferred) + return total_tiered + + +if __name__ == "__main__": + import os + import sys + + instance = sys.argv[1] if len(sys.argv) > 1 else os.environ["LAKETS_INSTANCE"] + run(instance) diff --git a/sql/01_schema.sql b/sql/01_schema.sql index 0561cc2..3890563 100644 --- a/sql/01_schema.sql +++ b/sql/01_schema.sql @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS lakets._chronotable_registry ( chunk_interval INTERVAL NOT NULL DEFAULT '7 days', space_column TEXT, space_partitions INT DEFAULT 1, - compression_enabled BOOLEAN DEFAULT FALSE, + tiering_enabled BOOLEAN DEFAULT FALSE, retention_interval INTERVAL, shadow_table_name TEXT, sync_enabled BOOLEAN DEFAULT FALSE, @@ -48,10 +48,13 @@ CREATE TABLE IF NOT EXISTS lakets._chunk_metadata ( status TEXT NOT NULL DEFAULT 'active', row_count BIGINT, size_bytes BIGINT, - compressed_at TIMESTAMPTZ, tiered_at TIMESTAMPTZ, + -- Highest WAL position written to this chunk, stamped by the tiering + -- write-tracking trigger. The tiering durability gate drops a chunk only + -- once CDF's committed_lsn for the shadow has flushed past this mark. + last_write_lsn PG_LSN, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT valid_status CHECK (status IN ('active', 'compressed', 'tiered', 'dropped')) + CONSTRAINT valid_status CHECK (status IN ('active', 'tiered', 'dropped')) ); CREATE INDEX IF NOT EXISTS idx_chunk_metadata_hypertable @@ -61,7 +64,7 @@ CREATE INDEX IF NOT EXISTS idx_chunk_metadata_range ON lakets._chunk_metadata(range_start, range_end); -- --------------------------------------------------------------------------- --- Policy Registry: tracks compression, retention, and tiering policies +-- Policy Registry: tracks tiering, retention, and tiered-retention policies -- --------------------------------------------------------------------------- CREATE TABLE IF NOT EXISTS lakets._policy_registry ( id SERIAL PRIMARY KEY, @@ -71,7 +74,7 @@ CREATE TABLE IF NOT EXISTS lakets._policy_registry ( enabled BOOLEAN DEFAULT TRUE, last_run_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT valid_policy_type CHECK (policy_type IN ('compression', 'retention', 'tiered_retention')) + CONSTRAINT valid_policy_type CHECK (policy_type IN ('tiering', 'retention', 'tiered_retention')) ); -- --------------------------------------------------------------------------- @@ -153,7 +156,7 @@ END $$; CREATE UNIQUE INDEX IF NOT EXISTS idx_chunk_metadata_chunk_name ON lakets._chunk_metadata(chunk_name); --- FK index on _policy_registry (scanned by compression/retention jobs) +-- FK index on _policy_registry (scanned by tiering/retention jobs) CREATE INDEX IF NOT EXISTS idx_policy_registry_ct_type ON lakets._policy_registry(chronotable_id, policy_type) WHERE enabled = TRUE; diff --git a/sql/05_compression.sql b/sql/05_compression.sql deleted file mode 100644 index 6da3da0..0000000 --- a/sql/05_compression.sql +++ /dev/null @@ -1,229 +0,0 @@ --- ============================================================================= --- LakeTS Compression & Tiering Policies --- Register policies for automatic data tiering from Lakebase to Delta Lake. --- Actual tiering is executed by Databricks workflow jobs. --- ============================================================================= - --- --------------------------------------------------------------------------- --- add_compression_policy: Registers a compression/tiering policy. --- After compress_after interval, the Databricks compression job will: --- 1. Ensure data is synced to Delta via Lakehouse Sync --- 2. Optimize the Delta table (Z-ORDER / Liquid Clustering) --- 3. Drop the Lakebase partition --- 4. Update chunk metadata status to 'tiered' --- --------------------------------------------------------------------------- -CREATE OR REPLACE FUNCTION lakets.add_compression_policy( - p_table_name TEXT, - p_compress_after INTERVAL, - p_segment_by TEXT DEFAULT NULL, - p_order_by TEXT DEFAULT NULL, - p_schema_name TEXT DEFAULT 'public' -) -RETURNS INT -LANGUAGE plpgsql -AS $$ -DECLARE - v_chronotable_id INT; - v_policy_id INT; - v_config JSONB; -BEGIN - SELECT id INTO v_chronotable_id - FROM lakets._chronotable_registry - WHERE schema_name = p_schema_name AND table_name = p_table_name; - - IF NOT FOUND THEN - RAISE EXCEPTION 'Table %.% is not a registered ChronoTable', - p_schema_name, p_table_name; - END IF; - - -- Check for existing compression policy - IF EXISTS ( - SELECT 1 FROM lakets._policy_registry - WHERE chronotable_id = v_chronotable_id AND policy_type = 'compression' - ) THEN - RAISE EXCEPTION 'Compression policy already exists for %.%', - p_schema_name, p_table_name; - END IF; - - v_config := jsonb_build_object( - 'compress_after', p_compress_after::TEXT, - 'segment_by', p_segment_by, - 'order_by', COALESCE(p_order_by, ( - SELECT time_column FROM lakets._chronotable_registry WHERE id = v_chronotable_id - ) || ' DESC') - ); - - INSERT INTO lakets._policy_registry - (chronotable_id, policy_type, config, enabled) - VALUES (v_chronotable_id, 'compression', v_config, TRUE) - RETURNING id INTO v_policy_id; - - -- Mark the hypertable as compression-enabled - UPDATE lakets._chronotable_registry - SET compression_enabled = TRUE - WHERE id = v_chronotable_id; - - RETURN v_policy_id; -END; -$$; - --- --------------------------------------------------------------------------- --- compress_chunk: Marks a chunk for compression/tiering. --- The actual tiering to Delta is done by the Databricks workflow job. --- --------------------------------------------------------------------------- -CREATE OR REPLACE FUNCTION lakets.compress_chunk( - p_chunk_name TEXT -) -RETURNS VOID -LANGUAGE plpgsql -AS $$ -BEGIN - UPDATE lakets._chunk_metadata - SET status = 'compressed', - compressed_at = now() - WHERE chunk_name = p_chunk_name AND status = 'active'; - - IF NOT FOUND THEN - RAISE EXCEPTION 'Chunk % not found or not active', p_chunk_name; - END IF; -END; -$$; - --- --------------------------------------------------------------------------- --- decompress_chunk: Marks a tiered chunk for re-ingestion. --- The actual data restoration from Delta is done by the Databricks workflow. --- --------------------------------------------------------------------------- -CREATE OR REPLACE FUNCTION lakets.decompress_chunk( - p_chunk_name TEXT -) -RETURNS VOID -LANGUAGE plpgsql -AS $$ -BEGIN - UPDATE lakets._chunk_metadata - SET status = 'active', - compressed_at = NULL, - tiered_at = NULL - WHERE chunk_name = p_chunk_name AND status IN ('compressed', 'tiered'); - - IF NOT FOUND THEN - RAISE EXCEPTION 'Chunk % not found or not compressed/tiered', p_chunk_name; - END IF; -END; -$$; - --- --------------------------------------------------------------------------- --- show_compression_policy: Returns the compression policy for a hypertable. --- --------------------------------------------------------------------------- -CREATE OR REPLACE FUNCTION lakets.show_compression_policy( - p_table_name TEXT, - p_schema_name TEXT DEFAULT 'public' -) -RETURNS TABLE ( - policy_id INT, - compress_after TEXT, - segment_by TEXT, - order_by TEXT, - enabled BOOLEAN, - last_run_at TIMESTAMPTZ -) -LANGUAGE plpgsql -AS $$ -DECLARE - v_chronotable_id INT; -BEGIN - SELECT id INTO v_chronotable_id - FROM lakets._chronotable_registry - WHERE schema_name = p_schema_name AND table_name = p_table_name; - - IF NOT FOUND THEN - RAISE EXCEPTION 'Table %.% is not a registered ChronoTable', - p_schema_name, p_table_name; - END IF; - - RETURN QUERY - SELECT - pr.id, - pr.config->>'compress_after', - pr.config->>'segment_by', - pr.config->>'order_by', - pr.enabled, - pr.last_run_at - FROM lakets._policy_registry pr - WHERE pr.chronotable_id = v_chronotable_id AND pr.policy_type = 'compression'; -END; -$$; - --- --------------------------------------------------------------------------- --- remove_compression_policy: Removes the compression policy. --- --------------------------------------------------------------------------- -CREATE OR REPLACE FUNCTION lakets.remove_compression_policy( - p_table_name TEXT, - p_schema_name TEXT DEFAULT 'public' -) -RETURNS VOID -LANGUAGE plpgsql -AS $$ -DECLARE - v_chronotable_id INT; -BEGIN - SELECT id INTO v_chronotable_id - FROM lakets._chronotable_registry - WHERE schema_name = p_schema_name AND table_name = p_table_name; - - IF NOT FOUND THEN - RAISE EXCEPTION 'Table %.% is not a registered ChronoTable', - p_schema_name, p_table_name; - END IF; - - DELETE FROM lakets._policy_registry - WHERE chronotable_id = v_chronotable_id AND policy_type = 'compression'; - - UPDATE lakets._chronotable_registry - SET compression_enabled = FALSE - WHERE id = v_chronotable_id; -END; -$$; - --- --------------------------------------------------------------------------- --- _get_chunks_to_compress: Returns chunks eligible for compression. --- Used by the Databricks compression job to find work. --- --------------------------------------------------------------------------- -CREATE OR REPLACE FUNCTION lakets._get_chunks_to_compress( - p_table_name TEXT, - p_schema_name TEXT DEFAULT 'public' -) -RETURNS TABLE ( - chunk_id INT, - chunk_name TEXT, - range_start TIMESTAMPTZ, - range_end TIMESTAMPTZ -) -LANGUAGE plpgsql -AS $$ -DECLARE - v_chronotable_id INT; - v_compress_after INTERVAL; -BEGIN - SELECT hr.id, (pr.config->>'compress_after')::INTERVAL - INTO v_chronotable_id, v_compress_after - FROM lakets._chronotable_registry hr - JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id - WHERE hr.schema_name = p_schema_name - AND hr.table_name = p_table_name - AND pr.policy_type = 'compression' - AND pr.enabled = TRUE; - - IF NOT FOUND THEN - RETURN; - END IF; - - RETURN QUERY - SELECT cm.id, cm.chunk_name, cm.range_start, cm.range_end - FROM lakets._chunk_metadata cm - WHERE cm.chronotable_id = v_chronotable_id - AND cm.status = 'active' - AND cm.range_end <= (now() - v_compress_after) - ORDER BY cm.range_start; -END; -$$; diff --git a/sql/05_tiering.sql b/sql/05_tiering.sql new file mode 100644 index 0000000..41b9494 --- /dev/null +++ b/sql/05_tiering.sql @@ -0,0 +1,487 @@ +-- ============================================================================= +-- LakeTS Tiering Policies +-- A tiering policy drops cold ChronoTable partitions to reclaim Lakebase +-- storage. The data is already durable in the Unity Catalog Managed Table via +-- Lakebase CDF; tiering only evicts partitions once CDF has flushed past them. +-- Executed by the Databricks tiering job. +-- ============================================================================= + +-- --------------------------------------------------------------------------- +-- _cdf_committed_lsn: returns the CDF-flushed LSN for a shadow table, but +-- ONLY if it is actively STREAMING in wal2delta.tables. Returns NULL (fail +-- closed) if the wal2delta subsystem is absent, the shadow does not exist, +-- or the table is not STREAMING (e.g. SKIPPED for missing REPLICA IDENTITY). +-- --------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION lakets._cdf_committed_lsn(p_shadow_name TEXT) +RETURNS PG_LSN +LANGUAGE plpgsql +AS $$ +DECLARE + v_oid OID; + v_lsn PG_LSN; +BEGIN + IF to_regclass('wal2delta.tables') IS NULL THEN + RETURN NULL; + END IF; + v_oid := to_regclass('lakets_cdf.' || quote_ident(p_shadow_name))::oid; + IF v_oid IS NULL THEN + RETURN NULL; + END IF; + SELECT committed_lsn INTO v_lsn + FROM wal2delta.tables + WHERE table_oid = v_oid AND status = 'STREAMING'; + RETURN v_lsn; -- NULL if no STREAMING row +END; +$$; + +-- --------------------------------------------------------------------------- +-- _stamp_tiered_chunk_lsn: statement-level trigger that records the current +-- WAL position on every chunk that just received writes. Installed on the +-- ChronoTable PARENT (with a transition table) so it covers all current and +-- future partitions in one trigger and stamps ONLY the chunks actually +-- touched -- a write to today's hot chunk never advances a cold chunk's +-- watermark, which is what lets cold chunks become tier-eligible. +-- +-- Why parent + transition table: a statement-level trigger on a partition does +-- not fire for inserts routed through the parent, and chunk_name is stored +-- schema-qualified, so the row->chunk mapping is recomputed here via date_bin +-- (same origin '2000-01-01' as _ensure_partitions). +-- --------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION lakets._stamp_tiered_chunk_lsn() +RETURNS TRIGGER +LANGUAGE plpgsql +AS $$ +DECLARE + v_parent TEXT; + v_ct_id INT; + v_time_col TEXT; + v_interval INTERVAL; +BEGIN + v_parent := lakets._resolve_partition_parent(TG_TABLE_SCHEMA, TG_TABLE_NAME); + + SELECT cr.id, cr.time_column, cr.chunk_interval + INTO v_ct_id, v_time_col, v_interval + FROM lakets._chronotable_registry cr + WHERE cr.schema_name = TG_TABLE_SCHEMA + AND cr.table_name = COALESCE(v_parent, TG_TABLE_NAME); + + IF v_ct_id IS NULL THEN + RETURN NULL; + END IF; + + -- pg_current_wal_lsn() here is a lower bound on the writing txn's commit + -- LSN. That imprecision is immaterial: tier_chunk only ever drops chunks + -- whose time window ended at least p_after ago (days), by which point + -- committed_lsn has long flushed past this mark. + IF TG_OP = 'DELETE' THEN + EXECUTE format( + 'UPDATE lakets._chunk_metadata cm + SET last_write_lsn = pg_current_wal_lsn() + WHERE cm.chronotable_id = $1 + AND cm.range_start IN (SELECT DISTINCT date_bin($2, %I, $3) FROM _old_rows)', + v_time_col) + USING v_ct_id, v_interval, '2000-01-01'::timestamptz; + ELSE + EXECUTE format( + 'UPDATE lakets._chunk_metadata cm + SET last_write_lsn = pg_current_wal_lsn() + WHERE cm.chronotable_id = $1 + AND cm.range_start IN (SELECT DISTINCT date_bin($2, %I, $3) FROM _new_rows)', + v_time_col) + USING v_ct_id, v_interval, '2000-01-01'::timestamptz; + END IF; + + RETURN NULL; +END; +$$; + +-- --------------------------------------------------------------------------- +-- _install_tiering_write_tracking / _remove_tiering_write_tracking: attach or +-- detach the per-chunk write-LSN triggers on a ChronoTable parent. Three +-- triggers: PostgreSQL allows a transition table for only ONE event each, so +-- INSERT and UPDATE get separate NEW TABLE triggers and DELETE gets an OLD +-- TABLE trigger. All three call the same trigger function. +-- --------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION lakets._install_tiering_write_tracking( + p_schema_name TEXT, p_table_name TEXT +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE format( + 'CREATE OR REPLACE TRIGGER trg_lakets_tier_lsn_insert + AFTER INSERT ON %I.%I + REFERENCING NEW TABLE AS _new_rows + FOR EACH STATEMENT EXECUTE FUNCTION lakets._stamp_tiered_chunk_lsn()', + p_schema_name, p_table_name); + EXECUTE format( + 'CREATE OR REPLACE TRIGGER trg_lakets_tier_lsn_update + AFTER UPDATE ON %I.%I + REFERENCING NEW TABLE AS _new_rows + FOR EACH STATEMENT EXECUTE FUNCTION lakets._stamp_tiered_chunk_lsn()', + p_schema_name, p_table_name); + EXECUTE format( + 'CREATE OR REPLACE TRIGGER trg_lakets_tier_lsn_delete + AFTER DELETE ON %I.%I + REFERENCING OLD TABLE AS _old_rows + FOR EACH STATEMENT EXECUTE FUNCTION lakets._stamp_tiered_chunk_lsn()', + p_schema_name, p_table_name); +END; +$$; + +CREATE OR REPLACE FUNCTION lakets._remove_tiering_write_tracking( + p_schema_name TEXT, p_table_name TEXT +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE format('DROP TRIGGER IF EXISTS trg_lakets_tier_lsn_insert ON %I.%I', + p_schema_name, p_table_name); + EXECUTE format('DROP TRIGGER IF EXISTS trg_lakets_tier_lsn_update ON %I.%I', + p_schema_name, p_table_name); + EXECUTE format('DROP TRIGGER IF EXISTS trg_lakets_tier_lsn_delete ON %I.%I', + p_schema_name, p_table_name); +END; +$$; + +-- --------------------------------------------------------------------------- +-- add_tiering_policy: register a tiering policy. Chunks older than p_after are +-- eligible for eviction once CDF has flushed their data to UC. +-- --------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION lakets.add_tiering_policy( + p_table_name TEXT, + p_after INTERVAL, + p_schema_name TEXT DEFAULT 'public' +) +RETURNS INT +LANGUAGE plpgsql +AS $$ +DECLARE + v_chronotable_id INT; + v_policy_id INT; + v_sync_enabled BOOLEAN; +BEGIN + SELECT id, sync_enabled INTO v_chronotable_id, v_sync_enabled + FROM lakets._chronotable_registry + WHERE schema_name = p_schema_name AND table_name = p_table_name; + + IF NOT FOUND THEN + RAISE EXCEPTION 'Table %.% is not a registered ChronoTable', + p_schema_name, p_table_name; + END IF; + + IF NOT COALESCE(v_sync_enabled, FALSE) THEN + RAISE NOTICE 'Table %.% is not CDF-synced yet; tiering will not evict any ' + 'partitions until enable_sync is called and CDF is streaming.', + p_schema_name, p_table_name; + END IF; + + -- _policy_registry has no unique constraint on (chronotable_id, policy_type), + -- so guard with an existence check. + IF EXISTS ( + SELECT 1 FROM lakets._policy_registry + WHERE chronotable_id = v_chronotable_id AND policy_type = 'tiering' + ) THEN + RAISE EXCEPTION 'Tiering policy already exists for %.%', p_schema_name, p_table_name; + END IF; + + INSERT INTO lakets._policy_registry (chronotable_id, policy_type, config, enabled) + VALUES (v_chronotable_id, 'tiering', jsonb_build_object('after', p_after::TEXT), TRUE) + RETURNING id INTO v_policy_id; + + UPDATE lakets._chronotable_registry + SET tiering_enabled = TRUE + WHERE id = v_chronotable_id; + + -- Track per-chunk write positions from now on. + PERFORM lakets._install_tiering_write_tracking(p_schema_name, p_table_name); + + -- Backfill: stamp existing active chunks with the current WAL head as a + -- conservative upper bound on their writes. A chunk drops only once CDF has + -- flushed past this mark, so pre-policy data is never dropped before it is + -- provably durable in UC. (NULL last_write_lsn is treated as "cannot prove + -- durable" by tier_chunk, so this backfill is what makes existing chunks + -- eligible at all.) + UPDATE lakets._chunk_metadata + SET last_write_lsn = pg_current_wal_lsn() + WHERE chronotable_id = v_chronotable_id + AND status = 'active' + AND last_write_lsn IS NULL; + + RETURN v_policy_id; +END; +$$; + +-- --------------------------------------------------------------------------- +-- remove_tiering_policy: Removes the tiering policy for a ChronoTable. +-- --------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION lakets.remove_tiering_policy( + p_table_name TEXT, + p_schema_name TEXT DEFAULT 'public' +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE v_chronotable_id INT; +BEGIN + SELECT id INTO v_chronotable_id + FROM lakets._chronotable_registry + WHERE schema_name = p_schema_name AND table_name = p_table_name; + IF NOT FOUND THEN + RAISE EXCEPTION 'Table %.% is not a registered ChronoTable', p_schema_name, p_table_name; + END IF; + + DELETE FROM lakets._policy_registry + WHERE chronotable_id = v_chronotable_id AND policy_type = 'tiering'; + + UPDATE lakets._chronotable_registry + SET tiering_enabled = FALSE + WHERE id = v_chronotable_id; + + PERFORM lakets._remove_tiering_write_tracking(p_schema_name, p_table_name); +END; +$$; + +-- --------------------------------------------------------------------------- +-- show_tiering_policy: Returns the tiering policy for a ChronoTable. +-- --------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION lakets.show_tiering_policy( + p_table_name TEXT, + p_schema_name TEXT DEFAULT 'public' +) +RETURNS TABLE (policy_id INT, after TEXT, enabled BOOLEAN, last_run_at TIMESTAMPTZ) +LANGUAGE plpgsql +AS $$ +BEGIN + RETURN QUERY + SELECT pr.id, pr.config->>'after', pr.enabled, pr.last_run_at + FROM lakets._policy_registry pr + JOIN lakets._chronotable_registry hr ON hr.id = pr.chronotable_id + WHERE hr.schema_name = p_schema_name + AND hr.table_name = p_table_name + AND pr.policy_type = 'tiering'; +END; +$$; + +-- --------------------------------------------------------------------------- +-- _get_chunks_to_tier: candidate chunks for eviction — active, older than the +-- policy 'after' interval, AND the table's shadow is STREAMING in CDF. The +-- exact per-chunk durability gate is enforced in tier_chunk at drop time. +-- --------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION lakets._get_chunks_to_tier( + p_table_name TEXT, + p_schema_name TEXT DEFAULT 'public' +) +RETURNS TABLE (chunk_id INT, chunk_name TEXT, range_start TIMESTAMPTZ, range_end TIMESTAMPTZ) +LANGUAGE plpgsql +AS $$ +DECLARE + v_chronotable_id INT; + v_after INTERVAL; + v_shadow TEXT; +BEGIN + SELECT hr.id, (pr.config->>'after')::INTERVAL, hr.shadow_table_name + INTO v_chronotable_id, v_after, v_shadow + FROM lakets._chronotable_registry hr + JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id + WHERE hr.schema_name = p_schema_name + AND hr.table_name = p_table_name + AND pr.policy_type = 'tiering' + AND pr.enabled = TRUE; + + IF NOT FOUND THEN + RETURN; + END IF; + + -- Hard gate: skip entirely unless the shadow is actively STREAMING. + IF v_shadow IS NULL OR lakets._cdf_committed_lsn(v_shadow) IS NULL THEN + RETURN; + END IF; + + RETURN QUERY + SELECT cm.id, cm.chunk_name, cm.range_start, cm.range_end + FROM lakets._chunk_metadata cm + WHERE cm.chronotable_id = v_chronotable_id + AND cm.status = 'active' + AND cm.range_end <= (now() - v_after) + ORDER BY cm.range_start; +END; +$$; + +-- --------------------------------------------------------------------------- +-- tier_chunk: drop a chunk's partition and mark it tiered, but ONLY if CDF has +-- provably flushed past every write to THAT chunk: +-- shadow is STREAMING AND chunk.last_write_lsn IS NOT NULL +-- AND committed_lsn >= chunk.last_write_lsn. +-- The comparison is against the chunk's own recorded write position (stamped by +-- _stamp_tiered_chunk_lsn), NOT the global WAL head -- the head keeps advancing +-- from unrelated activity while a quiescent shadow's committed_lsn freezes, so a +-- head comparison would never pass for the cold chunks we want to evict. +-- Returns TRUE if dropped, FALSE if deferred (caller retries next run). +-- --------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION lakets.tier_chunk(p_chunk_name TEXT) +RETURNS BOOLEAN +LANGUAGE plpgsql +AS $$ +DECLARE + v_chronotable_id INT; + v_shadow TEXT; + v_committed PG_LSN; + v_chunk_lsn PG_LSN; + v_parts TEXT[]; +BEGIN + SELECT cm.chronotable_id, cm.last_write_lsn INTO v_chronotable_id, v_chunk_lsn + FROM lakets._chunk_metadata cm + WHERE cm.chunk_name = p_chunk_name AND cm.status = 'active'; + IF NOT FOUND THEN + RAISE NOTICE 'tier_chunk: % not found or not active', p_chunk_name; + RETURN FALSE; + END IF; + + SELECT shadow_table_name INTO v_shadow + FROM lakets._chronotable_registry WHERE id = v_chronotable_id; + + v_committed := lakets._cdf_committed_lsn(v_shadow); + IF v_committed IS NULL THEN + RAISE NOTICE 'tier_chunk: % skipped — shadow not STREAMING', p_chunk_name; + RETURN FALSE; + END IF; + + IF v_chunk_lsn IS NULL THEN + RAISE NOTICE 'tier_chunk: % skipped — no recorded write position (cannot prove durable)', + p_chunk_name; + RETURN FALSE; + END IF; + + IF v_committed < v_chunk_lsn THEN + RAISE NOTICE 'tier_chunk: % deferred — CDF has not flushed the chunk yet ' + '(committed=%, chunk_last_write=%)', p_chunk_name, v_committed, v_chunk_lsn; + RETURN FALSE; + END IF; + + -- Safe to drop: every write to this chunk is at or below committed_lsn, i.e. + -- provably durable in the Unity Catalog Managed Table. + v_parts := string_to_array(p_chunk_name, '.'); + IF array_length(v_parts, 1) = 2 THEN + EXECUTE format('DROP TABLE IF EXISTS %I.%I', v_parts[1], v_parts[2]); + ELSE + EXECUTE format('DROP TABLE IF EXISTS %I', p_chunk_name); + END IF; + + UPDATE lakets._chunk_metadata + SET status = 'tiered', tiered_at = now() + WHERE chunk_name = p_chunk_name; + + RETURN TRUE; +END; +$$; + +-- --------------------------------------------------------------------------- +-- untier_chunk: restore a tiered chunk's metadata to active (e.g. before a +-- backfill re-ingests the partition from UC). +-- --------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION lakets.untier_chunk(p_chunk_name TEXT) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + UPDATE lakets._chunk_metadata + SET status = 'active', tiered_at = NULL + WHERE chunk_name = p_chunk_name AND status = 'tiered'; + IF NOT FOUND THEN + RAISE EXCEPTION 'Chunk % not found or not tiered', p_chunk_name; + END IF; +END; +$$; + +-- --------------------------------------------------------------------------- +-- show_tiering_status: per-table tiering observability, including the CDF gate +-- state so deferrals are self-explaining. +-- --------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION lakets.show_tiering_status( + p_table_name TEXT DEFAULT NULL, + p_schema_name TEXT DEFAULT 'public' +) +RETURNS TABLE ( + schema_name TEXT, + table_name TEXT, + after TEXT, + active_chunks INT, + tiered_chunks INT, + pending_chunks INT, + reclaimable_bytes BIGINT, + reclaimed_bytes BIGINT, + cdf_status TEXT, + cdf_lag_bytes BIGINT, + caught_up BOOLEAN, + last_run_at TIMESTAMPTZ +) +LANGUAGE plpgsql +AS $$ +DECLARE + r RECORD; + v_after INTERVAL; + v_committed PG_LSN; + v_max_pending_lsn PG_LSN; + v_pending_unstamped INT; +BEGIN + FOR r IN + SELECT hr.id, hr.schema_name AS sname, hr.table_name AS tname, + hr.shadow_table_name AS shadow, + pr.config->>'after' AS after_txt, pr.last_run_at AS lra + FROM lakets._chronotable_registry hr + JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id + WHERE pr.policy_type = 'tiering' + AND (p_table_name IS NULL OR + (hr.table_name = p_table_name AND hr.schema_name = p_schema_name)) + LOOP + v_after := r.after_txt::INTERVAL; + v_committed := lakets._cdf_committed_lsn(r.shadow); + + schema_name := r.sname; + table_name := r.tname; + after := r.after_txt; + last_run_at := r.lra; + + -- pending = active AND aged out. v_max_pending_lsn is the furthest WAL + -- position CDF must flush past to clear the whole pending backlog; + -- v_pending_unstamped counts pending chunks with no recorded position + -- (cannot be proven durable, so they block "caught_up"). + SELECT + count(*) FILTER (WHERE cm.status = 'active'), + count(*) FILTER (WHERE cm.status = 'tiered'), + count(*) FILTER (WHERE cm.status = 'active' AND cm.range_end <= now() - v_after), + COALESCE(sum(cm.size_bytes) FILTER ( + WHERE cm.status = 'active' AND cm.range_end <= now() - v_after), 0), + COALESCE(sum(cm.size_bytes) FILTER (WHERE cm.status = 'tiered'), 0), + max(cm.last_write_lsn) FILTER ( + WHERE cm.status = 'active' AND cm.range_end <= now() - v_after), + count(*) FILTER ( + WHERE cm.status = 'active' AND cm.range_end <= now() - v_after + AND cm.last_write_lsn IS NULL) + INTO active_chunks, tiered_chunks, pending_chunks, reclaimable_bytes, + reclaimed_bytes, v_max_pending_lsn, v_pending_unstamped + FROM lakets._chunk_metadata cm + WHERE cm.chronotable_id = r.id; + + IF v_committed IS NULL THEN + cdf_status := CASE WHEN r.shadow IS NULL THEN 'NONE' ELSE 'SKIPPED' END; + cdf_lag_bytes := NULL; + caught_up := FALSE; + ELSE + cdf_status := 'STREAMING'; + -- How far CDF still has to flush to clear the pending backlog. + cdf_lag_bytes := GREATEST( + 0, COALESCE(pg_wal_lsn_diff(v_max_pending_lsn, v_committed), 0))::BIGINT; + -- Caught up iff CDF has flushed past every stamped pending chunk and + -- no pending chunk is unstamped. + caught_up := (v_pending_unstamped = 0) + AND (v_max_pending_lsn IS NULL OR v_committed >= v_max_pending_lsn); + END IF; + + RETURN NEXT; + END LOOP; +END; +$$; diff --git a/sql/07_monitoring.sql b/sql/07_monitoring.sql index 44ba945..7884013 100644 --- a/sql/07_monitoring.sql +++ b/sql/07_monitoring.sql @@ -37,12 +37,25 @@ BEGIN LEFT JOIN pg_stat_user_tables s ON s.schemaname = hr.schema_name AND s.relname = hr.table_name; - -- Compression policy status + -- Tiering metrics, sourced from show_tiering_status so the gate logic lives + -- in exactly one place (per-chunk committed_lsn >= last_write_lsn). + -- pending = aged-out chunks awaiting eviction; caught_up = CDF has flushed + -- past every pending chunk (the durability gate currently passes). RETURN QUERY - SELECT 'lakets_compression_enabled'::TEXT, - (CASE WHEN hr.compression_enabled THEN 1 ELSE 0 END)::DOUBLE PRECISION, - jsonb_build_object('table', hr.schema_name || '.' || hr.table_name) - FROM lakets._chronotable_registry hr; + SELECT 'lakets_tiering_pending_chunks'::TEXT, sts.pending_chunks::DOUBLE PRECISION, + jsonb_build_object('table', sts.schema_name || '.' || sts.table_name) + FROM lakets.show_tiering_status() sts; + + RETURN QUERY + SELECT 'lakets_tiering_tiered_chunks_total'::TEXT, sts.tiered_chunks::DOUBLE PRECISION, + jsonb_build_object('table', sts.schema_name || '.' || sts.table_name) + FROM lakets.show_tiering_status() sts; + + RETURN QUERY + SELECT 'lakets_tiering_caught_up'::TEXT, + (CASE WHEN sts.caught_up THEN 1 ELSE 0 END)::DOUBLE PRECISION, + jsonb_build_object('table', sts.schema_name || '.' || sts.table_name) + FROM lakets.show_tiering_status() sts; -- RollUp watermark lag (seconds between now and watermark per RollUp) RETURN QUERY @@ -100,7 +113,6 @@ RETURNS TABLE ( hypertable TEXT, total_chunks BIGINT, active_chunks BIGINT, - compressed_chunks BIGINT, tiered_chunks BIGINT, dropped_chunks BIGINT, oldest_active TIMESTAMPTZ, @@ -114,7 +126,6 @@ BEGIN hr.schema_name || '.' || hr.table_name, count(*)::BIGINT, count(*) FILTER (WHERE cm.status = 'active')::BIGINT, - count(*) FILTER (WHERE cm.status = 'compressed')::BIGINT, count(*) FILTER (WHERE cm.status = 'tiered')::BIGINT, count(*) FILTER (WHERE cm.status = 'dropped')::BIGINT, min(cm.range_start) FILTER (WHERE cm.status = 'active'), diff --git a/sql/13_shadow_sync.sql b/sql/13_shadow_sync.sql index 7975469..95b6ac0 100644 --- a/sql/13_shadow_sync.sql +++ b/sql/13_shadow_sync.sql @@ -125,6 +125,18 @@ BEGIN WHERE schema_name = p_schema_name AND table_name = p_table_name) INTO v_is_ct; SELECT EXISTS(SELECT 1 FROM lakets._rollup_registry WHERE name = p_table_name) INTO v_is_ru; + -- Preflight: LakeTS cannot enable Lakebase CDF itself -- it is a prerequisite + -- enabled on the lakets_cdf schema via Databricks. Without it, the shadow is + -- created but never streams to Unity Catalog, and tiering stays fail-closed + -- (nothing is ever evicted). Warn loudly rather than failing, so sync can be + -- wired before CDF is turned on. + IF to_regclass('wal2delta.tables') IS NULL THEN + RAISE WARNING 'Lakebase CDF (wal2delta) is not enabled on this database. The ' + 'shadow will be created but will not replicate to Unity Catalog, and ' + 'tiering will not evict any partitions. Enable CDF on the lakets_cdf ' + 'schema (see the LakeTS prerequisites) to activate sync.'; + END IF; + IF v_is_ct AND v_is_ru THEN RAISE EXCEPTION 'Ambiguous: % is both a ChronoTable and a RollUp', p_table_name; ELSIF v_is_ct THEN diff --git a/sql/14_rollup_optimization.sql b/sql/14_rollup_optimization.sql index 046ca07..de82634 100644 --- a/sql/14_rollup_optimization.sql +++ b/sql/14_rollup_optimization.sql @@ -508,7 +508,7 @@ RETURNS TEXT LANGUAGE sql STABLE AS $$ SELECT CASE - WHEN cm.status IN ('active', 'compressed') THEN 'hot' + WHEN cm.status = 'active' THEN 'hot' WHEN cm.status = 'tiered' THEN 'cold' ELSE 'hot' END diff --git a/sql/99_install.sql b/sql/99_install.sql index aea577d..faa0c9b 100644 --- a/sql/99_install.sql +++ b/sql/99_install.sql @@ -23,8 +23,8 @@ -- Step 4: RollUp Engine (incremental time-bucketed aggregations) \ir 04_rollup.sql --- Step 5: Compression & tiering policies -\ir 05_compression.sql +-- Step 5: Tiering policies +\ir 05_tiering.sql -- Step 6: Retention policies \ir 06_retention.sql diff --git a/tests/test_compression.sql b/tests/test_compression.sql deleted file mode 100644 index 149858e..0000000 --- a/tests/test_compression.sql +++ /dev/null @@ -1,58 +0,0 @@ --- ============================================================================= --- LakeTS Compression & Tiering Tests --- ============================================================================= -DROP TABLE IF EXISTS public.comp_test CASCADE; -DELETE FROM lakets._policy_registry WHERE chronotable_id IN (SELECT id FROM lakets._chronotable_registry WHERE table_name='comp_test'); -DELETE FROM lakets._chunk_metadata WHERE chronotable_id IN (SELECT id FROM lakets._chronotable_registry WHERE table_name='comp_test'); -DELETE FROM lakets._chronotable_registry WHERE table_name='comp_test'; - -CREATE TABLE public.comp_test (time TIMESTAMPTZ NOT NULL, val DOUBLE PRECISION); -INSERT INTO comp_test SELECT now()-(i||' hours')::interval, random()*100 FROM generate_series(1,240) s(i); -SELECT lakets.create_chronotable('comp_test','time','1 day'); - --- T1: Add compression policy -DO $$ DECLARE v INT; BEGIN - SELECT lakets.add_compression_policy('comp_test', '3 days', 'val') INTO v; - ASSERT v IS NOT NULL; RAISE NOTICE 'T1 PASSED: policy id=%', v; -END $$; - --- T2: Show compression policy -DO $$ DECLARE v TEXT; BEGIN - SELECT compress_after INTO v FROM lakets.show_compression_policy('comp_test'); - ASSERT v = '3 days'; RAISE NOTICE 'T2 PASSED: compress_after=%', v; -END $$; - --- T3: Get chunks to compress -DO $$ DECLARE v BIGINT; BEGIN - SELECT count(*) INTO v FROM lakets._get_chunks_to_compress('comp_test'); - ASSERT v > 0; RAISE NOTICE 'T3 PASSED: % eligible chunks', v; -END $$; - --- T4: Compress a chunk -DO $$ DECLARE v_name TEXT; v_status TEXT; BEGIN - SELECT chunk_name INTO v_name FROM lakets._get_chunks_to_compress('comp_test') LIMIT 1; - PERFORM lakets.compress_chunk(v_name); - SELECT status INTO v_status FROM lakets._chunk_metadata WHERE chunk_name=v_name; - ASSERT v_status = 'compressed'; RAISE NOTICE 'T4 PASSED: status=%', v_status; -END $$; - --- T5: Decompress chunk -DO $$ DECLARE v_name TEXT; v_status TEXT; BEGIN - SELECT chunk_name INTO v_name FROM lakets._chunk_metadata WHERE status='compressed' AND chronotable_id IN (SELECT id FROM lakets._chronotable_registry WHERE table_name='comp_test') LIMIT 1; - PERFORM lakets.decompress_chunk(v_name); - SELECT status INTO v_status FROM lakets._chunk_metadata WHERE chunk_name=v_name; - ASSERT v_status = 'active'; RAISE NOTICE 'T5 PASSED: decompressed back to active'; -END $$; - --- T6: Remove compression policy -DO $$ DECLARE v BOOLEAN; BEGIN - PERFORM lakets.remove_compression_policy('comp_test'); - SELECT compression_enabled INTO v FROM lakets._chronotable_registry WHERE table_name='comp_test'; - ASSERT v = FALSE; RAISE NOTICE 'T6 PASSED: policy removed'; -END $$; - -DROP TABLE IF EXISTS public.comp_test CASCADE; -DELETE FROM lakets._policy_registry WHERE chronotable_id IN (SELECT id FROM lakets._chronotable_registry WHERE table_name='comp_test'); -DELETE FROM lakets._chunk_metadata WHERE chronotable_id IN (SELECT id FROM lakets._chronotable_registry WHERE table_name='comp_test'); -DELETE FROM lakets._chronotable_registry WHERE table_name='comp_test'; -SELECT 'ALL COMPRESSION TESTS PASSED' as result; diff --git a/tests/test_monitoring.sql b/tests/test_monitoring.sql index 4b727b8..5ded1ba 100644 --- a/tests/test_monitoring.sql +++ b/tests/test_monitoring.sql @@ -62,52 +62,62 @@ BEGIN DELETE FROM lakets._chronotable_registry WHERE table_name = 'mon_test'; END $$; --- T5: chunk_health reports compressed chunks correctly +-- T5: chunk_health reports tiered chunks correctly DO $$ DECLARE - v_compressed BIGINT; + v_tiered BIGINT; v_chunk_name TEXT; BEGIN - -- Create ChronoTable with enough data to compress - DROP TABLE IF EXISTS public.mon_comp_test CASCADE; + -- Create ChronoTable with enough data to age out a chunk + DROP TABLE IF EXISTS public.mon_tier_test CASCADE; DELETE FROM lakets._chunk_metadata WHERE chronotable_id IN ( - SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test' + SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test' ); DELETE FROM lakets._policy_registry WHERE chronotable_id IN ( - SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test' + SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test' ); - DELETE FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test'; + DELETE FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test'; - CREATE TABLE public.mon_comp_test (time TIMESTAMPTZ NOT NULL, val DOUBLE PRECISION); - INSERT INTO mon_comp_test SELECT now() - (i || ' hours')::interval, random() * 100 + CREATE TABLE public.mon_tier_test (time TIMESTAMPTZ NOT NULL, val DOUBLE PRECISION); + INSERT INTO mon_tier_test SELECT now() - (i || ' hours')::interval, random() * 100 FROM generate_series(1, 240) s(i); - PERFORM lakets.create_chronotable('mon_comp_test', 'time', '1 day'); - PERFORM lakets.add_compression_policy('mon_comp_test', '3 days', 'val'); + PERFORM lakets.create_chronotable('mon_tier_test', 'time', '1 day'); + PERFORM lakets.add_tiering_policy('mon_tier_test', '3 days'); + + -- Simulate a completed tiering. tier_chunk's actual drop needs live CDF; + -- here we only assert chunk_health surfaces the 'tiered' status. + SELECT chunk_name INTO v_chunk_name FROM lakets._get_chunks_to_tier('mon_tier_test') LIMIT 1; + IF v_chunk_name IS NULL THEN + SELECT cm.chunk_name INTO v_chunk_name + FROM lakets._chunk_metadata cm + JOIN lakets._chronotable_registry hr ON hr.id = cm.chronotable_id + WHERE hr.table_name = 'mon_tier_test' AND cm.status = 'active' + ORDER BY cm.range_start LIMIT 1; + END IF; - -- Compress one chunk - SELECT chunk_name INTO v_chunk_name FROM lakets._get_chunks_to_compress('mon_comp_test') LIMIT 1; IF v_chunk_name IS NOT NULL THEN - PERFORM lakets.compress_chunk(v_chunk_name); + UPDATE lakets._chunk_metadata SET status = 'tiered', tiered_at = now() + WHERE chunk_name = v_chunk_name; - SELECT compressed_chunks INTO v_compressed + SELECT tiered_chunks INTO v_tiered FROM lakets.chunk_health() - WHERE hypertable = 'public.mon_comp_test'; + WHERE hypertable = 'public.mon_tier_test'; - ASSERT v_compressed >= 1, format('expected >= 1 compressed chunk, got %s', v_compressed); - RAISE NOTICE 'T5 PASSED: chunk_health reports % compressed chunks', v_compressed; + ASSERT v_tiered >= 1, format('expected >= 1 tiered chunk, got %s', v_tiered); + RAISE NOTICE 'T5 PASSED: chunk_health reports % tiered chunks', v_tiered; ELSE - RAISE NOTICE 'T5 SKIPPED: no eligible chunks to compress'; + RAISE NOTICE 'T5 SKIPPED: no chunks materialized for mon_tier_test'; END IF; -- Cleanup - DROP TABLE IF EXISTS public.mon_comp_test CASCADE; + DROP TABLE IF EXISTS public.mon_tier_test CASCADE; DELETE FROM lakets._policy_registry WHERE chronotable_id IN ( - SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test' + SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test' ); DELETE FROM lakets._chunk_metadata WHERE chronotable_id IN ( - SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test' + SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test' ); - DELETE FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test'; + DELETE FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test'; END $$; -- T6: lakets_metrics includes all expected metric names diff --git a/tests/test_python_patterns.py b/tests/test_python_patterns.py index 083c696..9f7f3aa 100644 --- a/tests/test_python_patterns.py +++ b/tests/test_python_patterns.py @@ -55,30 +55,32 @@ def test_tracks_failures(self): "cold_rollup_refresh.py should track failures" -class TestCompressionJobPatterns: - """Verify compression_job.py uses safe SQL patterns.""" +class TestTieringJobPatterns: + """Verify tiering_job.py uses safe SQL patterns and delegates the drop to SQL.""" - SOURCE_PATH = "databricks/workflows/compression_job.py" + SOURCE_PATH = "databricks/workflows/tiering_job.py" - def test_drop_table_uses_identifier(self): - """T9: DROP TABLE uses sql.Identifier, not %-formatting.""" + def test_delegates_drop_to_tier_chunk(self): + """T9: The partition drop is delegated to lakets.tier_chunk (no raw DROP in Python).""" source = _read_source(self.SOURCE_PATH) - assert '"DROP TABLE IF EXISTS %s.%s" %' not in source, \ - "compression_job.py still uses %-format for DROP TABLE" + assert "lakets.tier_chunk" in source, \ + "tiering_job.py should call lakets.tier_chunk to perform the gated drop" + assert "DROP TABLE" not in source, \ + "tiering_job.py should not issue DROP TABLE directly; tier_chunk owns the drop" def test_uses_chronotable_registry(self): """T10: References _chronotable_registry, not _hypertable_registry.""" source = _read_source(self.SOURCE_PATH) assert "_hypertable_registry" not in source, \ - "compression_job.py still references legacy _hypertable_registry" + "tiering_job.py still references legacy _hypertable_registry" assert "_chronotable_registry" in source, \ - "compression_job.py should reference _chronotable_registry" + "tiering_job.py should reference _chronotable_registry" - def test_no_dead_jdbc_url(self): - """T11: Dead jdbc_url code has been removed.""" + def test_no_spark(self): + """T11: Tiering is pure Lakebase SQL — no Spark dependency.""" source = _read_source(self.SOURCE_PATH) - assert "jdbc_url" not in source, \ - "compression_job.py still has dead jdbc_url code" + assert "pyspark" not in source and "SparkSession" not in source, \ + "tiering_job.py should be Spark-free" class TestLakebaseUtilsPatterns: diff --git a/tests/test_tiering.sql b/tests/test_tiering.sql new file mode 100644 index 0000000..d39fc7a --- /dev/null +++ b/tests/test_tiering.sql @@ -0,0 +1,139 @@ +-- LakeTS Tiering tests. Run: psql "$LAKETS_URL" -v ON_ERROR_STOP=1 -f tests/test_tiering.sql +\set ON_ERROR_STOP on + +-- TEST 1: _cdf_committed_lsn fails closed for a shadow that does not exist. +DO $$ +BEGIN + ASSERT lakets._cdf_committed_lsn('_shadow_does_not_exist') IS NULL, + 'TEST 1 FAILED: expected NULL for a nonexistent shadow'; + RAISE NOTICE 'TEST 1 PASSED: _cdf_committed_lsn fails closed for missing shadow'; +END $$; + +-- TEST 2: add_tiering_policy registers a 'tiering' policy and sets tiering_enabled. +DO $$ +DECLARE + v_policy_id INT; + v_enabled BOOLEAN; + v_ptype TEXT; +BEGIN + DROP TABLE IF EXISTS public.tier_test CASCADE; + CREATE TABLE public.tier_test (time TIMESTAMPTZ NOT NULL, v DOUBLE PRECISION); + PERFORM lakets.create_chronotable('tier_test', 'time', '1 day'); + + v_policy_id := lakets.add_tiering_policy('tier_test', '7 days'); + ASSERT v_policy_id IS NOT NULL, 'TEST 2 FAILED: no policy id returned'; + + SELECT tiering_enabled INTO v_enabled + FROM lakets._chronotable_registry WHERE table_name = 'tier_test'; + ASSERT v_enabled, 'TEST 2 FAILED: tiering_enabled not set'; + + SELECT policy_type INTO v_ptype + FROM lakets._policy_registry pr + JOIN lakets._chronotable_registry hr ON hr.id = pr.chronotable_id + WHERE hr.table_name = 'tier_test'; + ASSERT v_ptype = 'tiering', 'TEST 2 FAILED: policy_type is not tiering'; + + RAISE NOTICE 'TEST 2 PASSED: add_tiering_policy works'; +END $$; + +-- TEST 3: show_tiering_policy returns the policy; remove_tiering_policy clears it. +DO $$ +DECLARE v_cnt INT; +BEGIN + PERFORM 1 FROM lakets.show_tiering_policy('tier_test'); + PERFORM lakets.remove_tiering_policy('tier_test'); + SELECT count(*) INTO v_cnt FROM lakets.show_tiering_policy('tier_test'); + ASSERT v_cnt = 0, 'TEST 3 FAILED: policy not removed'; + RAISE NOTICE 'TEST 3 PASSED: show/remove tiering policy works'; +END $$; + +-- TEST 4: _get_chunks_to_tier returns nothing for a non-streaming table +-- (no STREAMING shadow => fail closed). +DO $$ +DECLARE v_cnt INT; +BEGIN + -- recreate policy from TEST 2 teardown + PERFORM lakets.add_tiering_policy('tier_test', '0 seconds'); -- everything eligible by age + INSERT INTO lakets._chunk_metadata (chronotable_id, chunk_name, range_start, range_end, status) + SELECT id, 'tier_test_oldchunk', now() - interval '10 days', now() - interval '9 days', 'active' + FROM lakets._chronotable_registry WHERE table_name = 'tier_test'; + + SELECT count(*) INTO v_cnt FROM lakets._get_chunks_to_tier('tier_test'); + ASSERT v_cnt = 0, + 'TEST 4 FAILED: expected 0 eligible chunks when shadow is not STREAMING'; + RAISE NOTICE 'TEST 4 PASSED: _get_chunks_to_tier fails closed without STREAMING CDF'; +END $$; + +-- TEST 5: tier_chunk refuses to drop (returns FALSE, partition row intact) +-- when the CDF gate is not satisfiable. +DO $$ +DECLARE v_ok BOOLEAN; v_status TEXT; +BEGIN + v_ok := lakets.tier_chunk('tier_test_oldchunk'); + ASSERT v_ok = FALSE, 'TEST 5 FAILED: tier_chunk should refuse without STREAMING CDF'; + SELECT status INTO v_status FROM lakets._chunk_metadata WHERE chunk_name = 'tier_test_oldchunk'; + ASSERT v_status = 'active', 'TEST 5 FAILED: chunk should remain active (not tiered)'; + RAISE NOTICE 'TEST 5 PASSED: tier_chunk fails closed'; +END $$; + +-- TEST 6: untier_chunk restores tiered -> active. +DO $$ +DECLARE v_status TEXT; +BEGIN + UPDATE lakets._chunk_metadata SET status = 'tiered', tiered_at = now() + WHERE chunk_name = 'tier_test_oldchunk'; + PERFORM lakets.untier_chunk('tier_test_oldchunk'); + SELECT status INTO v_status FROM lakets._chunk_metadata WHERE chunk_name = 'tier_test_oldchunk'; + ASSERT v_status = 'active', 'TEST 6 FAILED: untier_chunk did not restore active'; + RAISE NOTICE 'TEST 6 PASSED: untier_chunk restores active'; +END $$; + +-- TEST 7: show_tiering_status reports counts and classifies cdf_status='NONE' +-- when the table is not in wal2delta.tables. +DO $$ +DECLARE r RECORD; +BEGIN + SELECT * INTO r FROM lakets.show_tiering_status('tier_test'); + ASSERT r.table_name = 'tier_test', 'TEST 7 FAILED: wrong/no row'; + ASSERT r.pending_chunks >= 1, 'TEST 7 FAILED: expected >=1 pending chunk'; + ASSERT r.cdf_status = 'NONE', 'TEST 7 FAILED: expected cdf_status NONE without CDF'; + ASSERT r.caught_up = FALSE, 'TEST 7 FAILED: caught_up should be false without CDF'; + RAISE NOTICE 'TEST 7 PASSED: show_tiering_status reports gate state'; +END $$; + +-- TEST 8: the write-tracking trigger stamps last_write_lsn on the chunk that +-- received rows and leaves other chunks untouched (no CDF needed). +DO $$ +DECLARE + v_id INT; + v_lsn_hot PG_LSN; + v_lsn_seeded PG_LSN; +BEGIN + SELECT id INTO v_id FROM lakets._chronotable_registry WHERE table_name = 'tier_test'; + + -- Ensure a partition exists for "now" so the insert has somewhere to route. + PERFORM lakets._ensure_partitions(v_id, p_range_start := now(), p_range_end := now()); + + -- Pin the seeded old chunk's watermark to a known value; it must NOT move. + UPDATE lakets._chunk_metadata SET last_write_lsn = '0/1'::pg_lsn + WHERE chunk_name = 'tier_test_oldchunk'; + + INSERT INTO public.tier_test (time, v) VALUES (now(), 1.0); + + -- The hot chunk that received the row is stamped with a real WAL position. + SELECT max(last_write_lsn) INTO v_lsn_hot + FROM lakets._chunk_metadata + WHERE chronotable_id = v_id AND chunk_name <> 'tier_test_oldchunk'; + ASSERT v_lsn_hot IS NOT NULL AND v_lsn_hot <> '0/1'::pg_lsn, + 'TEST 8 FAILED: hot chunk was not stamped by the write-tracking trigger'; + + -- The unrelated old chunk is untouched. + SELECT last_write_lsn INTO v_lsn_seeded + FROM lakets._chunk_metadata WHERE chunk_name = 'tier_test_oldchunk'; + ASSERT v_lsn_seeded = '0/1'::pg_lsn, + 'TEST 8 FAILED: a hot-chunk write bumped an unrelated cold chunk watermark'; + + RAISE NOTICE 'TEST 8 PASSED: write-tracking stamps only the written chunk'; +END $$; + +SELECT 'ALL TIERING TESTS PASSED' AS result; diff --git a/website/docs/architecture-patterns.md b/website/docs/architecture-patterns.md index fa98bbe..c26f0d5 100644 --- a/website/docs/architecture-patterns.md +++ b/website/docs/architecture-patterns.md @@ -26,7 +26,7 @@ LakeTS's signature pattern. Recent data lives in Lakebase (Postgres) for sub-10m - BI/analysis runs over weeks or months of history - You want one query surface (LakeTS routes hot/cold automatically) -**LakeTS pieces**: `ChronoTable`, `lakets.compression_policy`, `lakets.tiering_policy`, Lakebase CDF. +**LakeTS pieces**: `ChronoTable`, `lakets.add_tiering_policy`, Lakebase CDF. ## 2. Streaming ingest → hot tier → rollup diff --git a/website/docs/examples/sensor-reading-journey.md b/website/docs/examples/sensor-reading-journey.md index af9886c..8b13d2b 100644 --- a/website/docs/examples/sensor-reading-journey.md +++ b/website/docs/examples/sensor-reading-journey.md @@ -2,7 +2,7 @@ title: Life of a sensor reading sidebar_label: Sensor reading journey sidebar_position: 1 -description: Follow a single sensor reading through every stage of the LakeTS lifecycle — ingestion, RollUps, compression, tiering, retention. +description: Follow a single sensor reading through every stage of the LakeTS lifecycle — ingestion, RollUps, tiering, retention. --- # Life of a sensor reading @@ -30,17 +30,17 @@ Behind the scenes: - The hourly RollUp includes the reading immediately - `SELECT * FROM _rollup_rt_metrics_hourly` shows it in real time -## Day 7 — compression job runs +## Day 7 — tiering job runs -- The chunk is now 7 days old; `_get_chunks_to_compress()` returns it -- Chunk status flips from `active` → `compressed` -- The partition is dropped from Lakebase; the data is already in the Unity Catalog Managed Table +- The chunk is now 7 days old; `_get_chunks_to_tier()` returns it +- `tier_chunk()` checks the durability gate — CDF has flushed the chunk's writes (`committed_lsn >= last_write_lsn`) into the Unity Catalog Managed Table +- The gate passes, so the partition is dropped from Lakebase and the chunk status flips from `active` → `tiered` ## Day 7–90 — warm in Unity Catalog - Queryable via Lakehouse Federation (100 ms – 1 s) - The hourly RollUp Table still has the aggregation — unaffected by tiering -- The UC Managed Table is Z-ordered for fast time-range scans +- The data lives in the UC Managed Table, written there continuously by Lakebase CDF ## Day 90 — retention job runs @@ -63,7 +63,7 @@ gantt Active in partition :active, 2026-03-25, 7d section Warm (Unity Catalog Managed Table) - Compressed/Tiered :2026-04-01, 83d + Tiered :2026-04-01, 83d section Dropped Vacuumed :milestone, 2026-06-23, 0d diff --git a/website/docs/glossary.md b/website/docs/glossary.md index 2d406a3..2aec3d9 100644 --- a/website/docs/glossary.md +++ b/website/docs/glossary.md @@ -89,6 +89,10 @@ In a multi-metric ChronoTable, a column that identifies a series (`host`, `regio Two-phase lifecycle policy: tier to the Unity Catalog Managed Table after age N, drop entirely after age M. Configured with `add_tiered_retention_policy()`. +## Tiering + +Evicting cold chunks out of Lakebase to free hot-tier storage. The data already lives in the Unity Catalog Managed Table via Lakebase CDF, so `tier_chunk()` simply drops the old partition — but only once a CDF **durability gate** confirms every write to that chunk has been flushed to UC (the shadow is `STREAMING` and CDF's `committed_lsn` has reached the chunk's `last_write_lsn`). Configured with `add_tiering_policy()` and driven daily by the Databricks Tiering Job. See [How Tiering & Retention Works](./guides/how-it-works/tiering-and-retention.md). + ## Unity Catalog Managed Table Databricks's governed table format that abstracts the underlying storage (Delta or Iceberg). LakeTS uses it as the cold tier — `lakets.enable_sync()` targets a Unity Catalog Managed Table via Lakebase CDF. diff --git a/website/docs/guides/getting-started.md b/website/docs/guides/getting-started.md index b219cd9..b2eade6 100644 --- a/website/docs/guides/getting-started.md +++ b/website/docs/guides/getting-started.md @@ -185,4 +185,4 @@ Now you can think about how the application or dashboard reads the data: - [Reference](../reference/index.md) — full catalog of 73 functions, 2 aggregates, 6 triggers, 9 metadata tables — organized by topic - [Life of a sensor reading](../examples/sensor-reading-journey.md) — end-to-end worked example - [Troubleshooting](../troubleshooting.md) — when something doesn't behave the way you expect -- [Databricks workflows](https://github.com/databricks-solutions/lakets/blob/main/databricks/bundles/databricks.yml) — automated partition management, compression, retention, and aggregate refresh +- [Databricks workflows](https://github.com/databricks-solutions/lakets/blob/main/databricks/bundles/databricks.yml) — automated partition management, tiering, retention, and aggregate refresh diff --git a/website/docs/guides/how-it-works/chronotables.md b/website/docs/guides/how-it-works/chronotables.md index 4008995..9e3ad23 100644 --- a/website/docs/guides/how-it-works/chronotables.md +++ b/website/docs/guides/how-it-works/chronotables.md @@ -60,8 +60,8 @@ LakeTS tracks everything in two tables: ```sql -- Which tables are ChronoTables? SELECT * FROM lakets._chronotable_registry; --- id | schema | table | time_column | chunk_interval | compression_enabled | sync_enabled --- 1 | public | metrics | time | 1 day | false | false +-- id | schema | table | time_column | chunk_interval | tiering_enabled | sync_enabled +-- 1 | public | metrics | time | 1 day | false | false -- What chunks exist? SELECT * FROM lakets._chunk_metadata; diff --git a/website/docs/guides/how-it-works/compression-and-retention.md b/website/docs/guides/how-it-works/compression-and-retention.md deleted file mode 100644 index dd5b2e3..0000000 --- a/website/docs/guides/how-it-works/compression-and-retention.md +++ /dev/null @@ -1,80 +0,0 @@ ---- -title: Compression & retention -sidebar_label: Compression & retention -sidebar_position: 4 -description: How LakeTS tiers cold chunks to Unity Catalog and drops expired data. ---- - -# How Compression & Tiering Works - -## The concept - -Think of your data like files on a desk: -- **Hot desk** (Lakebase): Papers you're actively reading. Fast access. -- **Filing cabinet** (Unity Catalog Managed Table): Papers from last month. Slower but organized. -- **Shredder** (Retention): Papers older than a year. Gone. - -## How policies work - -When you add a compression policy, LakeTS stores the rule in `_policy_registry`: - -```sql -SELECT lakets.add_compression_policy('metrics', '7 days'); --- Stores: {compress_after: "7 days", segment_by: null, order_by: "time DESC"} -``` - -Nothing happens immediately. The policy is a **declaration of intent**. The actual work is done by the Databricks compression job that runs on a schedule: - -```mermaid -flowchart LR - subgraph JOB["Compression Job (runs daily at 2 AM)"] - A["1. Query _policy_registry
for compression policies"] --> B["2. Query _get_chunks_to_compress()
find chunks older than 7 days"] - B --> C["3. For each chunk:
mark compressed in metadata"] - C --> D["4. Optionally drop
Lakebase partition"] - end - - subgraph BEFORE["Before"] - P1["Mar 15 chunk (active)"] - P2["Mar 16 chunk (active)"] - P3["Mar 17 chunk (active)"] - end - - subgraph AFTER["After (compress_after=7 days, today=Mar 25)"] - P4["Mar 15 chunk (compressed)"] - P5["Mar 16 chunk (compressed)"] - P6["Mar 17 chunk (compressed)"] - P7["Mar 18 chunk (active - only 7 days old)"] - end -``` - -The data in the Unity Catalog Managed Table is already there (via Lakebase CDF). The compression job's main purpose is to **free Lakebase storage** by dropping old partitions once we know the data is safely in the cold tier. - ---- - -# How Retention Works - -Retention is simpler than compression — it just drops old chunks: - -```sql -SELECT lakets.add_retention_policy('metrics', '30 days'); -``` - -When `execute_retention` runs: - -1. Looks up the policy in `_policy_registry` -2. Calls `drop_chunks('metrics', '30 days')` which: - - Finds chunks where `range_end <= now() - 30 days` - - Runs `DROP TABLE` on each partition (instant, no row-by-row delete) - - Updates `_chunk_metadata` status to `dropped` - -**Tiered retention** adds a two-step lifecycle: - -``` -Day 0-7: HOT (Lakebase, fast queries) -Day 7-90: WARM (Unity Catalog Managed Table, tiered via compression policy) -Day 90+: DELETED (retention policy drops from both tiers) -``` - -:::tip RollUps survive retention -You can drop all raw data older than 30 days while keeping your hourly/daily RollUp tables forever. -::: diff --git a/website/docs/guides/how-it-works/index.md b/website/docs/guides/how-it-works/index.md index adb8936..194ba0e 100644 --- a/website/docs/guides/how-it-works/index.md +++ b/website/docs/guides/how-it-works/index.md @@ -14,7 +14,7 @@ Start here for the high-level picture, then follow the topic pages for each subs - **[ChronoTables](./chronotables.md)** — how time-partitioned tables work under the hood - **[Time series functions](./time-series-functions.md)** — `time_bucket`, `first`, `locf`, `interpolate`, `delta`, `rate`, `gapfill` - **[RollUps](./rollups.md)** — incremental aggregates, DAG cascade, scale optimizations (chunk-skip pruning, batch refresh, Unity Catalog export) -- **[Compression & retention](./compression-and-retention.md)** — lifecycle policies and chunk drops +- **[Tiering & retention](./tiering-and-retention.md)** — lifecycle policies, the CDF durability gate, and chunk drops - **[Lakebase CDF internals](./lakebase-cdf-internals.md)** — shadow tables, triggers, CDC routing For a worked example following a single sensor reading from ingest through retention, see [Life of a sensor reading](../../examples/sensor-reading-journey.md). @@ -64,7 +64,7 @@ LakeTS is built from these Postgres primitives — no custom extensions: | Gap-filling | `generate_series()` + `LEFT JOIN` | | RollUps | Regular `TABLE` + incremental refresh + `UNION ALL` view | | RollUp optimization | Chunk-skip pruning, batch `ANY(array)`, Kahn's toposort, transition tables | -| Compression / tiering | `_policy_registry` + Databricks Jobs | +| Tiering | `_policy_registry` + Databricks Jobs + CDF durability gate | | Retention | `DROP TABLE` on expired partitions | | Lakebase CDF | Shadow table + trigger + `wal2delta` CDC | | Monitoring | SQL functions over `pg_stat_*` + metadata | diff --git a/website/docs/guides/how-it-works/rollups.md b/website/docs/guides/how-it-works/rollups.md index 1d7fdbf..e04f43a 100644 --- a/website/docs/guides/how-it-works/rollups.md +++ b/website/docs/guides/how-it-works/rollups.md @@ -237,7 +237,6 @@ The default now resolves the tier from the chunk's status: | Chunk status | Resolved tier | Meaning | |---|---|---| | `active` | hot | Data in Lakebase | -| `compressed` | hot | Data in Lakebase (compressed) | | `tiered` | cold | Data in Unity Catalog Managed Table | You can still pass `p_tier` explicitly when you need to override. diff --git a/website/docs/guides/how-it-works/tiering-and-retention.md b/website/docs/guides/how-it-works/tiering-and-retention.md new file mode 100644 index 0000000..50c346a --- /dev/null +++ b/website/docs/guides/how-it-works/tiering-and-retention.md @@ -0,0 +1,103 @@ +--- +title: Tiering & retention +sidebar_label: Tiering & retention +sidebar_position: 4 +description: How LakeTS evicts cold chunks from Lakebase once CDF has flushed them to Unity Catalog, and drops expired data. +--- + +# How Tiering Works + +## The concept + +Think of your data like files on a desk: +- **Hot desk** (Lakebase): Papers you're actively reading. Fast access. +- **Filing cabinet** (Unity Catalog Managed Table): Papers from last month. Slower but organized. +- **Shredder** (Retention): Papers older than a year. Gone. + +## How policies work + +A tiering policy marks chunks older than `p_after` for eviction. The data is already in the Unity Catalog Managed Table via Lakebase CDF; the policy's job is to free Lakebase storage by dropping the old partitions once the data is safely cold. + +When you add a tiering policy, LakeTS stores the rule in `_policy_registry`: + +```sql +SELECT lakets.add_tiering_policy('metrics', '7 days'); +-- Stores: {after: "7 days"} with policy_type = 'tiering' +``` + +Nothing happens immediately. The policy is a **declaration of intent**. The actual work is done by the Databricks Tiering Job that runs on a schedule: + +```mermaid +flowchart LR + subgraph JOB["Tiering Job (runs daily at 2 AM)"] + A["1. Query _policy_registry
for tiering policies"] --> B["2. Query _get_chunks_to_tier()
find chunks older than 7 days"] + B --> C["3. For each chunk:
call tier_chunk()"] + C --> D["4. Gate passes →
drop Lakebase partition,
mark tiered"] + end + + subgraph BEFORE["Before"] + P1["Mar 15 chunk (active)"] + P2["Mar 16 chunk (active)"] + P3["Mar 17 chunk (active)"] + end + + subgraph AFTER["After (after=7 days, today=Mar 25)"] + P4["Mar 15 chunk (tiered)"] + P5["Mar 16 chunk (tiered)"] + P6["Mar 17 chunk (tiered)"] + P7["Mar 18 chunk (active - only 7 days old)"] + end +``` + +The data in the Unity Catalog Managed Table is already there (via Lakebase CDF). The Tiering Job's main purpose is to **free Lakebase storage** by dropping old partitions once we know the data is safely in the cold tier. + +## CDF is a prerequisite + +Tiering can only evict a chunk once Lakebase CDF has durably copied that chunk's data into the Unity Catalog Managed Table. CDF must be enabled (on the `lakets_cdf` schema, via Databricks) **before** tiering can drop anything — LakeTS cannot enable CDF itself, and a table must be CDF-synced via `lakets.enable_sync('')`. + +`add_tiering_policy` still creates the policy if the table isn't synced yet (it emits a NOTICE), but nothing will be evicted until sync and CDF are live. See [Lakebase CDF Setup](../lakebase-cdf-setup.md) for how to turn CDF on. + +## The durability gate + +`tier_chunk` drops a chunk's partition only when **both** of these hold: + +1. The chunk's CDF **shadow table** (in schema `lakets_cdf`) is `STREAMING` in `wal2delta.tables`, **and** +2. CDF's `committed_lsn` for that shadow is **>= the chunk's own `last_write_lsn`** (the WAL position of the chunk's most recent write). + +In plain terms: this proves Lakebase CDF has durably flushed every write to that chunk into the Unity Catalog Managed Table **before** the partition is dropped. + +The comparison is against the **chunk's own recorded write position**, not the global WAL head. A per-table `committed_lsn` does not advance while that shadow is idle — but the global WAL head keeps moving from unrelated activity, so a head comparison would never pass for a cold (idle) chunk. Per-chunk write positions are stamped automatically by triggers that are installed when you add a tiering policy. + +The gate is **fail-closed**. If it isn't satisfiable — CDF isn't streaming, or hasn't yet flushed past the chunk — `tier_chunk` defers and the chunk is retried on the next job run. A missing, degraded, or lagging CDF can never read as "safe to drop". + +`tier_chunk` returns `TRUE` when it dropped the partition and `FALSE` when it deferred. + +--- + +# How Retention Works + +Retention is simpler than tiering — it just drops old chunks: + +```sql +SELECT lakets.add_retention_policy('metrics', '30 days'); +``` + +When `execute_retention` runs: + +1. Looks up the policy in `_policy_registry` +2. Calls `drop_chunks('metrics', '30 days')` which: + - Finds chunks where `range_end <= now() - 30 days` + - Runs `DROP TABLE` on each partition (instant, no row-by-row delete) + - Updates `_chunk_metadata` status to `dropped` + +**Tiered retention** adds a two-step lifecycle: + +``` +Day 0-7: HOT (Lakebase, fast queries) +Day 7-90: WARM (Unity Catalog Managed Table, evicted via tiering policy) +Day 90+: DELETED (retention policy drops from both tiers) +``` + +:::tip RollUps survive retention +You can drop all raw data older than 30 days while keeping your hourly/daily RollUp tables forever. +::: diff --git a/website/docs/how-to/index.md b/website/docs/how-to/index.md index 7434905..e66940d 100644 --- a/website/docs/how-to/index.md +++ b/website/docs/how-to/index.md @@ -15,7 +15,7 @@ Pick the task you need to do — every page is self-contained: - **[Last Value Cache](./last-value-cache.md)** — sub-10ms reads for status widgets and "current value" tiles. - **[Bulk ingest](./bulk-ingest.md)** — write batches from edge devices, protocol adapters, or other writers using the JSONB ingest function. - **[Alerts](./alerts.md)** — SQL-native threshold and deadman alerts that run inside Lakebase. -- **[Data lifecycle](./lifecycle.md)** — add compression, retention, and tiered retention policies so old data tiers to Unity Catalog and eventually drops. +- **[Data lifecycle](./lifecycle.md)** — add tiering, retention, and tiered retention policies so old data tiers to Unity Catalog and eventually drops. - **[Monitoring](./monitoring.md)** — query operational metrics, chunk health, and top queries from inside Lakebase. - **[Manage tag cardinality](./cardinality.md)** — track distinct tag values to prevent label explosion in multi-metric ChronoTables. - **[Sync to Unity Catalog](./export-to-uc.md)** — expose RollUp Tables to Spark, BI, and ML pipelines via Lakebase CDF. diff --git a/website/docs/how-to/lifecycle.md b/website/docs/how-to/lifecycle.md index f76b32e..c99c026 100644 --- a/website/docs/how-to/lifecycle.md +++ b/website/docs/how-to/lifecycle.md @@ -2,22 +2,26 @@ title: Configure data lifecycle sidebar_label: Data lifecycle sidebar_position: 3 -description: Add compression, retention, and tiered retention policies so old data tiers to Unity Catalog and eventually drops. +description: Add tiering, retention, and tiered retention policies so old data tiers to Unity Catalog and eventually drops. --- # Configure data lifecycle -LakeTS gives you three policy primitives for old data: **compression** (tier to a Unity Catalog Managed Table), **retention** (drop partitions), and **tiered retention** (compose the two). +LakeTS gives you three policy primitives for old data: **tiering** (evict to a Unity Catalog Managed Table), **retention** (drop partitions), and **tiered retention** (compose the two). -## Compression — tier old data out of Lakebase +## Tiering — evict cold data out of Lakebase -A compression policy marks chunks older than `p_after` for tiering. The data is already in the Unity Catalog Managed Table via Lakebase CDF; the policy's job is to free Lakebase storage by dropping the old partitions once the data is safely cold. +A tiering policy marks chunks older than `p_after` for eviction. The data is +already in the Unity Catalog Managed Table via Lakebase CDF; the policy's job is +to free Lakebase storage by dropping the old partitions once the data is safely +cold. ```sql -SELECT lakets.add_compression_policy('metrics', '7 days'); +SELECT lakets.add_tiering_policy('metrics', '7 days'); ``` -The actual chunk-drop work runs in the Databricks Compression & Tiering Job (default schedule: daily at 2 AM in the bundle). +The drop runs in the Databricks Tiering Job (daily 2 AM). A partition is only +dropped once CDF has confirmed its data is durable in Unity Catalog. ## Retention — drop old chunks entirely @@ -47,4 +51,4 @@ Day 90+: GONE — retention drops both tiers You can drop all raw data older than 30 days while keeping your hourly/daily RollUp Tables forever. Aggregates are tiny compared to raw data, and RollUps are stored separately in `_rollup_*` tables. ::: -See [How Compression & Retention Works](../guides/how-it-works/compression-and-retention.md) for the internals. +See [How Tiering & Retention Works](../guides/how-it-works/tiering-and-retention.md) for the internals, including the CDF durability gate. diff --git a/website/docs/how-to/monitoring.md b/website/docs/how-to/monitoring.md index 6a45027..24bc486 100644 --- a/website/docs/how-to/monitoring.md +++ b/website/docs/how-to/monitoring.md @@ -15,7 +15,7 @@ LakeTS ships three SQL views that give you operational visibility into ChronoTab SELECT * FROM lakets.lakets_metrics(); ``` -Returns chunk counts, row counts per ChronoTable, RollUp lag, compression backlog, and last refresh times. +Returns chunk counts, row counts per ChronoTable, RollUp lag, tiering backlog (`lakets_tiering_pending_chunks`, `lakets_tiering_tiered_chunks_total`, `lakets_tiering_caught_up`), and last refresh times. ## Chunk health @@ -23,7 +23,23 @@ Returns chunk counts, row counts per ChronoTable, RollUp lag, compression backlo SELECT * FROM lakets.chunk_health(); ``` -Per-ChronoTable: count of `active` / `compressed` / `tiered` / `dropped` chunks, oldest chunk age, newest chunk age. Use this to confirm compression and retention are running. +Per-ChronoTable: count of `active` / `tiered` / `dropped` chunks, oldest chunk age, newest chunk age. Use this to confirm tiering and retention are running. + +## Why isn't my data tiering? + +When chunks past the policy age aren't being evicted, ask `show_tiering_status`: + +```sql +SELECT * FROM lakets.show_tiering_status('metrics'); +``` + +Read the result top-down: + +- **`cdf_status`** — `NONE` means sync was never enabled (`lakets.enable_sync('metrics')` was never called); `SKIPPED` means the shadow table isn't streaming (often a missing `REPLICA IDENTITY FULL`); `STREAMING` means CDF is healthy. +- **`cdf_lag_bytes`** — how far CDF must still flush before the durability gate passes. While this is non-zero for pending chunks, the gate keeps deferring. +- **`caught_up`** — `TRUE` when the gate passes for every pending chunk. Once true, the next Tiering Job run will drop those partitions. + +Tiering is fail-closed: if CDF is `NONE`, `SKIPPED`, or still lagging, partitions are kept until the data is provably durable in Unity Catalog. See [Lakebase CDF Setup](../guides/lakebase-cdf-setup.md) to bring sync up. ## Top queries diff --git a/website/docs/reference/chronotables.md b/website/docs/reference/chronotables.md index 04f72f6..b980b39 100644 --- a/website/docs/reference/chronotables.md +++ b/website/docs/reference/chronotables.md @@ -96,7 +96,7 @@ Lists all partitions for a ChronoTable with metadata. | `chunk_name` | TEXT | Partition name (e.g., `sensor_data_20260401_000000`) | | `range_start` | TIMESTAMPTZ | Partition lower bound | | `range_end` | TIMESTAMPTZ | Partition upper bound | -| `status` | TEXT | `active`, `compressed`, `tiered`, or `dropped` | +| `status` | TEXT | `active`, `tiered`, or `dropped` | | `row_count` | BIGINT | Approximate row count | | `size_bytes` | BIGINT | Partition size on disk | | `created_at` | TIMESTAMPTZ | When the partition was created | diff --git a/website/docs/reference/index.md b/website/docs/reference/index.md index 2c19da4..8d23c79 100644 --- a/website/docs/reference/index.md +++ b/website/docs/reference/index.md @@ -24,7 +24,7 @@ Every LakeTS function, custom aggregate, trigger, and metadata table — grouped - **[Downsampling pipelines](./downsampling.md)** — multi-resolution pipelines executed by Databricks Jobs ### Lifecycle -- **[Lifecycle policies](./lifecycle.md)** — compression, tiering, retention +- **[Lifecycle policies](./lifecycle.md)** — tiering, retention - **[Lakebase CDF](./lakebase-cdf.md)** — shadow-table sync to Unity Catalog (ChronoTables and RollUps) ### Operations @@ -41,7 +41,7 @@ Every LakeTS function, custom aggregate, trigger, and metadata table — grouped | ChronoTables (incl. multi-metric) | 9 | — | — | | Time-series analytics | 7 | 2 | — | | RollUps (engine + optimization) | 26 | — | 4 | -| Lifecycle (compression + retention) | 11 | — | — | +| Lifecycle (tiering + retention) | 11 | — | — | | Lakebase CDF | 3 | — | 1 | | Last Value Cache | 5 | — | 1 | | Downsampling | 4 | — | — | diff --git a/website/docs/reference/lifecycle.md b/website/docs/reference/lifecycle.md index 0548d74..b50bdc2 100644 --- a/website/docs/reference/lifecycle.md +++ b/website/docs/reference/lifecycle.md @@ -2,57 +2,61 @@ title: Lifecycle policies sidebar_label: Lifecycle policies sidebar_position: 4 -description: Compression, tiering, and retention policy functions. +description: Tiering and retention policy functions. --- # Lifecycle policies Functions that govern how chunks age out of the hot tier (Lakebase) and eventually out of the cold tier (Unity Catalog Managed Table). The actual data movement is performed by Databricks Jobs on a schedule; these functions register the policy + provide manual overrides. -## Compression & tiering +## Tiering -Tiering policies move data from the hot tier to the cold tier based on age. The Databricks Compression & Tiering job (daily at 2 AM) drives the actual movement. +Tiering evicts data from the hot tier (Lakebase) once Lakebase CDF has durably flushed it to the cold tier (Unity Catalog Managed Table), based on age. The Databricks Tiering job (daily at 2 AM) drives the actual eviction. -### `add_compression_policy(p_table_name, p_compress_after, p_segment_by, p_order_by, p_schema_name)` +CDF must be enabled and the table CDF-synced via `lakets.enable_sync()` before anything is evicted — see [Lakebase CDF Setup](../guides/lakebase-cdf-setup.md). -Registers a tiering policy for a ChronoTable. +### `add_tiering_policy(p_table_name, p_after, p_schema_name)` + +Registers a tiering policy for a ChronoTable. Also installs the triggers that stamp each chunk's `last_write_lsn` (used by the durability gate). Creates the policy even if the table isn't CDF-synced yet (with a NOTICE), but nothing is evicted until sync and CDF are live. | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `p_table_name` | TEXT | — | ChronoTable name | -| `p_compress_after` | INTERVAL | — | Tier chunks older than this | -| `p_segment_by` | TEXT | `NULL` | Column for segment optimization in the cold tier | -| `p_order_by` | TEXT | `NULL` | Column for Z-order optimization in the cold tier | +| `p_after` | INTERVAL | — | Evict chunks older than this | | `p_schema_name` | TEXT | `'public'` | Schema | **Returns**: `INT` — policy_id ```sql --- Tier data older than 30 days, segment by device_id -SELECT lakets.add_compression_policy( - 'sensor_data', '30 days', - p_segment_by => 'device_id', - p_order_by => 'time' -); +-- Evict chunks older than 7 days +SELECT lakets.add_tiering_policy('metrics', '7 days'); ``` -### `compress_chunk(p_chunk_name)` / `decompress_chunk(p_chunk_name)` +### `tier_chunk(p_chunk_name)` + +Drops the chunk's Lakebase partition and marks it `tiered` — but **only if the durability gate passes** (the chunk's CDF shadow is `STREAMING` and CDF's `committed_lsn` for that shadow is `>=` the chunk's own `last_write_lsn`). The gate is fail-closed. + +**Returns**: `BOOLEAN` — `TRUE` if the partition was dropped, `FALSE` if deferred (retried on the next job run). + +### `untier_chunk(p_chunk_name)` + +Restores a `tiered` chunk's metadata to `active` — e.g. before re-ingesting it from the Unity Catalog Managed Table. -Manually mark a specific chunk for tiering to the cold tier or for re-ingestion from cold back to Lakebase. +**Returns**: `VOID` -### `show_compression_policy(p_table_name, p_schema_name)` +### `show_tiering_policy(p_table_name, p_schema_name)` -Returns the compression/tiering policy for a ChronoTable. +Returns the tiering policy for a ChronoTable. -**Returns**: TABLE — `policy_id`, `compress_after`, `segment_by`, `order_by`, `enabled`, `last_run_at` +**Returns**: TABLE — `policy_id` (INT), `after` (TEXT), `enabled` (BOOLEAN), `last_run_at` (TIMESTAMPTZ) -### `remove_compression_policy(p_table_name, p_schema_name)` +### `remove_tiering_policy(p_table_name, p_schema_name)` -Removes the compression/tiering policy. +Removes the tiering policy. -### `_get_chunks_to_compress(p_table_name, p_schema_name)` +### `_get_chunks_to_tier(p_table_name, p_schema_name)` -Internal. Returns chunks eligible for tiering (active chunks older than the `compress_after` threshold). Called by the Databricks Compression & Tiering workflow. +Internal. Returns chunks eligible for tiering (active chunks older than the `after` threshold). Called by the Databricks Tiering workflow, which then calls `tier_chunk()` per candidate. ## Retention diff --git a/website/docs/reference/metadata-tables.md b/website/docs/reference/metadata-tables.md index eaeaf80..de6bcd9 100644 --- a/website/docs/reference/metadata-tables.md +++ b/website/docs/reference/metadata-tables.md @@ -30,7 +30,7 @@ Upgrade guard: prevents downgrade or re-install of the same version. | `chunk_interval` | INTERVAL | Partition size (default `7 days`) | | `space_column` | TEXT | Optional secondary (hash) partition column | | `space_partitions` | INT | Number of space partitions (default `1`) | -| `compression_enabled` | BOOLEAN | Whether a compression policy is active | +| `tiering_enabled` | BOOLEAN | Whether a tiering policy is active | | `retention_interval` | INTERVAL | Retention window, if a policy is set | | `shadow_table_name` | TEXT | `lakets_cdf` shadow table name when sync is enabled | | `sync_enabled` | BOOLEAN | Whether Lakebase CDF sync is enabled | @@ -46,11 +46,11 @@ Upgrade guard: prevents downgrade or re-install of the same version. | `chunk_name` | TEXT | Partition name | | `range_start` | TIMESTAMPTZ | Lower bound | | `range_end` | TIMESTAMPTZ | Upper bound | -| `status` | TEXT | `active` / `compressed` / `tiered` / `dropped` | +| `status` | TEXT | `active` / `tiered` / `dropped` | | `row_count` | BIGINT | Estimated rows in the chunk | | `size_bytes` | BIGINT | Estimated chunk size on disk | -| `compressed_at` | TIMESTAMPTZ | When the chunk was compressed/tiered out | -| `tiered_at` | TIMESTAMPTZ | When the chunk was tiered to Delta | +| `tiered_at` | TIMESTAMPTZ | When the chunk was tiered (partition dropped) | +| `last_write_lsn` | PG_LSN | WAL position of the chunk's most recent write (used by the tiering durability gate) | | `last_modified_at` | TIMESTAMPTZ | Last write timestamp (powers chunk-skip pruning) | | `created_at` | TIMESTAMPTZ | Creation time | @@ -93,7 +93,7 @@ Upgrade guard: prevents downgrade or re-install of the same version. |--------|------|-------------| | `id` | SERIAL | Policy ID | | `chronotable_id` | INT | FK to `_chronotable_registry` | -| `policy_type` | TEXT | `compression` / `retention` / `tiered_retention` | +| `policy_type` | TEXT | `tiering` / `retention` / `tiered_retention` | | `config` | JSONB | Policy parameters | | `enabled` | BOOLEAN | Active flag | | `last_run_at` | TIMESTAMPTZ | Last execution time | diff --git a/website/docs/reference/monitoring.md b/website/docs/reference/monitoring.md index dced656..c5f7a54 100644 --- a/website/docs/reference/monitoring.md +++ b/website/docs/reference/monitoring.md @@ -27,11 +27,48 @@ SELECT * FROM lakets.lakets_metrics(); Chunk counts are emitted as a single `lakets_chunks_total` metric labelled by `status` (one row per status), and RollUp lag is emitted as two metrics: `lakets_rollup_watermark_lag_seconds` and `lakets_rollup_refresh_lag_seconds`. +### Tiering metrics + +`lakets_metrics()` emits three per-table tiering metrics (labelled by `table`): + +| Metric | Meaning | +|--------|---------| +| `lakets_tiering_pending_chunks{table}` | Chunks eligible for eviction that have not been dropped yet | +| `lakets_tiering_tiered_chunks_total{table}` | Total chunks dropped from Lakebase and now `tiered` | +| `lakets_tiering_caught_up{table}` | `1` when the durability gate currently passes for all pending chunks, else `0` | + ## `chunk_health()` Per-ChronoTable chunk-health breakdown. -**Returns**: TABLE — `hypertable`, `total_chunks`, `active_chunks`, `compressed_chunks`, `tiered_chunks`, `dropped_chunks`, `oldest_active`, `newest_active` +**Returns**: TABLE — `hypertable`, `total_chunks`, `active_chunks`, `tiered_chunks`, `dropped_chunks`, `oldest_active`, `newest_active` + +## `show_tiering_status(p_table_name, p_schema_name)` + +Per-ChronoTable view of tiering progress and CDF durability — the function to reach for when chunks aren't being evicted. Both parameters are optional; with no arguments it reports every table that has a tiering policy. + +**Returns**: TABLE + +| Column | Type | Description | +|--------|------|-------------| +| `schema_name` | TEXT | Schema of the ChronoTable | +| `table_name` | TEXT | ChronoTable name | +| `after` | TEXT | Policy threshold (e.g. `7 days`) | +| `active_chunks` | INT | Chunks still resident in Lakebase | +| `tiered_chunks` | INT | Chunks dropped from Lakebase (data in UC) | +| `pending_chunks` | INT | Chunks eligible for eviction but not yet dropped | +| `reclaimable_bytes` | BIGINT | Lakebase storage that pending chunks would free | +| `reclaimed_bytes` | BIGINT | Lakebase storage already freed by tiered chunks | +| `cdf_status` | TEXT | `STREAMING`, `SKIPPED`, or `NONE` | +| `cdf_lag_bytes` | BIGINT | How far CDF must still flush before the gate passes | +| `caught_up` | BOOLEAN | `TRUE` when the gate passes for all pending chunks | +| `last_run_at` | TIMESTAMPTZ | Last time the tiering job ran for this table | + +Read `cdf_status` as: + +- **`NONE`** — sync was never enabled for this table (`lakets.enable_sync()` not called). +- **`SKIPPED`** — the shadow exists but isn't streaming (e.g. missing `REPLICA IDENTITY FULL`). +- **`STREAMING`** — CDF is healthy; eviction can proceed once `caught_up` is `TRUE`. ## `query_stats(p_limit)` diff --git a/website/docs/reference/workflow-jobs.md b/website/docs/reference/workflow-jobs.md index d9c056d..2c5a515 100644 --- a/website/docs/reference/workflow-jobs.md +++ b/website/docs/reference/workflow-jobs.md @@ -12,7 +12,7 @@ These scheduled Databricks Jobs drive the operational lifecycle of LakeTS. The b | Job | Schedule | What it does | |-----|----------|--------------| | **Partition Manager** | Every 6 h | Calls `_ensure_partitions()` — pre-creates future partitions | -| **Compression & Tiering** | Daily 2 AM | `_get_chunks_to_compress()` → Spark JDBC read → write to UC Managed Table → `compress_chunk()` | +| **Tiering** | Daily 2 AM | `_get_chunks_to_tier()` → `tier_chunk()` per candidate — drops cold partitions whose data CDF has flushed to UC (pure Lakebase SQL, no Spark) | | **Retention** | Daily 3 AM | `execute_retention()` — drops expired chunks in Lakebase and the UC Managed Table | | **RollUp Refresh** | Every 15 min | `refresh_rollup()` — incremental hot-tier refresh | | **Cold RollUp Refresh** | On-demand (no fixed schedule) | `refresh_rollup()` with cold-tier dirty buckets, run after cold-tier ETL corrections | diff --git a/website/sidebars.ts b/website/sidebars.ts index b040f90..31e11cb 100644 --- a/website/sidebars.ts +++ b/website/sidebars.ts @@ -43,7 +43,7 @@ const sidebars: SidebarsConfig = { "guides/how-it-works/chronotables", "guides/how-it-works/time-series-functions", "guides/how-it-works/rollups", - "guides/how-it-works/compression-and-retention", + "guides/how-it-works/tiering-and-retention", "guides/how-it-works/lakebase-cdf-internals", ], }, diff --git a/website/src/css/custom.css b/website/src/css/custom.css index 7037367..8288e49 100644 --- a/website/src/css/custom.css +++ b/website/src/css/custom.css @@ -310,7 +310,7 @@ code { .menu__link[href*="/guides/how-it-works/chronotables"]::before { content: "layers"; } .menu__link[href*="/guides/how-it-works/time-series-functions"]::before { content: "function"; } .menu__link[href*="/guides/how-it-works/rollups"]::before { content: "account_tree"; } -.menu__link[href*="/guides/how-it-works/compression-and-retention"]::before { content: "archive"; } +.menu__link[href*="/guides/how-it-works/tiering-and-retention"]::before { content: "archive"; } .menu__link[href*="/guides/how-it-works/lakebase-cdf-internals"]::before { content: "hub"; } .menu__link[href$="/how-to/"]::before, .menu__link[href$="/how-to"]::before { content: "explore"; }