diff --git a/CHANGELOG.md b/CHANGELOG.md
index 65bda48..7413c29 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,8 +10,13 @@ Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), version
### Added
+- **Per-chunk tiering durability gate.** A partition is dropped only when its CDF shadow is `STREAMING` in `wal2delta.tables` **and** `committed_lsn >= chunk.last_write_lsn` — proving Lakebase CDF has flushed every write to that chunk into the Unity Catalog Managed Table before the data leaves Lakebase. The gate compares against the chunk's own recorded write position (stamped by statement-level triggers), not the global WAL head, because a per-table `committed_lsn` does not advance while the shadow is idle. Tiering is fail-closed: it defers and retries whenever the gate is not satisfiable.
+- **`show_tiering_status()`** and the `lakets_tiering_pending_chunks` / `lakets_tiering_tiered_chunks_total` / `lakets_tiering_caught_up` Prometheus metrics for tiering observability (including CDF gate state).
+- `enable_sync` now warns when Lakebase CDF (`wal2delta`) is absent, since CDF is a prerequisite enabled on the `lakets_cdf` schema via Databricks, not by LakeTS.
+
### Changed
+- **Compression is now Tiering.** `add_compression_policy` → `add_tiering_policy` (drops the `segment_by`/`order_by` parameters); `compress_chunk`/`decompress_chunk` → `tier_chunk`/`untier_chunk` (`tier_chunk` returns `BOOLEAN` — `TRUE` dropped, `FALSE` deferred); `show_`/`remove_compression_policy` → `show_`/`remove_tiering_policy`. The `compression` policy type is now `tiering`; the `compressed` chunk status is removed (`active` → `tiered`). `_chronotable_registry.compression_enabled` is renamed `tiering_enabled`, and `_chunk_metadata` gains `last_write_lsn` (and drops the unused `compressed_at`). **Breaking — fresh release, no upgrade migration.**
- RollUps are now always incremental. Removed the `refresh_mode` toggle and the `'full'`
(`TRUNCATE`) refresh strategy; `create_rollup` no longer accepts a `p_refresh_mode` argument
and `show_rollups()` no longer returns a `refresh_mode` column.
@@ -21,6 +26,7 @@ Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), version
### Removed
+- The Spark-based compression job and its `{table}_archive` Delta export path. Tiering is pure Lakebase SQL (`tiering_job.py`); CDF owns the Unity Catalog copy.
- `enable_rollup_export` / `disable_rollup_export` / `show_rollup_exports`, `sql/15_uc_integration.sql` and its functions (`register_uc_table`, `tag_uc_table`, `get_uc_registrations`, `unregister_uc_table`, `_uc_registry`), and the `rollup_export.py` / `uc_registration.py` Databricks jobs.
### Fixed
diff --git a/README.md b/README.md
index 995f2d3..ddd3545 100644
--- a/README.md
+++ b/README.md
@@ -17,7 +17,7 @@ LakeTS is a time series toolkit for Databricks Lakebase — pure SQL (PL/pgSQL)
| **Time Series Functions** | `time_bucket`, `first`, `last`, `locf`, `interpolate`, `delta`, `rate`, `histogram` |
| **Gap-filling** | `time_bucket_gapfill` + LEFT JOIN for continuous time series |
| **RollUp Engine** | Incremental aggregates with per-bucket refresh, invalidation tracking, cold-tier re-aggregation, chunk-skip pruning, batch refresh, DAG orchestration, and Delta export |
-| **Compression & Tiering** | Policy-based tiering from Lakebase to Delta Lake |
+| **Tiering** | Policy-based eviction of cold partitions from Lakebase once CDF has flushed them to the Unity Catalog Managed Table |
| **Retention** | Automated data lifecycle management across both tiers |
| **Lakehouse Sync** | CDC-based replication to Delta via shadow table pattern |
| **Last Value Cache** | Sub-10ms latest-state queries via `enable_lvc()` |
@@ -112,7 +112,7 @@ FROM metrics GROUP BY 1 ORDER BY 1;
SELECT lakets.enable_lvc('system_metrics', ARRAY['host'], ARRAY['cpu','memory']);
-- Set up lifecycle policies
-SELECT lakets.add_compression_policy('metrics', '7 days');
+SELECT lakets.add_tiering_policy('metrics', '7 days');
SELECT lakets.add_retention_policy('metrics', '30 days');
```
@@ -182,7 +182,7 @@ LakeTS ships a pre-built **Databricks AI/BI dashboard** for monitoring your inst
| Page | Panels |
|------|--------|
-| **Partition Health** | Hypertable count, chunk counts by status (active/compressed/tiered/dropped), chunk health table per hypertable, estimated row counts |
+| **Partition Health** | Hypertable count, chunk counts by status (active/tiered/dropped), chunk health table per hypertable, estimated row counts |
| **RollUp Monitoring** | Stale RollUp counter, dirty bucket total, watermark lag bar chart per RollUp (colored by refresh mode), invalidation log depth, full RollUp status table |
| **LVC & System** | LVC-enabled table count, total cached series, database size (GB), LVC stats table, active policies by type |
diff --git a/build.sh b/build.sh
index c52592c..ca0e187 100755
--- a/build.sh
+++ b/build.sh
@@ -31,7 +31,7 @@ MODULES=(
"02_chronotable.sql"
"03_timeseries_functions.sql"
"04_rollup.sql"
- "05_compression.sql"
+ "05_tiering.sql"
"06_retention.sql"
"07_monitoring.sql"
"08_metric_table.sql"
diff --git a/databricks/bundles/databricks.yml b/databricks/bundles/databricks.yml
index 6b3e15b..66b1d4f 100644
--- a/databricks/bundles/databricks.yml
+++ b/databricks/bundles/databricks.yml
@@ -28,20 +28,19 @@ resources:
- ${var.lakebase_instance}
existing_cluster_id: ${var.cluster_id}
- lakets_compression:
- name: "LakeTS - Compression & Tiering"
- description: "Tiers old chunks from Lakebase to Delta Lake"
+ lakets_tiering:
+ name: "LakeTS - Tiering"
+ description: "Drops cold ChronoTable partitions once CDF has flushed them to Unity Catalog"
schedule:
quartz_cron_expression: "0 0 2 * * ?" # Daily at 2 AM UTC
timezone_id: "UTC"
tasks:
- - task_key: compress_and_tier
+ - task_key: tier_cold_partitions
python_wheel_task:
package_name: lakets
- entry_point: compression_job
+ entry_point: tiering_job
parameters:
- ${var.lakebase_instance}
- - ${var.delta_catalog}
existing_cluster_id: ${var.cluster_id}
lakets_retention:
diff --git a/databricks/workflows/compression_job.py b/databricks/workflows/compression_job.py
deleted file mode 100644
index b402f86..0000000
--- a/databricks/workflows/compression_job.py
+++ /dev/null
@@ -1,102 +0,0 @@
-"""
-LakeTS Compression & Tiering Job
-Databricks Workflow job that tiers old Lakebase chunks to Delta Lake.
-
-For each hypertable with a compression policy:
-1. Find chunks older than compress_after
-2. Verify data is in Delta (via Lakehouse Sync CDC)
-3. Mark chunk as compressed/tiered in metadata
-4. Optionally drop the Lakebase partition
-
-Schedule: Daily or per compression policy interval.
-
-Usage as Databricks Job:
- Pass instance_name and catalog as parameters.
-"""
-import logging
-
-from psycopg2 import sql
-from pyspark.sql import SparkSession
-
-from lakebase_utils import fetch_all, lakebase_cursor
-
-logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
-logger = logging.getLogger("lakets.compression_job")
-
-
-def run(instance_name: str, catalog: str, schema: str = "default", drop_partitions: bool = False):
- """Execute compression/tiering for all hypertables with policies."""
- spark = SparkSession.builder.getOrCreate()
-
- with lakebase_cursor(instance_name) as cur:
- # Find all hypertables with compression policies
- hypertables = fetch_all(cur, """
- SELECT hr.id, hr.schema_name, hr.table_name, hr.shadow_table_name,
- pr.config->>'compress_after' as compress_after,
- pr.config->>'segment_by' as segment_by,
- pr.config->>'order_by' as order_by
- FROM lakets._chronotable_registry hr
- JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id
- WHERE pr.policy_type = 'compression' AND pr.enabled = TRUE
- """)
- logger.info("Found %d hypertable(s) with compression policies", len(hypertables))
-
- total_compressed = 0
- for ht in hypertables:
- # Get eligible chunks
- chunks = fetch_all(cur, """
- SELECT * FROM lakets._get_chunks_to_compress(%s, %s)
- """, (ht["table_name"], ht["schema_name"]))
-
- if not chunks:
- logger.info("No chunks to compress for %s.%s", ht["schema_name"], ht["table_name"])
- continue
-
- logger.info(
- "Processing %d chunk(s) for %s.%s",
- len(chunks), ht["schema_name"], ht["table_name"],
- )
-
- for chunk in chunks:
- delta_table = f"{catalog}.{schema}.{ht['table_name']}_archive"
-
- chunk_parts = chunk["chunk_name"].split(".")
- logger.info("Tiering chunk %s -> %s", chunk["chunk_name"], delta_table)
-
- # Mark as compressed in metadata
- cur.execute("SELECT lakets.compress_chunk(%s)", (chunk["chunk_name"],))
-
- if drop_partitions:
- # Drop the Lakebase partition to free storage
- drop_query = sql.SQL("DROP TABLE IF EXISTS {}.{}").format(
- sql.Identifier(chunk_parts[0]),
- sql.Identifier(chunk_parts[1]),
- )
- cur.execute(drop_query)
- cur.execute("""
- UPDATE lakets._chunk_metadata
- SET status = 'tiered', tiered_at = now()
- WHERE chunk_name = %s
- """, (chunk["chunk_name"],))
- logger.info("Dropped partition %s", chunk["chunk_name"])
-
- total_compressed += 1
-
- # Update policy last_run
- cur.execute("""
- UPDATE lakets._policy_registry
- SET last_run_at = now()
- WHERE chronotable_id = %s AND policy_type = 'compression'
- """, (ht["id"],))
-
- logger.info("Total chunks compressed: %d", total_compressed)
- return total_compressed
-
-
-if __name__ == "__main__":
- import os
- import sys
-
- instance = sys.argv[1] if len(sys.argv) > 1 else os.environ["LAKETS_INSTANCE"]
- cat = sys.argv[2] if len(sys.argv) > 2 else "main"
- run(instance, cat)
diff --git a/databricks/workflows/tiering_job.py b/databricks/workflows/tiering_job.py
new file mode 100644
index 0000000..e05740e
--- /dev/null
+++ b/databricks/workflows/tiering_job.py
@@ -0,0 +1,81 @@
+"""
+LakeTS Tiering Job
+
+Drops cold ChronoTable partitions to reclaim Lakebase storage. The data is
+already durable in the Unity Catalog Managed Table via Lakebase CDF; this job
+only evicts partitions that lakets.tier_chunk confirms are safe to drop (the
+table's CDF shadow is STREAMING and has flushed past the WAL head). The drop
+and the metadata transition happen inside tier_chunk, so this job is pure
+Lakebase SQL — no Spark.
+
+Schedule: Daily or per tiering policy interval.
+
+Usage as Databricks Job:
+ Pass instance_name as a parameter.
+"""
+import logging
+
+from lakebase_utils import fetch_all, lakebase_cursor
+
+logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
+logger = logging.getLogger("lakets.tiering_job")
+
+
+def run(instance_name: str) -> int:
+ """Evict eligible, CDF-durable chunks for all tables with a tiering policy.
+
+ Returns the number of partitions actually dropped this run.
+ """
+ total_tiered = 0
+ deferred = 0
+ with lakebase_cursor(instance_name) as cur:
+ tables = fetch_all(cur, """
+ SELECT hr.id, hr.schema_name, hr.table_name
+ FROM lakets._chronotable_registry hr
+ JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id
+ WHERE pr.policy_type = 'tiering' AND pr.enabled = TRUE
+ """)
+ logger.info("Found %d table(s) with tiering policies", len(tables))
+
+ for t in tables:
+ chunks = fetch_all(cur, """
+ SELECT * FROM lakets._get_chunks_to_tier(%s, %s)
+ """, (t["table_name"], t["schema_name"]))
+
+ if not chunks:
+ logger.info(
+ "No eligible chunks for %s.%s (not aged out, or CDF not streaming)",
+ t["schema_name"], t["table_name"],
+ )
+ continue
+
+ logger.info(
+ "Evaluating %d chunk(s) for %s.%s",
+ len(chunks), t["schema_name"], t["table_name"],
+ )
+
+ for chunk in chunks:
+ cur.execute("SELECT lakets.tier_chunk(%s)", (chunk["chunk_name"],))
+ dropped = cur.fetchone()[0] # raw cursor returns tuples (see fetch_all)
+ if dropped:
+ total_tiered += 1
+ logger.info("Tiered (dropped) partition %s", chunk["chunk_name"])
+ else:
+ deferred += 1
+ logger.info("Deferred %s — CDF not caught up to WAL head", chunk["chunk_name"])
+
+ cur.execute("""
+ UPDATE lakets._policy_registry SET last_run_at = now()
+ WHERE chronotable_id = %s AND policy_type = 'tiering'
+ """, (t["id"],))
+
+ logger.info("Tiering complete: %d dropped, %d deferred", total_tiered, deferred)
+ return total_tiered
+
+
+if __name__ == "__main__":
+ import os
+ import sys
+
+ instance = sys.argv[1] if len(sys.argv) > 1 else os.environ["LAKETS_INSTANCE"]
+ run(instance)
diff --git a/sql/01_schema.sql b/sql/01_schema.sql
index 0561cc2..3890563 100644
--- a/sql/01_schema.sql
+++ b/sql/01_schema.sql
@@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS lakets._chronotable_registry (
chunk_interval INTERVAL NOT NULL DEFAULT '7 days',
space_column TEXT,
space_partitions INT DEFAULT 1,
- compression_enabled BOOLEAN DEFAULT FALSE,
+ tiering_enabled BOOLEAN DEFAULT FALSE,
retention_interval INTERVAL,
shadow_table_name TEXT,
sync_enabled BOOLEAN DEFAULT FALSE,
@@ -48,10 +48,13 @@ CREATE TABLE IF NOT EXISTS lakets._chunk_metadata (
status TEXT NOT NULL DEFAULT 'active',
row_count BIGINT,
size_bytes BIGINT,
- compressed_at TIMESTAMPTZ,
tiered_at TIMESTAMPTZ,
+ -- Highest WAL position written to this chunk, stamped by the tiering
+ -- write-tracking trigger. The tiering durability gate drops a chunk only
+ -- once CDF's committed_lsn for the shadow has flushed past this mark.
+ last_write_lsn PG_LSN,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
- CONSTRAINT valid_status CHECK (status IN ('active', 'compressed', 'tiered', 'dropped'))
+ CONSTRAINT valid_status CHECK (status IN ('active', 'tiered', 'dropped'))
);
CREATE INDEX IF NOT EXISTS idx_chunk_metadata_hypertable
@@ -61,7 +64,7 @@ CREATE INDEX IF NOT EXISTS idx_chunk_metadata_range
ON lakets._chunk_metadata(range_start, range_end);
-- ---------------------------------------------------------------------------
--- Policy Registry: tracks compression, retention, and tiering policies
+-- Policy Registry: tracks tiering, retention, and tiered-retention policies
-- ---------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS lakets._policy_registry (
id SERIAL PRIMARY KEY,
@@ -71,7 +74,7 @@ CREATE TABLE IF NOT EXISTS lakets._policy_registry (
enabled BOOLEAN DEFAULT TRUE,
last_run_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
- CONSTRAINT valid_policy_type CHECK (policy_type IN ('compression', 'retention', 'tiered_retention'))
+ CONSTRAINT valid_policy_type CHECK (policy_type IN ('tiering', 'retention', 'tiered_retention'))
);
-- ---------------------------------------------------------------------------
@@ -153,7 +156,7 @@ END $$;
CREATE UNIQUE INDEX IF NOT EXISTS idx_chunk_metadata_chunk_name
ON lakets._chunk_metadata(chunk_name);
--- FK index on _policy_registry (scanned by compression/retention jobs)
+-- FK index on _policy_registry (scanned by tiering/retention jobs)
CREATE INDEX IF NOT EXISTS idx_policy_registry_ct_type
ON lakets._policy_registry(chronotable_id, policy_type) WHERE enabled = TRUE;
diff --git a/sql/05_compression.sql b/sql/05_compression.sql
deleted file mode 100644
index 6da3da0..0000000
--- a/sql/05_compression.sql
+++ /dev/null
@@ -1,229 +0,0 @@
--- =============================================================================
--- LakeTS Compression & Tiering Policies
--- Register policies for automatic data tiering from Lakebase to Delta Lake.
--- Actual tiering is executed by Databricks workflow jobs.
--- =============================================================================
-
--- ---------------------------------------------------------------------------
--- add_compression_policy: Registers a compression/tiering policy.
--- After compress_after interval, the Databricks compression job will:
--- 1. Ensure data is synced to Delta via Lakehouse Sync
--- 2. Optimize the Delta table (Z-ORDER / Liquid Clustering)
--- 3. Drop the Lakebase partition
--- 4. Update chunk metadata status to 'tiered'
--- ---------------------------------------------------------------------------
-CREATE OR REPLACE FUNCTION lakets.add_compression_policy(
- p_table_name TEXT,
- p_compress_after INTERVAL,
- p_segment_by TEXT DEFAULT NULL,
- p_order_by TEXT DEFAULT NULL,
- p_schema_name TEXT DEFAULT 'public'
-)
-RETURNS INT
-LANGUAGE plpgsql
-AS $$
-DECLARE
- v_chronotable_id INT;
- v_policy_id INT;
- v_config JSONB;
-BEGIN
- SELECT id INTO v_chronotable_id
- FROM lakets._chronotable_registry
- WHERE schema_name = p_schema_name AND table_name = p_table_name;
-
- IF NOT FOUND THEN
- RAISE EXCEPTION 'Table %.% is not a registered ChronoTable',
- p_schema_name, p_table_name;
- END IF;
-
- -- Check for existing compression policy
- IF EXISTS (
- SELECT 1 FROM lakets._policy_registry
- WHERE chronotable_id = v_chronotable_id AND policy_type = 'compression'
- ) THEN
- RAISE EXCEPTION 'Compression policy already exists for %.%',
- p_schema_name, p_table_name;
- END IF;
-
- v_config := jsonb_build_object(
- 'compress_after', p_compress_after::TEXT,
- 'segment_by', p_segment_by,
- 'order_by', COALESCE(p_order_by, (
- SELECT time_column FROM lakets._chronotable_registry WHERE id = v_chronotable_id
- ) || ' DESC')
- );
-
- INSERT INTO lakets._policy_registry
- (chronotable_id, policy_type, config, enabled)
- VALUES (v_chronotable_id, 'compression', v_config, TRUE)
- RETURNING id INTO v_policy_id;
-
- -- Mark the hypertable as compression-enabled
- UPDATE lakets._chronotable_registry
- SET compression_enabled = TRUE
- WHERE id = v_chronotable_id;
-
- RETURN v_policy_id;
-END;
-$$;
-
--- ---------------------------------------------------------------------------
--- compress_chunk: Marks a chunk for compression/tiering.
--- The actual tiering to Delta is done by the Databricks workflow job.
--- ---------------------------------------------------------------------------
-CREATE OR REPLACE FUNCTION lakets.compress_chunk(
- p_chunk_name TEXT
-)
-RETURNS VOID
-LANGUAGE plpgsql
-AS $$
-BEGIN
- UPDATE lakets._chunk_metadata
- SET status = 'compressed',
- compressed_at = now()
- WHERE chunk_name = p_chunk_name AND status = 'active';
-
- IF NOT FOUND THEN
- RAISE EXCEPTION 'Chunk % not found or not active', p_chunk_name;
- END IF;
-END;
-$$;
-
--- ---------------------------------------------------------------------------
--- decompress_chunk: Marks a tiered chunk for re-ingestion.
--- The actual data restoration from Delta is done by the Databricks workflow.
--- ---------------------------------------------------------------------------
-CREATE OR REPLACE FUNCTION lakets.decompress_chunk(
- p_chunk_name TEXT
-)
-RETURNS VOID
-LANGUAGE plpgsql
-AS $$
-BEGIN
- UPDATE lakets._chunk_metadata
- SET status = 'active',
- compressed_at = NULL,
- tiered_at = NULL
- WHERE chunk_name = p_chunk_name AND status IN ('compressed', 'tiered');
-
- IF NOT FOUND THEN
- RAISE EXCEPTION 'Chunk % not found or not compressed/tiered', p_chunk_name;
- END IF;
-END;
-$$;
-
--- ---------------------------------------------------------------------------
--- show_compression_policy: Returns the compression policy for a hypertable.
--- ---------------------------------------------------------------------------
-CREATE OR REPLACE FUNCTION lakets.show_compression_policy(
- p_table_name TEXT,
- p_schema_name TEXT DEFAULT 'public'
-)
-RETURNS TABLE (
- policy_id INT,
- compress_after TEXT,
- segment_by TEXT,
- order_by TEXT,
- enabled BOOLEAN,
- last_run_at TIMESTAMPTZ
-)
-LANGUAGE plpgsql
-AS $$
-DECLARE
- v_chronotable_id INT;
-BEGIN
- SELECT id INTO v_chronotable_id
- FROM lakets._chronotable_registry
- WHERE schema_name = p_schema_name AND table_name = p_table_name;
-
- IF NOT FOUND THEN
- RAISE EXCEPTION 'Table %.% is not a registered ChronoTable',
- p_schema_name, p_table_name;
- END IF;
-
- RETURN QUERY
- SELECT
- pr.id,
- pr.config->>'compress_after',
- pr.config->>'segment_by',
- pr.config->>'order_by',
- pr.enabled,
- pr.last_run_at
- FROM lakets._policy_registry pr
- WHERE pr.chronotable_id = v_chronotable_id AND pr.policy_type = 'compression';
-END;
-$$;
-
--- ---------------------------------------------------------------------------
--- remove_compression_policy: Removes the compression policy.
--- ---------------------------------------------------------------------------
-CREATE OR REPLACE FUNCTION lakets.remove_compression_policy(
- p_table_name TEXT,
- p_schema_name TEXT DEFAULT 'public'
-)
-RETURNS VOID
-LANGUAGE plpgsql
-AS $$
-DECLARE
- v_chronotable_id INT;
-BEGIN
- SELECT id INTO v_chronotable_id
- FROM lakets._chronotable_registry
- WHERE schema_name = p_schema_name AND table_name = p_table_name;
-
- IF NOT FOUND THEN
- RAISE EXCEPTION 'Table %.% is not a registered ChronoTable',
- p_schema_name, p_table_name;
- END IF;
-
- DELETE FROM lakets._policy_registry
- WHERE chronotable_id = v_chronotable_id AND policy_type = 'compression';
-
- UPDATE lakets._chronotable_registry
- SET compression_enabled = FALSE
- WHERE id = v_chronotable_id;
-END;
-$$;
-
--- ---------------------------------------------------------------------------
--- _get_chunks_to_compress: Returns chunks eligible for compression.
--- Used by the Databricks compression job to find work.
--- ---------------------------------------------------------------------------
-CREATE OR REPLACE FUNCTION lakets._get_chunks_to_compress(
- p_table_name TEXT,
- p_schema_name TEXT DEFAULT 'public'
-)
-RETURNS TABLE (
- chunk_id INT,
- chunk_name TEXT,
- range_start TIMESTAMPTZ,
- range_end TIMESTAMPTZ
-)
-LANGUAGE plpgsql
-AS $$
-DECLARE
- v_chronotable_id INT;
- v_compress_after INTERVAL;
-BEGIN
- SELECT hr.id, (pr.config->>'compress_after')::INTERVAL
- INTO v_chronotable_id, v_compress_after
- FROM lakets._chronotable_registry hr
- JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id
- WHERE hr.schema_name = p_schema_name
- AND hr.table_name = p_table_name
- AND pr.policy_type = 'compression'
- AND pr.enabled = TRUE;
-
- IF NOT FOUND THEN
- RETURN;
- END IF;
-
- RETURN QUERY
- SELECT cm.id, cm.chunk_name, cm.range_start, cm.range_end
- FROM lakets._chunk_metadata cm
- WHERE cm.chronotable_id = v_chronotable_id
- AND cm.status = 'active'
- AND cm.range_end <= (now() - v_compress_after)
- ORDER BY cm.range_start;
-END;
-$$;
diff --git a/sql/05_tiering.sql b/sql/05_tiering.sql
new file mode 100644
index 0000000..41b9494
--- /dev/null
+++ b/sql/05_tiering.sql
@@ -0,0 +1,487 @@
+-- =============================================================================
+-- LakeTS Tiering Policies
+-- A tiering policy drops cold ChronoTable partitions to reclaim Lakebase
+-- storage. The data is already durable in the Unity Catalog Managed Table via
+-- Lakebase CDF; tiering only evicts partitions once CDF has flushed past them.
+-- Executed by the Databricks tiering job.
+-- =============================================================================
+
+-- ---------------------------------------------------------------------------
+-- _cdf_committed_lsn: returns the CDF-flushed LSN for a shadow table, but
+-- ONLY if it is actively STREAMING in wal2delta.tables. Returns NULL (fail
+-- closed) if the wal2delta subsystem is absent, the shadow does not exist,
+-- or the table is not STREAMING (e.g. SKIPPED for missing REPLICA IDENTITY).
+-- ---------------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION lakets._cdf_committed_lsn(p_shadow_name TEXT)
+RETURNS PG_LSN
+LANGUAGE plpgsql
+AS $$
+DECLARE
+ v_oid OID;
+ v_lsn PG_LSN;
+BEGIN
+ IF to_regclass('wal2delta.tables') IS NULL THEN
+ RETURN NULL;
+ END IF;
+ v_oid := to_regclass('lakets_cdf.' || quote_ident(p_shadow_name))::oid;
+ IF v_oid IS NULL THEN
+ RETURN NULL;
+ END IF;
+ SELECT committed_lsn INTO v_lsn
+ FROM wal2delta.tables
+ WHERE table_oid = v_oid AND status = 'STREAMING';
+ RETURN v_lsn; -- NULL if no STREAMING row
+END;
+$$;
+
+-- ---------------------------------------------------------------------------
+-- _stamp_tiered_chunk_lsn: statement-level trigger that records the current
+-- WAL position on every chunk that just received writes. Installed on the
+-- ChronoTable PARENT (with a transition table) so it covers all current and
+-- future partitions in one trigger and stamps ONLY the chunks actually
+-- touched -- a write to today's hot chunk never advances a cold chunk's
+-- watermark, which is what lets cold chunks become tier-eligible.
+--
+-- Why parent + transition table: a statement-level trigger on a partition does
+-- not fire for inserts routed through the parent, and chunk_name is stored
+-- schema-qualified, so the row->chunk mapping is recomputed here via date_bin
+-- (same origin '2000-01-01' as _ensure_partitions).
+-- ---------------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION lakets._stamp_tiered_chunk_lsn()
+RETURNS TRIGGER
+LANGUAGE plpgsql
+AS $$
+DECLARE
+ v_parent TEXT;
+ v_ct_id INT;
+ v_time_col TEXT;
+ v_interval INTERVAL;
+BEGIN
+ v_parent := lakets._resolve_partition_parent(TG_TABLE_SCHEMA, TG_TABLE_NAME);
+
+ SELECT cr.id, cr.time_column, cr.chunk_interval
+ INTO v_ct_id, v_time_col, v_interval
+ FROM lakets._chronotable_registry cr
+ WHERE cr.schema_name = TG_TABLE_SCHEMA
+ AND cr.table_name = COALESCE(v_parent, TG_TABLE_NAME);
+
+ IF v_ct_id IS NULL THEN
+ RETURN NULL;
+ END IF;
+
+ -- pg_current_wal_lsn() here is a lower bound on the writing txn's commit
+ -- LSN. That imprecision is immaterial: tier_chunk only ever drops chunks
+ -- whose time window ended at least p_after ago (days), by which point
+ -- committed_lsn has long flushed past this mark.
+ IF TG_OP = 'DELETE' THEN
+ EXECUTE format(
+ 'UPDATE lakets._chunk_metadata cm
+ SET last_write_lsn = pg_current_wal_lsn()
+ WHERE cm.chronotable_id = $1
+ AND cm.range_start IN (SELECT DISTINCT date_bin($2, %I, $3) FROM _old_rows)',
+ v_time_col)
+ USING v_ct_id, v_interval, '2000-01-01'::timestamptz;
+ ELSE
+ EXECUTE format(
+ 'UPDATE lakets._chunk_metadata cm
+ SET last_write_lsn = pg_current_wal_lsn()
+ WHERE cm.chronotable_id = $1
+ AND cm.range_start IN (SELECT DISTINCT date_bin($2, %I, $3) FROM _new_rows)',
+ v_time_col)
+ USING v_ct_id, v_interval, '2000-01-01'::timestamptz;
+ END IF;
+
+ RETURN NULL;
+END;
+$$;
+
+-- ---------------------------------------------------------------------------
+-- _install_tiering_write_tracking / _remove_tiering_write_tracking: attach or
+-- detach the per-chunk write-LSN triggers on a ChronoTable parent. Three
+-- triggers: PostgreSQL allows a transition table for only ONE event each, so
+-- INSERT and UPDATE get separate NEW TABLE triggers and DELETE gets an OLD
+-- TABLE trigger. All three call the same trigger function.
+-- ---------------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION lakets._install_tiering_write_tracking(
+ p_schema_name TEXT, p_table_name TEXT
+)
+RETURNS VOID
+LANGUAGE plpgsql
+AS $$
+BEGIN
+ EXECUTE format(
+ 'CREATE OR REPLACE TRIGGER trg_lakets_tier_lsn_insert
+ AFTER INSERT ON %I.%I
+ REFERENCING NEW TABLE AS _new_rows
+ FOR EACH STATEMENT EXECUTE FUNCTION lakets._stamp_tiered_chunk_lsn()',
+ p_schema_name, p_table_name);
+ EXECUTE format(
+ 'CREATE OR REPLACE TRIGGER trg_lakets_tier_lsn_update
+ AFTER UPDATE ON %I.%I
+ REFERENCING NEW TABLE AS _new_rows
+ FOR EACH STATEMENT EXECUTE FUNCTION lakets._stamp_tiered_chunk_lsn()',
+ p_schema_name, p_table_name);
+ EXECUTE format(
+ 'CREATE OR REPLACE TRIGGER trg_lakets_tier_lsn_delete
+ AFTER DELETE ON %I.%I
+ REFERENCING OLD TABLE AS _old_rows
+ FOR EACH STATEMENT EXECUTE FUNCTION lakets._stamp_tiered_chunk_lsn()',
+ p_schema_name, p_table_name);
+END;
+$$;
+
+CREATE OR REPLACE FUNCTION lakets._remove_tiering_write_tracking(
+ p_schema_name TEXT, p_table_name TEXT
+)
+RETURNS VOID
+LANGUAGE plpgsql
+AS $$
+BEGIN
+ EXECUTE format('DROP TRIGGER IF EXISTS trg_lakets_tier_lsn_insert ON %I.%I',
+ p_schema_name, p_table_name);
+ EXECUTE format('DROP TRIGGER IF EXISTS trg_lakets_tier_lsn_update ON %I.%I',
+ p_schema_name, p_table_name);
+ EXECUTE format('DROP TRIGGER IF EXISTS trg_lakets_tier_lsn_delete ON %I.%I',
+ p_schema_name, p_table_name);
+END;
+$$;
+
+-- ---------------------------------------------------------------------------
+-- add_tiering_policy: register a tiering policy. Chunks older than p_after are
+-- eligible for eviction once CDF has flushed their data to UC.
+-- ---------------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION lakets.add_tiering_policy(
+ p_table_name TEXT,
+ p_after INTERVAL,
+ p_schema_name TEXT DEFAULT 'public'
+)
+RETURNS INT
+LANGUAGE plpgsql
+AS $$
+DECLARE
+ v_chronotable_id INT;
+ v_policy_id INT;
+ v_sync_enabled BOOLEAN;
+BEGIN
+ SELECT id, sync_enabled INTO v_chronotable_id, v_sync_enabled
+ FROM lakets._chronotable_registry
+ WHERE schema_name = p_schema_name AND table_name = p_table_name;
+
+ IF NOT FOUND THEN
+ RAISE EXCEPTION 'Table %.% is not a registered ChronoTable',
+ p_schema_name, p_table_name;
+ END IF;
+
+ IF NOT COALESCE(v_sync_enabled, FALSE) THEN
+ RAISE NOTICE 'Table %.% is not CDF-synced yet; tiering will not evict any '
+ 'partitions until enable_sync is called and CDF is streaming.',
+ p_schema_name, p_table_name;
+ END IF;
+
+ -- _policy_registry has no unique constraint on (chronotable_id, policy_type),
+ -- so guard with an existence check.
+ IF EXISTS (
+ SELECT 1 FROM lakets._policy_registry
+ WHERE chronotable_id = v_chronotable_id AND policy_type = 'tiering'
+ ) THEN
+ RAISE EXCEPTION 'Tiering policy already exists for %.%', p_schema_name, p_table_name;
+ END IF;
+
+ INSERT INTO lakets._policy_registry (chronotable_id, policy_type, config, enabled)
+ VALUES (v_chronotable_id, 'tiering', jsonb_build_object('after', p_after::TEXT), TRUE)
+ RETURNING id INTO v_policy_id;
+
+ UPDATE lakets._chronotable_registry
+ SET tiering_enabled = TRUE
+ WHERE id = v_chronotable_id;
+
+ -- Track per-chunk write positions from now on.
+ PERFORM lakets._install_tiering_write_tracking(p_schema_name, p_table_name);
+
+ -- Backfill: stamp existing active chunks with the current WAL head as a
+ -- conservative upper bound on their writes. A chunk drops only once CDF has
+ -- flushed past this mark, so pre-policy data is never dropped before it is
+ -- provably durable in UC. (NULL last_write_lsn is treated as "cannot prove
+ -- durable" by tier_chunk, so this backfill is what makes existing chunks
+ -- eligible at all.)
+ UPDATE lakets._chunk_metadata
+ SET last_write_lsn = pg_current_wal_lsn()
+ WHERE chronotable_id = v_chronotable_id
+ AND status = 'active'
+ AND last_write_lsn IS NULL;
+
+ RETURN v_policy_id;
+END;
+$$;
+
+-- ---------------------------------------------------------------------------
+-- remove_tiering_policy: Removes the tiering policy for a ChronoTable.
+-- ---------------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION lakets.remove_tiering_policy(
+ p_table_name TEXT,
+ p_schema_name TEXT DEFAULT 'public'
+)
+RETURNS VOID
+LANGUAGE plpgsql
+AS $$
+DECLARE v_chronotable_id INT;
+BEGIN
+ SELECT id INTO v_chronotable_id
+ FROM lakets._chronotable_registry
+ WHERE schema_name = p_schema_name AND table_name = p_table_name;
+ IF NOT FOUND THEN
+ RAISE EXCEPTION 'Table %.% is not a registered ChronoTable', p_schema_name, p_table_name;
+ END IF;
+
+ DELETE FROM lakets._policy_registry
+ WHERE chronotable_id = v_chronotable_id AND policy_type = 'tiering';
+
+ UPDATE lakets._chronotable_registry
+ SET tiering_enabled = FALSE
+ WHERE id = v_chronotable_id;
+
+ PERFORM lakets._remove_tiering_write_tracking(p_schema_name, p_table_name);
+END;
+$$;
+
+-- ---------------------------------------------------------------------------
+-- show_tiering_policy: Returns the tiering policy for a ChronoTable.
+-- ---------------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION lakets.show_tiering_policy(
+ p_table_name TEXT,
+ p_schema_name TEXT DEFAULT 'public'
+)
+RETURNS TABLE (policy_id INT, after TEXT, enabled BOOLEAN, last_run_at TIMESTAMPTZ)
+LANGUAGE plpgsql
+AS $$
+BEGIN
+ RETURN QUERY
+ SELECT pr.id, pr.config->>'after', pr.enabled, pr.last_run_at
+ FROM lakets._policy_registry pr
+ JOIN lakets._chronotable_registry hr ON hr.id = pr.chronotable_id
+ WHERE hr.schema_name = p_schema_name
+ AND hr.table_name = p_table_name
+ AND pr.policy_type = 'tiering';
+END;
+$$;
+
+-- ---------------------------------------------------------------------------
+-- _get_chunks_to_tier: candidate chunks for eviction — active, older than the
+-- policy 'after' interval, AND the table's shadow is STREAMING in CDF. The
+-- exact per-chunk durability gate is enforced in tier_chunk at drop time.
+-- ---------------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION lakets._get_chunks_to_tier(
+ p_table_name TEXT,
+ p_schema_name TEXT DEFAULT 'public'
+)
+RETURNS TABLE (chunk_id INT, chunk_name TEXT, range_start TIMESTAMPTZ, range_end TIMESTAMPTZ)
+LANGUAGE plpgsql
+AS $$
+DECLARE
+ v_chronotable_id INT;
+ v_after INTERVAL;
+ v_shadow TEXT;
+BEGIN
+ SELECT hr.id, (pr.config->>'after')::INTERVAL, hr.shadow_table_name
+ INTO v_chronotable_id, v_after, v_shadow
+ FROM lakets._chronotable_registry hr
+ JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id
+ WHERE hr.schema_name = p_schema_name
+ AND hr.table_name = p_table_name
+ AND pr.policy_type = 'tiering'
+ AND pr.enabled = TRUE;
+
+ IF NOT FOUND THEN
+ RETURN;
+ END IF;
+
+ -- Hard gate: skip entirely unless the shadow is actively STREAMING.
+ IF v_shadow IS NULL OR lakets._cdf_committed_lsn(v_shadow) IS NULL THEN
+ RETURN;
+ END IF;
+
+ RETURN QUERY
+ SELECT cm.id, cm.chunk_name, cm.range_start, cm.range_end
+ FROM lakets._chunk_metadata cm
+ WHERE cm.chronotable_id = v_chronotable_id
+ AND cm.status = 'active'
+ AND cm.range_end <= (now() - v_after)
+ ORDER BY cm.range_start;
+END;
+$$;
+
+-- ---------------------------------------------------------------------------
+-- tier_chunk: drop a chunk's partition and mark it tiered, but ONLY if CDF has
+-- provably flushed past every write to THAT chunk:
+-- shadow is STREAMING AND chunk.last_write_lsn IS NOT NULL
+-- AND committed_lsn >= chunk.last_write_lsn.
+-- The comparison is against the chunk's own recorded write position (stamped by
+-- _stamp_tiered_chunk_lsn), NOT the global WAL head -- the head keeps advancing
+-- from unrelated activity while a quiescent shadow's committed_lsn freezes, so a
+-- head comparison would never pass for the cold chunks we want to evict.
+-- Returns TRUE if dropped, FALSE if deferred (caller retries next run).
+-- ---------------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION lakets.tier_chunk(p_chunk_name TEXT)
+RETURNS BOOLEAN
+LANGUAGE plpgsql
+AS $$
+DECLARE
+ v_chronotable_id INT;
+ v_shadow TEXT;
+ v_committed PG_LSN;
+ v_chunk_lsn PG_LSN;
+ v_parts TEXT[];
+BEGIN
+ SELECT cm.chronotable_id, cm.last_write_lsn INTO v_chronotable_id, v_chunk_lsn
+ FROM lakets._chunk_metadata cm
+ WHERE cm.chunk_name = p_chunk_name AND cm.status = 'active';
+ IF NOT FOUND THEN
+ RAISE NOTICE 'tier_chunk: % not found or not active', p_chunk_name;
+ RETURN FALSE;
+ END IF;
+
+ SELECT shadow_table_name INTO v_shadow
+ FROM lakets._chronotable_registry WHERE id = v_chronotable_id;
+
+ v_committed := lakets._cdf_committed_lsn(v_shadow);
+ IF v_committed IS NULL THEN
+ RAISE NOTICE 'tier_chunk: % skipped — shadow not STREAMING', p_chunk_name;
+ RETURN FALSE;
+ END IF;
+
+ IF v_chunk_lsn IS NULL THEN
+ RAISE NOTICE 'tier_chunk: % skipped — no recorded write position (cannot prove durable)',
+ p_chunk_name;
+ RETURN FALSE;
+ END IF;
+
+ IF v_committed < v_chunk_lsn THEN
+ RAISE NOTICE 'tier_chunk: % deferred — CDF has not flushed the chunk yet '
+ '(committed=%, chunk_last_write=%)', p_chunk_name, v_committed, v_chunk_lsn;
+ RETURN FALSE;
+ END IF;
+
+ -- Safe to drop: every write to this chunk is at or below committed_lsn, i.e.
+ -- provably durable in the Unity Catalog Managed Table.
+ v_parts := string_to_array(p_chunk_name, '.');
+ IF array_length(v_parts, 1) = 2 THEN
+ EXECUTE format('DROP TABLE IF EXISTS %I.%I', v_parts[1], v_parts[2]);
+ ELSE
+ EXECUTE format('DROP TABLE IF EXISTS %I', p_chunk_name);
+ END IF;
+
+ UPDATE lakets._chunk_metadata
+ SET status = 'tiered', tiered_at = now()
+ WHERE chunk_name = p_chunk_name;
+
+ RETURN TRUE;
+END;
+$$;
+
+-- ---------------------------------------------------------------------------
+-- untier_chunk: restore a tiered chunk's metadata to active (e.g. before a
+-- backfill re-ingests the partition from UC).
+-- ---------------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION lakets.untier_chunk(p_chunk_name TEXT)
+RETURNS VOID
+LANGUAGE plpgsql
+AS $$
+BEGIN
+ UPDATE lakets._chunk_metadata
+ SET status = 'active', tiered_at = NULL
+ WHERE chunk_name = p_chunk_name AND status = 'tiered';
+ IF NOT FOUND THEN
+ RAISE EXCEPTION 'Chunk % not found or not tiered', p_chunk_name;
+ END IF;
+END;
+$$;
+
+-- ---------------------------------------------------------------------------
+-- show_tiering_status: per-table tiering observability, including the CDF gate
+-- state so deferrals are self-explaining.
+-- ---------------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION lakets.show_tiering_status(
+ p_table_name TEXT DEFAULT NULL,
+ p_schema_name TEXT DEFAULT 'public'
+)
+RETURNS TABLE (
+ schema_name TEXT,
+ table_name TEXT,
+ after TEXT,
+ active_chunks INT,
+ tiered_chunks INT,
+ pending_chunks INT,
+ reclaimable_bytes BIGINT,
+ reclaimed_bytes BIGINT,
+ cdf_status TEXT,
+ cdf_lag_bytes BIGINT,
+ caught_up BOOLEAN,
+ last_run_at TIMESTAMPTZ
+)
+LANGUAGE plpgsql
+AS $$
+DECLARE
+ r RECORD;
+ v_after INTERVAL;
+ v_committed PG_LSN;
+ v_max_pending_lsn PG_LSN;
+ v_pending_unstamped INT;
+BEGIN
+ FOR r IN
+ SELECT hr.id, hr.schema_name AS sname, hr.table_name AS tname,
+ hr.shadow_table_name AS shadow,
+ pr.config->>'after' AS after_txt, pr.last_run_at AS lra
+ FROM lakets._chronotable_registry hr
+ JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id
+ WHERE pr.policy_type = 'tiering'
+ AND (p_table_name IS NULL OR
+ (hr.table_name = p_table_name AND hr.schema_name = p_schema_name))
+ LOOP
+ v_after := r.after_txt::INTERVAL;
+ v_committed := lakets._cdf_committed_lsn(r.shadow);
+
+ schema_name := r.sname;
+ table_name := r.tname;
+ after := r.after_txt;
+ last_run_at := r.lra;
+
+ -- pending = active AND aged out. v_max_pending_lsn is the furthest WAL
+ -- position CDF must flush past to clear the whole pending backlog;
+ -- v_pending_unstamped counts pending chunks with no recorded position
+ -- (cannot be proven durable, so they block "caught_up").
+ SELECT
+ count(*) FILTER (WHERE cm.status = 'active'),
+ count(*) FILTER (WHERE cm.status = 'tiered'),
+ count(*) FILTER (WHERE cm.status = 'active' AND cm.range_end <= now() - v_after),
+ COALESCE(sum(cm.size_bytes) FILTER (
+ WHERE cm.status = 'active' AND cm.range_end <= now() - v_after), 0),
+ COALESCE(sum(cm.size_bytes) FILTER (WHERE cm.status = 'tiered'), 0),
+ max(cm.last_write_lsn) FILTER (
+ WHERE cm.status = 'active' AND cm.range_end <= now() - v_after),
+ count(*) FILTER (
+ WHERE cm.status = 'active' AND cm.range_end <= now() - v_after
+ AND cm.last_write_lsn IS NULL)
+ INTO active_chunks, tiered_chunks, pending_chunks, reclaimable_bytes,
+ reclaimed_bytes, v_max_pending_lsn, v_pending_unstamped
+ FROM lakets._chunk_metadata cm
+ WHERE cm.chronotable_id = r.id;
+
+ IF v_committed IS NULL THEN
+ cdf_status := CASE WHEN r.shadow IS NULL THEN 'NONE' ELSE 'SKIPPED' END;
+ cdf_lag_bytes := NULL;
+ caught_up := FALSE;
+ ELSE
+ cdf_status := 'STREAMING';
+ -- How far CDF still has to flush to clear the pending backlog.
+ cdf_lag_bytes := GREATEST(
+ 0, COALESCE(pg_wal_lsn_diff(v_max_pending_lsn, v_committed), 0))::BIGINT;
+ -- Caught up iff CDF has flushed past every stamped pending chunk and
+ -- no pending chunk is unstamped.
+ caught_up := (v_pending_unstamped = 0)
+ AND (v_max_pending_lsn IS NULL OR v_committed >= v_max_pending_lsn);
+ END IF;
+
+ RETURN NEXT;
+ END LOOP;
+END;
+$$;
diff --git a/sql/07_monitoring.sql b/sql/07_monitoring.sql
index 44ba945..7884013 100644
--- a/sql/07_monitoring.sql
+++ b/sql/07_monitoring.sql
@@ -37,12 +37,25 @@ BEGIN
LEFT JOIN pg_stat_user_tables s
ON s.schemaname = hr.schema_name AND s.relname = hr.table_name;
- -- Compression policy status
+ -- Tiering metrics, sourced from show_tiering_status so the gate logic lives
+ -- in exactly one place (per-chunk committed_lsn >= last_write_lsn).
+ -- pending = aged-out chunks awaiting eviction; caught_up = CDF has flushed
+ -- past every pending chunk (the durability gate currently passes).
RETURN QUERY
- SELECT 'lakets_compression_enabled'::TEXT,
- (CASE WHEN hr.compression_enabled THEN 1 ELSE 0 END)::DOUBLE PRECISION,
- jsonb_build_object('table', hr.schema_name || '.' || hr.table_name)
- FROM lakets._chronotable_registry hr;
+ SELECT 'lakets_tiering_pending_chunks'::TEXT, sts.pending_chunks::DOUBLE PRECISION,
+ jsonb_build_object('table', sts.schema_name || '.' || sts.table_name)
+ FROM lakets.show_tiering_status() sts;
+
+ RETURN QUERY
+ SELECT 'lakets_tiering_tiered_chunks_total'::TEXT, sts.tiered_chunks::DOUBLE PRECISION,
+ jsonb_build_object('table', sts.schema_name || '.' || sts.table_name)
+ FROM lakets.show_tiering_status() sts;
+
+ RETURN QUERY
+ SELECT 'lakets_tiering_caught_up'::TEXT,
+ (CASE WHEN sts.caught_up THEN 1 ELSE 0 END)::DOUBLE PRECISION,
+ jsonb_build_object('table', sts.schema_name || '.' || sts.table_name)
+ FROM lakets.show_tiering_status() sts;
-- RollUp watermark lag (seconds between now and watermark per RollUp)
RETURN QUERY
@@ -100,7 +113,6 @@ RETURNS TABLE (
hypertable TEXT,
total_chunks BIGINT,
active_chunks BIGINT,
- compressed_chunks BIGINT,
tiered_chunks BIGINT,
dropped_chunks BIGINT,
oldest_active TIMESTAMPTZ,
@@ -114,7 +126,6 @@ BEGIN
hr.schema_name || '.' || hr.table_name,
count(*)::BIGINT,
count(*) FILTER (WHERE cm.status = 'active')::BIGINT,
- count(*) FILTER (WHERE cm.status = 'compressed')::BIGINT,
count(*) FILTER (WHERE cm.status = 'tiered')::BIGINT,
count(*) FILTER (WHERE cm.status = 'dropped')::BIGINT,
min(cm.range_start) FILTER (WHERE cm.status = 'active'),
diff --git a/sql/13_shadow_sync.sql b/sql/13_shadow_sync.sql
index 7975469..95b6ac0 100644
--- a/sql/13_shadow_sync.sql
+++ b/sql/13_shadow_sync.sql
@@ -125,6 +125,18 @@ BEGIN
WHERE schema_name = p_schema_name AND table_name = p_table_name) INTO v_is_ct;
SELECT EXISTS(SELECT 1 FROM lakets._rollup_registry WHERE name = p_table_name) INTO v_is_ru;
+ -- Preflight: LakeTS cannot enable Lakebase CDF itself -- it is a prerequisite
+ -- enabled on the lakets_cdf schema via Databricks. Without it, the shadow is
+ -- created but never streams to Unity Catalog, and tiering stays fail-closed
+ -- (nothing is ever evicted). Warn loudly rather than failing, so sync can be
+ -- wired before CDF is turned on.
+ IF to_regclass('wal2delta.tables') IS NULL THEN
+ RAISE WARNING 'Lakebase CDF (wal2delta) is not enabled on this database. The '
+ 'shadow will be created but will not replicate to Unity Catalog, and '
+ 'tiering will not evict any partitions. Enable CDF on the lakets_cdf '
+ 'schema (see the LakeTS prerequisites) to activate sync.';
+ END IF;
+
IF v_is_ct AND v_is_ru THEN
RAISE EXCEPTION 'Ambiguous: % is both a ChronoTable and a RollUp', p_table_name;
ELSIF v_is_ct THEN
diff --git a/sql/14_rollup_optimization.sql b/sql/14_rollup_optimization.sql
index 046ca07..de82634 100644
--- a/sql/14_rollup_optimization.sql
+++ b/sql/14_rollup_optimization.sql
@@ -508,7 +508,7 @@ RETURNS TEXT
LANGUAGE sql STABLE
AS $$
SELECT CASE
- WHEN cm.status IN ('active', 'compressed') THEN 'hot'
+ WHEN cm.status = 'active' THEN 'hot'
WHEN cm.status = 'tiered' THEN 'cold'
ELSE 'hot'
END
diff --git a/sql/99_install.sql b/sql/99_install.sql
index aea577d..faa0c9b 100644
--- a/sql/99_install.sql
+++ b/sql/99_install.sql
@@ -23,8 +23,8 @@
-- Step 4: RollUp Engine (incremental time-bucketed aggregations)
\ir 04_rollup.sql
--- Step 5: Compression & tiering policies
-\ir 05_compression.sql
+-- Step 5: Tiering policies
+\ir 05_tiering.sql
-- Step 6: Retention policies
\ir 06_retention.sql
diff --git a/tests/test_compression.sql b/tests/test_compression.sql
deleted file mode 100644
index 149858e..0000000
--- a/tests/test_compression.sql
+++ /dev/null
@@ -1,58 +0,0 @@
--- =============================================================================
--- LakeTS Compression & Tiering Tests
--- =============================================================================
-DROP TABLE IF EXISTS public.comp_test CASCADE;
-DELETE FROM lakets._policy_registry WHERE chronotable_id IN (SELECT id FROM lakets._chronotable_registry WHERE table_name='comp_test');
-DELETE FROM lakets._chunk_metadata WHERE chronotable_id IN (SELECT id FROM lakets._chronotable_registry WHERE table_name='comp_test');
-DELETE FROM lakets._chronotable_registry WHERE table_name='comp_test';
-
-CREATE TABLE public.comp_test (time TIMESTAMPTZ NOT NULL, val DOUBLE PRECISION);
-INSERT INTO comp_test SELECT now()-(i||' hours')::interval, random()*100 FROM generate_series(1,240) s(i);
-SELECT lakets.create_chronotable('comp_test','time','1 day');
-
--- T1: Add compression policy
-DO $$ DECLARE v INT; BEGIN
- SELECT lakets.add_compression_policy('comp_test', '3 days', 'val') INTO v;
- ASSERT v IS NOT NULL; RAISE NOTICE 'T1 PASSED: policy id=%', v;
-END $$;
-
--- T2: Show compression policy
-DO $$ DECLARE v TEXT; BEGIN
- SELECT compress_after INTO v FROM lakets.show_compression_policy('comp_test');
- ASSERT v = '3 days'; RAISE NOTICE 'T2 PASSED: compress_after=%', v;
-END $$;
-
--- T3: Get chunks to compress
-DO $$ DECLARE v BIGINT; BEGIN
- SELECT count(*) INTO v FROM lakets._get_chunks_to_compress('comp_test');
- ASSERT v > 0; RAISE NOTICE 'T3 PASSED: % eligible chunks', v;
-END $$;
-
--- T4: Compress a chunk
-DO $$ DECLARE v_name TEXT; v_status TEXT; BEGIN
- SELECT chunk_name INTO v_name FROM lakets._get_chunks_to_compress('comp_test') LIMIT 1;
- PERFORM lakets.compress_chunk(v_name);
- SELECT status INTO v_status FROM lakets._chunk_metadata WHERE chunk_name=v_name;
- ASSERT v_status = 'compressed'; RAISE NOTICE 'T4 PASSED: status=%', v_status;
-END $$;
-
--- T5: Decompress chunk
-DO $$ DECLARE v_name TEXT; v_status TEXT; BEGIN
- SELECT chunk_name INTO v_name FROM lakets._chunk_metadata WHERE status='compressed' AND chronotable_id IN (SELECT id FROM lakets._chronotable_registry WHERE table_name='comp_test') LIMIT 1;
- PERFORM lakets.decompress_chunk(v_name);
- SELECT status INTO v_status FROM lakets._chunk_metadata WHERE chunk_name=v_name;
- ASSERT v_status = 'active'; RAISE NOTICE 'T5 PASSED: decompressed back to active';
-END $$;
-
--- T6: Remove compression policy
-DO $$ DECLARE v BOOLEAN; BEGIN
- PERFORM lakets.remove_compression_policy('comp_test');
- SELECT compression_enabled INTO v FROM lakets._chronotable_registry WHERE table_name='comp_test';
- ASSERT v = FALSE; RAISE NOTICE 'T6 PASSED: policy removed';
-END $$;
-
-DROP TABLE IF EXISTS public.comp_test CASCADE;
-DELETE FROM lakets._policy_registry WHERE chronotable_id IN (SELECT id FROM lakets._chronotable_registry WHERE table_name='comp_test');
-DELETE FROM lakets._chunk_metadata WHERE chronotable_id IN (SELECT id FROM lakets._chronotable_registry WHERE table_name='comp_test');
-DELETE FROM lakets._chronotable_registry WHERE table_name='comp_test';
-SELECT 'ALL COMPRESSION TESTS PASSED' as result;
diff --git a/tests/test_monitoring.sql b/tests/test_monitoring.sql
index 4b727b8..5ded1ba 100644
--- a/tests/test_monitoring.sql
+++ b/tests/test_monitoring.sql
@@ -62,52 +62,62 @@ BEGIN
DELETE FROM lakets._chronotable_registry WHERE table_name = 'mon_test';
END $$;
--- T5: chunk_health reports compressed chunks correctly
+-- T5: chunk_health reports tiered chunks correctly
DO $$
DECLARE
- v_compressed BIGINT;
+ v_tiered BIGINT;
v_chunk_name TEXT;
BEGIN
- -- Create ChronoTable with enough data to compress
- DROP TABLE IF EXISTS public.mon_comp_test CASCADE;
+ -- Create ChronoTable with enough data to age out a chunk
+ DROP TABLE IF EXISTS public.mon_tier_test CASCADE;
DELETE FROM lakets._chunk_metadata WHERE chronotable_id IN (
- SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test'
+ SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test'
);
DELETE FROM lakets._policy_registry WHERE chronotable_id IN (
- SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test'
+ SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test'
);
- DELETE FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test';
+ DELETE FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test';
- CREATE TABLE public.mon_comp_test (time TIMESTAMPTZ NOT NULL, val DOUBLE PRECISION);
- INSERT INTO mon_comp_test SELECT now() - (i || ' hours')::interval, random() * 100
+ CREATE TABLE public.mon_tier_test (time TIMESTAMPTZ NOT NULL, val DOUBLE PRECISION);
+ INSERT INTO mon_tier_test SELECT now() - (i || ' hours')::interval, random() * 100
FROM generate_series(1, 240) s(i);
- PERFORM lakets.create_chronotable('mon_comp_test', 'time', '1 day');
- PERFORM lakets.add_compression_policy('mon_comp_test', '3 days', 'val');
+ PERFORM lakets.create_chronotable('mon_tier_test', 'time', '1 day');
+ PERFORM lakets.add_tiering_policy('mon_tier_test', '3 days');
+
+ -- Simulate a completed tiering. tier_chunk's actual drop needs live CDF;
+ -- here we only assert chunk_health surfaces the 'tiered' status.
+ SELECT chunk_name INTO v_chunk_name FROM lakets._get_chunks_to_tier('mon_tier_test') LIMIT 1;
+ IF v_chunk_name IS NULL THEN
+ SELECT cm.chunk_name INTO v_chunk_name
+ FROM lakets._chunk_metadata cm
+ JOIN lakets._chronotable_registry hr ON hr.id = cm.chronotable_id
+ WHERE hr.table_name = 'mon_tier_test' AND cm.status = 'active'
+ ORDER BY cm.range_start LIMIT 1;
+ END IF;
- -- Compress one chunk
- SELECT chunk_name INTO v_chunk_name FROM lakets._get_chunks_to_compress('mon_comp_test') LIMIT 1;
IF v_chunk_name IS NOT NULL THEN
- PERFORM lakets.compress_chunk(v_chunk_name);
+ UPDATE lakets._chunk_metadata SET status = 'tiered', tiered_at = now()
+ WHERE chunk_name = v_chunk_name;
- SELECT compressed_chunks INTO v_compressed
+ SELECT tiered_chunks INTO v_tiered
FROM lakets.chunk_health()
- WHERE hypertable = 'public.mon_comp_test';
+ WHERE hypertable = 'public.mon_tier_test';
- ASSERT v_compressed >= 1, format('expected >= 1 compressed chunk, got %s', v_compressed);
- RAISE NOTICE 'T5 PASSED: chunk_health reports % compressed chunks', v_compressed;
+ ASSERT v_tiered >= 1, format('expected >= 1 tiered chunk, got %s', v_tiered);
+ RAISE NOTICE 'T5 PASSED: chunk_health reports % tiered chunks', v_tiered;
ELSE
- RAISE NOTICE 'T5 SKIPPED: no eligible chunks to compress';
+ RAISE NOTICE 'T5 SKIPPED: no chunks materialized for mon_tier_test';
END IF;
-- Cleanup
- DROP TABLE IF EXISTS public.mon_comp_test CASCADE;
+ DROP TABLE IF EXISTS public.mon_tier_test CASCADE;
DELETE FROM lakets._policy_registry WHERE chronotable_id IN (
- SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test'
+ SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test'
);
DELETE FROM lakets._chunk_metadata WHERE chronotable_id IN (
- SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test'
+ SELECT id FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test'
);
- DELETE FROM lakets._chronotable_registry WHERE table_name = 'mon_comp_test';
+ DELETE FROM lakets._chronotable_registry WHERE table_name = 'mon_tier_test';
END $$;
-- T6: lakets_metrics includes all expected metric names
diff --git a/tests/test_python_patterns.py b/tests/test_python_patterns.py
index 083c696..9f7f3aa 100644
--- a/tests/test_python_patterns.py
+++ b/tests/test_python_patterns.py
@@ -55,30 +55,32 @@ def test_tracks_failures(self):
"cold_rollup_refresh.py should track failures"
-class TestCompressionJobPatterns:
- """Verify compression_job.py uses safe SQL patterns."""
+class TestTieringJobPatterns:
+ """Verify tiering_job.py uses safe SQL patterns and delegates the drop to SQL."""
- SOURCE_PATH = "databricks/workflows/compression_job.py"
+ SOURCE_PATH = "databricks/workflows/tiering_job.py"
- def test_drop_table_uses_identifier(self):
- """T9: DROP TABLE uses sql.Identifier, not %-formatting."""
+ def test_delegates_drop_to_tier_chunk(self):
+ """T9: The partition drop is delegated to lakets.tier_chunk (no raw DROP in Python)."""
source = _read_source(self.SOURCE_PATH)
- assert '"DROP TABLE IF EXISTS %s.%s" %' not in source, \
- "compression_job.py still uses %-format for DROP TABLE"
+ assert "lakets.tier_chunk" in source, \
+ "tiering_job.py should call lakets.tier_chunk to perform the gated drop"
+ assert "DROP TABLE" not in source, \
+ "tiering_job.py should not issue DROP TABLE directly; tier_chunk owns the drop"
def test_uses_chronotable_registry(self):
"""T10: References _chronotable_registry, not _hypertable_registry."""
source = _read_source(self.SOURCE_PATH)
assert "_hypertable_registry" not in source, \
- "compression_job.py still references legacy _hypertable_registry"
+ "tiering_job.py still references legacy _hypertable_registry"
assert "_chronotable_registry" in source, \
- "compression_job.py should reference _chronotable_registry"
+ "tiering_job.py should reference _chronotable_registry"
- def test_no_dead_jdbc_url(self):
- """T11: Dead jdbc_url code has been removed."""
+ def test_no_spark(self):
+ """T11: Tiering is pure Lakebase SQL — no Spark dependency."""
source = _read_source(self.SOURCE_PATH)
- assert "jdbc_url" not in source, \
- "compression_job.py still has dead jdbc_url code"
+ assert "pyspark" not in source and "SparkSession" not in source, \
+ "tiering_job.py should be Spark-free"
class TestLakebaseUtilsPatterns:
diff --git a/tests/test_tiering.sql b/tests/test_tiering.sql
new file mode 100644
index 0000000..d39fc7a
--- /dev/null
+++ b/tests/test_tiering.sql
@@ -0,0 +1,139 @@
+-- LakeTS Tiering tests. Run: psql "$LAKETS_URL" -v ON_ERROR_STOP=1 -f tests/test_tiering.sql
+\set ON_ERROR_STOP on
+
+-- TEST 1: _cdf_committed_lsn fails closed for a shadow that does not exist.
+DO $$
+BEGIN
+ ASSERT lakets._cdf_committed_lsn('_shadow_does_not_exist') IS NULL,
+ 'TEST 1 FAILED: expected NULL for a nonexistent shadow';
+ RAISE NOTICE 'TEST 1 PASSED: _cdf_committed_lsn fails closed for missing shadow';
+END $$;
+
+-- TEST 2: add_tiering_policy registers a 'tiering' policy and sets tiering_enabled.
+DO $$
+DECLARE
+ v_policy_id INT;
+ v_enabled BOOLEAN;
+ v_ptype TEXT;
+BEGIN
+ DROP TABLE IF EXISTS public.tier_test CASCADE;
+ CREATE TABLE public.tier_test (time TIMESTAMPTZ NOT NULL, v DOUBLE PRECISION);
+ PERFORM lakets.create_chronotable('tier_test', 'time', '1 day');
+
+ v_policy_id := lakets.add_tiering_policy('tier_test', '7 days');
+ ASSERT v_policy_id IS NOT NULL, 'TEST 2 FAILED: no policy id returned';
+
+ SELECT tiering_enabled INTO v_enabled
+ FROM lakets._chronotable_registry WHERE table_name = 'tier_test';
+ ASSERT v_enabled, 'TEST 2 FAILED: tiering_enabled not set';
+
+ SELECT policy_type INTO v_ptype
+ FROM lakets._policy_registry pr
+ JOIN lakets._chronotable_registry hr ON hr.id = pr.chronotable_id
+ WHERE hr.table_name = 'tier_test';
+ ASSERT v_ptype = 'tiering', 'TEST 2 FAILED: policy_type is not tiering';
+
+ RAISE NOTICE 'TEST 2 PASSED: add_tiering_policy works';
+END $$;
+
+-- TEST 3: show_tiering_policy returns the policy; remove_tiering_policy clears it.
+DO $$
+DECLARE v_cnt INT;
+BEGIN
+ PERFORM 1 FROM lakets.show_tiering_policy('tier_test');
+ PERFORM lakets.remove_tiering_policy('tier_test');
+ SELECT count(*) INTO v_cnt FROM lakets.show_tiering_policy('tier_test');
+ ASSERT v_cnt = 0, 'TEST 3 FAILED: policy not removed';
+ RAISE NOTICE 'TEST 3 PASSED: show/remove tiering policy works';
+END $$;
+
+-- TEST 4: _get_chunks_to_tier returns nothing for a non-streaming table
+-- (no STREAMING shadow => fail closed).
+DO $$
+DECLARE v_cnt INT;
+BEGIN
+ -- recreate policy from TEST 2 teardown
+ PERFORM lakets.add_tiering_policy('tier_test', '0 seconds'); -- everything eligible by age
+ INSERT INTO lakets._chunk_metadata (chronotable_id, chunk_name, range_start, range_end, status)
+ SELECT id, 'tier_test_oldchunk', now() - interval '10 days', now() - interval '9 days', 'active'
+ FROM lakets._chronotable_registry WHERE table_name = 'tier_test';
+
+ SELECT count(*) INTO v_cnt FROM lakets._get_chunks_to_tier('tier_test');
+ ASSERT v_cnt = 0,
+ 'TEST 4 FAILED: expected 0 eligible chunks when shadow is not STREAMING';
+ RAISE NOTICE 'TEST 4 PASSED: _get_chunks_to_tier fails closed without STREAMING CDF';
+END $$;
+
+-- TEST 5: tier_chunk refuses to drop (returns FALSE, partition row intact)
+-- when the CDF gate is not satisfiable.
+DO $$
+DECLARE v_ok BOOLEAN; v_status TEXT;
+BEGIN
+ v_ok := lakets.tier_chunk('tier_test_oldchunk');
+ ASSERT v_ok = FALSE, 'TEST 5 FAILED: tier_chunk should refuse without STREAMING CDF';
+ SELECT status INTO v_status FROM lakets._chunk_metadata WHERE chunk_name = 'tier_test_oldchunk';
+ ASSERT v_status = 'active', 'TEST 5 FAILED: chunk should remain active (not tiered)';
+ RAISE NOTICE 'TEST 5 PASSED: tier_chunk fails closed';
+END $$;
+
+-- TEST 6: untier_chunk restores tiered -> active.
+DO $$
+DECLARE v_status TEXT;
+BEGIN
+ UPDATE lakets._chunk_metadata SET status = 'tiered', tiered_at = now()
+ WHERE chunk_name = 'tier_test_oldchunk';
+ PERFORM lakets.untier_chunk('tier_test_oldchunk');
+ SELECT status INTO v_status FROM lakets._chunk_metadata WHERE chunk_name = 'tier_test_oldchunk';
+ ASSERT v_status = 'active', 'TEST 6 FAILED: untier_chunk did not restore active';
+ RAISE NOTICE 'TEST 6 PASSED: untier_chunk restores active';
+END $$;
+
+-- TEST 7: show_tiering_status reports counts and classifies cdf_status='NONE'
+-- when the table is not in wal2delta.tables.
+DO $$
+DECLARE r RECORD;
+BEGIN
+ SELECT * INTO r FROM lakets.show_tiering_status('tier_test');
+ ASSERT r.table_name = 'tier_test', 'TEST 7 FAILED: wrong/no row';
+ ASSERT r.pending_chunks >= 1, 'TEST 7 FAILED: expected >=1 pending chunk';
+ ASSERT r.cdf_status = 'NONE', 'TEST 7 FAILED: expected cdf_status NONE without CDF';
+ ASSERT r.caught_up = FALSE, 'TEST 7 FAILED: caught_up should be false without CDF';
+ RAISE NOTICE 'TEST 7 PASSED: show_tiering_status reports gate state';
+END $$;
+
+-- TEST 8: the write-tracking trigger stamps last_write_lsn on the chunk that
+-- received rows and leaves other chunks untouched (no CDF needed).
+DO $$
+DECLARE
+ v_id INT;
+ v_lsn_hot PG_LSN;
+ v_lsn_seeded PG_LSN;
+BEGIN
+ SELECT id INTO v_id FROM lakets._chronotable_registry WHERE table_name = 'tier_test';
+
+ -- Ensure a partition exists for "now" so the insert has somewhere to route.
+ PERFORM lakets._ensure_partitions(v_id, p_range_start := now(), p_range_end := now());
+
+ -- Pin the seeded old chunk's watermark to a known value; it must NOT move.
+ UPDATE lakets._chunk_metadata SET last_write_lsn = '0/1'::pg_lsn
+ WHERE chunk_name = 'tier_test_oldchunk';
+
+ INSERT INTO public.tier_test (time, v) VALUES (now(), 1.0);
+
+ -- The hot chunk that received the row is stamped with a real WAL position.
+ SELECT max(last_write_lsn) INTO v_lsn_hot
+ FROM lakets._chunk_metadata
+ WHERE chronotable_id = v_id AND chunk_name <> 'tier_test_oldchunk';
+ ASSERT v_lsn_hot IS NOT NULL AND v_lsn_hot <> '0/1'::pg_lsn,
+ 'TEST 8 FAILED: hot chunk was not stamped by the write-tracking trigger';
+
+ -- The unrelated old chunk is untouched.
+ SELECT last_write_lsn INTO v_lsn_seeded
+ FROM lakets._chunk_metadata WHERE chunk_name = 'tier_test_oldchunk';
+ ASSERT v_lsn_seeded = '0/1'::pg_lsn,
+ 'TEST 8 FAILED: a hot-chunk write bumped an unrelated cold chunk watermark';
+
+ RAISE NOTICE 'TEST 8 PASSED: write-tracking stamps only the written chunk';
+END $$;
+
+SELECT 'ALL TIERING TESTS PASSED' AS result;
diff --git a/website/docs/architecture-patterns.md b/website/docs/architecture-patterns.md
index fa98bbe..c26f0d5 100644
--- a/website/docs/architecture-patterns.md
+++ b/website/docs/architecture-patterns.md
@@ -26,7 +26,7 @@ LakeTS's signature pattern. Recent data lives in Lakebase (Postgres) for sub-10m
- BI/analysis runs over weeks or months of history
- You want one query surface (LakeTS routes hot/cold automatically)
-**LakeTS pieces**: `ChronoTable`, `lakets.compression_policy`, `lakets.tiering_policy`, Lakebase CDF.
+**LakeTS pieces**: `ChronoTable`, `lakets.add_tiering_policy`, Lakebase CDF.
## 2. Streaming ingest → hot tier → rollup
diff --git a/website/docs/examples/sensor-reading-journey.md b/website/docs/examples/sensor-reading-journey.md
index af9886c..8b13d2b 100644
--- a/website/docs/examples/sensor-reading-journey.md
+++ b/website/docs/examples/sensor-reading-journey.md
@@ -2,7 +2,7 @@
title: Life of a sensor reading
sidebar_label: Sensor reading journey
sidebar_position: 1
-description: Follow a single sensor reading through every stage of the LakeTS lifecycle — ingestion, RollUps, compression, tiering, retention.
+description: Follow a single sensor reading through every stage of the LakeTS lifecycle — ingestion, RollUps, tiering, retention.
---
# Life of a sensor reading
@@ -30,17 +30,17 @@ Behind the scenes:
- The hourly RollUp includes the reading immediately
- `SELECT * FROM _rollup_rt_metrics_hourly` shows it in real time
-## Day 7 — compression job runs
+## Day 7 — tiering job runs
-- The chunk is now 7 days old; `_get_chunks_to_compress()` returns it
-- Chunk status flips from `active` → `compressed`
-- The partition is dropped from Lakebase; the data is already in the Unity Catalog Managed Table
+- The chunk is now 7 days old; `_get_chunks_to_tier()` returns it
+- `tier_chunk()` checks the durability gate — CDF has flushed the chunk's writes (`committed_lsn >= last_write_lsn`) into the Unity Catalog Managed Table
+- The gate passes, so the partition is dropped from Lakebase and the chunk status flips from `active` → `tiered`
## Day 7–90 — warm in Unity Catalog
- Queryable via Lakehouse Federation (100 ms – 1 s)
- The hourly RollUp Table still has the aggregation — unaffected by tiering
-- The UC Managed Table is Z-ordered for fast time-range scans
+- The data lives in the UC Managed Table, written there continuously by Lakebase CDF
## Day 90 — retention job runs
@@ -63,7 +63,7 @@ gantt
Active in partition :active, 2026-03-25, 7d
section Warm (Unity Catalog Managed Table)
- Compressed/Tiered :2026-04-01, 83d
+ Tiered :2026-04-01, 83d
section Dropped
Vacuumed :milestone, 2026-06-23, 0d
diff --git a/website/docs/glossary.md b/website/docs/glossary.md
index 2d406a3..2aec3d9 100644
--- a/website/docs/glossary.md
+++ b/website/docs/glossary.md
@@ -89,6 +89,10 @@ In a multi-metric ChronoTable, a column that identifies a series (`host`, `regio
Two-phase lifecycle policy: tier to the Unity Catalog Managed Table after age N, drop entirely after age M. Configured with `add_tiered_retention_policy()`.
+## Tiering
+
+Evicting cold chunks out of Lakebase to free hot-tier storage. The data already lives in the Unity Catalog Managed Table via Lakebase CDF, so `tier_chunk()` simply drops the old partition — but only once a CDF **durability gate** confirms every write to that chunk has been flushed to UC (the shadow is `STREAMING` and CDF's `committed_lsn` has reached the chunk's `last_write_lsn`). Configured with `add_tiering_policy()` and driven daily by the Databricks Tiering Job. See [How Tiering & Retention Works](./guides/how-it-works/tiering-and-retention.md).
+
## Unity Catalog Managed Table
Databricks's governed table format that abstracts the underlying storage (Delta or Iceberg). LakeTS uses it as the cold tier — `lakets.enable_sync()` targets a Unity Catalog Managed Table via Lakebase CDF.
diff --git a/website/docs/guides/getting-started.md b/website/docs/guides/getting-started.md
index b219cd9..b2eade6 100644
--- a/website/docs/guides/getting-started.md
+++ b/website/docs/guides/getting-started.md
@@ -185,4 +185,4 @@ Now you can think about how the application or dashboard reads the data:
- [Reference](../reference/index.md) — full catalog of 73 functions, 2 aggregates, 6 triggers, 9 metadata tables — organized by topic
- [Life of a sensor reading](../examples/sensor-reading-journey.md) — end-to-end worked example
- [Troubleshooting](../troubleshooting.md) — when something doesn't behave the way you expect
-- [Databricks workflows](https://github.com/databricks-solutions/lakets/blob/main/databricks/bundles/databricks.yml) — automated partition management, compression, retention, and aggregate refresh
+- [Databricks workflows](https://github.com/databricks-solutions/lakets/blob/main/databricks/bundles/databricks.yml) — automated partition management, tiering, retention, and aggregate refresh
diff --git a/website/docs/guides/how-it-works/chronotables.md b/website/docs/guides/how-it-works/chronotables.md
index 4008995..9e3ad23 100644
--- a/website/docs/guides/how-it-works/chronotables.md
+++ b/website/docs/guides/how-it-works/chronotables.md
@@ -60,8 +60,8 @@ LakeTS tracks everything in two tables:
```sql
-- Which tables are ChronoTables?
SELECT * FROM lakets._chronotable_registry;
--- id | schema | table | time_column | chunk_interval | compression_enabled | sync_enabled
--- 1 | public | metrics | time | 1 day | false | false
+-- id | schema | table | time_column | chunk_interval | tiering_enabled | sync_enabled
+-- 1 | public | metrics | time | 1 day | false | false
-- What chunks exist?
SELECT * FROM lakets._chunk_metadata;
diff --git a/website/docs/guides/how-it-works/compression-and-retention.md b/website/docs/guides/how-it-works/compression-and-retention.md
deleted file mode 100644
index dd5b2e3..0000000
--- a/website/docs/guides/how-it-works/compression-and-retention.md
+++ /dev/null
@@ -1,80 +0,0 @@
----
-title: Compression & retention
-sidebar_label: Compression & retention
-sidebar_position: 4
-description: How LakeTS tiers cold chunks to Unity Catalog and drops expired data.
----
-
-# How Compression & Tiering Works
-
-## The concept
-
-Think of your data like files on a desk:
-- **Hot desk** (Lakebase): Papers you're actively reading. Fast access.
-- **Filing cabinet** (Unity Catalog Managed Table): Papers from last month. Slower but organized.
-- **Shredder** (Retention): Papers older than a year. Gone.
-
-## How policies work
-
-When you add a compression policy, LakeTS stores the rule in `_policy_registry`:
-
-```sql
-SELECT lakets.add_compression_policy('metrics', '7 days');
--- Stores: {compress_after: "7 days", segment_by: null, order_by: "time DESC"}
-```
-
-Nothing happens immediately. The policy is a **declaration of intent**. The actual work is done by the Databricks compression job that runs on a schedule:
-
-```mermaid
-flowchart LR
- subgraph JOB["Compression Job (runs daily at 2 AM)"]
- A["1. Query _policy_registry
for compression policies"] --> B["2. Query _get_chunks_to_compress()
find chunks older than 7 days"]
- B --> C["3. For each chunk:
mark compressed in metadata"]
- C --> D["4. Optionally drop
Lakebase partition"]
- end
-
- subgraph BEFORE["Before"]
- P1["Mar 15 chunk (active)"]
- P2["Mar 16 chunk (active)"]
- P3["Mar 17 chunk (active)"]
- end
-
- subgraph AFTER["After (compress_after=7 days, today=Mar 25)"]
- P4["Mar 15 chunk (compressed)"]
- P5["Mar 16 chunk (compressed)"]
- P6["Mar 17 chunk (compressed)"]
- P7["Mar 18 chunk (active - only 7 days old)"]
- end
-```
-
-The data in the Unity Catalog Managed Table is already there (via Lakebase CDF). The compression job's main purpose is to **free Lakebase storage** by dropping old partitions once we know the data is safely in the cold tier.
-
----
-
-# How Retention Works
-
-Retention is simpler than compression — it just drops old chunks:
-
-```sql
-SELECT lakets.add_retention_policy('metrics', '30 days');
-```
-
-When `execute_retention` runs:
-
-1. Looks up the policy in `_policy_registry`
-2. Calls `drop_chunks('metrics', '30 days')` which:
- - Finds chunks where `range_end <= now() - 30 days`
- - Runs `DROP TABLE` on each partition (instant, no row-by-row delete)
- - Updates `_chunk_metadata` status to `dropped`
-
-**Tiered retention** adds a two-step lifecycle:
-
-```
-Day 0-7: HOT (Lakebase, fast queries)
-Day 7-90: WARM (Unity Catalog Managed Table, tiered via compression policy)
-Day 90+: DELETED (retention policy drops from both tiers)
-```
-
-:::tip RollUps survive retention
-You can drop all raw data older than 30 days while keeping your hourly/daily RollUp tables forever.
-:::
diff --git a/website/docs/guides/how-it-works/index.md b/website/docs/guides/how-it-works/index.md
index adb8936..194ba0e 100644
--- a/website/docs/guides/how-it-works/index.md
+++ b/website/docs/guides/how-it-works/index.md
@@ -14,7 +14,7 @@ Start here for the high-level picture, then follow the topic pages for each subs
- **[ChronoTables](./chronotables.md)** — how time-partitioned tables work under the hood
- **[Time series functions](./time-series-functions.md)** — `time_bucket`, `first`, `locf`, `interpolate`, `delta`, `rate`, `gapfill`
- **[RollUps](./rollups.md)** — incremental aggregates, DAG cascade, scale optimizations (chunk-skip pruning, batch refresh, Unity Catalog export)
-- **[Compression & retention](./compression-and-retention.md)** — lifecycle policies and chunk drops
+- **[Tiering & retention](./tiering-and-retention.md)** — lifecycle policies, the CDF durability gate, and chunk drops
- **[Lakebase CDF internals](./lakebase-cdf-internals.md)** — shadow tables, triggers, CDC routing
For a worked example following a single sensor reading from ingest through retention, see [Life of a sensor reading](../../examples/sensor-reading-journey.md).
@@ -64,7 +64,7 @@ LakeTS is built from these Postgres primitives — no custom extensions:
| Gap-filling | `generate_series()` + `LEFT JOIN` |
| RollUps | Regular `TABLE` + incremental refresh + `UNION ALL` view |
| RollUp optimization | Chunk-skip pruning, batch `ANY(array)`, Kahn's toposort, transition tables |
-| Compression / tiering | `_policy_registry` + Databricks Jobs |
+| Tiering | `_policy_registry` + Databricks Jobs + CDF durability gate |
| Retention | `DROP TABLE` on expired partitions |
| Lakebase CDF | Shadow table + trigger + `wal2delta` CDC |
| Monitoring | SQL functions over `pg_stat_*` + metadata |
diff --git a/website/docs/guides/how-it-works/rollups.md b/website/docs/guides/how-it-works/rollups.md
index 1d7fdbf..e04f43a 100644
--- a/website/docs/guides/how-it-works/rollups.md
+++ b/website/docs/guides/how-it-works/rollups.md
@@ -237,7 +237,6 @@ The default now resolves the tier from the chunk's status:
| Chunk status | Resolved tier | Meaning |
|---|---|---|
| `active` | hot | Data in Lakebase |
-| `compressed` | hot | Data in Lakebase (compressed) |
| `tiered` | cold | Data in Unity Catalog Managed Table |
You can still pass `p_tier` explicitly when you need to override.
diff --git a/website/docs/guides/how-it-works/tiering-and-retention.md b/website/docs/guides/how-it-works/tiering-and-retention.md
new file mode 100644
index 0000000..50c346a
--- /dev/null
+++ b/website/docs/guides/how-it-works/tiering-and-retention.md
@@ -0,0 +1,103 @@
+---
+title: Tiering & retention
+sidebar_label: Tiering & retention
+sidebar_position: 4
+description: How LakeTS evicts cold chunks from Lakebase once CDF has flushed them to Unity Catalog, and drops expired data.
+---
+
+# How Tiering Works
+
+## The concept
+
+Think of your data like files on a desk:
+- **Hot desk** (Lakebase): Papers you're actively reading. Fast access.
+- **Filing cabinet** (Unity Catalog Managed Table): Papers from last month. Slower but organized.
+- **Shredder** (Retention): Papers older than a year. Gone.
+
+## How policies work
+
+A tiering policy marks chunks older than `p_after` for eviction. The data is already in the Unity Catalog Managed Table via Lakebase CDF; the policy's job is to free Lakebase storage by dropping the old partitions once the data is safely cold.
+
+When you add a tiering policy, LakeTS stores the rule in `_policy_registry`:
+
+```sql
+SELECT lakets.add_tiering_policy('metrics', '7 days');
+-- Stores: {after: "7 days"} with policy_type = 'tiering'
+```
+
+Nothing happens immediately. The policy is a **declaration of intent**. The actual work is done by the Databricks Tiering Job that runs on a schedule:
+
+```mermaid
+flowchart LR
+ subgraph JOB["Tiering Job (runs daily at 2 AM)"]
+ A["1. Query _policy_registry
for tiering policies"] --> B["2. Query _get_chunks_to_tier()
find chunks older than 7 days"]
+ B --> C["3. For each chunk:
call tier_chunk()"]
+ C --> D["4. Gate passes →
drop Lakebase partition,
mark tiered"]
+ end
+
+ subgraph BEFORE["Before"]
+ P1["Mar 15 chunk (active)"]
+ P2["Mar 16 chunk (active)"]
+ P3["Mar 17 chunk (active)"]
+ end
+
+ subgraph AFTER["After (after=7 days, today=Mar 25)"]
+ P4["Mar 15 chunk (tiered)"]
+ P5["Mar 16 chunk (tiered)"]
+ P6["Mar 17 chunk (tiered)"]
+ P7["Mar 18 chunk (active - only 7 days old)"]
+ end
+```
+
+The data in the Unity Catalog Managed Table is already there (via Lakebase CDF). The Tiering Job's main purpose is to **free Lakebase storage** by dropping old partitions once we know the data is safely in the cold tier.
+
+## CDF is a prerequisite
+
+Tiering can only evict a chunk once Lakebase CDF has durably copied that chunk's data into the Unity Catalog Managed Table. CDF must be enabled (on the `lakets_cdf` schema, via Databricks) **before** tiering can drop anything — LakeTS cannot enable CDF itself, and a table must be CDF-synced via `lakets.enable_sync('