Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ test-threads = "num-cpus"

# All tests that share the source Postgres cluster must run serially. Separate
# nextest test groups do not serialize against each other, so cluster-wide
# setting tests, etl replication tests, and PG-backed destination pipeline
# tests all share one group. Local-only destination tests stay parallel.
# setting tests, etl replication tests, and PG-backed destination tests all
# share one group. Local-only destination tests stay parallel.
[test-groups.shared-pg]
max-threads = 1

[[profile.default.overrides]]
filter = "test(exclusive_) | binary_id(etl::main) | (binary_id(etl-destinations::main) & test(/^(bigquery_pipeline|ducklake_pipeline|iceberg_destination)::/))"
filter = "test(exclusive_) | binary_id(etl::main) | (binary_id(etl-destinations::main) & test(/^(bigquery_pipeline|ducklake_destination|ducklake_pipeline|iceberg_destination)::/)) | (binary_id(etl-destinations) & test(/ducklake::core::tests::postgres_backed::/))"
test-group = "shared-pg"
347 changes: 176 additions & 171 deletions etl-destinations/src/ducklake/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1658,189 +1658,194 @@ mod tests {
assert!(!is_create_table_conflict(&error, "public_orders"));
}

#[tokio::test(flavor = "multi_thread")]
async fn query_table_storage_metrics_reads_ducklake_metadata() {
let dir = TempDir::new().expect("failed to create temp dir");
let data = path_to_file_url(&dir.path().join("data"));
let (_catalog_database, catalog) = create_catalog_database().await;
let store = MemoryStore::new();
let schema = make_schema(1, "public", "users");
let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone()));
let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap();

store.store_table_schema(schema.clone()).await.expect("failed to seed schema");

let destination = DuckLakeDestination::new(
catalog.clone(),
data.clone(),
1,
None,
None,
None,
None,
None,
store,
)
.await
.expect("failed to create destination");

destination
.write_table_rows(
&replicated_table_schema,
vec![
TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())]),
TableRow::new(vec![Cell::I32(2), Cell::String("bob".to_string())]),
],
mod postgres_backed {
use super::*;

#[tokio::test(flavor = "multi_thread")]
async fn query_table_storage_metrics_reads_ducklake_metadata() {
let dir = TempDir::new().expect("failed to create temp dir");
let data = path_to_file_url(&dir.path().join("data"));
let (_catalog_database, catalog) = create_catalog_database().await;
let store = MemoryStore::new();
let schema = make_schema(1, "public", "users");
let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone()));
let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap();

store.store_table_schema(schema.clone()).await.expect("failed to seed schema");

let destination = DuckLakeDestination::new(
catalog.clone(),
data.clone(),
1,
None,
None,
None,
None,
None,
store,
)
.await
.expect("failed to write rows");

let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await;
let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn)
.expect("failed to resolve metadata schema");
let metadata_pg_pool =
build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool");
let _rows_flushed = flush_table_inlined_data(&conn, &table_name)
.expect("failed to materialize inlined rows for storage metrics test");
let deadline = Instant::now() + Duration::from_secs(10);
let metrics = loop {
let metrics =
query_table_storage_metrics(&metadata_pg_pool, &metadata_schema, &table_name)
.await
.expect("failed to query storage metrics");
if metrics.active_data_files >= 1 {
break metrics;
}
assert!(
Instant::now() < deadline,
"timed out waiting for storage metrics after materialization"
);
tokio::time::sleep(Duration::from_millis(100)).await;
};
.expect("failed to create destination");

destination
.write_table_rows(
&replicated_table_schema,
vec![
TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())]),
TableRow::new(vec![Cell::I32(2), Cell::String("bob".to_string())]),
],
)
.await
.expect("failed to write rows");

let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await;
let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn)
.expect("failed to resolve metadata schema");
let metadata_pg_pool =
build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool");
let _rows_flushed = flush_table_inlined_data(&conn, &table_name)
.expect("failed to materialize inlined rows for storage metrics test");
let deadline = Instant::now() + Duration::from_secs(10);
let metrics = loop {
let metrics =
query_table_storage_metrics(&metadata_pg_pool, &metadata_schema, &table_name)
.await
.expect("failed to query storage metrics");
if metrics.active_data_files >= 1 {
break metrics;
}
assert!(
Instant::now() < deadline,
"timed out waiting for storage metrics after materialization"
);
tokio::time::sleep(Duration::from_millis(100)).await;
};

