From 60a4a8185437f63ce056f8195ab0b3b4875ee914 Mon Sep 17 00:00:00 2001 From: Benjamin <5719034+bnjjj@users.noreply.github.com> Date: Mon, 27 Apr 2026 15:08:39 +0200 Subject: [PATCH 1/2] feat(ducklake): add catalog maintenances Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com> --- etl-api/src/configs/destination.rs | 31 ++ ...ination_config_serialization_ducklake.snap | 5 +- etl-api/src/k8s/core.rs | 2 + etl-api/src/validation/validators.rs | 6 + etl-config/src/shared/destination.rs | 6 + etl-destinations/src/ducklake/config.rs | 35 ++ etl-destinations/src/ducklake/core.rs | 38 +- etl-destinations/src/ducklake/maintenance.rs | 442 ++++++++++++++++++ .../tests/ducklake_destination.rs | 55 +++ etl-destinations/tests/ducklake_pipeline.rs | 1 + etl-replicator/src/core.rs | 2 + 11 files changed, 620 insertions(+), 3 deletions(-) diff --git a/etl-api/src/configs/destination.rs b/etl-api/src/configs/destination.rs index 52d40e856..af84f1a06 100644 --- a/etl-api/src/configs/destination.rs +++ b/etl-api/src/configs/destination.rs @@ -116,6 +116,13 @@ pub enum FullApiDestinationConfig { deserialize_with = "crate::utils::trim_option_string" )] maintenance_target_file_size: Option, + #[schema(example = "7 days")] + #[serde( + default, + skip_serializing_if = "Option::is_none", + deserialize_with = "crate::utils::trim_option_string" + )] + expire_snapshots_older_than: Option, }, } @@ -186,6 +193,7 @@ impl From for FullApiDestinationConfig { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, } => Self::Ducklake { catalog_url, data_path, @@ -199,6 +207,7 @@ impl From for FullApiDestinationConfig { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, }, } } @@ -229,6 +238,7 @@ pub enum StoredDestinationConfig { metadata_schema: Option, duckdb_memory_cache_limit: Option, maintenance_target_file_size: Option, + expire_snapshots_older_than: Option, }, } @@ -299,6 +309,7 @@ impl StoredDestinationConfig { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, } => DestinationConfig::Ducklake { catalog_url, data_path, @@ -312,6 +323,7 @@ impl StoredDestinationConfig { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, }, } } @@ -385,6 +397,7 @@ impl From for StoredDestinationConfig { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, } => Self::Ducklake { catalog_url, data_path, @@ -398,6 +411,7 @@ impl From for StoredDestinationConfig { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, }, } } @@ -496,6 +510,7 @@ impl Encrypt for StoredDestinationConfig { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, } => { let s3_access_key_id = s3_access_key_id .map(|value| encrypt_text(value.expose_secret().to_owned(), encryption_key)) @@ -517,6 +532,7 @@ impl Encrypt for StoredDestinationConfig { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, }) } } @@ -552,6 +568,7 @@ pub enum EncryptedStoredDestinationConfig { metadata_schema: Option, duckdb_memory_cache_limit: Option, maintenance_target_file_size: Option, + expire_snapshots_older_than: Option, }, } @@ -663,6 +680,7 @@ impl Decrypt for EncryptedStoredDestinationConfig { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, } => Ok(StoredDestinationConfig::Ducklake { catalog_url, data_path, @@ -684,6 +702,7 @@ impl Decrypt for EncryptedStoredDestinationConfig { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, }), } } @@ -1141,6 +1160,7 @@ mod tests { metadata_schema: Some("ducklake".to_string()), duckdb_memory_cache_limit: Some("50MB".to_string()), maintenance_target_file_size: Some("10MB".to_string()), + expire_snapshots_older_than: Some("7 days".to_string()), }; let key = EncryptionKey { id: 1, key: generate_random_key::<32>().unwrap() }; @@ -1163,6 +1183,7 @@ mod tests { metadata_schema: m1, duckdb_memory_cache_limit: memory1, maintenance_target_file_size: target1, + expire_snapshots_older_than: expire1, }, StoredDestinationConfig::Ducklake { catalog_url: c2, @@ -1177,6 +1198,7 @@ mod tests { metadata_schema: m2, duckdb_memory_cache_limit: memory2, maintenance_target_file_size: target2, + expire_snapshots_older_than: expire2, }, ) => { assert_eq!(c1, c2); @@ -1197,6 +1219,7 @@ mod tests { assert_eq!(m1, m2); assert_eq!(memory1, memory2); assert_eq!(target1, target2); + assert_eq!(expire1, expire2); } _ => panic!("Config types don't match"), } @@ -1217,6 +1240,7 @@ mod tests { metadata_schema: Some("ducklake".to_string()), duckdb_memory_cache_limit: None, maintenance_target_file_size: None, + expire_snapshots_older_than: None, }; let stored: StoredDestinationConfig = full_config.clone().into(); @@ -1231,6 +1255,7 @@ mod tests { metadata_schema: m1, duckdb_memory_cache_limit: memory1, maintenance_target_file_size: target1, + expire_snapshots_older_than: expire1, .. }, FullApiDestinationConfig::Ducklake { @@ -1240,6 +1265,7 @@ mod tests { metadata_schema: m2, duckdb_memory_cache_limit: memory2, maintenance_target_file_size: target2, + expire_snapshots_older_than: expire2, .. }, ) => { @@ -1250,6 +1276,7 @@ mod tests { assert_eq!(m1, m2); assert_eq!(memory1, memory2); assert_eq!(target1, target2); + assert_eq!(expire1, expire2); } _ => panic!("Config types don't match"), } @@ -1270,6 +1297,7 @@ mod tests { metadata_schema: Some("ducklake".to_string()), duckdb_memory_cache_limit: Some("50MB".to_string()), maintenance_target_file_size: Some("10MB".to_string()), + expire_snapshots_older_than: Some("7 days".to_string()), }; assert_json_snapshot!(full_config); @@ -1291,6 +1319,7 @@ mod tests { metadata_schema: m1, duckdb_memory_cache_limit: memory1, maintenance_target_file_size: target1, + expire_snapshots_older_than: expire1, }, FullApiDestinationConfig::Ducklake { catalog_url: c2, @@ -1305,6 +1334,7 @@ mod tests { metadata_schema: m2, duckdb_memory_cache_limit: memory2, maintenance_target_file_size: target2, + expire_snapshots_older_than: expire2, }, ) => { assert_eq!(c1, &c2); @@ -1325,6 +1355,7 @@ mod tests { assert_eq!(m1, &m2); assert_eq!(memory1, &memory2); assert_eq!(target1, &target2); + assert_eq!(expire1, &expire2); } _ => panic!("Deserialization failed or variant mismatch"), } diff --git a/etl-api/src/configs/snapshots/etl_api__configs__destination__tests__full_api_destination_config_serialization_ducklake.snap b/etl-api/src/configs/snapshots/etl_api__configs__destination__tests__full_api_destination_config_serialization_ducklake.snap index d34258355..532abe0be 100644 --- a/etl-api/src/configs/snapshots/etl_api__configs__destination__tests__full_api_destination_config_serialization_ducklake.snap +++ b/etl-api/src/configs/snapshots/etl_api__configs__destination__tests__full_api_destination_config_serialization_ducklake.snap @@ -1,6 +1,6 @@ --- source: etl-api/src/configs/destination.rs -assertion_line: 1234 +assertion_line: 1303 expression: full_config --- { @@ -16,6 +16,7 @@ expression: full_config "s3_use_ssl": false, "metadata_schema": "ducklake", "duckdb_memory_cache_limit": "50MB", - "maintenance_target_file_size": "10MB" + "maintenance_target_file_size": "10MB", + "expire_snapshots_older_than": "7 days" } } diff --git a/etl-api/src/k8s/core.rs b/etl-api/src/k8s/core.rs index 9b0a0c59a..193183f1d 100644 --- a/etl-api/src/k8s/core.rs +++ b/etl-api/src/k8s/core.rs @@ -604,6 +604,7 @@ mod tests { metadata_schema: None, duckdb_memory_cache_limit: None, maintenance_target_file_size: None, + expire_snapshots_older_than: None, }; let secrets = build_secrets_from_configs(&source_config, &destination_config); @@ -642,6 +643,7 @@ mod tests { metadata_schema: None, duckdb_memory_cache_limit: None, maintenance_target_file_size: None, + expire_snapshots_older_than: None, }; let secrets = build_secrets_from_configs(&source_config, &destination_config); diff --git a/etl-api/src/validation/validators.rs b/etl-api/src/validation/validators.rs index 397f53e68..1956f8c22 100644 --- a/etl-api/src/validation/validators.rs +++ b/etl-api/src/validation/validators.rs @@ -651,6 +651,7 @@ struct DucklakeValidator { metadata_schema: Option, duckdb_memory_cache_limit: Option, maintenance_target_file_size: Option, + expire_snapshots_older_than: Option, } impl DucklakeValidator { @@ -668,6 +669,7 @@ impl DucklakeValidator { metadata_schema: Option, duckdb_memory_cache_limit: Option, maintenance_target_file_size: Option, + expire_snapshots_older_than: Option, ) -> Self { Self { catalog_url, @@ -682,6 +684,7 @@ impl DucklakeValidator { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, } } } @@ -743,6 +746,7 @@ impl Validator for DucklakeValidator { self.metadata_schema.clone(), self.duckdb_memory_cache_limit.clone(), self.maintenance_target_file_size.clone(), + self.expire_snapshots_older_than.clone(), MemoryStore::new(), ) .await @@ -890,6 +894,7 @@ impl Validator for DestinationValidator { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, } => { let validator = DucklakeValidator::new( catalog_url.clone(), @@ -906,6 +911,7 @@ impl Validator for DestinationValidator { metadata_schema.clone(), duckdb_memory_cache_limit.clone(), maintenance_target_file_size.clone(), + expire_snapshots_older_than.clone(), ); validator.validate(ctx).await } diff --git a/etl-config/src/shared/destination.rs b/etl-config/src/shared/destination.rs index bc7178832..1e8255fad 100644 --- a/etl-config/src/shared/destination.rs +++ b/etl-config/src/shared/destination.rs @@ -77,6 +77,8 @@ pub enum DestinationConfig { duckdb_memory_cache_limit: Option, /// Optional DuckLake maintenance target file size. maintenance_target_file_size: Option, + /// Optional DuckLake snapshot-retention interval. + expire_snapshots_older_than: Option, }, } @@ -256,6 +258,8 @@ pub enum DestinationConfigWithoutSecrets { duckdb_memory_cache_limit: Option, /// Optional DuckLake maintenance target file size. maintenance_target_file_size: Option, + /// Optional DuckLake snapshot-retention interval. + expire_snapshots_older_than: Option, }, } @@ -290,6 +294,7 @@ impl From for DestinationConfigWithoutSecrets { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, } => DestinationConfigWithoutSecrets::Ducklake { catalog_url, data_path, @@ -301,6 +306,7 @@ impl From for DestinationConfigWithoutSecrets { metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, }, } } diff --git a/etl-destinations/src/ducklake/config.rs b/etl-destinations/src/ducklake/config.rs index e6cb52316..e8e03cce9 100644 --- a/etl-destinations/src/ducklake/config.rs +++ b/etl-destinations/src/ducklake/config.rs @@ -26,6 +26,7 @@ const HTTPFS_EXTENSION_FILE: &str = "httpfs.duckdb_extension"; const POSTGRES_SCANNER_EXTENSION_FILE: &str = "postgres_scanner.duckdb_extension"; pub(super) const TARGET_FILE_SIZE_OPTION_NAME: &str = "target_file_size"; pub(super) const MAINTENANCE_TARGET_FILE_SIZE: &str = "10MB"; +pub(super) const EXPIRE_SNAPSHOTS_OLDER_THAN: &str = "7 days"; pub(super) const DUCKDB_MEMORY_CACHE_LIMIT: &str = "150MB"; /// Resolves the configured DuckDB memory limit or falls back to the default. @@ -59,6 +60,24 @@ pub(super) fn maintenance_target_file_size_sql( ) } +/// Resolves the configured snapshot-retention window or falls back to the +/// default. +pub(super) fn resolve_expire_snapshots_older_than( + expire_snapshots_older_than: Option<&str>, +) -> &str { + expire_snapshots_older_than.unwrap_or(EXPIRE_SNAPSHOTS_OLDER_THAN) +} + +/// Builds the SQL used to validate the configured snapshot-retention window. +pub(super) fn validate_expire_snapshots_older_than_sql( + expire_snapshots_older_than: &str, +) -> String { + format!( + "SELECT CAST(now() AS TIMESTAMP) - CAST({} AS INTERVAL);", + quote_literal(expire_snapshots_older_than) + ) +} + /// One named DuckDB setup phase for a DuckLake connection. #[derive(Debug, Clone, PartialEq, Eq)] pub(super) struct DuckLakeSetupStep { @@ -1140,6 +1159,22 @@ mod tests { assert_eq!(plan.steps()[0].sql, format!("SET memory_limit = {};", quote_literal("256MB"))); } + #[test] + fn resolve_expire_snapshots_older_than_uses_default() { + assert_eq!(resolve_expire_snapshots_older_than(None), EXPIRE_SNAPSHOTS_OLDER_THAN); + assert_eq!(resolve_expire_snapshots_older_than(Some("2 days")), "2 days"); + } + + #[test] + fn validate_expire_snapshots_older_than_sql_casts_interval() { + let sql = validate_expire_snapshots_older_than_sql("2 days"); + let quoted_interval = quote_literal("2 days"); + + assert!(sql.contains("SELECT CAST(now() AS TIMESTAMP) - CAST(")); + assert!(sql.contains("ed_interval)); + assert!(sql.contains(" AS INTERVAL);")); + } + #[test] fn vendored_extension_strategy_disables_autoload() { assert!( diff --git a/etl-destinations/src/ducklake/core.rs b/etl-destinations/src/ducklake/core.rs index 80b2a5e95..03f22b739 100644 --- a/etl-destinations/src/ducklake/core.rs +++ b/etl-destinations/src/ducklake/core.rs @@ -56,7 +56,8 @@ use crate::{ }, config::{ MAINTENANCE_TARGET_FILE_SIZE, build_setup_plan, current_duckdb_extension_strategy, - maintenance_target_file_size_sql, + maintenance_target_file_size_sql, resolve_expire_snapshots_older_than, + validate_expire_snapshots_older_than_sql, }, inline_size::DuckLakePendingInlineSizeSampler, maintenance::{ @@ -388,6 +389,8 @@ where /// (e.g. `"150MB"`). Defaults to `150MB`. /// - `maintenance_target_file_size`: Optional DuckLake maintenance /// `target_file_size` value (e.g. `"10MB"`). Defaults to `10MB`. + /// - `expire_snapshots_older_than`: Optional DuckLake snapshot-retention + /// interval (e.g. `"7 days"`). Defaults to `7 days`. /// - `duckdb_log`: Optional DuckDB log storage and shutdown dump paths. /// - On Linux and macOS, DuckDB extensions are loaded from vendored local /// files when a vendored directory is available. The root directory can @@ -407,6 +410,7 @@ where metadata_schema: Option, duckdb_memory_cache_limit: Option, maintenance_target_file_size: Option, + expire_snapshots_older_than: Option, store: S, ) -> EtlResult { register_metrics(); @@ -433,6 +437,9 @@ where maintenance_target_file_size .unwrap_or_else(|| MAINTENANCE_TARGET_FILE_SIZE.to_string()), ); + let expire_snapshots_older_than = Arc::::from( + resolve_expire_snapshots_older_than(expire_snapshots_older_than.as_deref()).to_owned(), + ); if let crate::ducklake::config::DuckDbExtensionStrategy::VendoredLocal { platform_dir } = extension_strategy { @@ -482,6 +489,31 @@ where }, ) .await?; + let expire_snapshots_validation_sql = + validate_expire_snapshots_older_than_sql(expire_snapshots_older_than.as_ref()); + let expire_snapshots_older_than_for_error = Arc::clone(&expire_snapshots_older_than); + run_duckdb_blocking( + Arc::clone(&pool), + Arc::clone(&blocking_slots), + DuckDbBlockingOperationKind::Foreground, + move |conn| -> EtlResult<()> { + conn.query_row(&expire_snapshots_validation_sql, [], |_row| Ok(())).map_err( + |source| { + etl_error!( + ErrorKind::ConfigError, + "DuckLake expire_snapshots_older_than configuration failed", + format!( + "invalid expire_snapshots_older_than value `{}`", + expire_snapshots_older_than_for_error + ), + source: source + ) + }, + )?; + Ok(()) + }, + ) + .await?; let metadata_schema = match metadata_schema { Some(metadata_schema) => metadata_schema, None => { @@ -537,6 +569,7 @@ where Arc::clone(&inline_flush_requested), Arc::clone(&inline_flush_requests), pending_inline_size_sampler, + Arc::clone(&expire_snapshots_older_than), )? .into(), ); @@ -1645,6 +1678,7 @@ mod tests { None, None, None, + None, store, ) .await @@ -1710,6 +1744,7 @@ mod tests { None, None, None, + None, store, ) .await @@ -1771,6 +1806,7 @@ mod tests { None, None, None, + None, store, ) .await diff --git a/etl-destinations/src/ducklake/maintenance.rs b/etl-destinations/src/ducklake/maintenance.rs index 8514c6d87..9553269f5 100644 --- a/etl-destinations/src/ducklake/maintenance.rs +++ b/etl-destinations/src/ducklake/maintenance.rs @@ -42,6 +42,10 @@ use crate::ducklake::{ const MAINTENANCE_POOL_SIZE: u32 = 1; /// Poll interval for checking per-table inline flush thresholds. const MAINTENANCE_FLUSH_POLL_INTERVAL: Duration = Duration::from_secs(30); +/// Fixed cadence for expiring old DuckLake snapshots. +const MAINTENANCE_EXPIRE_SNAPSHOTS_INTERVAL: Duration = Duration::from_secs(5 * 60 * 60); +/// Fixed cadence for cleaning up old DuckLake files. +const MAINTENANCE_CLEANUP_OLD_FILES_INTERVAL: Duration = Duration::from_secs(6 * 60 * 60); /// Pending inlined bytes threshold that triggers a background inline flush. const MAINTENANCE_PENDING_INLINED_DATA_BYTES_THRESHOLD: u64 = 10_000_000; /// Estimated ratio from raw row payload to compressed parquet bytes. @@ -72,6 +76,7 @@ const MAINTENANCE_EMERGENCY_REWRITE_DELETED_ROW_RATIO_THRESHOLD: f64 = 0.25; pub(super) const NOTIFICATION_SEND_TIMEOUT: Duration = Duration::from_secs(5); const MAINTENANCE_TASK_FLUSH: &str = "flush"; +const MAINTENANCE_TASK_CATALOG_MAINTENANCE: &str = "catalog_maintenance"; const MAINTENANCE_TASK_TARGETED_MAINTENANCE: &str = "targeted_maintenance"; #[cfg(test)] @@ -81,6 +86,8 @@ static FAIL_REWRITE_SINGLE_OUTPUT_FILE_ONCE_FOR_TESTS: AtomicBool = AtomicBool:: #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum MaintenanceOperation { FlushInlinedData, + ExpireSnapshots, + CleanupOldFiles, RewriteDataFiles, } @@ -89,6 +96,8 @@ impl MaintenanceOperation { fn as_str(self) -> &'static str { match self { Self::FlushInlinedData => "flush_inlined_data", + Self::ExpireSnapshots => "expire_snapshots", + Self::CleanupOldFiles => "cleanup_old_files", Self::RewriteDataFiles => "rewrite_data_files", } } @@ -101,6 +110,8 @@ enum MaintenanceReason { PendingInlinedDataBytesThreshold, PendingBytesThreshold, PendingInsertedRowsThreshold, + SnapshotRetentionThreshold, + CleanupIntervalElapsed, IdleRewriteMetricsThreshold, EmergencyRewriteMetricsThreshold, } @@ -112,6 +123,8 @@ impl MaintenanceReason { Self::PendingInlinedDataBytesThreshold => "pending_inlined_data_bytes_threshold", Self::PendingBytesThreshold => "pending_bytes_threshold", Self::PendingInsertedRowsThreshold => "pending_inserted_rows_threshold", + Self::SnapshotRetentionThreshold => "snapshot_retention_threshold", + Self::CleanupIntervalElapsed => "cleanup_interval_elapsed", Self::IdleRewriteMetricsThreshold => "idle_rewrite_metrics_threshold", Self::EmergencyRewriteMetricsThreshold => "emergency_rewrite_metrics_threshold", } @@ -237,6 +250,60 @@ impl TargetedMaintenancePlan { } } +/// Static configuration for periodic catalog-level maintenance. +#[derive(Debug, Clone)] +struct CatalogMaintenanceConfig { + expire_snapshots_older_than: Arc, + expire_snapshots_interval: Duration, + cleanup_old_files_interval: Duration, +} + +impl CatalogMaintenanceConfig { + /// Builds the fixed catalog-maintenance configuration. + fn new(expire_snapshots_older_than: Arc) -> Self { + Self { + expire_snapshots_older_than, + expire_snapshots_interval: MAINTENANCE_EXPIRE_SNAPSHOTS_INTERVAL, + cleanup_old_files_interval: MAINTENANCE_CLEANUP_OLD_FILES_INTERVAL, + } + } +} + +/// Periodic catalog-maintenance state tracked by the background worker. +#[derive(Debug, Default)] +struct CatalogMaintenanceState { + last_expire_snapshots_completed_at: Option, + last_cleanup_old_files_completed_at: Option, +} + +impl CatalogMaintenanceState { + /// Returns whether snapshot expiration is due now. + fn expire_snapshots_due(&self, now: Instant, interval: Duration) -> bool { + match self.last_expire_snapshots_completed_at { + Some(last_completed_at) => now.saturating_duration_since(last_completed_at) >= interval, + None => true, + } + } + + /// Returns whether old-file cleanup is due now. + fn cleanup_old_files_due(&self, now: Instant, interval: Duration) -> bool { + match self.last_cleanup_old_files_completed_at { + Some(last_completed_at) => now.saturating_duration_since(last_completed_at) >= interval, + None => true, + } + } + + /// Records one successful snapshot-expiration run. + fn complete_expire_snapshots(&mut self, now: Instant) { + self.last_expire_snapshots_completed_at = Some(now); + } + + /// Records one successful cleanup-old-files run. + fn complete_cleanup_old_files(&mut self, now: Instant) { + self.last_cleanup_old_files_completed_at = Some(now); + } +} + /// Coalesced maintenance state for one DuckLake table. #[derive(Debug, Default)] struct TableMaintenanceState { @@ -551,6 +618,25 @@ fn record_skipped_targeted_maintenance(plan: TargetedMaintenancePlan) { } } +/// Records skipped catalog-maintenance operations when the worker cannot enter +/// the exclusive safe point yet. +fn record_skipped_catalog_maintenance(expire_snapshots_due: bool, cleanup_old_files_due: bool) { + if expire_snapshots_due { + record_ducklake_maintenance_skipped( + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::ExpireSnapshots, + MaintenanceReason::SnapshotRetentionThreshold, + ); + } + if cleanup_old_files_due { + record_ducklake_maintenance_skipped( + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::CleanupOldFiles, + MaintenanceReason::CleanupIntervalElapsed, + ); + } +} + /// Returns whether this maintenance failure matches a known DuckLake compaction /// bug. fn is_known_ducklake_compaction_single_output_file_error(error: &EtlError) -> bool { @@ -691,6 +777,7 @@ pub(super) fn spawn_ducklake_maintenance_worker( inline_flush_requested: Arc, pending_inline_flush_requests: Arc, pending_inline_size_sampler: Option, + expire_snapshots_older_than: Arc, ) -> EtlResult { let mut pool = LazyDuckLakePool::new(manager, MAINTENANCE_POOL_SIZE, "maintenance"); pool.warm_in_background(); @@ -703,6 +790,7 @@ pub(super) fn spawn_ducklake_maintenance_worker( inline_flush_requested, pending_inline_flush_requests, pending_inline_size_sampler, + CatalogMaintenanceConfig::new(expire_snapshots_older_than), notification_rx, shutdown_rx, )); @@ -723,6 +811,7 @@ async fn run_ducklake_maintenance_worker( inline_flush_requested: Arc, pending_inline_flush_requests: Arc, pending_inline_size_sampler: Option, + catalog_maintenance_config: CatalogMaintenanceConfig, mut notification_rx: mpsc::Receiver, mut shutdown_rx: watch::Receiver<()>, ) { @@ -731,6 +820,7 @@ async fn run_ducklake_maintenance_worker( flush_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut table_states: HashMap = HashMap::new(); + let mut catalog_maintenance_state = CatalogMaintenanceState::default(); loop { tokio::select! { @@ -862,6 +952,15 @@ async fn run_ducklake_maintenance_worker( || state.dirty_since_compaction || state.latest_storage_metrics.is_some() }); + + maybe_run_catalog_maintenance( + &mut pool, + Arc::clone(&checkpoint_gate), + Arc::clone(&blocking_slots), + &catalog_maintenance_config, + &mut catalog_maintenance_state, + ) + .await; } } } @@ -960,6 +1059,95 @@ async fn run_targeted_table_maintenance( }) } +/// Runs catalog-level DuckLake maintenance when its fixed cadence is due. +async fn maybe_run_catalog_maintenance( + pool: &mut LazyDuckLakePool, + checkpoint_gate: Arc>, + blocking_slots: Arc, + config: &CatalogMaintenanceConfig, + state: &mut CatalogMaintenanceState, +) { + let now = Instant::now(); + let expire_snapshots_due = state.expire_snapshots_due(now, config.expire_snapshots_interval); + let cleanup_old_files_due = state.cleanup_old_files_due(now, config.cleanup_old_files_interval); + + if !expire_snapshots_due && !cleanup_old_files_due { + return; + } + + let Ok(_checkpoint_guard) = checkpoint_gate.try_write_owned() else { + record_skipped_catalog_maintenance(expire_snapshots_due, cleanup_old_files_due); + return; + }; + + let pool = match pool.get_or_init_pool().await { + Ok(pool) => pool, + Err(error) => { + warn!(error = ?error, "ducklake maintenance pool initialization failed"); + return; + } + }; + + match run_catalog_maintenance( + pool, + Arc::clone(&blocking_slots), + Arc::clone(&config.expire_snapshots_older_than), + expire_snapshots_due, + cleanup_old_files_due, + ) + .await + { + Ok((expired_snapshots, cleaned_up_files)) => { + let completed_at = Instant::now(); + if expire_snapshots_due { + state.complete_expire_snapshots(completed_at); + } + if cleanup_old_files_due { + state.complete_cleanup_old_files(completed_at); + } + info!( + expire_snapshots_older_than = %config.expire_snapshots_older_than, + expire_snapshots_due, + cleanup_old_files_due, + expired_snapshots, + cleaned_up_files, + "ducklake catalog maintenance completed" + ); + } + Err(error) => { + warn!( + expire_snapshots_older_than = %config.expire_snapshots_older_than, + error = ?error, + "ducklake catalog maintenance failed" + ); + } + } +} + +/// Runs catalog-level maintenance inside one DuckDB blocking operation. +async fn run_catalog_maintenance( + pool: Arc>, + blocking_slots: Arc, + expire_snapshots_older_than: Arc, + expire_snapshots_due: bool, + cleanup_old_files_due: bool, +) -> EtlResult<(u64, u64)> { + run_duckdb_blocking( + pool, + blocking_slots, + DuckDbBlockingOperationKind::Maintenance, + move |conn| { + run_catalog_maintenance_blocking( + conn, + expire_snapshots_older_than.as_ref(), + expire_snapshots_due, + cleanup_old_files_due, + ) + }, + ) + .await +} + /// Runs requested inline flushes before foreground ingestion begins. pub(super) async fn maybe_run_requested_inline_flush( pool: Arc>, @@ -1109,6 +1297,140 @@ fn run_targeted_table_maintenance_blocking( Ok(rewrite_outcome.unwrap_or(MaintenanceOutcome::Noop)) } +/// Builds the DuckLake snapshot-expiration call for one retention window. +fn expire_snapshots_sql(expire_snapshots_older_than: &str) -> String { + format!( + "CALL ducklake_expire_snapshots({}, older_than => CAST(now() AS TIMESTAMP) - CAST({} AS \ + INTERVAL));", + quote_literal(LAKE_CATALOG), + quote_literal(expire_snapshots_older_than), + ) +} + +/// Builds the DuckLake old-file cleanup call for one retention window. +fn cleanup_old_files_sql(expire_snapshots_older_than: &str) -> String { + format!( + "CALL ducklake_cleanup_old_files({}, older_than => CAST(now() AS TIMESTAMP) - CAST({} AS \ + INTERVAL));", + quote_literal(LAKE_CATALOG), + quote_literal(expire_snapshots_older_than), + ) +} + +/// Counts the rows returned by one DuckLake maintenance call. +fn count_ducklake_maintenance_rows( + conn: &duckdb::Connection, + sql: &str, + description: &'static str, +) -> EtlResult { + let mut statement = conn.prepare(sql).map_err(|source| { + etl_error!( + ErrorKind::DestinationQueryFailed, + description, + format_query_error_detail(sql, &source), + source: source + ) + })?; + let mut rows = statement.query([]).map_err(|source| { + etl_error!( + ErrorKind::DestinationQueryFailed, + description, + format_query_error_detail(sql, &source), + source: source + ) + })?; + let mut count = 0u64; + + while let Some(_row) = rows.next().map_err(|source| { + etl_error!( + ErrorKind::DestinationQueryFailed, + description, + format_query_error_detail(sql, &source), + source: source + ) + })? { + count = count.saturating_add(1); + } + + Ok(count) +} + +/// Runs DuckLake catalog maintenance and records per-operation outcomes. +fn run_catalog_maintenance_blocking( + conn: &duckdb::Connection, + expire_snapshots_older_than: &str, + expire_snapshots_due: bool, + cleanup_old_files_due: bool, +) -> EtlResult<(u64, u64)> { + let mut expired_snapshots = 0u64; + let mut cleaned_up_files = 0u64; + + if expire_snapshots_due { + let expire_reason = MaintenanceReason::SnapshotRetentionThreshold; + let expire_started = Instant::now(); + let _expire_guard = DuckLakeMaintenanceInProgressGuard::start( + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::ExpireSnapshots, + expire_reason, + ); + let expire_sql = expire_snapshots_sql(expire_snapshots_older_than); + expired_snapshots = + count_ducklake_maintenance_rows(conn, &expire_sql, "DuckLake expire snapshots failed") + .inspect_err(|_error| { + record_ducklake_maintenance_duration( + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::ExpireSnapshots, + expire_reason, + MaintenanceOutcome::Failed, + expire_started.elapsed().as_secs_f64(), + ); + })?; + let expire_outcome = MaintenanceOutcome::from(expired_snapshots); + record_ducklake_maintenance_duration( + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::ExpireSnapshots, + expire_reason, + expire_outcome, + expire_started.elapsed().as_secs_f64(), + ); + } + + if cleanup_old_files_due { + let cleanup_reason = MaintenanceReason::CleanupIntervalElapsed; + let cleanup_started = Instant::now(); + let _cleanup_guard = DuckLakeMaintenanceInProgressGuard::start( + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::CleanupOldFiles, + cleanup_reason, + ); + let cleanup_sql = cleanup_old_files_sql(expire_snapshots_older_than); + cleaned_up_files = count_ducklake_maintenance_rows( + conn, + &cleanup_sql, + "DuckLake cleanup old files failed", + ) + .inspect_err(|_error| { + record_ducklake_maintenance_duration( + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::CleanupOldFiles, + cleanup_reason, + MaintenanceOutcome::Failed, + cleanup_started.elapsed().as_secs_f64(), + ); + })?; + let cleanup_outcome = MaintenanceOutcome::from(cleaned_up_files); + record_ducklake_maintenance_duration( + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::CleanupOldFiles, + cleanup_reason, + cleanup_outcome, + cleanup_started.elapsed().as_secs_f64(), + ); + } + + Ok((expired_snapshots, cleaned_up_files)) +} + /// Flushes inlined user data for one table after the write transaction commits. pub(super) fn flush_table_inlined_data( conn: &duckdb::Connection, @@ -1488,6 +1810,45 @@ mod tests { assert!(skipped_after > skipped_before, "rewrite skip count did not increase"); } + #[tokio::test] + async fn catalog_maintenance_busy_emits_skip_counter_for_both_operations() { + let handle = init_metrics_handle().expect("failed to initialize prometheus handle"); + register_metrics(); + + let rendered_before = handle.render(); + let expire_before = maintenance_skipped_counter_value( + &rendered_before, + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::ExpireSnapshots, + MaintenanceReason::SnapshotRetentionThreshold, + ); + let cleanup_before = maintenance_skipped_counter_value( + &rendered_before, + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::CleanupOldFiles, + MaintenanceReason::CleanupIntervalElapsed, + ); + + record_skipped_catalog_maintenance(true, true); + + let rendered_after = handle.render(); + let expire_after = maintenance_skipped_counter_value( + &rendered_after, + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::ExpireSnapshots, + MaintenanceReason::SnapshotRetentionThreshold, + ); + let cleanup_after = maintenance_skipped_counter_value( + &rendered_after, + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::CleanupOldFiles, + MaintenanceReason::CleanupIntervalElapsed, + ); + + assert!(expire_after > expire_before, "expire snapshots skip count did not increase"); + assert!(cleanup_after > cleanup_before, "cleanup old files skip count did not increase"); + } + #[test] fn table_maintenance_state_prefers_pending_bytes_flush_reason() { let now = Instant::now(); @@ -1597,6 +1958,46 @@ mod tests { assert_eq!(MaintenanceOutcome::from(3), MaintenanceOutcome::Applied); } + #[test] + fn catalog_maintenance_state_tracks_independent_due_intervals() { + let now = Instant::now(); + let mut state = CatalogMaintenanceState::default(); + + assert!(state.expire_snapshots_due(now, MAINTENANCE_EXPIRE_SNAPSHOTS_INTERVAL)); + assert!(state.cleanup_old_files_due(now, MAINTENANCE_CLEANUP_OLD_FILES_INTERVAL)); + + state.complete_expire_snapshots(now); + state.complete_cleanup_old_files(now); + + assert!(!state.expire_snapshots_due( + now + MAINTENANCE_EXPIRE_SNAPSHOTS_INTERVAL - Duration::from_secs(1), + MAINTENANCE_EXPIRE_SNAPSHOTS_INTERVAL + )); + assert!(state.expire_snapshots_due( + now + MAINTENANCE_EXPIRE_SNAPSHOTS_INTERVAL, + MAINTENANCE_EXPIRE_SNAPSHOTS_INTERVAL + )); + assert!(!state.cleanup_old_files_due( + now + MAINTENANCE_CLEANUP_OLD_FILES_INTERVAL - Duration::from_secs(1), + MAINTENANCE_CLEANUP_OLD_FILES_INTERVAL + )); + assert!(state.cleanup_old_files_due( + now + MAINTENANCE_CLEANUP_OLD_FILES_INTERVAL, + MAINTENANCE_CLEANUP_OLD_FILES_INTERVAL + )); + } + + #[test] + fn catalog_maintenance_sql_builders_use_interval_casts() { + let expire_sql = expire_snapshots_sql("2 days"); + let cleanup_sql = cleanup_old_files_sql("2 days"); + + assert!(expire_sql.contains("ducklake_expire_snapshots")); + assert!(cleanup_sql.contains("ducklake_cleanup_old_files")); + assert!(expire_sql.contains("CAST('2 days' AS INTERVAL)")); + assert!(cleanup_sql.contains("CAST('2 days' AS INTERVAL)")); + } + #[tokio::test] async fn flush_failure_records_failed_metric() { let handle = init_metrics_handle().expect("failed to initialize prometheus handle"); @@ -1671,6 +2072,46 @@ mod tests { assert!(failed_after > failed_before, "rewrite failed duration count did not increase"); } + #[tokio::test] + async fn catalog_expire_failure_records_failed_duration() { + let handle = init_metrics_handle().expect("failed to initialize prometheus handle"); + register_metrics(); + let conn = duckdb::Connection::open_in_memory().expect("failed to open in-memory duckdb"); + + let rendered_before = handle.render(); + let failed_before = maintenance_duration_count( + &rendered_before, + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::ExpireSnapshots, + MaintenanceReason::SnapshotRetentionThreshold, + MaintenanceOutcome::Failed, + ); + + let error = run_catalog_maintenance_blocking(&conn, "7 days", true, true) + .expect_err("catalog maintenance should fail without ducklake functions"); + + assert!(matches!(error.kind(), ErrorKind::DestinationQueryFailed)); + + let rendered_after = handle.render(); + let failed_after = maintenance_duration_count( + &rendered_after, + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::ExpireSnapshots, + MaintenanceReason::SnapshotRetentionThreshold, + MaintenanceOutcome::Failed, + ); + let cleanup_failed_after = maintenance_duration_count( + &rendered_after, + MAINTENANCE_TASK_CATALOG_MAINTENANCE, + MaintenanceOperation::CleanupOldFiles, + MaintenanceReason::CleanupIntervalElapsed, + MaintenanceOutcome::Failed, + ); + + assert!(failed_after > failed_before, "expire snapshots failed duration did not increase"); + assert_eq!(cleanup_failed_after, 0.0, "cleanup should not run after expiration fails"); + } + #[cfg(feature = "test-utils")] #[tokio::test] async fn known_rewrite_single_output_file_error_is_suppressed_and_recycles_connection() { @@ -1741,6 +2182,7 @@ mod tests { Arc::new(AtomicBool::new(false)), Arc::new(PendingInlineFlushRequests::default()), None, + Arc::::from("7 days"), ) .expect("failed to spawn maintenance worker"); diff --git a/etl-destinations/tests/ducklake_destination.rs b/etl-destinations/tests/ducklake_destination.rs index 1e7124e9f..ae4bc3a2d 100644 --- a/etl-destinations/tests/ducklake_destination.rs +++ b/etl-destinations/tests/ducklake_destination.rs @@ -282,6 +282,7 @@ async fn write_table_rows_basic() { None, None, None, + None, store, ) .await @@ -340,6 +341,7 @@ async fn write_table_rows_small_batch_stays_inlined_after_return() { None, None, None, + None, store, ) .await @@ -383,6 +385,7 @@ async fn ducklake_rejects_zero_pool_size() { None, None, None, + None, MemoryStore::new(), ) .await @@ -417,6 +420,7 @@ async fn ducklake_rejects_non_postgres_catalog_url() { None, None, None, + None, MemoryStore::new(), ) .await @@ -426,6 +430,34 @@ async fn ducklake_rejects_non_postgres_catalog_url() { assert_eq!(err.kind(), ErrorKind::ConfigError); } +/// Invalid snapshot-retention intervals should fail during destination +/// initialization. +#[tokio::test(flavor = "multi_thread")] +async fn ducklake_rejects_invalid_expire_snapshots_retention() { + let lake = create_test_lake("ducklake_rejects_invalid_expire_snapshots_retention").await; + + let err = DuckLakeDestination::new( + lake.catalog_url.clone(), + lake.data_url.clone(), + 1, + None, + None, + None, + None, + Some("definitely not an interval".to_string()), + MemoryStore::new(), + ) + .await + .err() + .expect("invalid expire_snapshots_older_than should fail"); + + assert_eq!(err.kind(), ErrorKind::ConfigError); + assert_eq!( + err.description(), + Some("DuckLake expire_snapshots_older_than configuration failed") + ); +} + /// Repeated writes should reuse the warm pooled DuckDB connection. #[cfg(feature = "test-utils")] #[tokio::test(flavor = "multi_thread")] @@ -452,6 +484,7 @@ async fn write_table_rows_reuses_warm_pooled_connection() { None, None, None, + None, store, ) .await @@ -509,6 +542,7 @@ async fn write_table_rows_replaces_broken_pooled_connection_after_retry() { None, None, None, + None, store, ) .await @@ -568,6 +602,7 @@ async fn write_table_rows_retry_after_post_commit_failure_is_idempotent() { None, None, None, + None, store, ) .await @@ -618,6 +653,7 @@ async fn concurrent_same_table_copy_batches_complete() { None, None, None, + None, store, ) .await @@ -743,6 +779,7 @@ async fn write_table_rows_empty_creates_table() { None, None, None, + None, store, ) .await @@ -779,6 +816,7 @@ async fn truncate_clears_rows() { None, None, None, + None, store, ) .await @@ -839,6 +877,7 @@ async fn truncate_clears_copy_markers_for_recopy() { None, None, None, + None, store, ) .await @@ -883,6 +922,7 @@ async fn write_events() { None, None, None, + None, store, ) .await @@ -973,6 +1013,7 @@ async fn write_events_small_batch_stays_inlined_after_return() { None, None, None, + None, store, ) .await @@ -1025,6 +1066,7 @@ async fn write_events_with_old_row_update() { None, None, None, + None, store, ) .await @@ -1101,6 +1143,7 @@ async fn write_events_with_partial_updates() { None, None, None, + None, store, ) .await @@ -1193,6 +1236,7 @@ async fn write_events_without_replica_identity_rejects_mutations() { None, None, None, + None, store, ) .await @@ -1280,6 +1324,7 @@ async fn write_events_replay_is_idempotent() { None, None, None, + None, store, ) .await @@ -1375,6 +1420,7 @@ async fn write_events_same_commit_lsn_higher_tx_ordinal_still_applies() { None, None, None, + None, store, ) .await @@ -1452,6 +1498,7 @@ async fn write_events_restart_overlap_rebatches_only_pending_suffix() { None, None, None, + None, store.clone(), ) .await @@ -1493,6 +1540,7 @@ async fn write_events_restart_overlap_rebatches_only_pending_suffix() { None, None, None, + None, store, ) .await @@ -1569,6 +1617,7 @@ async fn write_events_reuses_one_staging_table_per_atomic_batch() { None, None, None, + None, store, ) .await @@ -1655,6 +1704,7 @@ async fn applied_batches_table_uses_data_inlining() { None, None, None, + None, store, ) .await @@ -1704,6 +1754,7 @@ async fn write_events_mixed_multi_table_batches() { None, None, None, + None, store, ) .await @@ -1836,6 +1887,7 @@ async fn write_events_truncate_retry_after_post_commit_failure_is_idempotent() { None, None, None, + None, store, ) .await @@ -1930,6 +1982,7 @@ async fn write_events_retry_after_post_commit_failure_is_idempotent() { None, None, None, + None, store, ) .await @@ -2033,6 +2086,7 @@ async fn concurrent_writes_with_single_slot_complete() { None, None, None, + None, store, ) .await @@ -2097,6 +2151,7 @@ async fn type_mapping_round_trip() { None, None, None, + None, store, ) .await diff --git a/etl-destinations/tests/ducklake_pipeline.rs b/etl-destinations/tests/ducklake_pipeline.rs index 89352351c..94b1e3ed6 100644 --- a/etl-destinations/tests/ducklake_pipeline.rs +++ b/etl-destinations/tests/ducklake_pipeline.rs @@ -108,6 +108,7 @@ async fn build_destination( None, None, None, + None, store, ) .await diff --git a/etl-replicator/src/core.rs b/etl-replicator/src/core.rs index 71cf15c1f..c56471d94 100644 --- a/etl-replicator/src/core.rs +++ b/etl-replicator/src/core.rs @@ -160,6 +160,7 @@ pub(crate) async fn start_replicator_with_config( metadata_schema, duckdb_memory_cache_limit, maintenance_target_file_size, + expire_snapshots_older_than, } => { set_destination_scope::>(); @@ -189,6 +190,7 @@ pub(crate) async fn start_replicator_with_config( metadata_schema.clone(), duckdb_memory_cache_limit.clone(), maintenance_target_file_size.clone(), + expire_snapshots_older_than.clone(), state_store.clone(), ) .await?; From f17bc3236097efcf21a2dfbd61c1d8690c3962d1 Mon Sep 17 00:00:00 2001 From: Benjamin <5719034+bnjjj@users.noreply.github.com> Date: Mon, 27 Apr 2026 16:07:31 +0200 Subject: [PATCH 2/2] feat(ducklake): add catalog maintenances Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com> --- AGENTS.md | 2 ++ etl-examples/src/bin/ducklake.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/AGENTS.md b/AGENTS.md index c8f9386c9..c162d9037 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -45,6 +45,7 @@ - Never create commits, push branches, open pull requests, or perform other git write actions unless the user explicitly instructs you to do so. - Keep the workspace on the stable toolchain from `rust-toolchain.toml` for build, lint, and test commands; use the pinned nightly formatter only through `./scripts/fmt` and `./scripts/fmt-check`. - Treat `Cargo.toml` workspace lints, `rustfmt.toml`, and compiler diagnostics as the source of truth for enforceable style and correctness rules. Prefer adding or tightening static checks over adding prose rules here. +- Always run the clippy lint command listed in commands section at the end to check everything compile properly. ## Rust Style - This section is only for project-specific judgment that is not already covered by rustfmt, rustc, or Clippy. @@ -101,6 +102,7 @@ - When fixing a specific crate, run the narrowest relevant tests first, then broaden if needed. - Add or update tests when behavior changes, regressions are possible, or new logic is introduced. - Register `NotifyingStore::notify_on_*` and `TestDestinationWrapper::wait_for_*` handles before the producer can fire. These helpers only arm on updates that arrive *after* registration, so register the notifier first, then start the producer. +- If your tests need `TESTS_DATABASE_HOST` to be set or a test instance of PostgreSQL you can use `cargo xtask postgres create` command to spawn a postgres instance ```rust let ready = store.notify_on_table_state_type(id, Ready).await; diff --git a/etl-examples/src/bin/ducklake.rs b/etl-examples/src/bin/ducklake.rs index 665abff76..07b837649 100644 --- a/etl-examples/src/bin/ducklake.rs +++ b/etl-examples/src/bin/ducklake.rs @@ -230,6 +230,7 @@ async fn main_impl() -> Result<(), Box> { args.ducklake_args.metadata_schema, None, None, + None, store.clone(), ) .await?;