Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), version

### Added

- **Per-chunk tiering durability gate.** A partition is dropped only when its CDF shadow is `STREAMING` in `wal2delta.tables` **and** `committed_lsn >= chunk.last_write_lsn` — proving Lakebase CDF has flushed every write to that chunk into the Unity Catalog Managed Table before the data leaves Lakebase. The gate compares against the chunk's own recorded write position (stamped by statement-level triggers), not the global WAL head, because a per-table `committed_lsn` does not advance while the shadow is idle. Tiering is fail-closed: it defers and retries whenever the gate is not satisfiable.
- **`show_tiering_status()`** and the `lakets_tiering_pending_chunks` / `lakets_tiering_tiered_chunks_total` / `lakets_tiering_caught_up` Prometheus metrics for tiering observability (including CDF gate state).
- `enable_sync` now warns when Lakebase CDF (`wal2delta`) is absent, since CDF is a prerequisite enabled on the `lakets_cdf` schema via Databricks, not by LakeTS.

### Changed

- **Compression is now Tiering.** `add_compression_policy` → `add_tiering_policy` (drops the `segment_by`/`order_by` parameters); `compress_chunk`/`decompress_chunk` → `tier_chunk`/`untier_chunk` (`tier_chunk` returns `BOOLEAN` — `TRUE` dropped, `FALSE` deferred); `show_`/`remove_compression_policy` → `show_`/`remove_tiering_policy`. The `compression` policy type is now `tiering`; the `compressed` chunk status is removed (`active` → `tiered`). `_chronotable_registry.compression_enabled` is renamed `tiering_enabled`, and `_chunk_metadata` gains `last_write_lsn` (and drops the unused `compressed_at`). **Breaking — fresh release, no upgrade migration.**
- RollUps are now always incremental. Removed the `refresh_mode` toggle and the `'full'`
(`TRUNCATE`) refresh strategy; `create_rollup` no longer accepts a `p_refresh_mode` argument
and `show_rollups()` no longer returns a `refresh_mode` column.
Expand All @@ -21,6 +26,7 @@ Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), version

### Removed

- The Spark-based compression job and its `{table}_archive` Delta export path. Tiering is pure Lakebase SQL (`tiering_job.py`); CDF owns the Unity Catalog copy.
- `enable_rollup_export` / `disable_rollup_export` / `show_rollup_exports`, `sql/15_uc_integration.sql` and its functions (`register_uc_table`, `tag_uc_table`, `get_uc_registrations`, `unregister_uc_table`, `_uc_registry`), and the `rollup_export.py` / `uc_registration.py` Databricks jobs.

### Fixed
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ LakeTS is a time series toolkit for Databricks Lakebase — pure SQL (PL/pgSQL)
| **Time Series Functions** | `time_bucket`, `first`, `last`, `locf`, `interpolate`, `delta`, `rate`, `histogram` |
| **Gap-filling** | `time_bucket_gapfill` + LEFT JOIN for continuous time series |
| **RollUp Engine** | Incremental aggregates with per-bucket refresh, invalidation tracking, cold-tier re-aggregation, chunk-skip pruning, batch refresh, DAG orchestration, and Delta export |
| **Compression & Tiering** | Policy-based tiering from Lakebase to Delta Lake |
| **Tiering** | Policy-based eviction of cold partitions from Lakebase once CDF has flushed them to the Unity Catalog Managed Table |
| **Retention** | Automated data lifecycle management across both tiers |
| **Lakehouse Sync** | CDC-based replication to Delta via shadow table pattern |
| **Last Value Cache** | Sub-10ms latest-state queries via `enable_lvc()` |
Expand Down Expand Up @@ -112,7 +112,7 @@ FROM metrics GROUP BY 1 ORDER BY 1;
SELECT lakets.enable_lvc('system_metrics', ARRAY['host'], ARRAY['cpu','memory']);