assert!(metrics.active_data_files >= 1);
assert!(metrics.active_data_bytes > 0);
assert_eq!(metrics.active_delete_files, 0);
assert_eq!(metrics.deleted_rows, 0);
}
assert!(metrics.active_data_files >= 1);
assert!(metrics.active_data_bytes > 0);
assert_eq!(metrics.active_delete_files, 0);
assert_eq!(metrics.deleted_rows, 0);
}

#[tokio::test(flavor = "multi_thread")]
async fn query_catalog_maintenance_metrics_reports_active_data_files_total() {
let dir = TempDir::new().expect("failed to create temp dir");
let data = path_to_file_url(&dir.path().join("data"));
let (_catalog_database, catalog) = create_catalog_database().await;
let store = MemoryStore::new();
let schema = make_schema(1, "public", "users");
let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone()));
let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap();

store.store_table_schema(schema.clone()).await.expect("failed to seed schema");

let destination = DuckLakeDestination::new(
catalog.clone(),
data.clone(),
1,
None,
None,
None,
None,
None,
store,
)
.await
.expect("failed to create destination");

destination
.write_table_rows(
&replicated_table_schema,
vec![
TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())]),
TableRow::new(vec![Cell::I32(2), Cell::String("bob".to_string())]),
],
#[tokio::test(flavor = "multi_thread")]
async fn query_catalog_maintenance_metrics_reports_active_data_files_total() {
let dir = TempDir::new().expect("failed to create temp dir");
let data = path_to_file_url(&dir.path().join("data"));
let (_catalog_database, catalog) = create_catalog_database().await;
let store = MemoryStore::new();
let schema = make_schema(1, "public", "users");
let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone()));
let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap();

store.store_table_schema(schema.clone()).await.expect("failed to seed schema");

let destination = DuckLakeDestination::new(
catalog.clone(),
data.clone(),
1,
None,
None,
None,
None,
None,
store,
)
.await
.expect("failed to write rows");

let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await;
let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn)
.expect("failed to resolve metadata schema");
let metadata_pg_pool =
build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool");
let _rows_flushed = flush_table_inlined_data(&conn, &table_name)
.expect("failed to materialize inlined rows for catalog metrics test");
let deadline = Instant::now() + Duration::from_secs(10);
let metrics = loop {
let metrics = query_catalog_maintenance_metrics(&metadata_pg_pool, &metadata_schema)
.expect("failed to create destination");

destination
.write_table_rows(
&replicated_table_schema,
vec![
TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())]),
TableRow::new(vec![Cell::I32(2), Cell::String("bob".to_string())]),
],
)
.await
.expect("failed to query catalog maintenance metrics");
if metrics.active_data_files_total >= 1 {
break metrics;
}
assert!(
Instant::now() < deadline,
"timed out waiting for active data files total after materialization"
);
tokio::time::sleep(Duration::from_millis(100)).await;
};

assert!(metrics.active_data_files_total >= 1);
}
.expect("failed to write rows");

let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await;
let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn)
.expect("failed to resolve metadata schema");
let metadata_pg_pool =
build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool");
let _rows_flushed = flush_table_inlined_data(&conn, &table_name)
.expect("failed to materialize inlined rows for catalog metrics test");
let deadline = Instant::now() + Duration::from_secs(10);
let metrics = loop {
let metrics =
query_catalog_maintenance_metrics(&metadata_pg_pool, &metadata_schema)
.await
.expect("failed to query catalog maintenance metrics");
if metrics.active_data_files_total >= 1 {
break metrics;
}
assert!(
Instant::now() < deadline,
"timed out waiting for active data files total after materialization"
);
tokio::time::sleep(Duration::from_millis(100)).await;
};

