Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
- Never create commits, push branches, open pull requests, or perform other git write actions unless the user explicitly instructs you to do so.
- Keep the workspace on the stable toolchain from `rust-toolchain.toml` for build, lint, and test commands; use the pinned nightly formatter only through `./scripts/fmt` and `./scripts/fmt-check`.
- Treat `Cargo.toml` workspace lints, `rustfmt.toml`, and compiler diagnostics as the source of truth for enforceable style and correctness rules. Prefer adding or tightening static checks over adding prose rules here.
- Always run the clippy lint command listed in commands section at the end to check everything compile properly.

## Rust Style
- This section is only for project-specific judgment that is not already covered by rustfmt, rustc, or Clippy.
Expand Down Expand Up @@ -101,6 +102,7 @@
- When fixing a specific crate, run the narrowest relevant tests first, then broaden if needed.
- Add or update tests when behavior changes, regressions are possible, or new logic is introduced.
- Register `NotifyingStore::notify_on_*` and `TestDestinationWrapper::wait_for_*` handles before the producer can fire. These helpers only arm on updates that arrive *after* registration, so register the notifier first, then start the producer.
- If your tests need `TESTS_DATABASE_HOST` to be set or a test instance of PostgreSQL you can use `cargo xtask postgres create` command to spawn a postgres instance

