diff --git a/etl/src/test_utils/delayed_confirm_destination.rs b/etl/src/test_utils/delayed_confirm_destination.rs new file mode 100644 index 000000000..9ef816555 --- /dev/null +++ b/etl/src/test_utils/delayed_confirm_destination.rs @@ -0,0 +1,256 @@ +use etl_postgres::types::TableId; +use std::collections::HashSet; +use std::sync::Arc; +use tokio::sync::{Notify, RwLock}; +use tokio_postgres::types::PgLsn; +use tracing::info; + +use crate::destination::async_result::{TruncateTableResult, WriteEventsResult, WriteTableRowsResult}; +use crate::destination::Destination; +use crate::error::EtlResult; +use crate::store::state::StateStore; +use crate::test_utils::notify::TimedNotify; +use crate::types::{Event, EventType, TableRow}; + +/// Internal state for the delayed-confirm destination. +struct Inner { + state_store: S, + /// Pending write-events async result senders that have not been confirmed yet. + pending_results: Vec>, + /// Commit LSNs received across all batches (from `CommitEvent.end_lsn`). + received_commit_lsns: Vec, + /// All events received (tracked immediately, before flush confirmation). + events: Vec, + /// Conditions to check after each write_events call. + event_conditions: Vec<(Box bool + Send + Sync>, Arc)>, + /// Whether shutdown was called. + shutdown_called: bool, +} + +impl Inner { + fn check_conditions(&mut self) { + self.event_conditions.retain(|(condition, notify)| { + if condition(&self.events) { + notify.notify_one(); + false + } else { + true + } + }); + } +} + +/// A test destination that accepts events immediately but defers +/// the flush confirmation until test code explicitly calls +/// [`confirm_all`](Self::confirm_all). +/// +/// Events are tracked immediately upon receipt (before flush confirmation), +/// which allows tests to wait for events even when the async result is deferred. +#[derive(Clone)] +pub struct DelayedConfirmDestination { + inner: Arc>>, +} + +impl DelayedConfirmDestination +where + S: StateStore + Clone + Send + Sync + 'static, +{ + /// Creates a new delayed-confirm destination backed by the given state store. + pub fn new(state_store: S) -> Self { + Self { + inner: Arc::new(RwLock::new(Inner { + state_store, + pending_results: Vec::new(), + received_commit_lsns: Vec::new(), + events: Vec::new(), + event_conditions: Vec::new(), + shutdown_called: false, + })), + } + } + + /// Fires all pending async result senders with `Ok(())`. + pub async fn confirm_all(&self) { + let mut inner = self.inner.write().await; + let pending: Vec<_> = inner.pending_results.drain(..).collect(); + for result in pending { + result.send(Ok(())); + } + } + + /// Returns the commit LSNs that were tracked from `CommitEvent`s. + pub async fn get_received_commit_lsns(&self) -> Vec { + self.inner.read().await.received_commit_lsns.clone() + } + + /// Returns all events received by this destination. + pub async fn get_events(&self) -> Vec { + self.inner.read().await.events.clone() + } + + /// Returns whether shutdown was called on this destination. + pub async fn shutdown_called(&self) -> bool { + self.inner.read().await.shutdown_called + } + + /// Waits for the specified number of events of each type to be received. + /// + /// Events are tracked immediately on receipt (before flush confirmation), + /// so this works even when async results are deferred. + pub async fn wait_for_events_count(&self, conditions: Vec<(EventType, u64)>) -> TimedNotify { + let notify = Arc::new(Notify::new()); + let mut inner = self.inner.write().await; + + let condition: Box bool + Send + Sync> = + Box::new(move |events: &[Event]| { + for (event_type, expected_count) in &conditions { + let actual_count = events + .iter() + .filter(|e| &e.event_type() == event_type) + .count() as u64; + if actual_count < *expected_count { + return false; + } + } + true + }); + + inner.event_conditions.push((condition, notify.clone())); + inner.check_conditions(); + + TimedNotify::new(notify) + } +} + +impl std::fmt::Debug for DelayedConfirmDestination { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DelayedConfirmDestination").finish() + } +} + +impl Destination for DelayedConfirmDestination +where + S: StateStore + Clone + Send + Sync + 'static, +{ + fn name() -> &'static str { + "delayed_confirm" + } + + async fn truncate_table( + &self, + table_id: TableId, + async_result: TruncateTableResult<()>, + ) -> EtlResult<()> { + info!(table_id = table_id.0, "truncating table (delayed confirm)"); + async_result.send(Ok(())); + Ok(()) + } + + async fn write_table_rows( + &self, + table_id: TableId, + table_rows: Vec, + async_result: WriteTableRowsResult<()>, + ) -> EtlResult<()> { + let state_store = { + let inner = self.inner.read().await; + inner.state_store.clone() + }; + + state_store + .store_table_mapping( + table_id, + format!("delayed_confirm_destination_table_{}", table_id.0), + ) + .await?; + + info!( + table_id = table_id.0, + row_count = table_rows.len(), + "writing table rows (delayed confirm)" + ); + + async_result.send(Ok(())); + Ok(()) + } + + async fn write_events( + &self, + events: Vec, + async_result: WriteEventsResult<()>, + ) -> EtlResult<()> { + let mut has_commit = false; + let mut table_ids = HashSet::new(); + + for event in &events { + match event { + Event::Commit(commit_event) => { + has_commit = true; + let mut inner = self.inner.write().await; + inner.received_commit_lsns.push(commit_event.end_lsn); + } + Event::Insert(e) => { + table_ids.insert(e.table_id); + } + Event::Update(e) => { + table_ids.insert(e.table_id); + } + Event::Delete(e) => { + table_ids.insert(e.table_id); + } + Event::Relation(e) => { + table_ids.insert(e.table_schema.id); + } + Event::Truncate(e) => { + for tid in &e.rel_ids { + table_ids.insert(TableId::new(*tid)); + } + } + Event::Begin(_) | Event::Unsupported => {} + } + } + + // Record table mappings. + { + let inner = self.inner.read().await; + for table_id in table_ids { + inner + .state_store + .store_table_mapping( + table_id, + format!("delayed_confirm_destination_table_{}", table_id.0), + ) + .await?; + } + } + + info!( + event_count = events.len(), + has_commit, "writing events (delayed confirm)" + ); + + // Track events immediately (before flush confirmation) so tests can wait on them. + { + let mut inner = self.inner.write().await; + inner.events.extend(events); + inner.check_conditions(); + } + + if has_commit { + // Defer: store the async result sender without firing it. + let mut inner = self.inner.write().await; + inner.pending_results.push(async_result); + } else { + // No commit in this batch — confirm immediately. + async_result.send(Ok(())); + } + + Ok(()) + } + + async fn shutdown(&self) -> EtlResult<()> { + let mut inner = self.inner.write().await; + inner.shutdown_called = true; + Ok(()) + } +} diff --git a/etl/src/test_utils/mod.rs b/etl/src/test_utils/mod.rs index 66c017b60..63bbce5be 100644 --- a/etl/src/test_utils/mod.rs +++ b/etl/src/test_utils/mod.rs @@ -5,6 +5,7 @@ //! worker lifecycle coordination, and data consistency validation. pub mod database; +pub mod delayed_confirm_destination; pub mod event; pub mod materialize; pub mod memory_destination; diff --git a/etl/tests/pipeline_destination_controlled_flush.rs b/etl/tests/pipeline_destination_controlled_flush.rs new file mode 100644 index 000000000..1c5800558 --- /dev/null +++ b/etl/tests/pipeline_destination_controlled_flush.rs @@ -0,0 +1,381 @@ +#![cfg(feature = "test-utils")] + +use std::time::Duration; + +use etl::state::table::TableReplicationPhaseType; +use etl::test_utils::database::spawn_source_database; +use etl::test_utils::delayed_confirm_destination::DelayedConfirmDestination; +use etl::test_utils::notifying_store::NotifyingStore; +use etl::test_utils::pipeline::create_pipeline; +use etl::test_utils::test_schema::{ + TableSelection, insert_users_data, setup_test_database_schema, +}; +use etl::types::{EventType, PipelineId}; +use etl_postgres::replication::slots::EtlReplicationSlot; +use etl_telemetry::tracing::init_test_tracing; +use rand::random; +use tokio_postgres::types::PgLsn; + +/// Helper: query the confirmed_flush_lsn for a given replication slot. +async fn get_confirmed_flush_lsn( + client: &tokio_postgres::Client, + slot_name: &str, +) -> Option { + let row = client + .query_opt( + "SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1", + &[&slot_name], + ) + .await + .expect("Failed to query pg_replication_slots"); + + row.map(|r| r.get::<_, PgLsn>(0)) +} + +/// When the destination doesn't confirm, the Postgres replication slot's +/// confirmed_flush_lsn does NOT advance to the commit LSN. +#[tokio::test(flavor = "multi_thread")] +async fn destination_controlled_flush_delays_slot_advancement() { + init_test_tracing(); + + let mut database = spawn_source_database().await; + let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await; + + let store = NotifyingStore::new(); + let delayed_dest = DelayedConfirmDestination::new(store.clone()); + + let pipeline_id: PipelineId = random(); + let apply_slot_name: String = EtlReplicationSlot::for_apply_worker(pipeline_id) + .try_into() + .unwrap(); + + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + database_schema.publication_name(), + store.clone(), + delayed_dest.clone(), + ); + + let table_ready_notify = store + .notify_on_table_state_type( + database_schema.users_schema().id, + TableReplicationPhaseType::Ready, + ) + .await; + + pipeline.start().await.unwrap(); + table_ready_notify.notified().await; + + // Insert 1 row and wait for the insert event. + let users_table_name = database_schema.users_schema().name.clone(); + let events_notify = delayed_dest + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + insert_users_data(&mut database, &users_table_name, 1..=1).await; + events_notify.notified().await; + + // Wait briefly for the commit to be processed. + tokio::time::sleep(Duration::from_secs(2)).await; + + // Do NOT confirm — the slot should not have advanced to the commit LSN. + let commit_lsns = delayed_dest.get_received_commit_lsns().await; + assert!( + !commit_lsns.is_empty(), + "should have tracked at least one commit LSN" + ); + let max_commit_lsn = commit_lsns.iter().copied().max().unwrap(); + + // Shutdown WITHOUT confirming. + pipeline.shutdown_and_wait().await.unwrap(); + database.wait_for_slot_inactive(&apply_slot_name).await; + + // The slot should NOT have advanced to the commit LSN. + let slot_lsn = + get_confirmed_flush_lsn(database.client.as_ref().unwrap(), &apply_slot_name) + .await + .expect("slot should still exist after shutdown"); + + assert!( + slot_lsn < max_commit_lsn, + "slot confirmed_flush_lsn ({slot_lsn:?}) should not have advanced to \ + the commit LSN ({max_commit_lsn:?}) since destination never confirmed" + ); +} + +/// Confirm the batch, shutdown, verify the slot advances to the commit LSN. +#[tokio::test(flavor = "multi_thread")] +async fn destination_controlled_flush_confirm_advances_slot() { + init_test_tracing(); + + let mut database = spawn_source_database().await; + let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await; + + let store = NotifyingStore::new(); + let delayed_dest = DelayedConfirmDestination::new(store.clone()); + + let pipeline_id: PipelineId = random(); + let apply_slot_name: String = EtlReplicationSlot::for_apply_worker(pipeline_id) + .try_into() + .unwrap(); + + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + database_schema.publication_name(), + store.clone(), + delayed_dest.clone(), + ); + + let table_ready_notify = store + .notify_on_table_state_type( + database_schema.users_schema().id, + TableReplicationPhaseType::Ready, + ) + .await; + + pipeline.start().await.unwrap(); + table_ready_notify.notified().await; + + // Insert 1 row and wait for the insert event. + let users_table_name = database_schema.users_schema().name.clone(); + let events_notify = delayed_dest + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + insert_users_data(&mut database, &users_table_name, 1..=1).await; + events_notify.notified().await; + + // Wait briefly for the commit to be processed. + tokio::time::sleep(Duration::from_secs(2)).await; + + // Confirm the batch. + delayed_dest.confirm_all().await; + + // Wait for the apply loop to process confirmation and send status update. + tokio::time::sleep(Duration::from_secs(2)).await; + + // Get the commit LSN before shutdown. + let commit_lsns = delayed_dest.get_received_commit_lsns().await; + assert!( + !commit_lsns.is_empty(), + "should have tracked at least one commit LSN" + ); + let max_commit_lsn = commit_lsns.iter().copied().max().unwrap(); + + // Shutdown pipeline. + pipeline.shutdown_and_wait().await.unwrap(); + database.wait_for_slot_inactive(&apply_slot_name).await; + + // The slot should have advanced to at least the commit LSN since we confirmed. + let slot_lsn = + get_confirmed_flush_lsn(database.client.as_ref().unwrap(), &apply_slot_name) + .await + .expect("slot should still exist after shutdown"); + + assert!( + slot_lsn >= max_commit_lsn, + "after confirming, the slot LSN ({slot_lsn:?}) should be at least \ + the commit LSN ({max_commit_lsn:?})" + ); +} + +/// Crash-resume test: kill pipeline without confirming, restart, verify data is replayed. +#[tokio::test(flavor = "multi_thread")] +async fn destination_controlled_flush_crash_resume_no_data_loss() { + init_test_tracing(); + + let mut database = spawn_source_database().await; + let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await; + + let store = NotifyingStore::new(); + let delayed_dest = DelayedConfirmDestination::new(store.clone()); + + let pipeline_id: PipelineId = random(); + + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + database_schema.publication_name(), + store.clone(), + delayed_dest.clone(), + ); + + let table_ready_notify = store + .notify_on_table_state_type( + database_schema.users_schema().id, + TableReplicationPhaseType::Ready, + ) + .await; + + pipeline.start().await.unwrap(); + table_ready_notify.notified().await; + + // Insert 1 row and wait for the insert event. + let users_table_name = database_schema.users_schema().name.clone(); + let events_notify = delayed_dest + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + insert_users_data(&mut database, &users_table_name, 1..=1).await; + events_notify.notified().await; + + // DO NOT confirm — simulate a crash where the batch was not durably written. + pipeline.shutdown_and_wait().await.unwrap(); + + let apply_slot_name: String = EtlReplicationSlot::for_apply_worker(pipeline_id) + .try_into() + .unwrap(); + database.wait_for_slot_inactive(&apply_slot_name).await; + + // Create a NEW pipeline with a FRESH DelayedConfirmDestination (same pipeline_id). + let delayed_dest_2 = DelayedConfirmDestination::new(store.clone()); + + let mut pipeline_2 = create_pipeline( + &database.config, + pipeline_id, + database_schema.publication_name(), + store.clone(), + delayed_dest_2.clone(), + ); + + let table_ready_notify_2 = store + .notify_on_table_state_type( + database_schema.users_schema().id, + TableReplicationPhaseType::Ready, + ) + .await; + + // Wait for the insert event to be replayed on the new destination. + let events_notify_2 = delayed_dest_2 + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + pipeline_2.start().await.unwrap(); + table_ready_notify_2.notified().await; + events_notify_2.notified().await; + + // Confirm on the new destination so shutdown can complete cleanly. + delayed_dest_2.confirm_all().await; + pipeline_2.shutdown_and_wait().await.unwrap(); + + // Verify the new destination received the insert event (data was replayed, not lost). + let events = delayed_dest_2.get_events().await; + let insert_count = events + .iter() + .filter(|e| matches!(e, etl::types::Event::Insert(_))) + .count(); + assert!( + insert_count >= 1, + "the new destination should have received at least 1 insert event \ + (replayed from unconfirmed slot position), got {insert_count}" + ); +} + +/// Multiple transactions with selective confirmation: confirm only the first, +/// verify the slot advances to the first commit but not the second. +#[tokio::test(flavor = "multi_thread")] +async fn destination_controlled_flush_selective_confirmation() { + init_test_tracing(); + + let mut database = spawn_source_database().await; + let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await; + + let store = NotifyingStore::new(); + let delayed_dest = DelayedConfirmDestination::new(store.clone()); + + let pipeline_id: PipelineId = random(); + let apply_slot_name: String = EtlReplicationSlot::for_apply_worker(pipeline_id) + .try_into() + .unwrap(); + + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + database_schema.publication_name(), + store.clone(), + delayed_dest.clone(), + ); + + let table_ready_notify = store + .notify_on_table_state_type( + database_schema.users_schema().id, + TableReplicationPhaseType::Ready, + ) + .await; + + pipeline.start().await.unwrap(); + table_ready_notify.notified().await; + + let users_table_name = database_schema.users_schema().name.clone(); + + // --- Transaction 1: Insert 1 row --- + let events_notify_1 = delayed_dest + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + insert_users_data(&mut database, &users_table_name, 1..=1).await; + events_notify_1.notified().await; + + // Wait briefly for commit to be tracked. + tokio::time::sleep(Duration::from_secs(2)).await; + + // Record commit LSN from transaction 1. + let commit_lsns_after_tx1 = delayed_dest.get_received_commit_lsns().await; + assert!( + !commit_lsns_after_tx1.is_empty(), + "should have tracked at least one commit LSN after transaction 1" + ); + let first_commit_lsn = *commit_lsns_after_tx1.last().unwrap(); + + // Confirm transaction 1. + delayed_dest.confirm_all().await; + + // Wait for apply loop to process the confirmation. + tokio::time::sleep(Duration::from_secs(2)).await; + + // --- Transaction 2: Insert 1 more row --- + let events_notify_2 = delayed_dest + .wait_for_events_count(vec![(EventType::Insert, 2)]) + .await; + + insert_users_data(&mut database, &users_table_name, 2..=2).await; + events_notify_2.notified().await; + + // Wait briefly for commit to be tracked. + tokio::time::sleep(Duration::from_secs(2)).await; + + // DO NOT confirm transaction 2. + let commit_lsns_after_tx2 = delayed_dest.get_received_commit_lsns().await; + assert!( + commit_lsns_after_tx2.len() >= 2, + "should have tracked at least 2 commit LSNs, got {}", + commit_lsns_after_tx2.len() + ); + let second_commit_lsn = *commit_lsns_after_tx2.last().unwrap(); + + // Shutdown pipeline (transaction 2 is unconfirmed). + pipeline.shutdown_and_wait().await.unwrap(); + database.wait_for_slot_inactive(&apply_slot_name).await; + + // Query slot's confirmed_flush_lsn. + let slot_lsn = + get_confirmed_flush_lsn(database.client.as_ref().unwrap(), &apply_slot_name) + .await + .expect("slot should still exist after shutdown"); + + // The slot should have advanced past the first commit LSN (which was confirmed). + assert!( + slot_lsn >= first_commit_lsn, + "slot LSN ({slot_lsn:?}) should be at least the first commit LSN \ + ({first_commit_lsn:?}) since it was confirmed" + ); + + // The slot should NOT have advanced to the second commit LSN (which was not confirmed). + assert!( + slot_lsn < second_commit_lsn, + "slot LSN ({slot_lsn:?}) should be less than the second commit LSN \ + ({second_commit_lsn:?}) since it was not confirmed" + ); +}