From e5a9d707607e9c23f672d16b597a68c06d23e0c6 Mon Sep 17 00:00:00 2001 From: Jordan McQueen Date: Mon, 27 Apr 2026 18:17:26 +0900 Subject: [PATCH] test(ducklake): classify postgres-backed tests --- .config/nextest.toml | 6 +- etl-destinations/src/ducklake/core.rs | 341 +++++++++++++------------- xtask/src/commands/nextest.rs | 7 +- 3 files changed, 180 insertions(+), 174 deletions(-) diff --git a/.config/nextest.toml b/.config/nextest.toml index 513ac0ff1..02c759624 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -4,11 +4,11 @@ test-threads = "num-cpus" # All tests that share the source Postgres cluster must run serially. Separate # nextest test groups do not serialize against each other, so cluster-wide -# setting tests, etl replication tests, and PG-backed destination pipeline -# tests all share one group. Local-only destination tests stay parallel. +# setting tests, etl replication tests, and PG-backed destination tests all +# share one group. Local-only destination tests stay parallel. [test-groups.shared-pg] max-threads = 1 [[profile.default.overrides]] -filter = "test(exclusive_) | binary_id(etl::main) | (binary_id(etl-destinations::main) & test(/^(bigquery_pipeline|ducklake_pipeline|iceberg_destination)::/))" +filter = "test(exclusive_) | binary_id(etl::main) | (binary_id(etl-destinations::main) & test(/^(bigquery_pipeline|ducklake_destination|ducklake_pipeline|iceberg_destination)::/)) | (binary_id(etl-destinations) & test(/ducklake::core::tests::postgres_backed::/))" test-group = "shared-pg" diff --git a/etl-destinations/src/ducklake/core.rs b/etl-destinations/src/ducklake/core.rs index 80b2a5e95..b1c58a7e5 100644 --- a/etl-destinations/src/ducklake/core.rs +++ b/etl-destinations/src/ducklake/core.rs @@ -1625,186 +1625,191 @@ mod tests { assert!(!is_create_table_conflict(&error, "public_orders")); } - #[tokio::test(flavor = "multi_thread")] - async fn query_table_storage_metrics_reads_ducklake_metadata() { - let dir = TempDir::new().expect("failed to create temp dir"); - let data = path_to_file_url(&dir.path().join("data")); - let (_catalog_database, catalog) = create_catalog_database().await; - let store = MemoryStore::new(); - let schema = make_schema(1, "public", "users"); - let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone())); - let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap(); - - store.store_table_schema(schema.clone()).await.expect("failed to seed schema"); - - let destination = DuckLakeDestination::new( - catalog.clone(), - data.clone(), - 1, - None, - None, - None, - None, - store, - ) - .await - .expect("failed to create destination"); - - destination - .write_table_rows( - &replicated_table_schema, - vec![ - TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())]), - TableRow::new(vec![Cell::I32(2), Cell::String("bob".to_string())]), - ], + mod postgres_backed { + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn query_table_storage_metrics_reads_ducklake_metadata() { + let dir = TempDir::new().expect("failed to create temp dir"); + let data = path_to_file_url(&dir.path().join("data")); + let (_catalog_database, catalog) = create_catalog_database().await; + let store = MemoryStore::new(); + let schema = make_schema(1, "public", "users"); + let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone())); + let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap(); + + store.store_table_schema(schema.clone()).await.expect("failed to seed schema"); + + let destination = DuckLakeDestination::new( + catalog.clone(), + data.clone(), + 1, + None, + None, + None, + None, + store, ) .await - .expect("failed to write rows"); - - let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await; - let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn) - .expect("failed to resolve metadata schema"); - let metadata_pg_pool = - build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool"); - let _rows_flushed = flush_table_inlined_data(&conn, &table_name) - .expect("failed to materialize inlined rows for storage metrics test"); - let deadline = Instant::now() + Duration::from_secs(10); - let metrics = loop { - let metrics = - query_table_storage_metrics(&metadata_pg_pool, &metadata_schema, &table_name) - .await - .expect("failed to query storage metrics"); - if metrics.active_data_files >= 1 { - break metrics; - } - assert!( - Instant::now() < deadline, - "timed out waiting for storage metrics after materialization" - ); - tokio::time::sleep(Duration::from_millis(100)).await; - }; + .expect("failed to create destination"); + + destination + .write_table_rows( + &replicated_table_schema, + vec![ + TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())]), + TableRow::new(vec![Cell::I32(2), Cell::String("bob".to_string())]), + ], + ) + .await + .expect("failed to write rows"); + + let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await; + let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn) + .expect("failed to resolve metadata schema"); + let metadata_pg_pool = + build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool"); + let _rows_flushed = flush_table_inlined_data(&conn, &table_name) + .expect("failed to materialize inlined rows for storage metrics test"); + let deadline = Instant::now() + Duration::from_secs(10); + let metrics = loop { + let metrics = + query_table_storage_metrics(&metadata_pg_pool, &metadata_schema, &table_name) + .await + .expect("failed to query storage metrics"); + if metrics.active_data_files >= 1 { + break metrics; + } + assert!( + Instant::now() < deadline, + "timed out waiting for storage metrics after materialization" + ); + tokio::time::sleep(Duration::from_millis(100)).await; + }; - assert!(metrics.active_data_files >= 1); - assert!(metrics.active_data_bytes > 0); - assert_eq!(metrics.active_delete_files, 0); - assert_eq!(metrics.deleted_rows, 0); - } + assert!(metrics.active_data_files >= 1); + assert!(metrics.active_data_bytes > 0); + assert_eq!(metrics.active_delete_files, 0); + assert_eq!(metrics.deleted_rows, 0); + } - #[tokio::test(flavor = "multi_thread")] - async fn query_catalog_maintenance_metrics_reports_active_data_files_total() { - let dir = TempDir::new().expect("failed to create temp dir"); - let data = path_to_file_url(&dir.path().join("data")); - let (_catalog_database, catalog) = create_catalog_database().await; - let store = MemoryStore::new(); - let schema = make_schema(1, "public", "users"); - let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone())); - let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap(); - - store.store_table_schema(schema.clone()).await.expect("failed to seed schema"); - - let destination = DuckLakeDestination::new( - catalog.clone(), - data.clone(), - 1, - None, - None, - None, - None, - store, - ) - .await - .expect("failed to create destination"); - - destination - .write_table_rows( - &replicated_table_schema, - vec![ - TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())]), - TableRow::new(vec![Cell::I32(2), Cell::String("bob".to_string())]), - ], + #[tokio::test(flavor = "multi_thread")] + async fn query_catalog_maintenance_metrics_reports_active_data_files_total() { + let dir = TempDir::new().expect("failed to create temp dir"); + let data = path_to_file_url(&dir.path().join("data")); + let (_catalog_database, catalog) = create_catalog_database().await; + let store = MemoryStore::new(); + let schema = make_schema(1, "public", "users"); + let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone())); + let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap(); + + store.store_table_schema(schema.clone()).await.expect("failed to seed schema"); + + let destination = DuckLakeDestination::new( + catalog.clone(), + data.clone(), + 1, + None, + None, + None, + None, + store, ) .await - .expect("failed to write rows"); - - let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await; - let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn) - .expect("failed to resolve metadata schema"); - let metadata_pg_pool = - build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool"); - let _rows_flushed = flush_table_inlined_data(&conn, &table_name) - .expect("failed to materialize inlined rows for catalog metrics test"); - let deadline = Instant::now() + Duration::from_secs(10); - let metrics = loop { - let metrics = query_catalog_maintenance_metrics(&metadata_pg_pool, &metadata_schema) + .expect("failed to create destination"); + + destination + .write_table_rows( + &replicated_table_schema, + vec![ + TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())]), + TableRow::new(vec![Cell::I32(2), Cell::String("bob".to_string())]), + ], + ) .await - .expect("failed to query catalog maintenance metrics"); - if metrics.active_data_files_total >= 1 { - break metrics; - } - assert!( - Instant::now() < deadline, - "timed out waiting for active data files total after materialization" - ); - tokio::time::sleep(Duration::from_millis(100)).await; - }; - - assert!(metrics.active_data_files_total >= 1); - } + .expect("failed to write rows"); + + let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await; + let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn) + .expect("failed to resolve metadata schema"); + let metadata_pg_pool = + build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool"); + let _rows_flushed = flush_table_inlined_data(&conn, &table_name) + .expect("failed to materialize inlined rows for catalog metrics test"); + let deadline = Instant::now() + Duration::from_secs(10); + let metrics = loop { + let metrics = + query_catalog_maintenance_metrics(&metadata_pg_pool, &metadata_schema) + .await + .expect("failed to query catalog maintenance metrics"); + if metrics.active_data_files_total >= 1 { + break metrics; + } + assert!( + Instant::now() < deadline, + "timed out waiting for active data files total after materialization" + ); + tokio::time::sleep(Duration::from_millis(100)).await; + }; - #[tokio::test(flavor = "multi_thread")] - async fn query_catalog_maintenance_metrics_reads_ducklake_metadata() { - let dir = TempDir::new().expect("failed to create temp dir"); - let data = path_to_file_url(&dir.path().join("data")); - let (_catalog_database, catalog) = create_catalog_database().await; - let store = MemoryStore::new(); - let schema = make_schema(1, "public", "users"); - let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone())); - let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap(); - - store.store_table_schema(schema.clone()).await.expect("failed to seed schema"); - - let destination = DuckLakeDestination::new( - catalog.clone(), - data.clone(), - 1, - None, - None, - None, - None, - store, - ) - .await - .expect("failed to create destination"); + assert!(metrics.active_data_files_total >= 1); + } - destination - .write_table_rows( - &replicated_table_schema, - vec![TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())])], + #[tokio::test(flavor = "multi_thread")] + async fn query_catalog_maintenance_metrics_reads_ducklake_metadata() { + let dir = TempDir::new().expect("failed to create temp dir"); + let data = path_to_file_url(&dir.path().join("data")); + let (_catalog_database, catalog) = create_catalog_database().await; + let store = MemoryStore::new(); + let schema = make_schema(1, "public", "users"); + let replicated_table_schema = ReplicatedTableSchema::all(Arc::new(schema.clone())); + let table_name = table_name_to_ducklake_table_name(&schema.name).unwrap(); + + store.store_table_schema(schema.clone()).await.expect("failed to seed schema"); + + let destination = DuckLakeDestination::new( + catalog.clone(), + data.clone(), + 1, + None, + None, + None, + None, + store, ) .await - .expect("failed to write rows"); - destination - .truncate_table(&replicated_table_schema) - .await - .expect("failed to truncate table"); + .expect("failed to create destination"); + + destination + .write_table_rows( + &replicated_table_schema, + vec![TableRow::new(vec![Cell::I32(1), Cell::String("alice".to_string())])], + ) + .await + .expect("failed to write rows"); + destination + .truncate_table(&replicated_table_schema) + .await + .expect("failed to truncate table"); - destination.shutdown().await.expect("failed to shutdown destination"); - drop(destination); + destination.shutdown().await.expect("failed to shutdown destination"); + drop(destination); - let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await; - let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn) - .expect("failed to resolve metadata schema"); - let metadata_pg_pool = - build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool"); - let metrics = query_catalog_maintenance_metrics(&metadata_pg_pool, &metadata_schema) - .await - .expect("failed to query catalog maintenance metrics"); - - assert!(metrics.active_data_files_total >= 0); - assert!(metrics.snapshots_total >= 1); - assert!(metrics.oldest_snapshot_age_seconds >= 0); - assert!(metrics.files_scheduled_for_deletion_total >= 0); - assert!(metrics.files_scheduled_for_deletion_bytes >= 0); - assert!(metrics.oldest_scheduled_deletion_age_seconds >= 0); + let conn = open_lake_conn_when_table_visible(&catalog, &data, &table_name).await; + let metadata_schema = resolve_ducklake_metadata_schema_blocking(&conn) + .expect("failed to resolve metadata schema"); + let metadata_pg_pool = + build_ducklake_metadata_pg_pool(&catalog).expect("failed to create metadata pool"); + let metrics = query_catalog_maintenance_metrics(&metadata_pg_pool, &metadata_schema) + .await + .expect("failed to query catalog maintenance metrics"); + + assert!(metrics.active_data_files_total >= 0); + assert!(metrics.snapshots_total >= 1); + assert!(metrics.oldest_snapshot_age_seconds >= 0); + assert!(metrics.files_scheduled_for_deletion_total >= 0); + assert!(metrics.files_scheduled_for_deletion_bytes >= 0); + assert!(metrics.oldest_scheduled_deletion_age_seconds >= 0); + } } } diff --git a/xtask/src/commands/nextest.rs b/xtask/src/commands/nextest.rs index 74d7dbaa1..fa6bfddea 100644 --- a/xtask/src/commands/nextest.rs +++ b/xtask/src/commands/nextest.rs @@ -11,10 +11,11 @@ use clap::{Args, ValueEnum}; /// /// This must stay in sync with the `shared-pg` test group in /// `.config/nextest.toml`. -const SHARED_PG_FILTER: &str = "\ +const SHARED_PG_FILTER: &str = + "\ test(exclusive_) | binary_id(etl::main) | (binary_id(etl-destinations::main) & \ - test(/^(bigquery_pipeline|ducklake_pipeline|iceberg_destination)::/\ - ))"; + test(/^(bigquery_pipeline|ducklake_destination|ducklake_pipeline|iceberg_destination)::/)) | \ + (binary_id(etl-destinations) & test(/ducklake::core::tests::postgres_backed::/))"; use super::shared::{DEFAULT_BASE_PORT, DEFAULT_PG_SHARD_COUNT};