```rust
let ready = store.notify_on_table_state_type(id, Ready).await;
Expand Down
31 changes: 31 additions & 0 deletions etl-api/src/configs/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ pub enum FullApiDestinationConfig {
deserialize_with = "crate::utils::trim_option_string"
)]
maintenance_target_file_size: Option<String>,
#[schema(example = "7 days")]
#[serde(
default,
skip_serializing_if = "Option::is_none",
deserialize_with = "crate::utils::trim_option_string"
)]
expire_snapshots_older_than: Option<String>,
},
}

Expand Down Expand Up @@ -186,6 +193,7 @@ impl From<StoredDestinationConfig> for FullApiDestinationConfig {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
} => Self::Ducklake {
catalog_url,
data_path,
Expand All @@ -199,6 +207,7 @@ impl From<StoredDestinationConfig> for FullApiDestinationConfig {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
},
}
}
Expand Down Expand Up @@ -229,6 +238,7 @@ pub enum StoredDestinationConfig {
metadata_schema: Option<String>,
duckdb_memory_cache_limit: Option<String>,
maintenance_target_file_size: Option<String>,
expire_snapshots_older_than: Option<String>,
},
}

Expand Down Expand Up @@ -299,6 +309,7 @@ impl StoredDestinationConfig {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
} => DestinationConfig::Ducklake {
catalog_url,
data_path,
Expand All @@ -312,6 +323,7 @@ impl StoredDestinationConfig {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
},
}
}
Expand Down Expand Up @@ -385,6 +397,7 @@ impl From<FullApiDestinationConfig> for StoredDestinationConfig {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
} => Self::Ducklake {
catalog_url,
data_path,
Expand All @@ -398,6 +411,7 @@ impl From<FullApiDestinationConfig> for StoredDestinationConfig {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
},
}
}
Expand Down Expand Up @@ -496,6 +510,7 @@ impl Encrypt<EncryptedStoredDestinationConfig> for StoredDestinationConfig {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
} => {
let s3_access_key_id = s3_access_key_id
.map(|value| encrypt_text(value.expose_secret().to_owned(), encryption_key))
Expand All @@ -517,6 +532,7 @@ impl Encrypt<EncryptedStoredDestinationConfig> for StoredDestinationConfig {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
})
}
}
Expand Down Expand Up @@ -552,6 +568,7 @@ pub enum EncryptedStoredDestinationConfig {
metadata_schema: Option<String>,
duckdb_memory_cache_limit: Option<String>,
maintenance_target_file_size: Option<String>,
expire_snapshots_older_than: Option<String>,
},
}

Expand Down Expand Up @@ -663,6 +680,7 @@ impl Decrypt<StoredDestinationConfig> for EncryptedStoredDestinationConfig {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
} => Ok(StoredDestinationConfig::Ducklake {
catalog_url,
data_path,
Expand All @@ -684,6 +702,7 @@ impl Decrypt<StoredDestinationConfig> for EncryptedStoredDestinationConfig {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
}),
}
}
Expand Down Expand Up @@ -1141,6 +1160,7 @@ mod tests {
metadata_schema: Some("ducklake".to_string()),
duckdb_memory_cache_limit: Some("50MB".to_string()),
maintenance_target_file_size: Some("10MB".to_string()),
expire_snapshots_older_than: Some("7 days".to_string()),
};

let key = EncryptionKey { id: 1, key: generate_random_key::<32>().unwrap() };
Expand All @@ -1163,6 +1183,7 @@ mod tests {
metadata_schema: m1,
duckdb_memory_cache_limit: memory1,
maintenance_target_file_size: target1,
expire_snapshots_older_than: expire1,
},
StoredDestinationConfig::Ducklake {
catalog_url: c2,
Expand All @@ -1177,6 +1198,7 @@ mod tests {
metadata_schema: m2,
duckdb_memory_cache_limit: memory2,
maintenance_target_file_size: target2,
expire_snapshots_older_than: expire2,
},
) => {
assert_eq!(c1, c2);
Expand All @@ -1197,6 +1219,7 @@ mod tests {
assert_eq!(m1, m2);
assert_eq!(memory1, memory2);
assert_eq!(target1, target2);
assert_eq!(expire1, expire2);
}
_ => panic!("Config types don't match"),
}
Expand All @@ -1217,6 +1240,7 @@ mod tests {
metadata_schema: Some("ducklake".to_string()),
duckdb_memory_cache_limit: None,
maintenance_target_file_size: None,
expire_snapshots_older_than: None,
};

let stored: StoredDestinationConfig = full_config.clone().into();
Expand All @@ -1231,6 +1255,7 @@ mod tests {
metadata_schema: m1,
duckdb_memory_cache_limit: memory1,
maintenance_target_file_size: target1,
expire_snapshots_older_than: expire1,
..
},
FullApiDestinationConfig::Ducklake {
Expand All @@ -1240,6 +1265,7 @@ mod tests {
metadata_schema: m2,
duckdb_memory_cache_limit: memory2,
maintenance_target_file_size: target2,
expire_snapshots_older_than: expire2,
..
},
) => {
Expand All @@ -1250,6 +1276,7 @@ mod tests {
assert_eq!(m1, m2);
assert_eq!(memory1, memory2);
assert_eq!(target1, target2);
assert_eq!(expire1, expire2);
}
_ => panic!("Config types don't match"),
}
Expand All @@ -1270,6 +1297,7 @@ mod tests {
metadata_schema: Some("ducklake".to_string()),
duckdb_memory_cache_limit: Some("50MB".to_string()),
maintenance_target_file_size: Some("10MB".to_string()),
expire_snapshots_older_than: Some("7 days".to_string()),
};

assert_json_snapshot!(full_config);
Expand All @@ -1291,6 +1319,7 @@ mod tests {
metadata_schema: m1,
duckdb_memory_cache_limit: memory1,
maintenance_target_file_size: target1,
expire_snapshots_older_than: expire1,
},
FullApiDestinationConfig::Ducklake {
catalog_url: c2,
Expand All @@ -1305,6 +1334,7 @@ mod tests {
metadata_schema: m2,
duckdb_memory_cache_limit: memory2,
maintenance_target_file_size: target2,
expire_snapshots_older_than: expire2,
},
) => {
assert_eq!(c1, &c2);
Expand All @@ -1325,6 +1355,7 @@ mod tests {
assert_eq!(m1, &m2);
assert_eq!(memory1, &memory2);
assert_eq!(target1, &target2);
assert_eq!(expire1, &expire2);
}
_ => panic!("Deserialization failed or variant mismatch"),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: etl-api/src/configs/destination.rs
assertion_line: 1234
assertion_line: 1303
expression: full_config
---
{
Expand All @@ -16,6 +16,7 @@ expression: full_config
"s3_use_ssl": false,
"metadata_schema": "ducklake",
"duckdb_memory_cache_limit": "50MB",
"maintenance_target_file_size": "10MB"
"maintenance_target_file_size": "10MB",
"expire_snapshots_older_than": "7 days"
}
}
2 changes: 2 additions & 0 deletions etl-api/src/k8s/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ mod tests {
metadata_schema: None,
duckdb_memory_cache_limit: None,
maintenance_target_file_size: None,
expire_snapshots_older_than: None,
};

let secrets = build_secrets_from_configs(&source_config, &destination_config);
Expand Down Expand Up @@ -642,6 +643,7 @@ mod tests {
metadata_schema: None,
duckdb_memory_cache_limit: None,
maintenance_target_file_size: None,
expire_snapshots_older_than: None,
};

let secrets = build_secrets_from_configs(&source_config, &destination_config);
Expand Down
6 changes: 6 additions & 0 deletions etl-api/src/validation/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ struct DucklakeValidator {
metadata_schema: Option<String>,
duckdb_memory_cache_limit: Option<String>,
maintenance_target_file_size: Option<String>,
expire_snapshots_older_than: Option<String>,
}

impl DucklakeValidator {
Expand All @@ -668,6 +669,7 @@ impl DucklakeValidator {
metadata_schema: Option<String>,
duckdb_memory_cache_limit: Option<String>,
maintenance_target_file_size: Option<String>,
expire_snapshots_older_than: Option<String>,
) -> Self {
Self {
catalog_url,
Expand All @@ -682,6 +684,7 @@ impl DucklakeValidator {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
}
}
}
Expand Down Expand Up @@ -743,6 +746,7 @@ impl Validator for DucklakeValidator {
self.metadata_schema.clone(),
self.duckdb_memory_cache_limit.clone(),
self.maintenance_target_file_size.clone(),
self.expire_snapshots_older_than.clone(),
MemoryStore::new(),
)
.await
Expand Down Expand Up @@ -890,6 +894,7 @@ impl Validator for DestinationValidator {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
} => {
let validator = DucklakeValidator::new(
catalog_url.clone(),
Expand All @@ -906,6 +911,7 @@ impl Validator for DestinationValidator {
metadata_schema.clone(),
duckdb_memory_cache_limit.clone(),
maintenance_target_file_size.clone(),
expire_snapshots_older_than.clone(),
);
validator.validate(ctx).await
}
Expand Down
6 changes: 6 additions & 0 deletions etl-config/src/shared/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub enum DestinationConfig {
duckdb_memory_cache_limit: Option<String>,
/// Optional DuckLake maintenance target file size.
maintenance_target_file_size: Option<String>,
/// Optional DuckLake snapshot-retention interval.
expire_snapshots_older_than: Option<String>,
},
}

Expand Down Expand Up @@ -256,6 +258,8 @@ pub enum DestinationConfigWithoutSecrets {
duckdb_memory_cache_limit: Option<String>,
/// Optional DuckLake maintenance target file size.
maintenance_target_file_size: Option<String>,
/// Optional DuckLake snapshot-retention interval.
expire_snapshots_older_than: Option<String>,
},
}

Expand Down Expand Up @@ -290,6 +294,7 @@ impl From<DestinationConfig> for DestinationConfigWithoutSecrets {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
} => DestinationConfigWithoutSecrets::Ducklake {
catalog_url,
data_path,
Expand All @@ -301,6 +306,7 @@ impl From<DestinationConfig> for DestinationConfigWithoutSecrets {
metadata_schema,
duckdb_memory_cache_limit,
maintenance_target_file_size,
expire_snapshots_older_than,
},
}
}
Expand Down
35 changes: 35 additions & 0 deletions etl-destinations/src/ducklake/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const HTTPFS_EXTENSION_FILE: &str = "httpfs.duckdb_extension";
const POSTGRES_SCANNER_EXTENSION_FILE: &str = "postgres_scanner.duckdb_extension";
pub(super) const TARGET_FILE_SIZE_OPTION_NAME: &str = "target_file_size";
pub(super) const MAINTENANCE_TARGET_FILE_SIZE: &str = "10MB";
pub(super) const EXPIRE_SNAPSHOTS_OLDER_THAN: &str = "7 days";
pub(super) const DUCKDB_MEMORY_CACHE_LIMIT: &str = "150MB";

/// Resolves the configured DuckDB memory limit or falls back to the default.
Expand Down Expand Up @@ -59,6 +60,24 @@ pub(super) fn maintenance_target_file_size_sql(
)
}

/// Resolves the configured snapshot-retention window or falls back to the
/// default.
pub(super) fn resolve_expire_snapshots_older_than(
expire_snapshots_older_than: Option<&str>,
) -> &str {
expire_snapshots_older_than.unwrap_or(EXPIRE_SNAPSHOTS_OLDER_THAN)
}

/// Builds the SQL used to validate the configured snapshot-retention window.
pub(super) fn validate_expire_snapshots_older_than_sql(
expire_snapshots_older_than: &str,
) -> String {
format!(
"SELECT CAST(now() AS TIMESTAMP) - CAST({} AS INTERVAL);",
quote_literal(expire_snapshots_older_than)
)
}

/// One named DuckDB setup phase for a DuckLake connection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct DuckLakeSetupStep {
Expand Down Expand Up @@ -1140,6 +1159,22 @@ mod tests {
assert_eq!(plan.steps()[0].sql, format!("SET memory_limit = {};", quote_literal("256MB")));
}

#[test]
fn resolve_expire_snapshots_older_than_uses_default() {
assert_eq!(resolve_expire_snapshots_older_than(None), EXPIRE_SNAPSHOTS_OLDER_THAN);
assert_eq!(resolve_expire_snapshots_older_than(Some("2 days")), "2 days");
}

#[test]
fn validate_expire_snapshots_older_than_sql_casts_interval() {
let sql = validate_expire_snapshots_older_than_sql("2 days");
let quoted_interval = quote_literal("2 days");

assert!(sql.contains("SELECT CAST(now() AS TIMESTAMP) - CAST("));
assert!(sql.contains(&quoted_interval));
assert!(sql.contains(" AS INTERVAL);"));
}

#[test]
fn vendored_extension_strategy_disables_autoload() {
assert!(
Expand Down
Loading
Loading