From a1a485d0735c68b316e0058cfac7bb6a432b25b8 Mon Sep 17 00:00:00 2001 From: aleksandarskrbic Date: Mon, 24 Nov 2025 11:09:39 +0100 Subject: [PATCH 1/3] feat(etl-api): handle case when publication is deleted --- ...88d8e4ba7aa392349207a6dcff6cbb1a90907.json | 22 + ...5e6e50de3ca5e59e8ce9d2bedebb50eab9b85.json | 66 +++ etl-api/src/db/pipelines.rs | 52 ++ etl-api/src/db/publications.rs | 18 + etl-api/src/routes/pipelines.rs | 90 +++- etl-api/src/routes/sources/publications.rs | 101 +++- etl-api/src/startup.rs | 10 +- etl-api/tests/publications.rs | 476 ++++++++++++++++++ etl-api/tests/support/test_app.rs | 99 ++++ 9 files changed, 914 insertions(+), 20 deletions(-) create mode 100644 etl-api/.sqlx/query-67e9d0f49e0645a37b5b95a636a88d8e4ba7aa392349207a6dcff6cbb1a90907.json create mode 100644 etl-api/.sqlx/query-8d3d991029f43c1ec0dd42bf7835e6e50de3ca5e59e8ce9d2bedebb50eab9b85.json create mode 100644 etl-api/tests/publications.rs diff --git a/etl-api/.sqlx/query-67e9d0f49e0645a37b5b95a636a88d8e4ba7aa392349207a6dcff6cbb1a90907.json b/etl-api/.sqlx/query-67e9d0f49e0645a37b5b95a636a88d8e4ba7aa392349207a6dcff6cbb1a90907.json new file mode 100644 index 000000000..de0c3295a --- /dev/null +++ b/etl-api/.sqlx/query-67e9d0f49e0645a37b5b95a636a88d8e4ba7aa392349207a6dcff6cbb1a90907.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n select exists(\n select 1 from pg_publication where pubname = $1\n ) as \"exists!\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Name" + ] + }, + "nullable": [ + null + ] + }, + "hash": "67e9d0f49e0645a37b5b95a636a88d8e4ba7aa392349207a6dcff6cbb1a90907" +} diff --git a/etl-api/.sqlx/query-8d3d991029f43c1ec0dd42bf7835e6e50de3ca5e59e8ce9d2bedebb50eab9b85.json b/etl-api/.sqlx/query-8d3d991029f43c1ec0dd42bf7835e6e50de3ca5e59e8ce9d2bedebb50eab9b85.json new file mode 100644 index 000000000..2dbecd9e7 --- /dev/null +++ b/etl-api/.sqlx/query-8d3d991029f43c1ec0dd42bf7835e6e50de3ca5e59e8ce9d2bedebb50eab9b85.json @@ -0,0 +1,66 @@ +{ + "db_name": "PostgreSQL", + "query": "\n select p.id,\n p.tenant_id,\n source_id,\n s.name as source_name,\n destination_id,\n d.name as destination_name,\n replicator_id,\n p.config\n from app.pipelines p\n join app.sources s on p.source_id = s.id\n join app.destinations d on p.destination_id = d.id\n where p.tenant_id = $1\n and p.source_id = $2\n and p.config->>'publication_name' = $3\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "tenant_id", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "source_id", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "source_name", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "destination_id", + "type_info": "Int8" + }, + { + "ordinal": 5, + "name": "destination_name", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "replicator_id", + "type_info": "Int8" + }, + { + "ordinal": 7, + "name": "config", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Text", + "Int8", + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "8d3d991029f43c1ec0dd42bf7835e6e50de3ca5e59e8ce9d2bedebb50eab9b85" +} diff --git a/etl-api/src/db/pipelines.rs b/etl-api/src/db/pipelines.rs index 74c0c24a4..c0554cffc 100644 --- a/etl-api/src/db/pipelines.rs +++ b/etl-api/src/db/pipelines.rs @@ -318,6 +318,58 @@ where Ok(pipelines) } +pub async fn find_pipelines_by_publication<'c, E>( + executor: E, + tenant_id: &str, + source_id: i64, + publication_name: &str, +) -> Result, PipelinesDbError> +where + E: PgExecutor<'c>, +{ + let records = sqlx::query!( + r#" + select p.id, + p.tenant_id, + source_id, + s.name as source_name, + destination_id, + d.name as destination_name, + replicator_id, + p.config + from app.pipelines p + join app.sources s on p.source_id = s.id + join app.destinations d on p.destination_id = d.id + where p.tenant_id = $1 + and p.source_id = $2 + and p.config->>'publication_name' = $3 + "#, + tenant_id, + source_id, + publication_name, + ) + .fetch_all(executor) + .await?; + + let mut pipelines = Vec::with_capacity(records.len()); + for record in records { + let config = deserialize_from_value::(record.config.clone())?; + + pipelines.push(Pipeline { + id: record.id, + tenant_id: record.tenant_id, + source_id: record.source_id, + source_name: record.source_name, + destination_id: record.destination_id, + destination_name: record.destination_name, + replicator_id: record.replicator_id, + config, + }); + } + + Ok(pipelines) +} + pub async fn update_pipeline_config( txn: &mut PgTransaction<'_>, tenant_id: &str, diff --git a/etl-api/src/db/publications.rs b/etl-api/src/db/publications.rs index 5c01ac859..8cec69a7e 100644 --- a/etl-api/src/db/publications.rs +++ b/etl-api/src/db/publications.rs @@ -176,3 +176,21 @@ pub async fn read_all_publications(pool: &PgPool) -> Result, Pu Ok(publications) } + +pub async fn publication_exists( + pool: &PgPool, + publication_name: &str, +) -> Result { + let exists = sqlx::query_scalar!( + r#" + select exists( + select 1 from pg_publication where pubname = $1 + ) as "exists!" + "#, + publication_name + ) + .fetch_one(pool) + .await?; + + Ok(exists) +} diff --git a/etl-api/src/routes/pipelines.rs b/etl-api/src/routes/pipelines.rs index f864da1d4..094e5adc1 100644 --- a/etl-api/src/routes/pipelines.rs +++ b/etl-api/src/routes/pipelines.rs @@ -18,6 +18,7 @@ use crate::db; use crate::db::destinations::{DestinationsDbError, destination_exists}; use crate::db::images::ImagesDbError; use crate::db::pipelines::{MAX_PIPELINES_PER_TENANT, PipelinesDbError, read_pipeline_components}; +use crate::db::publications::PublicationsDbError; use crate::db::replicators::ReplicatorsDbError; use crate::db::sources::{SourcesDbError, source_exists}; use crate::k8s::core::{ @@ -51,6 +52,9 @@ pub enum PipelineError { #[error("No default image was found")] NoDefaultImageFound, + #[error("The publication '{0}' was not found on the source database")] + PublicationNotFound(String), + #[error(transparent)] TenantId(#[from] TenantIdError), @@ -78,6 +82,9 @@ pub enum PipelineError { #[error(transparent)] DestinationsDb(#[from] DestinationsDbError), + #[error(transparent)] + PublicationsDb(#[from] PublicationsDbError), + #[error(transparent)] PipelinesDb(PipelinesDbError), @@ -128,6 +135,7 @@ impl PipelineError { // Do not expose internal database details in error messages PipelineError::SourcesDb(SourcesDbError::Database(_)) | PipelineError::DestinationsDb(DestinationsDbError::Database(_)) + | PipelineError::PublicationsDb(PublicationsDbError::Database(_)) | PipelineError::PipelinesDb(PipelinesDbError::Database(_)) | PipelineError::ReplicatorsDb(ReplicatorsDbError::Database(_)) | PipelineError::ImagesDb(ImagesDbError::Database(_)) @@ -147,6 +155,7 @@ impl ResponseError for PipelineError { | PipelineError::NoDefaultImageFound | PipelineError::SourcesDb(_) | PipelineError::DestinationsDb(_) + | PipelineError::PublicationsDb(_) | PipelineError::PipelinesDb(_) | PipelineError::ReplicatorsDb(_) | PipelineError::ImagesDb(_) @@ -162,9 +171,9 @@ impl ResponseError for PipelineError { | PipelineError::ImageIdNotDefault(_) | PipelineError::DestinationNotFound(_) | PipelineError::SourceNotFound(_) => StatusCode::NOT_FOUND, - PipelineError::TenantId(_) | PipelineError::NotRollbackable(_) => { - StatusCode::BAD_REQUEST - } + PipelineError::TenantId(_) + | PipelineError::NotRollbackable(_) + | PipelineError::PublicationNotFound(_) => StatusCode::BAD_REQUEST, PipelineError::DuplicatePipeline => StatusCode::CONFLICT, PipelineError::PipelineLimitReached { .. } => StatusCode::UNPROCESSABLE_ENTITY, } @@ -442,6 +451,47 @@ pub struct GetPipelineVersionResponse { pub new_version: Option, } +/// Validates pipeline inputs: checks source/destination existence and publication validity. +/// +/// Returns the source pool for the validated source. +async fn validate_pipeline_inputs( + txn: &mut sqlx::PgTransaction<'_>, + tenant_id: &str, + source_id: i64, + destination_id: i64, + publication_name: &str, + encryption_key: &EncryptionKey, +) -> Result { + // Check source exists + if !source_exists(txn.deref_mut(), tenant_id, source_id).await? { + return Err(PipelineError::SourceNotFound(source_id)); + } + + // Check destination exists + if !destination_exists(txn.deref_mut(), tenant_id, destination_id).await? { + return Err(PipelineError::DestinationNotFound(destination_id)); + } + + // Read source configuration + let source = db::sources::read_source(txn.deref_mut(), tenant_id, source_id, encryption_key) + .await? + .ok_or(PipelineError::SourceNotFound(source_id))?; + + // Connect to source database + let source_pool = + connect_to_source_database_with_defaults(&source.config.into_connection_config()).await?; + + // Validate publication exists on source database + let exists = db::publications::publication_exists(&source_pool, publication_name).await?; + if !exists { + return Err(PipelineError::PublicationNotFound( + publication_name.to_string(), + )); + } + + Ok(source_pool) +} + #[utoipa::path( summary = "Create a pipeline", description = "Creates a pipeline linking a source to a destination.", @@ -460,19 +510,24 @@ pub struct GetPipelineVersionResponse { pub async fn create_pipeline( req: HttpRequest, pool: Data, + encryption_key: Data, pipeline: Json, ) -> Result { let tenant_id = extract_tenant_id(&req)?; let pipeline = pipeline.into_inner(); let mut txn = pool.begin().await?; - if !source_exists(txn.deref_mut(), tenant_id, pipeline.source_id).await? { - return Err(PipelineError::SourceNotFound(pipeline.source_id)); - } - if !destination_exists(txn.deref_mut(), tenant_id, pipeline.destination_id).await? { - return Err(PipelineError::DestinationNotFound(pipeline.destination_id)); - } + // Validate source, destination, and publication + let _source_pool = validate_pipeline_inputs( + &mut txn, + tenant_id, + pipeline.source_id, + pipeline.destination_id, + &pipeline.config.publication_name, + &encryption_key, + ) + .await?; let pipeline_count = db::pipelines::count_pipelines_for_tenant(txn.deref_mut(), tenant_id).await?; @@ -566,6 +621,7 @@ pub async fn read_pipeline( pub async fn update_pipeline( req: HttpRequest, pool: Data, + encryption_key: Data, pipeline_id: Path, pipeline: Json, ) -> Result { @@ -574,13 +630,17 @@ pub async fn update_pipeline( let pipeline = pipeline.into_inner(); let mut txn = pool.begin().await?; - if !source_exists(txn.deref_mut(), tenant_id, pipeline.source_id).await? { - return Err(PipelineError::SourceNotFound(pipeline.source_id)); - } - if !destination_exists(txn.deref_mut(), tenant_id, pipeline.destination_id).await? { - return Err(PipelineError::DestinationNotFound(pipeline.destination_id)); - } + // Validate source, destination, and publication + let _source_pool = validate_pipeline_inputs( + &mut txn, + tenant_id, + pipeline.source_id, + pipeline.destination_id, + &pipeline.config.publication_name, + &encryption_key, + ) + .await?; db::pipelines::update_pipeline( txn.deref_mut(), diff --git a/etl-api/src/routes/sources/publications.rs b/etl-api/src/routes/sources/publications.rs index f56a627e7..e096af33b 100644 --- a/etl-api/src/routes/sources/publications.rs +++ b/etl-api/src/routes/sources/publications.rs @@ -9,6 +9,7 @@ use sqlx::PgPool; use thiserror::Error; use utoipa::ToSchema; +use crate::db::pipelines::PipelinesDbError; use crate::db::publications::PublicationsDbError; use crate::routes::connect_to_source_database_with_defaults; use crate::{ @@ -25,6 +26,9 @@ enum PublicationError { #[error("The publication with name {0} was not found")] PublicationNotFound(String), + #[error("Cannot delete publication '{0}' - referenced by pipelines: {1}")] + PublicationInUse(String, String), + #[error(transparent)] TenantId(#[from] TenantIdError), @@ -34,6 +38,9 @@ enum PublicationError { #[error(transparent)] PublicationsDb(#[from] PublicationsDbError), + #[error(transparent)] + PipelinesDb(#[from] PipelinesDbError), + #[error("Database connection error: {0}")] Database(#[from] sqlx::Error), } @@ -43,7 +50,8 @@ impl PublicationError { match self { // Do not expose internal database details in error messages PublicationError::SourcesDb(SourcesDbError::Database(_)) - | PublicationError::PublicationsDb(PublicationsDbError::Database(_)) => { + | PublicationError::PublicationsDb(PublicationsDbError::Database(_)) + | PublicationError::PipelinesDb(PipelinesDbError::Database(_)) => { "internal server error".to_string() } // Every other message is ok, as they do not divulge sensitive information @@ -57,10 +65,12 @@ impl ResponseError for PublicationError { match self { PublicationError::SourcesDb(_) | PublicationError::PublicationsDb(_) + | PublicationError::PipelinesDb(_) | PublicationError::Database(_) => StatusCode::INTERNAL_SERVER_ERROR, PublicationError::SourceNotFound(_) | PublicationError::PublicationNotFound(_) => { StatusCode::NOT_FOUND } + PublicationError::PublicationInUse(_, _) => StatusCode::CONFLICT, PublicationError::TenantId(_) => StatusCode::BAD_REQUEST, } } @@ -77,7 +87,7 @@ impl ResponseError for PublicationError { } } -#[derive(Deserialize, ToSchema)] +#[derive(Serialize, Deserialize, ToSchema)] pub struct CreatePublicationRequest { #[schema(example = "my_publication", required = true)] name: String, @@ -85,7 +95,7 @@ pub struct CreatePublicationRequest { tables: Vec, } -#[derive(Deserialize, ToSchema)] +#[derive(Serialize, Deserialize, ToSchema)] pub struct UpdatePublicationRequest { #[schema(required = true)] tables: Vec
, @@ -96,6 +106,19 @@ pub struct ReadPublicationsResponse { pub publications: Vec, } +#[derive(Serialize, Deserialize, ToSchema)] +pub struct PipelineInfo { + #[schema(example = 1)] + pub id: i64, + #[schema(example = "Pipeline 1")] + pub name: String, +} + +#[derive(Serialize, Deserialize, ToSchema)] +pub struct ListPipelinesForPublicationResponse { + pub pipelines: Vec, +} + #[utoipa::path( summary = "Create a publication", description = "Creates a publication on the given source with the specified tables.", @@ -229,6 +252,7 @@ pub async fn update_publication( responses( (status = 200, description = "Publication deleted successfully"), (status = 404, description = "Publication not found", body = ErrorMessage), + (status = 409, description = "Publication is referenced by pipelines", body = ErrorMessage), (status = 500, description = "Internal server error", body = ErrorMessage) ) )] @@ -247,6 +271,23 @@ pub async fn delete_publication( .map(|s| s.config) .ok_or(PublicationError::SourceNotFound(source_id))?; + // Check if any pipelines are using this publication + let pipelines = db::pipelines::find_pipelines_by_publication( + &**pool, + tenant_id, + source_id, + &publication_name, + ) + .await?; + + if !pipelines.is_empty() { + let pipeline_ids: Vec = pipelines.iter().map(|p| p.id.to_string()).collect(); + return Err(PublicationError::PublicationInUse( + publication_name, + pipeline_ids.join(", "), + )); + } + let source_pool = connect_to_source_database_with_defaults(&config.into_connection_config()).await?; db::publications::drop_publication(&publication_name, &source_pool).await?; @@ -288,3 +329,57 @@ pub async fn read_all_publications( Ok(Json(response)) } + +#[utoipa::path( + summary = "List pipelines using a publication", + description = "Returns all pipelines that reference the specified publication.", + tag = "Publications", + params( + ("source_id" = i64, Path, description = "Unique ID of the source"), + ("publication_name" = String, Path, description = "Publication name within the source"), + ), + responses( + (status = 200, description = "Pipelines listed successfully", body = ListPipelinesForPublicationResponse), + (status = 404, description = "Source not found", body = ErrorMessage), + (status = 500, description = "Internal server error", body = ErrorMessage) + ) +)] +#[get("/sources/{source_id}/publications/{publication_name}/pipelines")] +pub async fn list_pipelines_for_publication( + req: HttpRequest, + pool: Data, + encryption_key: Data, + source_id_and_pub_name: Path<(i64, String)>, +) -> Result { + let tenant_id = extract_tenant_id(&req)?; + let (source_id, publication_name) = source_id_and_pub_name.into_inner(); + + // Verify source exists + let _config = db::sources::read_source(&**pool, tenant_id, source_id, &encryption_key) + .await? + .map(|s| s.config) + .ok_or(PublicationError::SourceNotFound(source_id))?; + + // Find all pipelines using this publication + let pipelines = db::pipelines::find_pipelines_by_publication( + &**pool, + tenant_id, + source_id, + &publication_name, + ) + .await?; + + let pipeline_infos: Vec = pipelines + .into_iter() + .map(|p| PipelineInfo { + id: p.id, + name: format!("{} -> {}", p.source_name, p.destination_name), + }) + .collect(); + + let response = ListPipelinesForPublicationResponse { + pipelines: pipeline_infos, + }; + + Ok(Json(response)) +} diff --git a/etl-api/src/startup.rs b/etl-api/src/startup.rs index bf06e5d0d..a857c46ed 100644 --- a/etl-api/src/startup.rs +++ b/etl-api/src/startup.rs @@ -55,8 +55,10 @@ use crate::{ CreateSourceRequest, CreateSourceResponse, ReadSourceResponse, ReadSourcesResponse, UpdateSourceRequest, create_source, delete_source, publications::{ - CreatePublicationRequest, UpdatePublicationRequest, create_publication, - delete_publication, read_all_publications, read_publication, update_publication, + CreatePublicationRequest, ListPipelinesForPublicationResponse, PipelineInfo, + UpdatePublicationRequest, create_publication, delete_publication, + list_pipelines_for_publication, read_all_publications, read_publication, + update_publication, }, read_all_sources, read_source, tables::read_table_names, @@ -248,6 +250,8 @@ pub async fn run( CreatePublicationRequest, UpdatePublicationRequest, Publication, + PipelineInfo, + ListPipelinesForPublicationResponse, CreateDestinationRequest, CreateDestinationResponse, UpdateDestinationRequest, @@ -298,6 +302,7 @@ pub async fn run( crate::routes::sources::publications::update_publication, crate::routes::sources::publications::delete_publication, crate::routes::sources::publications::read_all_publications, + crate::routes::sources::publications::list_pipelines_for_publication, crate::routes::sources::tables::read_table_names, crate::routes::destinations::create_destination, crate::routes::destinations::read_destination, @@ -376,6 +381,7 @@ pub async fn run( .service(update_publication) .service(delete_publication) .service(read_all_publications) + .service(list_pipelines_for_publication) //images .service(create_image) .service(read_image) diff --git a/etl-api/tests/publications.rs b/etl-api/tests/publications.rs new file mode 100644 index 000000000..5b67b2988 --- /dev/null +++ b/etl-api/tests/publications.rs @@ -0,0 +1,476 @@ +use etl_api::routes::pipelines::{CreatePipelineRequest, CreatePipelineResponse}; +use etl_api::routes::sources::publications::ListPipelinesForPublicationResponse; +use etl_telemetry::tracing::init_test_tracing; +use reqwest::StatusCode; +use sqlx::PgPool; + +use crate::support::database::{ + create_test_source_database, run_etl_migrations_on_source_database, +}; +use crate::support::mocks::create_default_image; +use crate::support::mocks::destinations::create_destination; +use crate::support::mocks::pipelines::new_pipeline_config; +use crate::support::mocks::tenants::create_tenant; +use crate::support::test_app::spawn_test_app; + +mod support; + +async fn create_test_table(pool: &PgPool, table_name: &str) { + sqlx::query(&format!( + "create table if not exists {} (id serial primary key, data text)", + table_name + )) + .execute(pool) + .await + .expect("failed to create test table"); +} + +async fn create_publication_on_db(pool: &PgPool, publication_name: &str, table_name: &str) { + sqlx::query(&format!( + "create publication {} for table {}", + publication_name, table_name + )) + .execute(pool) + .await + .expect("failed to create publication"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn publication_not_referenced_by_pipeline_can_be_deleted() { + init_test_tracing(); + // Arrange + let app = spawn_test_app().await; + let tenant_id = &create_tenant(&app).await; + let (source_pool, source_id, source_db_config) = + create_test_source_database(&app, tenant_id).await; + run_etl_migrations_on_source_database(&source_db_config).await; + + // Create a test table and publication on the source database + create_test_table(&source_pool, "test_table").await; + create_publication_on_db(&source_pool, "test_publication", "test_table").await; + + // Act - Delete the publication (not referenced by any pipeline) + let response = app + .delete_publication(tenant_id, source_id, "test_publication") + .await; + + // Assert - Should succeed + assert!( + response.status().is_success(), + "Expected success, got: {}", + response.status() + ); + + // Verify publication was actually deleted from the database + let publication_exists: bool = + sqlx::query_scalar("select exists(select 1 from pg_publication where pubname = $1)") + .bind("test_publication") + .fetch_one(&source_pool) + .await + .expect("failed to check publication existence"); + + assert!(!publication_exists, "Publication should have been deleted"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn publication_referenced_by_pipeline_cannot_be_deleted() { + init_test_tracing(); + // Arrange + let app = spawn_test_app().await; + let tenant_id = &create_tenant(&app).await; + create_default_image(&app).await; + let (source_pool, source_id, source_db_config) = + create_test_source_database(&app, tenant_id).await; + run_etl_migrations_on_source_database(&source_db_config).await; + + // Create a test table and publication on the source database + create_test_table(&source_pool, "test_table").await; + create_publication_on_db(&source_pool, "my_publication", "test_table").await; + + // Create destination + let destination_id = create_destination(&app, tenant_id).await; + + // Create a pipeline that uses this publication + let mut pipeline_config = new_pipeline_config(); + pipeline_config.publication_name = "my_publication".to_string(); + + let pipeline = CreatePipelineRequest { + source_id, + destination_id, + config: pipeline_config, + }; + let pipeline_response = app.create_pipeline(tenant_id, &pipeline).await; + assert!( + pipeline_response.status().is_success(), + "Failed to create pipeline" + ); + let pipeline_response: CreatePipelineResponse = pipeline_response + .json() + .await + .expect("failed to deserialize response"); + let pipeline_id = pipeline_response.id; + + // Act - Try to delete the publication + let response = app + .delete_publication(tenant_id, source_id, "my_publication") + .await; + + // Assert - Should fail with 409 Conflict + assert_eq!( + response.status(), + StatusCode::CONFLICT, + "Expected 409 Conflict when deleting publication in use" + ); + + let error_body = response.text().await.expect("failed to read response body"); + assert!( + error_body.contains(&pipeline_id.to_string()), + "Error message should contain pipeline ID: {}", + error_body + ); + assert!( + error_body.contains("my_publication"), + "Error message should contain publication name" + ); + + // Verify publication still exists + let publication_exists: bool = + sqlx::query_scalar("select exists(select 1 from pg_publication where pubname = $1)") + .bind("my_publication") + .fetch_one(&source_pool) + .await + .expect("failed to check publication existence"); + + assert!(publication_exists, "Publication should still exist"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn publication_referenced_by_multiple_pipelines_cannot_be_deleted() { + init_test_tracing(); + // Arrange + let app = spawn_test_app().await; + let tenant_id = &create_tenant(&app).await; + create_default_image(&app).await; + let (source_pool, source_id, source_db_config) = + create_test_source_database(&app, tenant_id).await; + run_etl_migrations_on_source_database(&source_db_config).await; + + // Create a test table and publication on the source database + create_test_table(&source_pool, "test_table").await; + create_publication_on_db(&source_pool, "shared_publication", "test_table").await; + + // Create two destinations + let destination_id_1 = create_destination(&app, tenant_id).await; + let destination_id_2 = create_destination(&app, tenant_id).await; + + // Create two pipelines that use the same publication + let mut pipeline_config = new_pipeline_config(); + pipeline_config.publication_name = "shared_publication".to_string(); + + let pipeline1 = CreatePipelineRequest { + source_id, + destination_id: destination_id_1, + config: pipeline_config.clone(), + }; + let response1 = app.create_pipeline(tenant_id, &pipeline1).await; + assert!( + response1.status().is_success(), + "Failed to create pipeline 1" + ); + let response1: CreatePipelineResponse = response1.json().await.expect("failed to deserialize"); + let pipeline_id_1 = response1.id; + + let pipeline2 = CreatePipelineRequest { + source_id, + destination_id: destination_id_2, + config: pipeline_config, + }; + let response2 = app.create_pipeline(tenant_id, &pipeline2).await; + assert!( + response2.status().is_success(), + "Failed to create pipeline 2" + ); + let response2: CreatePipelineResponse = response2.json().await.expect("failed to deserialize"); + let pipeline_id_2 = response2.id; + + // Act - Try to delete the publication + let response = app + .delete_publication(tenant_id, source_id, "shared_publication") + .await; + + // Assert - Should fail with 409 Conflict + assert_eq!( + response.status(), + StatusCode::CONFLICT, + "Expected 409 Conflict when deleting publication used by multiple pipelines" + ); + + let error_body = response.text().await.expect("failed to read response body"); + assert!( + error_body.contains(&pipeline_id_1.to_string()), + "Error message should contain first pipeline ID" + ); + assert!( + error_body.contains(&pipeline_id_2.to_string()), + "Error message should contain second pipeline ID" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn cannot_create_pipeline_with_non_existent_publication() { + init_test_tracing(); + // Arrange + let app = spawn_test_app().await; + let tenant_id = &create_tenant(&app).await; + create_default_image(&app).await; + let (_source_pool, source_id, source_db_config) = + create_test_source_database(&app, tenant_id).await; + run_etl_migrations_on_source_database(&source_db_config).await; + + // Create destination + let destination_id = create_destination(&app, tenant_id).await; + + // Act - Try to create pipeline with non-existent publication + let mut pipeline_config = new_pipeline_config(); + pipeline_config.publication_name = "non_existent_publication".to_string(); + + let pipeline = CreatePipelineRequest { + source_id, + destination_id, + config: pipeline_config, + }; + let response = app.create_pipeline(tenant_id, &pipeline).await; + + // Assert - Should fail with 400 Bad Request + assert_eq!( + response.status(), + StatusCode::BAD_REQUEST, + "Expected 400 Bad Request when creating pipeline with non-existent publication" + ); + + let error_body = response.text().await.expect("failed to read response body"); + assert!( + error_body.contains("non_existent_publication"), + "Error message should mention the publication name" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn can_create_pipeline_with_existing_publication() { + init_test_tracing(); + // Arrange + let app = spawn_test_app().await; + let tenant_id = &create_tenant(&app).await; + create_default_image(&app).await; + let (source_pool, source_id, source_db_config) = + create_test_source_database(&app, tenant_id).await; + run_etl_migrations_on_source_database(&source_db_config).await; + + // Create a test table and publication on the source database + create_test_table(&source_pool, "test_table").await; + create_publication_on_db(&source_pool, "valid_publication", "test_table").await; + + // Create destination + let destination_id = create_destination(&app, tenant_id).await; + + // Act - Create pipeline with existing publication + let mut pipeline_config = new_pipeline_config(); + pipeline_config.publication_name = "valid_publication".to_string(); + + let pipeline = CreatePipelineRequest { + source_id, + destination_id, + config: pipeline_config, + }; + let response = app.create_pipeline(tenant_id, &pipeline).await; + + // Assert - Should succeed + assert!( + response.status().is_success(), + "Expected success when creating pipeline with valid publication, got: {}", + response.status() + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn cannot_update_pipeline_to_non_existent_publication() { + init_test_tracing(); + // Arrange + let app = spawn_test_app().await; + let tenant_id = &create_tenant(&app).await; + create_default_image(&app).await; + let (source_pool, source_id, source_db_config) = + create_test_source_database(&app, tenant_id).await; + run_etl_migrations_on_source_database(&source_db_config).await; + + // Create a test table and publication on the source database + create_test_table(&source_pool, "test_table").await; + create_publication_on_db(&source_pool, "initial_publication", "test_table").await; + + // Create destination + let destination_id = create_destination(&app, tenant_id).await; + + // Create pipeline with initial publication + let mut pipeline_config = new_pipeline_config(); + pipeline_config.publication_name = "initial_publication".to_string(); + + let pipeline = CreatePipelineRequest { + source_id, + destination_id, + config: pipeline_config.clone(), + }; + let create_response = app.create_pipeline(tenant_id, &pipeline).await; + assert!(create_response.status().is_success()); + let create_response: CreatePipelineResponse = create_response + .json() + .await + .expect("failed to deserialize response"); + let pipeline_id = create_response.id; + + // Act - Try to update pipeline to non-existent publication + pipeline_config.publication_name = "non_existent_publication".to_string(); + + let update_request = etl_api::routes::pipelines::UpdatePipelineRequest { + source_id, + destination_id, + config: pipeline_config, + }; + let response = app + .update_pipeline(tenant_id, pipeline_id, &update_request) + .await; + + // Assert - Should fail with 400 Bad Request + assert_eq!( + response.status(), + StatusCode::BAD_REQUEST, + "Expected 400 Bad Request when updating pipeline to non-existent publication" + ); + + let error_body = response.text().await.expect("failed to read response body"); + assert!( + error_body.contains("non_existent_publication"), + "Error message should mention the publication name" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn list_pipelines_for_publication_returns_correct_pipelines() { + init_test_tracing(); + // Arrange + let app = spawn_test_app().await; + let tenant_id = &create_tenant(&app).await; + create_default_image(&app).await; + let (source_pool, source_id, source_db_config) = + create_test_source_database(&app, tenant_id).await; + run_etl_migrations_on_source_database(&source_db_config).await; + + // Create test tables and publications + create_test_table(&source_pool, "test_table_a").await; + create_test_table(&source_pool, "test_table_b").await; + create_publication_on_db(&source_pool, "publication_a", "test_table_a").await; + create_publication_on_db(&source_pool, "publication_b", "test_table_b").await; + + // Create destinations + let destination_id_1 = create_destination(&app, tenant_id).await; + let destination_id_2 = create_destination(&app, tenant_id).await; + let destination_id_3 = create_destination(&app, tenant_id).await; + + // Create pipelines: two with publication_a, one with publication_b + let mut pipeline_config_a = new_pipeline_config(); + pipeline_config_a.publication_name = "publication_a".to_string(); + + let mut pipeline_config_b = new_pipeline_config(); + pipeline_config_b.publication_name = "publication_b".to_string(); + + // Pipeline 1 with publication_a + let pipeline1 = CreatePipelineRequest { + source_id, + destination_id: destination_id_1, + config: pipeline_config_a.clone(), + }; + let response1 = app.create_pipeline(tenant_id, &pipeline1).await; + assert!(response1.status().is_success()); + let response1: CreatePipelineResponse = response1.json().await.expect("failed to deserialize"); + let pipeline_id_1 = response1.id; + + // Pipeline 2 with publication_a + let pipeline2 = CreatePipelineRequest { + source_id, + destination_id: destination_id_2, + config: pipeline_config_a, + }; + let response2 = app.create_pipeline(tenant_id, &pipeline2).await; + assert!(response2.status().is_success()); + let response2: CreatePipelineResponse = response2.json().await.expect("failed to deserialize"); + let pipeline_id_2 = response2.id; + + // Pipeline 3 with publication_b + let pipeline3 = CreatePipelineRequest { + source_id, + destination_id: destination_id_3, + config: pipeline_config_b, + }; + let response3 = app.create_pipeline(tenant_id, &pipeline3).await; + assert!(response3.status().is_success()); + + // Act - List pipelines for publication_a + let response = app + .list_pipelines_for_publication(tenant_id, source_id, "publication_a") + .await; + + // Assert - Should return only the two pipelines using publication_a + assert!(response.status().is_success()); + let response_body: ListPipelinesForPublicationResponse = response + .json() + .await + .expect("failed to deserialize response"); + + assert_eq!( + response_body.pipelines.len(), + 2, + "Should have exactly 2 pipelines for publication_a" + ); + + let pipeline_ids: Vec = response_body.pipelines.iter().map(|p| p.id).collect(); + assert!( + pipeline_ids.contains(&pipeline_id_1), + "Should include pipeline 1" + ); + assert!( + pipeline_ids.contains(&pipeline_id_2), + "Should include pipeline 2" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn list_pipelines_for_unused_publication_returns_empty() { + init_test_tracing(); + // Arrange + let app = spawn_test_app().await; + let tenant_id = &create_tenant(&app).await; + let (source_pool, source_id, source_db_config) = + create_test_source_database(&app, tenant_id).await; + run_etl_migrations_on_source_database(&source_db_config).await; + + // Create a test table and publication but no pipelines using it + create_test_table(&source_pool, "test_table").await; + create_publication_on_db(&source_pool, "unused_publication", "test_table").await; + + // Act - List pipelines for unused publication + let response = app + .list_pipelines_for_publication(tenant_id, source_id, "unused_publication") + .await; + + // Assert - Should return empty list + assert!(response.status().is_success()); + let response_body: ListPipelinesForPublicationResponse = response + .json() + .await + .expect("failed to deserialize response"); + + assert_eq!( + response_body.pipelines.len(), + 0, + "Should have no pipelines for unused publication" + ); +} diff --git a/etl-api/tests/support/test_app.rs b/etl-api/tests/support/test_app.rs index f27502eb9..1e61a6d18 100644 --- a/etl-api/tests/support/test_app.rs +++ b/etl-api/tests/support/test_app.rs @@ -10,6 +10,7 @@ use etl_api::routes::pipelines::{ CreatePipelineRequest, RollbackTableStateRequest, UpdatePipelineConfigRequest, UpdatePipelineRequest, UpdatePipelineVersionRequest, }; +use etl_api::routes::sources::publications::{CreatePublicationRequest, UpdatePublicationRequest}; use etl_api::routes::sources::{CreateSourceRequest, UpdateSourceRequest}; use etl_api::routes::tenants::{ CreateOrUpdateTenantRequest, CreateTenantRequest, UpdateTenantRequest, @@ -494,6 +495,104 @@ impl TestApp { .await .expect("failed to execute request") } + + pub async fn create_publication( + &self, + tenant_id: &str, + source_id: i64, + publication: &CreatePublicationRequest, + ) -> reqwest::Response { + self.post_authenticated(format!( + "{}/v1/sources/{}/publications", + &self.address, source_id + )) + .header("tenant_id", tenant_id) + .json(publication) + .send() + .await + .expect("Failed to execute request.") + } + + pub async fn read_publication( + &self, + tenant_id: &str, + source_id: i64, + publication_name: &str, + ) -> reqwest::Response { + self.get_authenticated(format!( + "{}/v1/sources/{}/publications/{}", + &self.address, source_id, publication_name + )) + .header("tenant_id", tenant_id) + .send() + .await + .expect("failed to execute request") + } + + pub async fn update_publication( + &self, + tenant_id: &str, + source_id: i64, + publication_name: &str, + publication: &UpdatePublicationRequest, + ) -> reqwest::Response { + self.post_authenticated(format!( + "{}/v1/sources/{}/publications/{}", + &self.address, source_id, publication_name + )) + .header("tenant_id", tenant_id) + .json(publication) + .send() + .await + .expect("failed to execute request") + } + + pub async fn delete_publication( + &self, + tenant_id: &str, + source_id: i64, + publication_name: &str, + ) -> reqwest::Response { + self.delete_authenticated(format!( + "{}/v1/sources/{}/publications/{}", + &self.address, source_id, publication_name + )) + .header("tenant_id", tenant_id) + .send() + .await + .expect("Failed to execute request.") + } + + pub async fn read_all_publications( + &self, + tenant_id: &str, + source_id: i64, + ) -> reqwest::Response { + self.get_authenticated(format!( + "{}/v1/sources/{}/publications", + &self.address, source_id + )) + .header("tenant_id", tenant_id) + .send() + .await + .expect("failed to execute request") + } + + pub async fn list_pipelines_for_publication( + &self, + tenant_id: &str, + source_id: i64, + publication_name: &str, + ) -> reqwest::Response { + self.get_authenticated(format!( + "{}/v1/sources/{}/publications/{}/pipelines", + &self.address, source_id, publication_name + )) + .header("tenant_id", tenant_id) + .send() + .await + .expect("failed to execute request") + } } impl Drop for TestApp { From 06710be617103b0af1bbf469af53c105542fecd3 Mon Sep 17 00:00:00 2001 From: aleksandarskrbic Date: Mon, 24 Nov 2025 11:26:35 +0100 Subject: [PATCH 2/3] feat(etl-api): handle case when publication is deleted pt.2 --- etl-api/src/routes/pipelines.rs | 81 +++++++++++++++------------------ 1 file changed, 36 insertions(+), 45 deletions(-) diff --git a/etl-api/src/routes/pipelines.rs b/etl-api/src/routes/pipelines.rs index 094e5adc1..f28aeaaf1 100644 --- a/etl-api/src/routes/pipelines.rs +++ b/etl-api/src/routes/pipelines.rs @@ -451,47 +451,6 @@ pub struct GetPipelineVersionResponse { pub new_version: Option, } -/// Validates pipeline inputs: checks source/destination existence and publication validity. -/// -/// Returns the source pool for the validated source. -async fn validate_pipeline_inputs( - txn: &mut sqlx::PgTransaction<'_>, - tenant_id: &str, - source_id: i64, - destination_id: i64, - publication_name: &str, - encryption_key: &EncryptionKey, -) -> Result { - // Check source exists - if !source_exists(txn.deref_mut(), tenant_id, source_id).await? { - return Err(PipelineError::SourceNotFound(source_id)); - } - - // Check destination exists - if !destination_exists(txn.deref_mut(), tenant_id, destination_id).await? { - return Err(PipelineError::DestinationNotFound(destination_id)); - } - - // Read source configuration - let source = db::sources::read_source(txn.deref_mut(), tenant_id, source_id, encryption_key) - .await? - .ok_or(PipelineError::SourceNotFound(source_id))?; - - // Connect to source database - let source_pool = - connect_to_source_database_with_defaults(&source.config.into_connection_config()).await?; - - // Validate publication exists on source database - let exists = db::publications::publication_exists(&source_pool, publication_name).await?; - if !exists { - return Err(PipelineError::PublicationNotFound( - publication_name.to_string(), - )); - } - - Ok(source_pool) -} - #[utoipa::path( summary = "Create a pipeline", description = "Creates a pipeline linking a source to a destination.", @@ -518,8 +477,7 @@ pub async fn create_pipeline( let mut txn = pool.begin().await?; - // Validate source, destination, and publication - let _source_pool = validate_pipeline_inputs( + validate_pipeline_inputs( &mut txn, tenant_id, pipeline.source_id, @@ -631,8 +589,7 @@ pub async fn update_pipeline( let mut txn = pool.begin().await?; - // Validate source, destination, and publication - let _source_pool = validate_pipeline_inputs( + validate_pipeline_inputs( &mut txn, tenant_id, pipeline.source_id, @@ -1285,3 +1242,37 @@ pub async fn update_pipeline_config( Ok(Json(response)) } + +/// Validates pipeline inputs: checks source/destination existence and publication validity. +async fn validate_pipeline_inputs( + txn: &mut sqlx::PgTransaction<'_>, + tenant_id: &str, + source_id: i64, + destination_id: i64, + publication_name: &str, + encryption_key: &EncryptionKey, +) -> Result<(), PipelineError> { + if !source_exists(txn.deref_mut(), tenant_id, source_id).await? { + return Err(PipelineError::SourceNotFound(source_id)); + } + + if !destination_exists(txn.deref_mut(), tenant_id, destination_id).await? { + return Err(PipelineError::DestinationNotFound(destination_id)); + } + + let source = db::sources::read_source(txn.deref_mut(), tenant_id, source_id, encryption_key) + .await? + .ok_or(PipelineError::SourceNotFound(source_id))?; + + let source_pool = + connect_to_source_database_with_defaults(&source.config.into_connection_config()).await?; + + let exists = db::publications::publication_exists(&source_pool, publication_name).await?; + if !exists { + return Err(PipelineError::PublicationNotFound( + publication_name.to_string(), + )); + } + + Ok(()) +} From 12062d986238ba8d7ae8acecc967faa03ceba675 Mon Sep 17 00:00:00 2001 From: aleksandarskrbic Date: Mon, 24 Nov 2025 11:33:13 +0100 Subject: [PATCH 3/3] feat(etl-api): handle case when publication is deleted pt.3 --- etl-api/src/routes/sources/publications.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/etl-api/src/routes/sources/publications.rs b/etl-api/src/routes/sources/publications.rs index e096af33b..23ddef560 100644 --- a/etl-api/src/routes/sources/publications.rs +++ b/etl-api/src/routes/sources/publications.rs @@ -271,7 +271,6 @@ pub async fn delete_publication( .map(|s| s.config) .ok_or(PublicationError::SourceNotFound(source_id))?; - // Check if any pipelines are using this publication let pipelines = db::pipelines::find_pipelines_by_publication( &**pool, tenant_id, @@ -354,13 +353,11 @@ pub async fn list_pipelines_for_publication( let tenant_id = extract_tenant_id(&req)?; let (source_id, publication_name) = source_id_and_pub_name.into_inner(); - // Verify source exists - let _config = db::sources::read_source(&**pool, tenant_id, source_id, &encryption_key) + db::sources::read_source(&**pool, tenant_id, source_id, &encryption_key) .await? - .map(|s| s.config) + .map(|_| ()) .ok_or(PublicationError::SourceNotFound(source_id))?; - // Find all pipelines using this publication let pipelines = db::pipelines::find_pipelines_by_publication( &**pool, tenant_id,