-- Set up lifecycle policies
SELECT lakets.add_compression_policy('metrics', '7 days');
SELECT lakets.add_tiering_policy('metrics', '7 days');
SELECT lakets.add_retention_policy('metrics', '30 days');
```

Expand Down Expand Up @@ -182,7 +182,7 @@ LakeTS ships a pre-built **Databricks AI/BI dashboard** for monitoring your inst

| Page | Panels |
|------|--------|
| **Partition Health** | Hypertable count, chunk counts by status (active/compressed/tiered/dropped), chunk health table per hypertable, estimated row counts |
| **Partition Health** | Hypertable count, chunk counts by status (active/tiered/dropped), chunk health table per hypertable, estimated row counts |
| **RollUp Monitoring** | Stale RollUp counter, dirty bucket total, watermark lag bar chart per RollUp (colored by refresh mode), invalidation log depth, full RollUp status table |
| **LVC & System** | LVC-enabled table count, total cached series, database size (GB), LVC stats table, active policies by type |

Expand Down
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ MODULES=(
"02_chronotable.sql"
"03_timeseries_functions.sql"
"04_rollup.sql"
"05_compression.sql"
"05_tiering.sql"
"06_retention.sql"
"07_monitoring.sql"
"08_metric_table.sql"
Expand Down
11 changes: 5 additions & 6 deletions databricks/bundles/databricks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,19 @@ resources:
- ${var.lakebase_instance}
existing_cluster_id: ${var.cluster_id}

lakets_compression:
name: "LakeTS - Compression & Tiering"
description: "Tiers old chunks from Lakebase to Delta Lake"
lakets_tiering:
name: "LakeTS - Tiering"
description: "Drops cold ChronoTable partitions once CDF has flushed them to Unity Catalog"
schedule:
quartz_cron_expression: "0 0 2 * * ?" # Daily at 2 AM UTC
timezone_id: "UTC"
tasks:
- task_key: compress_and_tier
- task_key: tier_cold_partitions
python_wheel_task:
package_name: lakets
entry_point: compression_job
entry_point: tiering_job
parameters:
- ${var.lakebase_instance}
- ${var.delta_catalog}
existing_cluster_id: ${var.cluster_id}

lakets_retention:
Expand Down
102 changes: 0 additions & 102 deletions databricks/workflows/compression_job.py

This file was deleted.

81 changes: 81 additions & 0 deletions databricks/workflows/tiering_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
LakeTS Tiering Job

Drops cold ChronoTable partitions to reclaim Lakebase storage. The data is
already durable in the Unity Catalog Managed Table via Lakebase CDF; this job
only evicts partitions that lakets.tier_chunk confirms are safe to drop (the
table's CDF shadow is STREAMING and has flushed past the WAL head). The drop
and the metadata transition happen inside tier_chunk, so this job is pure
Lakebase SQL — no Spark.

Schedule: Daily or per tiering policy interval.

Usage as Databricks Job:
Pass instance_name as a parameter.
"""
import logging

from lakebase_utils import fetch_all, lakebase_cursor

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("lakets.tiering_job")


def run(instance_name: str) -> int:
"""Evict eligible, CDF-durable chunks for all tables with a tiering policy.

Returns the number of partitions actually dropped this run.
"""
total_tiered = 0
deferred = 0
with lakebase_cursor(instance_name) as cur:
tables = fetch_all(cur, """
SELECT hr.id, hr.schema_name, hr.table_name
FROM lakets._chronotable_registry hr
JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id
WHERE pr.policy_type = 'tiering' AND pr.enabled = TRUE
""")
logger.info("Found %d table(s) with tiering policies", len(tables))

for t in tables:
chunks = fetch_all(cur, """
SELECT * FROM lakets._get_chunks_to_tier(%s, %s)
""", (t["table_name"], t["schema_name"]))

if not chunks:
logger.info(
"No eligible chunks for %s.%s (not aged out, or CDF not streaming)",
t["schema_name"], t["table_name"],
)
continue

logger.info(
"Evaluating %d chunk(s) for %s.%s",
len(chunks), t["schema_name"], t["table_name"],
)

for chunk in chunks:
cur.execute("SELECT lakets.tier_chunk(%s)", (chunk["chunk_name"],))
dropped = cur.fetchone()[0] # raw cursor returns tuples (see fetch_all)
if dropped:
total_tiered += 1
logger.info("Tiered (dropped) partition %s", chunk["chunk_name"])
else:
deferred += 1
logger.info("Deferred %s — CDF not caught up to WAL head", chunk["chunk_name"])

cur.execute("""
UPDATE lakets._policy_registry SET last_run_at = now()
WHERE chronotable_id = %s AND policy_type = 'tiering'
""", (t["id"],))

logger.info("Tiering complete: %d dropped, %d deferred", total_tiered, deferred)
return total_tiered


if __name__ == "__main__":
import os
import sys

instance = sys.argv[1] if len(sys.argv) > 1 else os.environ["LAKETS_INSTANCE"]
run(instance)
15 changes: 9 additions & 6 deletions sql/01_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS lakets._chronotable_registry (
chunk_interval INTERVAL NOT NULL DEFAULT '7 days',
space_column TEXT,
space_partitions INT DEFAULT 1,
compression_enabled BOOLEAN DEFAULT FALSE,
tiering_enabled BOOLEAN DEFAULT FALSE,
retention_interval INTERVAL,
shadow_table_name TEXT,
sync_enabled BOOLEAN DEFAULT FALSE,
Expand All @@ -48,10 +48,13 @@ CREATE TABLE IF NOT EXISTS lakets._chunk_metadata (
status TEXT NOT NULL DEFAULT 'active',
row_count BIGINT,
size_bytes BIGINT,
compressed_at TIMESTAMPTZ,
tiered_at TIMESTAMPTZ,
-- Highest WAL position written to this chunk, stamped by the tiering
-- write-tracking trigger. The tiering durability gate drops a chunk only
-- once CDF's committed_lsn for the shadow has flushed past this mark.
last_write_lsn PG_LSN,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT valid_status CHECK (status IN ('active', 'compressed', 'tiered', 'dropped'))
CONSTRAINT valid_status CHECK (status IN ('active', 'tiered', 'dropped'))
);

CREATE INDEX IF NOT EXISTS idx_chunk_metadata_hypertable
Expand All @@ -61,7 +64,7 @@ CREATE INDEX IF NOT EXISTS idx_chunk_metadata_range
ON lakets._chunk_metadata(range_start, range_end);

-- ---------------------------------------------------------------------------
-- Policy Registry: tracks compression, retention, and tiering policies
-- Policy Registry: tracks tiering, retention, and tiered-retention policies
-- ---------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS lakets._policy_registry (
id SERIAL PRIMARY KEY,
Expand All @@ -71,7 +74,7 @@ CREATE TABLE IF NOT EXISTS lakets._policy_registry (
enabled BOOLEAN DEFAULT TRUE,
last_run_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT valid_policy_type CHECK (policy_type IN ('compression', 'retention', 'tiered_retention'))
CONSTRAINT valid_policy_type CHECK (policy_type IN ('tiering', 'retention', 'tiered_retention'))
);

-- ---------------------------------------------------------------------------
Expand Down Expand Up @@ -153,7 +156,7 @@ END $$;
CREATE UNIQUE INDEX IF NOT EXISTS idx_chunk_metadata_chunk_name
ON lakets._chunk_metadata(chunk_name);

-- FK index on _policy_registry (scanned by compression/retention jobs)
-- FK index on _policy_registry (scanned by tiering/retention jobs)
CREATE INDEX IF NOT EXISTS idx_policy_registry_ct_type
ON lakets._policy_registry(chronotable_id, policy_type) WHERE enabled = TRUE;

Expand Down
Loading