#[tokio::test(flavor = "multi_thread")]
async fn query_catalog_maintenance_metrics_reads_ducklake_metadata() {
let dir = TempDir::new().expect("failed to create temp dir");
let data = path_to_file_url(&dir.path().join("data"));
let (_catalog_database, catalog) = create_catalog_database().await;
let store = MemoryStore::new();
let schema = make_schema(1, "public", "users");
let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone()));
let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap();

store.store_table_schema(schema.clone()).await.expect("failed to seed schema");

let destination = DuckLakeDestination::new(
catalog.clone(),
data.clone(),
1,
None,
None,
None,
None,
None,
store,
)
.await
.expect("failed to create destination");
assert!(metrics.active_data_files_total >= 1);
}

destination
.write_table_rows(
&replicated_table_schema,
vec![TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())])],
#[tokio::test(flavor = "multi_thread")]
async fn query_catalog_maintenance_metrics_reads_ducklake_metadata() {
let dir = TempDir::new().expect("failed to create temp dir");
let data = path_to_file_url(&dir.path().join("data"));
let (_catalog_database, catalog) = create_catalog_database().await;
let store = MemoryStore::new();
let schema = make_schema(1, "public", "users");
let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone()));
let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap();

store.store_table_schema(schema.clone()).await.expect("failed to seed schema");

let destination = DuckLakeDestination::new(
catalog.clone(),
data.clone(),
1,
None,
None,
None,
None,
None,
store,
)
.await
.expect("failed to write rows");
destination
.truncate_table(&replicated_table_schema)
.await
.expect("failed to truncate table");
.expect("failed to create destination");

destination
.write_table_rows(
&replicated_table_schema,
vec![TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())])],
)
.await
.expect("failed to write rows");
destination
.truncate_table(&replicated_table_schema)
.await
.expect("failed to truncate table");

destination.shutdown().await.expect("failed to shutdown destination");
drop(destination);
destination.shutdown().await.expect("failed to shutdown destination");
drop(destination);

let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await;
let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn)
.expect("failed to resolve metadata schema");
let metadata_pg_pool =
build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool");
let metrics = query_catalog_maintenance_metrics(&metadata_pg_pool, &metadata_schema)
.await
.expect("failed to query catalog maintenance metrics");

assert!(metrics.active_data_files_total >= 0);
assert!(metrics.snapshots_total >= 1);
assert!(metrics.oldest_snapshot_age_seconds >= 0);
assert!(metrics.files_scheduled_for_deletion_total >= 0);
assert!(metrics.files_scheduled_for_deletion_bytes >= 0);
assert!(metrics.oldest_scheduled_deletion_age_seconds >= 0);
let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await;
let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn)
.expect("failed to resolve metadata schema");
let metadata_pg_pool =
build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool");
let metrics = query_catalog_maintenance_metrics(&metadata_pg_pool, &metadata_schema)
.await
.expect("failed to query catalog maintenance metrics");

assert!(metrics.active_data_files_total >= 0);
assert!(metrics.snapshots_total >= 1);
assert!(metrics.oldest_snapshot_age_seconds >= 0);
assert!(metrics.files_scheduled_for_deletion_total >= 0);
assert!(metrics.files_scheduled_for_deletion_bytes >= 0);
assert!(metrics.oldest_scheduled_deletion_age_seconds >= 0);
}
}
}
7 changes: 4 additions & 3 deletions xtask/src/commands/nextest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use clap::{Args, ValueEnum};
///
/// This must stay in sync with the `shared-pg` test group in
/// `.config/nextest.toml`.
const SHARED_PG_FILTER: &str = "\
const SHARED_PG_FILTER: &str =
"\
test(exclusive_) | binary_id(etl::main) | (binary_id(etl-destinations::main) & \
test(/^(bigquery_pipeline|ducklake_pipeline|iceberg_destination)::/\
))";
test(/^(bigquery_pipeline|ducklake_destination|ducklake_pipeline|iceberg_destination)::/)) | \
(binary_id(etl-destinations) & test(/ducklake::core::tests::postgres_backed::/))";

use super::shared::{DEFAULT_BASE_PORT, DEFAULT_PG_SHARD_COUNT};

Expand Down
Loading