From e04b4fe7d826fc5af6b55189b4ae6c23abebd1e3 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 6 May 2026 13:27:14 +0530 Subject: [PATCH 01/21] feat: add activity for schema migration --- flow/activities/flowable.go | 58 ++++++++ flow/connectors/postgres/pgdump_schema.go | 165 ++++++++++++++++++++++ flow/go.sum | 4 +- flow/workflows/setup_flow.go | 38 +++++ protos/flow.proto | 7 + stacks/flow.Dockerfile | 2 +- 6 files changed, 271 insertions(+), 3 deletions(-) create mode 100644 flow/connectors/postgres/pgdump_schema.go diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 1b29b3785..33a1ebfed 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -2141,3 +2141,61 @@ func (a *FlowableActivity) MigratePostgresTableOIDs( return nil } + +func (a *FlowableActivity) RunPgDumpSchema( + ctx context.Context, + input *protos.RunPgDumpSchemaInput, +) error { + logger := internal.LoggerFromCtx(ctx) + ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowName) + + srcPeer, err := connectors.LoadPeer(ctx, a.CatalogPool, input.SourceName) + if err != nil { + return a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("failed to load source peer: %w", err)) + } + + dstPeer, err := connectors.LoadPeer(ctx, a.CatalogPool, input.DestinationName) + if err != nil { + return a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("failed to load destination peer: %w", err)) + } + + srcPgConfig, ok := srcPeer.Config.(*protos.Peer_PostgresConfig) + if !ok { + return a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("source peer %s is not a PostgreSQL peer", input.SourceName)) + } + + dstPgConfig, ok := dstPeer.Config.(*protos.Peer_PostgresConfig) + if !ok { + return a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("destination peer %s is not a PostgreSQL peer", input.DestinationName)) + } + + // skip schema migration for peers using SSH tunnels + if srcPgConfig.PostgresConfig.SshConfig != nil { + logger.Info("skipping pg_dump schema migration: source peer uses SSH tunnel") + return nil + } + if dstPgConfig.PostgresConfig.SshConfig != nil { + logger.Info("skipping pg_dump schema migration: destination peer uses SSH tunnel") + return nil + } + + // skip schema migration for non-password auth (e.g. IAM) + if srcPgConfig.PostgresConfig.AuthType != protos.PostgresAuthType_POSTGRES_PASSWORD { + logger.Info("skipping pg_dump schema migration: source peer uses non-password auth") + return nil + } + if dstPgConfig.PostgresConfig.AuthType != protos.PostgresAuthType_POSTGRES_PASSWORD { + logger.Info("skipping pg_dump schema migration: destination peer uses non-password auth") + return nil + } + + logger.Info("running pg_dump schema migration from source to destination", + slog.String("source", input.SourceName), slog.String("destination", input.DestinationName)) + + if err := connpostgres.RunPgDumpSchema(ctx, srcPgConfig.PostgresConfig, dstPgConfig.PostgresConfig); err != nil { + return a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("pg_dump schema migration failed: %w", err)) + } + + logger.Info("pg_dump schema migration completed successfully") + return nil +} diff --git a/flow/connectors/postgres/pgdump_schema.go b/flow/connectors/postgres/pgdump_schema.go new file mode 100644 index 000000000..ee2a49462 --- /dev/null +++ b/flow/connectors/postgres/pgdump_schema.go @@ -0,0 +1,165 @@ +package connpostgres + +import ( + "bytes" + "context" + "fmt" + "log/slog" + "os" + "os/exec" + "strconv" + + "github.com/PeerDB-io/peerdb/flow/generated/protos" +) + +// RunPgDumpSchema first migrates roles via pg_dumpall --roles-only, then streams +// a schema-only pg_dump from source directly into psql on the destination, +// piping stdout into stdin without intermediate files. +func RunPgDumpSchema(ctx context.Context, srcConfig *protos.PostgresConfig, dstConfig *protos.PostgresConfig) error { + // Step 1: migrate roles from source to destination + if err := pipeCommand(ctx, srcConfig, dstConfig, "pg_dumpall", buildPgDumpAllArgs(srcConfig)); err != nil { + return fmt.Errorf("pg_dumpall roles migration failed: %w", err) + } + + // Step 2: migrate schema from source to destination + if err := pipeCommand(ctx, srcConfig, dstConfig, "pg_dump", buildPgDumpArgs(srcConfig)); err != nil { + return fmt.Errorf("pg_dump schema migration failed: %w", err) + } + + return nil +} + +// pipeCommand runs srcBinary with the given args, piping its stdout into psql on the destination. +func pipeCommand( + ctx context.Context, + srcConfig *protos.PostgresConfig, + dstConfig *protos.PostgresConfig, + srcBinary string, + srcArgs []string, +) error { + psqlArgs := buildPsqlArgs(dstConfig) + + srcCmd := exec.CommandContext(ctx, srcBinary, srcArgs...) + psqlCmd := exec.CommandContext(ctx, "psql", psqlArgs...) + + // set PGPASSWORD for each command via separate env slices + srcCmd.Env = append(os.Environ(), "PGPASSWORD="+srcConfig.Password) + psqlCmd.Env = append(os.Environ(), "PGPASSWORD="+dstConfig.Password) + + // handle TLS env vars + appendTLSEnv(srcCmd, srcConfig) + appendTLSEnv(psqlCmd, dstConfig) + + // pipe source command stdout -> psql stdin + pipe, err := srcCmd.StdoutPipe() + if err != nil { + return fmt.Errorf("failed to create %s stdout pipe: %w", srcBinary, err) + } + psqlCmd.Stdin = pipe + + var srcStderr, psqlStderr bytes.Buffer + srcCmd.Stderr = &srcStderr + psqlCmd.Stderr = &psqlStderr + + // start psql first so it's ready to read + if err := psqlCmd.Start(); err != nil { + return fmt.Errorf("failed to start psql: %w", err) + } + + // then start source command which writes to the pipe + if err := srcCmd.Start(); err != nil { + // kill psql since source command failed to start + _ = psqlCmd.Process.Kill() + _ = psqlCmd.Wait() + return fmt.Errorf("failed to start %s: %w", srcBinary, err) + } + + // wait for source command to finish (closes the pipe, signaling EOF to psql) + srcErr := srcCmd.Wait() + psqlErr := psqlCmd.Wait() + + if srcErr != nil { + return fmt.Errorf("%s failed: %w\nstderr: %s", srcBinary, srcErr, srcStderr.String()) + } + if psqlErr != nil { + return fmt.Errorf("psql failed: %w\nstderr: %s", psqlErr, psqlStderr.String()) + } + + return nil +} + +func buildPgDumpAllArgs(config *protos.PostgresConfig) []string { + port := config.Port + if port == 0 { + port = 5432 + } + + args := []string{ + "--roles-only", + "-h", config.Host, + "-p", strconv.FormatUint(uint64(port), 10), + } + if config.User != "" { + args = append(args, "-U", config.User) + } + return args +} + +func buildPgDumpArgs(config *protos.PostgresConfig) []string { + port := config.Port + if port == 0 { + port = 5432 + } + + args := []string{ + "--schema-only", + "-h", config.Host, + "-p", strconv.FormatUint(uint64(port), 10), + "-d", config.Database, + } + if config.User != "" { + args = append(args, "-U", config.User) + } + return args +} + +func buildPsqlArgs(config *protos.PostgresConfig) []string { + port := config.Port + if port == 0 { + port = 5432 + } + + args := []string{ + "-h", config.Host, + "-p", strconv.FormatUint(uint64(port), 10), + "-d", config.Database, + } + if config.User != "" { + args = append(args, "-U", config.User) + } + return args +} + +func appendTLSEnv(cmd *exec.Cmd, config *protos.PostgresConfig) { + if config.RequireTls { + cmd.Env = append(cmd.Env, "PGSSLMODE=require") + + if config.RootCa != nil && *config.RootCa != "" { + // write root CA to a temp file + tmpFile, err := os.CreateTemp("", "peerdb-root-ca-*.pem") + if err != nil { + slog.Warn("failed to create temp file for root CA, skipping sslrootcert", slog.Any("error", err)) + return + } + if _, err := tmpFile.WriteString(*config.RootCa); err != nil { + slog.Warn("failed to write root CA to temp file", slog.Any("error", err)) + tmpFile.Close() + os.Remove(tmpFile.Name()) + return + } + tmpFile.Close() + cmd.Env = append(cmd.Env, "PGSSLROOTCERT="+tmpFile.Name()) + // note: temp file is cleaned up when the process exits + } + } +} diff --git a/flow/go.sum b/flow/go.sum index 52fe9f0cf..6f40e5361 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -437,8 +437,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc= -github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= +github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw= +github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 0f4aa64ec..f12566173 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -236,6 +236,37 @@ func (s *SetupFlowExecution) createNormalizedTables( s.Error("failed to create normalized tables", slog.Any("error", err)) return fmt.Errorf("failed to create normalized tables: %w", err) } + + return nil +} + +// runPgDumpSchema runs pg_dump --schema-only on the source and pipes the output +// into psql on the destination, streaming the schema directly. +// This is only used for PG type system (PG-to-PG mirrors). +func (s *SetupFlowExecution) runPgDumpSchema( + ctx workflow.Context, + config *protos.FlowConnectionConfigsCore, +) error { + s.Info("running pg_dump schema migration from source to destination") + + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Hour, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, + }) + + input := &protos.RunPgDumpSchemaInput{ + SourceName: config.SourceName, + DestinationName: config.DestinationName, + FlowName: config.FlowJobName, + Env: config.Env, + } + + if err := workflow.ExecuteActivity(ctx, flowable.RunPgDumpSchema, input).Get(ctx, nil); err != nil { + return fmt.Errorf("failed to run pg_dump schema migration: %w", err) + } + return nil } @@ -264,6 +295,13 @@ func (s *SetupFlowExecution) executeSetupFlow( } } + // for PG type system (PG-to-PG mirrors), run pg_dump schema migration before setting up normalized tables + if config.System == protos.TypeSystem_PG { + if err := s.runPgDumpSchema(ctx, config); err != nil { + return nil, fmt.Errorf("failed to run pg_dump schema migration: %w", err) + } + } + if err := s.setupTableSchema(ctx, config); err != nil { return nil, fmt.Errorf("failed to fetch table schema: %w", err) } diff --git a/protos/flow.proto b/protos/flow.proto index ff2124510..eb53bf4f9 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -671,3 +671,10 @@ message GetFlowInfoToCancelFromCatalogOutput { string workflow_id = 2; peerdb_peers.DBType source_peer_type = 3; } + +message RunPgDumpSchemaInput { + string source_name = 1; + string destination_name = 2; + string flow_name = 3; + map env = 4; +} diff --git a/stacks/flow.Dockerfile b/stacks/flow.Dockerfile index 3c13ee793..c9847e575 100644 --- a/stacks/flow.Dockerfile +++ b/stacks/flow.Dockerfile @@ -33,7 +33,7 @@ RUN --mount=type=cache,target="/root/.cache/go-build" if [[ "$DEBUG_BUILD" = "1" FROM alpine:3.23@sha256:5b10f432ef3da1b8d4c7eb6c487f2f5a8f096bc91145e68878dd4a5019afde11 AS flow-base ENV TZ=UTC ADD --checksum=sha256:e5bb2084ccf45087bda1c9bffdea0eb15ee67f0b91646106e466714f9de3c7e3 https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem /usr/local/share/ca-certificates/global-aws-rds-bundle.pem -RUN apk add --no-cache ca-certificates geos && \ +RUN apk add --no-cache ca-certificates geos postgresql-client && \ update-ca-certificates && \ adduser -s /bin/sh -D peerdb USER peerdb From 75baabe4118eb09f81743f09676390e5783f8980 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 6 May 2026 17:16:59 +0530 Subject: [PATCH 02/21] boilerplate --- flow/activities/flowable.go | 4 + flow/connectors/postgres/validate.go | 4 + flow/e2e/pg_schema_dump_test.go | 336 +++++++++++++++++++++++++++ flow/internal/dynamicconf.go | 4 + flow/internal/test_env.go | 10 + flow/workflows/setup_flow.go | 35 ++- 6 files changed, 387 insertions(+), 6 deletions(-) create mode 100644 flow/e2e/pg_schema_dump_test.go diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 33a1ebfed..18c32c677 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -2142,6 +2142,10 @@ func (a *FlowableActivity) MigratePostgresTableOIDs( return nil } +func (a *FlowableActivity) PeerDBPGAutomatedSchemaDump(ctx context.Context, env map[string]string) (bool, error) { + return internal.PeerDBPGAutomatedSchemaDump(ctx, env) +} + func (a *FlowableActivity) RunPgDumpSchema( ctx context.Context, input *protos.RunPgDumpSchemaInput, diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 4d942747a..dcec932bb 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -278,6 +278,10 @@ func (c *PostgresConnector) ValidateMirrorDestination( return nil // no need to validate schema for resync, as we will create or replace the tables } + if cfg.System == protos.TypeSystem_PG && cfg.Env["PEERDB_PG_AUTOMATED_SCHEMA_DUMP"] == "true" { + return nil // pg_dump will create the schema and tables on the destination + } + // Validate that all source columns exist in destination tables checkedSchemas := make(map[string]struct{}) for _, tableMapping := range cfg.TableMappings { diff --git a/flow/e2e/pg_schema_dump_test.go b/flow/e2e/pg_schema_dump_test.go new file mode 100644 index 000000000..433b9667f --- /dev/null +++ b/flow/e2e/pg_schema_dump_test.go @@ -0,0 +1,336 @@ +package e2e + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + + connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres" + "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" +) + +func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { + srcSchema := "e2e_test_" + s.suffix + dstDBName := "e2e_pgdump_" + s.suffix + + // create destination database on the same PG instance + _, err := s.Conn().Exec(s.t.Context(), "CREATE DATABASE "+dstDBName) + require.NoError(s.t, err) + s.t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + cfg := internal.GetAncillaryPostgresConfigFromEnv() + connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=postgres", + cfg.Host, cfg.Port, cfg.User, cfg.Password) + dropConn, err := pgx.Connect(ctx, connStr) + if err != nil { + s.t.Logf("failed to connect for cleanup: %v", err) + return + } + defer dropConn.Close(ctx) + _, _ = dropConn.Exec(ctx, fmt.Sprintf( + "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='%s' AND pid <> pg_backend_pid()", dstDBName)) + if _, err := dropConn.Exec(ctx, "DROP DATABASE IF EXISTS "+dstDBName); err != nil { + s.t.Logf("failed to drop destination database %s: %v", dstDBName, err) + } + }) + + // connect to destination database for verification later + dstCfg := internal.GetAncillaryPostgresConfigFromEnv() + dstConnStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", + dstCfg.Host, dstCfg.Port, dstCfg.User, dstCfg.Password, dstDBName) + dstConn, err := pgx.Connect(s.t.Context(), dstConnStr) + require.NoError(s.t, err) + s.t.Cleanup(func() { dstConn.Close(s.t.Context()) }) + + // create the source schema in the destination database (pg_dump will create it, but we need it for the peer) + // Actually, pg_dump --schema-only will create the schema, so we don't need to pre-create it. + + // create a destination peer pointing to the new database + dstPeerCfg := internal.GetAncillaryPostgresConfigFromEnv() + dstPeerCfg.Database = dstDBName + dstPeerName := "pgdump_dst_" + s.suffix + dstPeer := &protos.Peer{ + Name: dstPeerName, + Type: protos.DBType_POSTGRES, + Config: &protos.Peer_PostgresConfig{ + PostgresConfig: dstPeerCfg, + }, + } + CreatePeer(s.t, dstPeer) + + // --- set up rich schema on source --- + + // create custom enum type in the source schema + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + "CREATE TYPE %s.color AS ENUM ('red', 'green', 'blue')", srcSchema)) + require.NoError(s.t, err) + + // create parent table with various column types + parentTable := srcSchema + ".parent_tbl" + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE %s ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + color %s.color NOT NULL DEFAULT 'red', + score NUMERIC(10,2), + metadata JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() + )`, parentTable, srcSchema)) + require.NoError(s.t, err) + + // create unique index on parent + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + "CREATE UNIQUE INDEX idx_parent_name ON %s (name)", parentTable)) + require.NoError(s.t, err) + + // create child table with foreign key referencing parent + childTable := srcSchema + ".child_tbl" + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE %s ( + id SERIAL PRIMARY KEY, + parent_id INT NOT NULL REFERENCES %s(id), + value TEXT, + tags TEXT[] + )`, childTable, parentTable)) + require.NoError(s.t, err) + + // create btree index on child + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + "CREATE INDEX idx_child_parent ON %s (parent_id)", childTable)) + require.NoError(s.t, err) + + // insert initial data for snapshot + for i := 1; i <= 5; i++ { + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + "INSERT INTO %s (name, color, score, metadata) VALUES ($1, $2, $3, $4)", + parentTable), + fmt.Sprintf("item_%d", i), + []string{"red", "green", "blue"}[i%3], + float64(i)*10.5, + fmt.Sprintf(`{"key": "val_%d"}`, i), + ) + require.NoError(s.t, err) + } + + for i := 1; i <= 10; i++ { + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + "INSERT INTO %s (parent_id, value, tags) VALUES ($1, $2, $3)", + childTable), + (i%5)+1, + fmt.Sprintf("child_val_%d", i), + fmt.Sprintf("{tag_%d,common}", i), + ) + require.NoError(s.t, err) + } + + // source and dest table identifiers — use same schema-qualified names + // since pg_dump recreates the schema on the destination + srcParent := parentTable + dstParent := parentTable + srcChild := childTable + dstChild := childTable + + config := &protos.FlowConnectionConfigs{ + FlowJobName: s.attachSuffix("test_pgdump"), + DestinationName: dstPeerName, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcParent, + DestinationTableIdentifier: dstParent, + }, + { + SourceTableIdentifier: srcChild, + DestinationTableIdentifier: dstChild, + }, + }, + SourceName: GeneratePostgresPeer(s.t).Name, + MaxBatchSize: 100, + DoInitialSnapshot: true, + System: protos.TypeSystem_PG, + SoftDeleteColName: "", + SyncedAtColName: "", + Env: map[string]string{ + "PEERDB_PG_AUTOMATED_SCHEMA_DUMP": "true", + }, + } + + tc := NewTemporalClient(s.t) + env := ExecutePeerflow(s.t, tc, config) + SetupCDCFlowStatusQuery(s.t, env, config) + + // wait for initial snapshot to complete + EnvWaitFor(s.t, env, 3*time.Minute, "initial load parent", func() bool { + var count int64 + err := dstConn.QueryRow(s.t.Context(), + fmt.Sprintf("SELECT COUNT(*) FROM %s", dstParent)).Scan(&count) + return err == nil && count == 5 + }) + EnvWaitFor(s.t, env, 3*time.Minute, "initial load child", func() bool { + var count int64 + err := dstConn.QueryRow(s.t.Context(), + fmt.Sprintf("SELECT COUNT(*) FROM %s", dstChild)).Scan(&count) + return err == nil && count == 10 + }) + + // --- verify schema objects on destination --- + + // verify enum type exists on destination + var enumExists bool + err = dstConn.QueryRow(s.t.Context(), ` + SELECT EXISTS ( + SELECT 1 FROM pg_type t + JOIN pg_namespace n ON n.oid = t.typnamespace + WHERE t.typname = 'color' AND n.nspname = $1 + )`, srcSchema).Scan(&enumExists) + require.NoError(s.t, err) + require.True(s.t, enumExists, "enum type 'color' should exist on destination") + + // verify unique index on parent table + var idxParentExists bool + err = dstConn.QueryRow(s.t.Context(), ` + SELECT EXISTS ( + SELECT 1 FROM pg_indexes + WHERE schemaname = $1 AND tablename = 'parent_tbl' AND indexname = 'idx_parent_name' + )`, srcSchema).Scan(&idxParentExists) + require.NoError(s.t, err) + require.True(s.t, idxParentExists, "unique index idx_parent_name should exist on destination") + + // verify btree index on child table + var idxChildExists bool + err = dstConn.QueryRow(s.t.Context(), ` + SELECT EXISTS ( + SELECT 1 FROM pg_indexes + WHERE schemaname = $1 AND tablename = 'child_tbl' AND indexname = 'idx_child_parent' + )`, srcSchema).Scan(&idxChildExists) + require.NoError(s.t, err) + require.True(s.t, idxChildExists, "btree index idx_child_parent should exist on destination") + + // verify foreign key constraint on child table + var fkExists bool + err = dstConn.QueryRow(s.t.Context(), ` + SELECT EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE constraint_type = 'FOREIGN KEY' + AND table_schema = $1 + AND table_name = 'child_tbl' + )`, srcSchema).Scan(&fkExists) + require.NoError(s.t, err) + require.True(s.t, fkExists, "foreign key on child_tbl should exist on destination") + + // --- CDC test: insert more rows and verify replication --- + + // insert more parent rows + for i := 6; i <= 8; i++ { + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + "INSERT INTO %s (name, color, score, metadata) VALUES ($1, $2, $3, $4)", + parentTable), + fmt.Sprintf("item_%d", i), + []string{"red", "green", "blue"}[i%3], + float64(i)*10.5, + fmt.Sprintf(`{"key": "val_%d"}`, i), + ) + EnvNoError(s.t, env, err) + } + + // insert more child rows + for i := 11; i <= 15; i++ { + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + "INSERT INTO %s (parent_id, value, tags) VALUES ($1, $2, $3)", + childTable), + (i%5)+1, + fmt.Sprintf("child_val_%d", i), + fmt.Sprintf("{tag_%d,common}", i), + ) + EnvNoError(s.t, env, err) + } + + // wait for CDC to replicate the new rows + EnvWaitFor(s.t, env, 3*time.Minute, "cdc parent rows", func() bool { + var count int64 + err := dstConn.QueryRow(s.t.Context(), + fmt.Sprintf("SELECT COUNT(*) FROM %s", dstParent)).Scan(&count) + return err == nil && count == 8 + }) + EnvWaitFor(s.t, env, 3*time.Minute, "cdc child rows", func() bool { + var count int64 + err := dstConn.QueryRow(s.t.Context(), + fmt.Sprintf("SELECT COUNT(*) FROM %s", dstChild)).Scan(&count) + return err == nil && count == 15 + }) + + // verify data integrity: compare actual row content + // query source and destination and compare + var srcParentCount, dstParentCount int64 + err = s.Conn().QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", srcParent)).Scan(&srcParentCount) + require.NoError(s.t, err) + err = dstConn.QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", dstParent)).Scan(&dstParentCount) + require.NoError(s.t, err) + require.Equal(s.t, srcParentCount, dstParentCount, "parent table row counts should match") + + var srcChildCount, dstChildCount int64 + err = s.Conn().QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", srcChild)).Scan(&srcChildCount) + require.NoError(s.t, err) + err = dstConn.QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", dstChild)).Scan(&dstChildCount) + require.NoError(s.t, err) + require.Equal(s.t, srcChildCount, dstChildCount, "child table row counts should match") + + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +// Test_PG_Schema_Dump_Role_Migration verifies that pg_dumpall --roles-only +// propagates a role from a source PG cluster to a separate destination PG +// cluster. Roles are global objects per cluster, so this requires two clusters +// (peerdb-postgres + peerdb-postgres2). +func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_Role_Migration() { + srcCfg := internal.GetAncillaryPostgresConfigFromEnv() + dstCfg := internal.GetSecondaryPostgresConfigFromEnv() + + roleName := "peerdb_test_role_" + s.suffix + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + dstConnStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", + dstCfg.Host, dstCfg.Port, dstCfg.User, dstCfg.Password, dstCfg.Database) + dstConn, err := pgx.Connect(ctx, dstConnStr) + require.NoError(s.t, err, "failed to connect to secondary postgres on %s:%d (is the postgres2 tilt resource running?)", dstCfg.Host, dstCfg.Port) + defer dstConn.Close(ctx) + + // sanity: role must not pre-exist on destination + var preExists bool + require.NoError(s.t, dstConn.QueryRow(ctx, + "SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname=$1)", roleName).Scan(&preExists)) + require.False(s.t, preExists, "role %s unexpectedly exists on destination before test", roleName) + + // create role on source + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf("CREATE ROLE %s LOGIN PASSWORD 'pw'", roleName)) + require.NoError(s.t, err) + s.t.Cleanup(func() { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cleanupCancel() + _, _ = s.Conn().Exec(cleanupCtx, "DROP ROLE IF EXISTS "+roleName) + + dropConn, err := pgx.Connect(cleanupCtx, dstConnStr) + if err != nil { + s.t.Logf("failed to connect to destination for role cleanup: %v", err) + return + } + defer dropConn.Close(cleanupCtx) + if _, err := dropConn.Exec(cleanupCtx, "DROP ROLE IF EXISTS "+roleName); err != nil { + s.t.Logf("failed to drop destination role %s: %v", roleName, err) + } + }) + + require.NoError(s.t, connpostgres.RunPgDumpSchema(ctx, srcCfg, dstCfg)) + + var postExists bool + require.NoError(s.t, dstConn.QueryRow(ctx, + "SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname=$1)", roleName).Scan(&postExists)) + require.True(s.t, postExists, "role %s should have been migrated to destination cluster", roleName) +} diff --git a/flow/internal/dynamicconf.go b/flow/internal/dynamicconf.go index 351f83457..a88c414d6 100644 --- a/flow/internal/dynamicconf.go +++ b/flow/internal/dynamicconf.go @@ -799,3 +799,7 @@ func PeerDBMetricsRecordAggregatesEnabled(ctx context.Context, env map[string]st func PeerDBPostgresApplyCtidBlockPartitioning(ctx context.Context, env map[string]string) (bool, error) { return dynamicConfBool(ctx, env, "PEERDB_POSTGRES_APPLY_CTID_BLOCK_PARTITIONING_OVERRIDE") } + +func PeerDBPGAutomatedSchemaDump(ctx context.Context, env map[string]string) (bool, error) { + return dynamicConfBool(ctx, env, "PEERDB_PG_AUTOMATED_SCHEMA_DUMP") +} diff --git a/flow/internal/test_env.go b/flow/internal/test_env.go index 4055295f6..e4cac6e55 100644 --- a/flow/internal/test_env.go +++ b/flow/internal/test_env.go @@ -30,6 +30,16 @@ func GetAncillaryPostgresConfigFromEnv() *protos.PostgresConfig { } } +func GetSecondaryPostgresConfigFromEnv() *protos.PostgresConfig { + return &protos.PostgresConfig{ + Host: GetEnvString("PG2_HOST", "localhost"), + Port: uint32(getEnvUint[uint16]("PG2_PORT", 5437)), + User: GetEnvString("PG2_USER", "postgres"), + Password: GetEnvString("PG2_PASSWORD", "postgres"), + Database: GetEnvString("PG2_DATABASE", "postgres"), + } +} + func PostgresToxiproxyUpstreamHostWithFallback(fallback string) string { return GetEnvString("TOXIPROXY_POSTGRES_HOST", fallback) } diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index f12566173..917450401 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -232,9 +232,13 @@ func (s *SetupFlowExecution) createNormalizedTables( Flags: flowConnectionConfigs.Flags, } - if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig).Get(ctx, nil); err != nil { - s.Error("failed to create normalized tables", slog.Any("error", err)) - return fmt.Errorf("failed to create normalized tables: %w", err) + if !skipCreateTables { + if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig).Get(ctx, nil); err != nil { + s.Error("failed to create normalized tables", slog.Any("error", err)) + return fmt.Errorf("failed to create normalized tables: %w", err) + } + } else { + s.Info("skipping normalized table creation, pg_dump already created tables") } return nil @@ -270,6 +274,21 @@ func (s *SetupFlowExecution) runPgDumpSchema( return nil } +// getPGAutomatedSchemaDump checks the PEERDB_PG_AUTOMATED_SCHEMA_DUMP env flag via an activity. +func (s *SetupFlowExecution) getPGAutomatedSchemaDump(ctx workflow.Context, env map[string]string) bool { + checkCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + }) + + var enabled bool + future := workflow.ExecuteActivity(checkCtx, flowable.PeerDBPGAutomatedSchemaDump, env) + if err := future.Get(checkCtx, &enabled); err != nil { + s.Warn("failed to check PEERDB_PG_AUTOMATED_SCHEMA_DUMP, defaulting to false", slog.Any("error", err)) + return false + } + return enabled +} + // executeSetupFlow executes the setup flow. func (s *SetupFlowExecution) executeSetupFlow( ctx workflow.Context, @@ -295,10 +314,14 @@ func (s *SetupFlowExecution) executeSetupFlow( } } - // for PG type system (PG-to-PG mirrors), run pg_dump schema migration before setting up normalized tables + // for PG type system (PG-to-PG mirrors), run pg_dump schema migration if enabled + enablePgSchemaDump := false if config.System == protos.TypeSystem_PG { - if err := s.runPgDumpSchema(ctx, config); err != nil { - return nil, fmt.Errorf("failed to run pg_dump schema migration: %w", err) + enablePgSchemaDump = s.getPGAutomatedSchemaDump(ctx, config.Env) + if enablePgSchemaDump { + if err := s.runPgDumpSchema(ctx, config); err != nil { + return nil, fmt.Errorf("failed to run pg_dump schema migration: %w", err) + } } } From 83ed4697a044041bd0ca686af5e8c426a60e6f30 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 May 2026 10:36:55 +0530 Subject: [PATCH 03/21] rebase fix: go sum --- flow/go.sum | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/go.sum b/flow/go.sum index 6f40e5361..52fe9f0cf 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -437,8 +437,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw= -github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= +github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc= +github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= From 44c0c395e2b16a08eead9b1b7a55c7f54e5bf8ae Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 May 2026 13:14:07 +0530 Subject: [PATCH 04/21] replace pgdump_all with --no-owner and --no-privielges --- flow/connectors/postgres/pgdump_schema.go | 30 +---- flow/e2e/pg_schema_dump_test.go | 136 +++++++++++++++++----- 2 files changed, 112 insertions(+), 54 deletions(-) diff --git a/flow/connectors/postgres/pgdump_schema.go b/flow/connectors/postgres/pgdump_schema.go index ee2a49462..8bf2efd9a 100644 --- a/flow/connectors/postgres/pgdump_schema.go +++ b/flow/connectors/postgres/pgdump_schema.go @@ -12,16 +12,9 @@ import ( "github.com/PeerDB-io/peerdb/flow/generated/protos" ) -// RunPgDumpSchema first migrates roles via pg_dumpall --roles-only, then streams -// a schema-only pg_dump from source directly into psql on the destination, -// piping stdout into stdin without intermediate files. +// RunPgDumpSchema streams a schema-only pg_dump from source directly into psql +// on the destination, piping stdout into stdin without intermediate files. func RunPgDumpSchema(ctx context.Context, srcConfig *protos.PostgresConfig, dstConfig *protos.PostgresConfig) error { - // Step 1: migrate roles from source to destination - if err := pipeCommand(ctx, srcConfig, dstConfig, "pg_dumpall", buildPgDumpAllArgs(srcConfig)); err != nil { - return fmt.Errorf("pg_dumpall roles migration failed: %w", err) - } - - // Step 2: migrate schema from source to destination if err := pipeCommand(ctx, srcConfig, dstConfig, "pg_dump", buildPgDumpArgs(srcConfig)); err != nil { return fmt.Errorf("pg_dump schema migration failed: %w", err) } @@ -88,23 +81,6 @@ func pipeCommand( return nil } -func buildPgDumpAllArgs(config *protos.PostgresConfig) []string { - port := config.Port - if port == 0 { - port = 5432 - } - - args := []string{ - "--roles-only", - "-h", config.Host, - "-p", strconv.FormatUint(uint64(port), 10), - } - if config.User != "" { - args = append(args, "-U", config.User) - } - return args -} - func buildPgDumpArgs(config *protos.PostgresConfig) []string { port := config.Port if port == 0 { @@ -113,6 +89,8 @@ func buildPgDumpArgs(config *protos.PostgresConfig) []string { args := []string{ "--schema-only", + "--no-owner", + "--no-privileges", "-h", config.Host, "-p", strconv.FormatUint(uint64(port), 10), "-d", config.Database, diff --git a/flow/e2e/pg_schema_dump_test.go b/flow/e2e/pg_schema_dump_test.go index 433b9667f..ebcadeff8 100644 --- a/flow/e2e/pg_schema_dump_test.go +++ b/flow/e2e/pg_schema_dump_test.go @@ -8,7 +8,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" - connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres" "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/internal" ) @@ -283,54 +282,135 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { RequireEnvCanceled(s.t, env) } -// Test_PG_Schema_Dump_Role_Migration verifies that pg_dumpall --roles-only -// propagates a role from a source PG cluster to a separate destination PG -// cluster. Roles are global objects per cluster, so this requires two clusters -// (peerdb-postgres + peerdb-postgres2). -func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_Role_Migration() { - srcCfg := internal.GetAncillaryPostgresConfigFromEnv() +// Test_PG_Schema_Dump_No_Owner_No_Privileges verifies that the schema dump does +// not emit owner or grant statements that reference roles. We create a role on +// the source, give it ownership and grants on a table, then dump into a +// secondary cluster where that role does NOT exist. With --no-owner and +// --no-privileges the dump must succeed; without them it would fail on +// ALTER TABLE ... OWNER TO / GRANT ... TO . +// +// Also verifies initial load + CDC into the dumped table on the destination +// cluster, so we know the table is usable end-to-end. +func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_No_Owner_No_Privileges() { dstCfg := internal.GetSecondaryPostgresConfigFromEnv() - roleName := "peerdb_test_role_" + s.suffix - - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() + srcSchema := "e2e_test_" + s.suffix + roleName := "peerdb_owner_role_" + s.suffix + tableName := "owned_tbl" + qualified := fmt.Sprintf("%s.%s", srcSchema, tableName) + // destination connection (for assertions + role-absence sanity) dstConnStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", dstCfg.Host, dstCfg.Port, dstCfg.User, dstCfg.Password, dstCfg.Database) - dstConn, err := pgx.Connect(ctx, dstConnStr) + dstConn, err := pgx.Connect(s.t.Context(), dstConnStr) require.NoError(s.t, err, "failed to connect to secondary postgres on %s:%d (is the postgres2 tilt resource running?)", dstCfg.Host, dstCfg.Port) - defer dstConn.Close(ctx) + s.t.Cleanup(func() { dstConn.Close(s.t.Context()) }) - // sanity: role must not pre-exist on destination - var preExists bool - require.NoError(s.t, dstConn.QueryRow(ctx, - "SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname=$1)", roleName).Scan(&preExists)) - require.False(s.t, preExists, "role %s unexpectedly exists on destination before test", roleName) + // sanity: role must not exist on destination + var roleExistsOnDst bool + require.NoError(s.t, dstConn.QueryRow(s.t.Context(), + "SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname=$1)", roleName).Scan(&roleExistsOnDst)) + require.False(s.t, roleExistsOnDst, "role %s unexpectedly exists on destination", roleName) - // create role on source + // create role + owned/granted table on source with seed rows _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf("CREATE ROLE %s LOGIN PASSWORD 'pw'", roleName)) require.NoError(s.t, err) + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + "CREATE TABLE %s (id SERIAL PRIMARY KEY, val TEXT)", qualified)) + require.NoError(s.t, err) + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf("ALTER TABLE %s OWNER TO %s", qualified, roleName)) + require.NoError(s.t, err) + _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf("GRANT SELECT, INSERT ON %s TO %s", qualified, roleName)) + require.NoError(s.t, err) + + for i := 1; i <= 5; i++ { + _, err = s.Conn().Exec(s.t.Context(), + fmt.Sprintf("INSERT INTO %s (val) VALUES ($1)", qualified), + fmt.Sprintf("snap_%d", i)) + require.NoError(s.t, err) + } + s.t.Cleanup(func() { cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) defer cleanupCancel() + _, _ = s.Conn().Exec(cleanupCtx, fmt.Sprintf("DROP TABLE IF EXISTS %s", qualified)) _, _ = s.Conn().Exec(cleanupCtx, "DROP ROLE IF EXISTS "+roleName) dropConn, err := pgx.Connect(cleanupCtx, dstConnStr) if err != nil { - s.t.Logf("failed to connect to destination for role cleanup: %v", err) + s.t.Logf("failed to connect to destination for cleanup: %v", err) return } defer dropConn.Close(cleanupCtx) - if _, err := dropConn.Exec(cleanupCtx, "DROP ROLE IF EXISTS "+roleName); err != nil { - s.t.Logf("failed to drop destination role %s: %v", roleName, err) - } + _, _ = dropConn.Exec(cleanupCtx, fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", srcSchema)) }) - require.NoError(s.t, connpostgres.RunPgDumpSchema(ctx, srcCfg, dstCfg)) + // register destination peer pointing at postgres2 + dstPeerName := "pgdump_noowner_dst_" + s.suffix + CreatePeer(s.t, &protos.Peer{ + Name: dstPeerName, + Type: protos.DBType_POSTGRES, + Config: &protos.Peer_PostgresConfig{PostgresConfig: dstCfg}, + }) - var postExists bool - require.NoError(s.t, dstConn.QueryRow(ctx, - "SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname=$1)", roleName).Scan(&postExists)) - require.True(s.t, postExists, "role %s should have been migrated to destination cluster", roleName) + config := &protos.FlowConnectionConfigs{ + FlowJobName: s.attachSuffix("test_pgdump_noowner"), + DestinationName: dstPeerName, + TableMappings: []*protos.TableMapping{{ + SourceTableIdentifier: qualified, + DestinationTableIdentifier: qualified, + }}, + SourceName: GeneratePostgresPeer(s.t).Name, + MaxBatchSize: 100, + DoInitialSnapshot: true, + System: protos.TypeSystem_PG, + Env: map[string]string{ + "PEERDB_PG_AUTOMATED_SCHEMA_DUMP": "true", + }, + } + + tc := NewTemporalClient(s.t) + env := ExecutePeerflow(s.t, tc, config) + SetupCDCFlowStatusQuery(s.t, env, config) + + // initial load: pg_dump must succeed despite missing owner/grantee role, + // then snapshot copies the 5 seed rows. + EnvWaitFor(s.t, env, 3*time.Minute, "initial load owned_tbl", func() bool { + var count int64 + err := dstConn.QueryRow(s.t.Context(), + fmt.Sprintf("SELECT COUNT(*) FROM %s", qualified)).Scan(&count) + return err == nil && count == 5 + }) + + // CDC: insert more rows on source and wait for them on dst + for i := 6; i <= 10; i++ { + _, err = s.Conn().Exec(s.t.Context(), + fmt.Sprintf("INSERT INTO %s (val) VALUES ($1)", qualified), + fmt.Sprintf("cdc_%d", i)) + EnvNoError(s.t, env, err) + } + + EnvWaitFor(s.t, env, 3*time.Minute, "cdc owned_tbl", func() bool { + var count int64 + err := dstConn.QueryRow(s.t.Context(), + fmt.Sprintf("SELECT COUNT(*) FROM %s", qualified)).Scan(&count) + return err == nil && count == 10 + }) + + // owner on dst should be the connecting user, not the (missing) source role + var dstOwner string + require.NoError(s.t, dstConn.QueryRow(s.t.Context(), + "SELECT tableowner FROM pg_tables WHERE schemaname=$1 AND tablename=$2", + srcSchema, tableName).Scan(&dstOwner)) + require.NotEqual(s.t, roleName, dstOwner, "destination table should not be owned by the source-only role") + + // no grants should reference the missing role on dst + var grantCount int + require.NoError(s.t, dstConn.QueryRow(s.t.Context(), + "SELECT COUNT(*) FROM information_schema.table_privileges WHERE table_schema=$1 AND table_name=$2 AND grantee=$3", + srcSchema, tableName, roleName).Scan(&grantCount)) + require.Zero(s.t, grantCount, "no privileges should be granted to the source-only role on destination") + + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) } From a87896ac7672f7bb454f5d7a3b801eb322f0c4b7 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 May 2026 14:05:58 +0530 Subject: [PATCH 05/21] use with force --- flow/e2e/pg_schema_dump_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flow/e2e/pg_schema_dump_test.go b/flow/e2e/pg_schema_dump_test.go index ebcadeff8..02144ee6c 100644 --- a/flow/e2e/pg_schema_dump_test.go +++ b/flow/e2e/pg_schema_dump_test.go @@ -31,9 +31,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { return } defer dropConn.Close(ctx) - _, _ = dropConn.Exec(ctx, fmt.Sprintf( - "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='%s' AND pid <> pg_backend_pid()", dstDBName)) - if _, err := dropConn.Exec(ctx, "DROP DATABASE IF EXISTS "+dstDBName); err != nil { + if _, err := dropConn.Exec(ctx, "DROP DATABASE IF EXISTS "+dstDBName+" WITH (FORCE)"); err != nil { s.t.Logf("failed to drop destination database %s: %v", dstDBName, err) } }) From 6a812dbf44f3fd9707e9ab0c2ff7fe109aa54c12 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 May 2026 15:12:58 +0530 Subject: [PATCH 06/21] fix(test): use dedicated source db --- flow/e2e/pg_schema_dump_test.go | 112 ++++++++++++++++++++++++-------- 1 file changed, 86 insertions(+), 26 deletions(-) diff --git a/flow/e2e/pg_schema_dump_test.go b/flow/e2e/pg_schema_dump_test.go index 02144ee6c..dcf015a43 100644 --- a/flow/e2e/pg_schema_dump_test.go +++ b/flow/e2e/pg_schema_dump_test.go @@ -3,6 +3,7 @@ package e2e import ( "context" "fmt" + "testing" "time" "github.com/jackc/pgx/v5" @@ -12,8 +13,66 @@ import ( "github.com/PeerDB-io/peerdb/flow/internal" ) +// setupDedicatedPgDumpSource creates a fresh database on the primary PG instance +// to serve as the source for schema-dump tests. pg_dump dumps a whole database, +// so sharing the source DB with other parallel tests caused the dump to include +// every concurrent test's e2e_test_ schema, blowing past the workflow +// SETUP timeout. A dedicated source DB keeps the dump scoped to this test. +// +// Returns a connection to the new DB, the registered source peer name, and the +// schema name to use within it. All resources are cleaned up via t.Cleanup. +func setupDedicatedPgDumpSource(t *testing.T, suffix string) (*pgx.Conn, string, string) { + t.Helper() + + srcCfg := internal.GetAncillaryPostgresConfigFromEnv() + srcDBName := "e2e_pgdump_src_" + suffix + srcSchema := "e2e_test_" + suffix + + bootstrapStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", + srcCfg.Host, srcCfg.Port, srcCfg.User, srcCfg.Password, srcCfg.Database) + bootstrap, err := pgx.Connect(t.Context(), bootstrapStr) + require.NoError(t, err) + _, err = bootstrap.Exec(t.Context(), "CREATE DATABASE "+srcDBName) + require.NoError(t, err) + bootstrap.Close(t.Context()) + + srcConnStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", + srcCfg.Host, srcCfg.Port, srcCfg.User, srcCfg.Password, srcDBName) + srcConn, err := pgx.Connect(t.Context(), srcConnStr) + require.NoError(t, err) + _, err = srcConn.Exec(t.Context(), "CREATE SCHEMA "+srcSchema) + require.NoError(t, err) + + srcPeerCfg := internal.GetAncillaryPostgresConfigFromEnv() + srcPeerCfg.Database = srcDBName + srcPeerName := "pgdump_src_" + suffix + CreatePeer(t, &protos.Peer{ + Name: srcPeerName, + Type: protos.DBType_POSTGRES, + Config: &protos.Peer_PostgresConfig{PostgresConfig: srcPeerCfg}, + }) + + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + srcConn.Close(ctx) + dropConn, err := pgx.Connect(ctx, bootstrapStr) + if err != nil { + t.Logf("failed to connect for source DB cleanup: %v", err) + return + } + defer dropConn.Close(ctx) + if _, err := dropConn.Exec(ctx, "DROP DATABASE IF EXISTS "+srcDBName+" WITH (FORCE)"); err != nil { + t.Logf("failed to drop source database %s: %v", srcDBName, err) + } + }) + + return srcConn, srcPeerName, srcSchema +} + func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { - srcSchema := "e2e_test_" + s.suffix + srcConn, srcPeerName, srcSchema := setupDedicatedPgDumpSource(s.t, s.suffix) + dstDBName := "e2e_pgdump_" + s.suffix // create destination database on the same PG instance @@ -44,9 +103,6 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { require.NoError(s.t, err) s.t.Cleanup(func() { dstConn.Close(s.t.Context()) }) - // create the source schema in the destination database (pg_dump will create it, but we need it for the peer) - // Actually, pg_dump --schema-only will create the schema, so we don't need to pre-create it. - // create a destination peer pointing to the new database dstPeerCfg := internal.GetAncillaryPostgresConfigFromEnv() dstPeerCfg.Database = dstDBName @@ -63,13 +119,13 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { // --- set up rich schema on source --- // create custom enum type in the source schema - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf( "CREATE TYPE %s.color AS ENUM ('red', 'green', 'blue')", srcSchema)) require.NoError(s.t, err) // create parent table with various column types parentTable := srcSchema + ".parent_tbl" - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(` + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf(` CREATE TABLE %s ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, @@ -81,13 +137,13 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { require.NoError(s.t, err) // create unique index on parent - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf( "CREATE UNIQUE INDEX idx_parent_name ON %s (name)", parentTable)) require.NoError(s.t, err) // create child table with foreign key referencing parent childTable := srcSchema + ".child_tbl" - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(` + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf(` CREATE TABLE %s ( id SERIAL PRIMARY KEY, parent_id INT NOT NULL REFERENCES %s(id), @@ -97,13 +153,13 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { require.NoError(s.t, err) // create btree index on child - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf( "CREATE INDEX idx_child_parent ON %s (parent_id)", childTable)) require.NoError(s.t, err) // insert initial data for snapshot for i := 1; i <= 5; i++ { - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf( "INSERT INTO %s (name, color, score, metadata) VALUES ($1, $2, $3, $4)", parentTable), fmt.Sprintf("item_%d", i), @@ -115,7 +171,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { } for i := 1; i <= 10; i++ { - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf( "INSERT INTO %s (parent_id, value, tags) VALUES ($1, $2, $3)", childTable), (i%5)+1, @@ -145,7 +201,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { DestinationTableIdentifier: dstChild, }, }, - SourceName: GeneratePostgresPeer(s.t).Name, + SourceName: srcPeerName, MaxBatchSize: 100, DoInitialSnapshot: true, System: protos.TypeSystem_PG, @@ -223,7 +279,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { // insert more parent rows for i := 6; i <= 8; i++ { - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf( "INSERT INTO %s (name, color, score, metadata) VALUES ($1, $2, $3, $4)", parentTable), fmt.Sprintf("item_%d", i), @@ -236,7 +292,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { // insert more child rows for i := 11; i <= 15; i++ { - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf( "INSERT INTO %s (parent_id, value, tags) VALUES ($1, $2, $3)", childTable), (i%5)+1, @@ -263,14 +319,14 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { // verify data integrity: compare actual row content // query source and destination and compare var srcParentCount, dstParentCount int64 - err = s.Conn().QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", srcParent)).Scan(&srcParentCount) + err = srcConn.QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", srcParent)).Scan(&srcParentCount) require.NoError(s.t, err) err = dstConn.QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", dstParent)).Scan(&dstParentCount) require.NoError(s.t, err) require.Equal(s.t, srcParentCount, dstParentCount, "parent table row counts should match") var srcChildCount, dstChildCount int64 - err = s.Conn().QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", srcChild)).Scan(&srcChildCount) + err = srcConn.QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", srcChild)).Scan(&srcChildCount) require.NoError(s.t, err) err = dstConn.QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", dstChild)).Scan(&dstChildCount) require.NoError(s.t, err) @@ -290,9 +346,10 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { // Also verifies initial load + CDC into the dumped table on the destination // cluster, so we know the table is usable end-to-end. func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_No_Owner_No_Privileges() { + srcConn, srcPeerName, srcSchema := setupDedicatedPgDumpSource(s.t, s.suffix) + dstCfg := internal.GetSecondaryPostgresConfigFromEnv() - srcSchema := "e2e_test_" + s.suffix roleName := "peerdb_owner_role_" + s.suffix tableName := "owned_tbl" qualified := fmt.Sprintf("%s.%s", srcSchema, tableName) @@ -310,28 +367,31 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_No_Owner_No_Privileges() { "SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname=$1)", roleName).Scan(&roleExistsOnDst)) require.False(s.t, roleExistsOnDst, "role %s unexpectedly exists on destination", roleName) - // create role + owned/granted table on source with seed rows - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf("CREATE ROLE %s LOGIN PASSWORD 'pw'", roleName)) + // create role + owned/granted table on source with seed rows. + // the role is cluster-wide; the table lives in the dedicated source DB + // and will go away when that DB is dropped during cleanup. + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf("CREATE ROLE %s LOGIN PASSWORD 'pw'", roleName)) require.NoError(s.t, err) - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf( + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf( "CREATE TABLE %s (id SERIAL PRIMARY KEY, val TEXT)", qualified)) require.NoError(s.t, err) - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf("ALTER TABLE %s OWNER TO %s", qualified, roleName)) + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf("ALTER TABLE %s OWNER TO %s", qualified, roleName)) require.NoError(s.t, err) - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf("GRANT SELECT, INSERT ON %s TO %s", qualified, roleName)) + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf("GRANT SELECT, INSERT ON %s TO %s", qualified, roleName)) require.NoError(s.t, err) for i := 1; i <= 5; i++ { - _, err = s.Conn().Exec(s.t.Context(), + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf("INSERT INTO %s (val) VALUES ($1)", qualified), fmt.Sprintf("snap_%d", i)) require.NoError(s.t, err) } + // role is cluster-wide so it outlives the dedicated source DB; drop it + // against the shared source connection after the source DB is gone. s.t.Cleanup(func() { cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) defer cleanupCancel() - _, _ = s.Conn().Exec(cleanupCtx, fmt.Sprintf("DROP TABLE IF EXISTS %s", qualified)) _, _ = s.Conn().Exec(cleanupCtx, "DROP ROLE IF EXISTS "+roleName) dropConn, err := pgx.Connect(cleanupCtx, dstConnStr) @@ -358,7 +418,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_No_Owner_No_Privileges() { SourceTableIdentifier: qualified, DestinationTableIdentifier: qualified, }}, - SourceName: GeneratePostgresPeer(s.t).Name, + SourceName: srcPeerName, MaxBatchSize: 100, DoInitialSnapshot: true, System: protos.TypeSystem_PG, @@ -382,7 +442,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_No_Owner_No_Privileges() { // CDC: insert more rows on source and wait for them on dst for i := 6; i <= 10; i++ { - _, err = s.Conn().Exec(s.t.Context(), + _, err = srcConn.Exec(s.t.Context(), fmt.Sprintf("INSERT INTO %s (val) VALUES ($1)", qualified), fmt.Sprintf("cdc_%d", i)) EnvNoError(s.t, env, err) From 565a9844af141db5e331965e23dfb78bf1acb380 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 May 2026 15:16:00 +0530 Subject: [PATCH 07/21] chore: lint --- flow/connectors/postgres/pgdump_schema.go | 10 +++++----- flow/e2e/pg_schema_dump_test.go | 23 ++++++++++++----------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/flow/connectors/postgres/pgdump_schema.go b/flow/connectors/postgres/pgdump_schema.go index 8bf2efd9a..723b68c35 100644 --- a/flow/connectors/postgres/pgdump_schema.go +++ b/flow/connectors/postgres/pgdump_schema.go @@ -40,8 +40,8 @@ func pipeCommand( psqlCmd.Env = append(os.Environ(), "PGPASSWORD="+dstConfig.Password) // handle TLS env vars - appendTLSEnv(srcCmd, srcConfig) - appendTLSEnv(psqlCmd, dstConfig) + appendTLSEnv(ctx, srcCmd, srcConfig) + appendTLSEnv(ctx, psqlCmd, dstConfig) // pipe source command stdout -> psql stdin pipe, err := srcCmd.StdoutPipe() @@ -118,7 +118,7 @@ func buildPsqlArgs(config *protos.PostgresConfig) []string { return args } -func appendTLSEnv(cmd *exec.Cmd, config *protos.PostgresConfig) { +func appendTLSEnv(ctx context.Context, cmd *exec.Cmd, config *protos.PostgresConfig) { if config.RequireTls { cmd.Env = append(cmd.Env, "PGSSLMODE=require") @@ -126,11 +126,11 @@ func appendTLSEnv(cmd *exec.Cmd, config *protos.PostgresConfig) { // write root CA to a temp file tmpFile, err := os.CreateTemp("", "peerdb-root-ca-*.pem") if err != nil { - slog.Warn("failed to create temp file for root CA, skipping sslrootcert", slog.Any("error", err)) + slog.WarnContext(ctx, "failed to create temp file for root CA, skipping sslrootcert", slog.Any("error", err)) return } if _, err := tmpFile.WriteString(*config.RootCa); err != nil { - slog.Warn("failed to write root CA to temp file", slog.Any("error", err)) + slog.WarnContext(ctx, "failed to write root CA to temp file", slog.Any("error", err)) tmpFile.Close() os.Remove(tmpFile.Name()) return diff --git a/flow/e2e/pg_schema_dump_test.go b/flow/e2e/pg_schema_dump_test.go index dcf015a43..5d0ccf938 100644 --- a/flow/e2e/pg_schema_dump_test.go +++ b/flow/e2e/pg_schema_dump_test.go @@ -220,13 +220,13 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { EnvWaitFor(s.t, env, 3*time.Minute, "initial load parent", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - fmt.Sprintf("SELECT COUNT(*) FROM %s", dstParent)).Scan(&count) + "SELECT COUNT(*) FROM " + dstParent).Scan(&count) return err == nil && count == 5 }) EnvWaitFor(s.t, env, 3*time.Minute, "initial load child", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - fmt.Sprintf("SELECT COUNT(*) FROM %s", dstChild)).Scan(&count) + "SELECT COUNT(*) FROM " + dstChild).Scan(&count) return err == nil && count == 10 }) @@ -306,29 +306,29 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { EnvWaitFor(s.t, env, 3*time.Minute, "cdc parent rows", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - fmt.Sprintf("SELECT COUNT(*) FROM %s", dstParent)).Scan(&count) + "SELECT COUNT(*) FROM " + dstParent).Scan(&count) return err == nil && count == 8 }) EnvWaitFor(s.t, env, 3*time.Minute, "cdc child rows", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - fmt.Sprintf("SELECT COUNT(*) FROM %s", dstChild)).Scan(&count) + "SELECT COUNT(*) FROM " + dstChild).Scan(&count) return err == nil && count == 15 }) // verify data integrity: compare actual row content // query source and destination and compare var srcParentCount, dstParentCount int64 - err = srcConn.QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", srcParent)).Scan(&srcParentCount) + err = srcConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM " + srcParent).Scan(&srcParentCount) require.NoError(s.t, err) - err = dstConn.QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", dstParent)).Scan(&dstParentCount) + err = dstConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM " + dstParent).Scan(&dstParentCount) require.NoError(s.t, err) require.Equal(s.t, srcParentCount, dstParentCount, "parent table row counts should match") var srcChildCount, dstChildCount int64 - err = srcConn.QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", srcChild)).Scan(&srcChildCount) + err = srcConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM " + srcChild).Scan(&srcChildCount) require.NoError(s.t, err) - err = dstConn.QueryRow(s.t.Context(), fmt.Sprintf("SELECT COUNT(*) FROM %s", dstChild)).Scan(&dstChildCount) + err = dstConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM " + dstChild).Scan(&dstChildCount) require.NoError(s.t, err) require.Equal(s.t, srcChildCount, dstChildCount, "child table row counts should match") @@ -358,7 +358,8 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_No_Owner_No_Privileges() { dstConnStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s", dstCfg.Host, dstCfg.Port, dstCfg.User, dstCfg.Password, dstCfg.Database) dstConn, err := pgx.Connect(s.t.Context(), dstConnStr) - require.NoError(s.t, err, "failed to connect to secondary postgres on %s:%d (is the postgres2 tilt resource running?)", dstCfg.Host, dstCfg.Port) + require.NoError(s.t, err, "failed to connect to secondary postgres on %s:%d (postgres2 tilt resource running?)", + dstCfg.Host, dstCfg.Port) s.t.Cleanup(func() { dstConn.Close(s.t.Context()) }) // sanity: role must not exist on destination @@ -436,7 +437,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_No_Owner_No_Privileges() { EnvWaitFor(s.t, env, 3*time.Minute, "initial load owned_tbl", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - fmt.Sprintf("SELECT COUNT(*) FROM %s", qualified)).Scan(&count) + "SELECT COUNT(*) FROM " + qualified).Scan(&count) return err == nil && count == 5 }) @@ -451,7 +452,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_No_Owner_No_Privileges() { EnvWaitFor(s.t, env, 3*time.Minute, "cdc owned_tbl", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - fmt.Sprintf("SELECT COUNT(*) FROM %s", qualified)).Scan(&count) + "SELECT COUNT(*) FROM " + qualified).Scan(&count) return err == nil && count == 10 }) From db40b989d926d0139372967c77f73c2c88ca8049 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 May 2026 15:52:33 +0530 Subject: [PATCH 08/21] feat(dynconf): add postgres target and default --- flow/internal/dynamicconf.go | 8 ++++++++ protos/flow.proto | 1 + 2 files changed, 9 insertions(+) diff --git a/flow/internal/dynamicconf.go b/flow/internal/dynamicconf.go index a88c414d6..75fc51341 100644 --- a/flow/internal/dynamicconf.go +++ b/flow/internal/dynamicconf.go @@ -443,6 +443,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_ALL, }, + { + Name: "PEERDB_PG_AUTOMATED_SCHEMA_DUMP", + Description: "For PG-to-PG mirrors, run pg_dump --schema-only from source into psql on destination during setup so destination schema/tables/indexes match the source.", + DefaultValue: "false", + ValueType: protos.DynconfValueType_BOOL, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, + TargetForSetting: protos.DynconfTarget_POSTGRES, + }, } var DynamicIndex = func() map[string]int { diff --git a/protos/flow.proto b/protos/flow.proto index eb53bf4f9..006a588a4 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -583,6 +583,7 @@ enum DynconfTarget { SNOWFLAKE = 2; CLICKHOUSE = 3; QUEUES = 4; + POSTGRES = 5; } message DropFlowActivityInput { From d4d901192eaf17f336ae9cbad4760dfd89b209b0 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 May 2026 16:42:22 +0530 Subject: [PATCH 09/21] fix: lint and test --- flow/e2e/pg_schema_dump_test.go | 20 ++++++++++---------- flow/internal/dynamicconf.go | 5 +++-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/flow/e2e/pg_schema_dump_test.go b/flow/e2e/pg_schema_dump_test.go index 5d0ccf938..bcc587d0f 100644 --- a/flow/e2e/pg_schema_dump_test.go +++ b/flow/e2e/pg_schema_dump_test.go @@ -220,13 +220,13 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { EnvWaitFor(s.t, env, 3*time.Minute, "initial load parent", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - "SELECT COUNT(*) FROM " + dstParent).Scan(&count) + "SELECT COUNT(*) FROM "+dstParent).Scan(&count) return err == nil && count == 5 }) EnvWaitFor(s.t, env, 3*time.Minute, "initial load child", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - "SELECT COUNT(*) FROM " + dstChild).Scan(&count) + "SELECT COUNT(*) FROM "+dstChild).Scan(&count) return err == nil && count == 10 }) @@ -306,29 +306,29 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_And_CDC() { EnvWaitFor(s.t, env, 3*time.Minute, "cdc parent rows", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - "SELECT COUNT(*) FROM " + dstParent).Scan(&count) + "SELECT COUNT(*) FROM "+dstParent).Scan(&count) return err == nil && count == 8 }) EnvWaitFor(s.t, env, 3*time.Minute, "cdc child rows", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - "SELECT COUNT(*) FROM " + dstChild).Scan(&count) + "SELECT COUNT(*) FROM "+dstChild).Scan(&count) return err == nil && count == 15 }) // verify data integrity: compare actual row content // query source and destination and compare var srcParentCount, dstParentCount int64 - err = srcConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM " + srcParent).Scan(&srcParentCount) + err = srcConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM "+srcParent).Scan(&srcParentCount) require.NoError(s.t, err) - err = dstConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM " + dstParent).Scan(&dstParentCount) + err = dstConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM "+dstParent).Scan(&dstParentCount) require.NoError(s.t, err) require.Equal(s.t, srcParentCount, dstParentCount, "parent table row counts should match") var srcChildCount, dstChildCount int64 - err = srcConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM " + srcChild).Scan(&srcChildCount) + err = srcConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM "+srcChild).Scan(&srcChildCount) require.NoError(s.t, err) - err = dstConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM " + dstChild).Scan(&dstChildCount) + err = dstConn.QueryRow(s.t.Context(), "SELECT COUNT(*) FROM "+dstChild).Scan(&dstChildCount) require.NoError(s.t, err) require.Equal(s.t, srcChildCount, dstChildCount, "child table row counts should match") @@ -437,7 +437,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_No_Owner_No_Privileges() { EnvWaitFor(s.t, env, 3*time.Minute, "initial load owned_tbl", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - "SELECT COUNT(*) FROM " + qualified).Scan(&count) + "SELECT COUNT(*) FROM "+qualified).Scan(&count) return err == nil && count == 5 }) @@ -452,7 +452,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_Schema_Dump_No_Owner_No_Privileges() { EnvWaitFor(s.t, env, 3*time.Minute, "cdc owned_tbl", func() bool { var count int64 err := dstConn.QueryRow(s.t.Context(), - "SELECT COUNT(*) FROM " + qualified).Scan(&count) + "SELECT COUNT(*) FROM "+qualified).Scan(&count) return err == nil && count == 10 }) diff --git a/flow/internal/dynamicconf.go b/flow/internal/dynamicconf.go index 75fc51341..0702f4ddb 100644 --- a/flow/internal/dynamicconf.go +++ b/flow/internal/dynamicconf.go @@ -444,8 +444,9 @@ var DynamicSettings = [...]*protos.DynamicSetting{ TargetForSetting: protos.DynconfTarget_ALL, }, { - Name: "PEERDB_PG_AUTOMATED_SCHEMA_DUMP", - Description: "For PG-to-PG mirrors, run pg_dump --schema-only from source into psql on destination during setup so destination schema/tables/indexes match the source.", + Name: "PEERDB_PG_AUTOMATED_SCHEMA_DUMP", + Description: "For PG-to-PG mirrors, run pg_dump --schema-only from source into psql on destination " + + "during setup so destination schema/tables/indexes match the source.", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, From 3a0cd358e1ea98a1024033b416fb9617b028727f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 May 2026 21:56:09 +0530 Subject: [PATCH 10/21] revert stacks, go --- flow/go.sum | 4 ++-- stacks/flow.Dockerfile | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flow/go.sum b/flow/go.sum index 52fe9f0cf..36acb1e31 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -865,8 +865,8 @@ golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa h1:efT73AJZfAAUV7SOip6 golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa/go.mod h1:kHjTxDEnAu6/Nl9lDkzjWpR+bmKfxeiRuSDlsMb70gE= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= -golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY= +golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= +golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= diff --git a/stacks/flow.Dockerfile b/stacks/flow.Dockerfile index c9847e575..3c13ee793 100644 --- a/stacks/flow.Dockerfile +++ b/stacks/flow.Dockerfile @@ -33,7 +33,7 @@ RUN --mount=type=cache,target="/root/.cache/go-build" if [[ "$DEBUG_BUILD" = "1" FROM alpine:3.23@sha256:5b10f432ef3da1b8d4c7eb6c487f2f5a8f096bc91145e68878dd4a5019afde11 AS flow-base ENV TZ=UTC ADD --checksum=sha256:e5bb2084ccf45087bda1c9bffdea0eb15ee67f0b91646106e466714f9de3c7e3 https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem /usr/local/share/ca-certificates/global-aws-rds-bundle.pem -RUN apk add --no-cache ca-certificates geos postgresql-client && \ +RUN apk add --no-cache ca-certificates geos && \ update-ca-certificates && \ adduser -s /bin/sh -D peerdb USER peerdb From cdab47329067cb4a4e244510f924608e356cd9a2 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 May 2026 22:42:55 +0530 Subject: [PATCH 11/21] remove unused function --- flow/internal/test_env.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/flow/internal/test_env.go b/flow/internal/test_env.go index e4cac6e55..4055295f6 100644 --- a/flow/internal/test_env.go +++ b/flow/internal/test_env.go @@ -30,16 +30,6 @@ func GetAncillaryPostgresConfigFromEnv() *protos.PostgresConfig { } } -func GetSecondaryPostgresConfigFromEnv() *protos.PostgresConfig { - return &protos.PostgresConfig{ - Host: GetEnvString("PG2_HOST", "localhost"), - Port: uint32(getEnvUint[uint16]("PG2_PORT", 5437)), - User: GetEnvString("PG2_USER", "postgres"), - Password: GetEnvString("PG2_PASSWORD", "postgres"), - Database: GetEnvString("PG2_DATABASE", "postgres"), - } -} - func PostgresToxiproxyUpstreamHostWithFallback(fallback string) string { return GetEnvString("TOXIPROXY_POSTGRES_HOST", fallback) } From 29575c562212ab85067752356901b11f8ca7ab28 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 May 2026 22:43:24 +0530 Subject: [PATCH 12/21] Revert "remove unused function" This reverts commit 71af23abc341bbb7243d3a694698356916d9c8d5. --- flow/internal/test_env.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/flow/internal/test_env.go b/flow/internal/test_env.go index 4055295f6..e4cac6e55 100644 --- a/flow/internal/test_env.go +++ b/flow/internal/test_env.go @@ -30,6 +30,16 @@ func GetAncillaryPostgresConfigFromEnv() *protos.PostgresConfig { } } +func GetSecondaryPostgresConfigFromEnv() *protos.PostgresConfig { + return &protos.PostgresConfig{ + Host: GetEnvString("PG2_HOST", "localhost"), + Port: uint32(getEnvUint[uint16]("PG2_PORT", 5437)), + User: GetEnvString("PG2_USER", "postgres"), + Password: GetEnvString("PG2_PASSWORD", "postgres"), + Database: GetEnvString("PG2_DATABASE", "postgres"), + } +} + func PostgresToxiproxyUpstreamHostWithFallback(fallback string) string { return GetEnvString("TOXIPROXY_POSTGRES_HOST", fallback) } From 071b4e36895804a3fd4492a9d73cdbb4d1a56965 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 8 May 2026 18:30:14 +0530 Subject: [PATCH 13/21] cleaner --- flow/activities/flowable.go | 22 +++++------ .../postgres/postgres_destination.go | 1 - flow/workflows/setup_flow.go | 38 +++++++++++-------- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 18c32c677..1693f9e4a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -2149,57 +2149,57 @@ func (a *FlowableActivity) PeerDBPGAutomatedSchemaDump(ctx context.Context, env func (a *FlowableActivity) RunPgDumpSchema( ctx context.Context, input *protos.RunPgDumpSchemaInput, -) error { +) (bool, error) { logger := internal.LoggerFromCtx(ctx) ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowName) srcPeer, err := connectors.LoadPeer(ctx, a.CatalogPool, input.SourceName) if err != nil { - return a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("failed to load source peer: %w", err)) + return false, a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("failed to load source peer: %w", err)) } dstPeer, err := connectors.LoadPeer(ctx, a.CatalogPool, input.DestinationName) if err != nil { - return a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("failed to load destination peer: %w", err)) + return false, a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("failed to load destination peer: %w", err)) } srcPgConfig, ok := srcPeer.Config.(*protos.Peer_PostgresConfig) if !ok { - return a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("source peer %s is not a PostgreSQL peer", input.SourceName)) + return false, a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("source peer %s is not a PostgreSQL peer", input.SourceName)) } dstPgConfig, ok := dstPeer.Config.(*protos.Peer_PostgresConfig) if !ok { - return a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("destination peer %s is not a PostgreSQL peer", input.DestinationName)) + return false, a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("destination peer %s is not a PostgreSQL peer", input.DestinationName)) } // skip schema migration for peers using SSH tunnels if srcPgConfig.PostgresConfig.SshConfig != nil { logger.Info("skipping pg_dump schema migration: source peer uses SSH tunnel") - return nil + return false, nil } if dstPgConfig.PostgresConfig.SshConfig != nil { logger.Info("skipping pg_dump schema migration: destination peer uses SSH tunnel") - return nil + return false, nil } // skip schema migration for non-password auth (e.g. IAM) if srcPgConfig.PostgresConfig.AuthType != protos.PostgresAuthType_POSTGRES_PASSWORD { logger.Info("skipping pg_dump schema migration: source peer uses non-password auth") - return nil + return false, nil } if dstPgConfig.PostgresConfig.AuthType != protos.PostgresAuthType_POSTGRES_PASSWORD { logger.Info("skipping pg_dump schema migration: destination peer uses non-password auth") - return nil + return false, nil } logger.Info("running pg_dump schema migration from source to destination", slog.String("source", input.SourceName), slog.String("destination", input.DestinationName)) if err := connpostgres.RunPgDumpSchema(ctx, srcPgConfig.PostgresConfig, dstPgConfig.PostgresConfig); err != nil { - return a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("pg_dump schema migration failed: %w", err)) + return false, a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("pg_dump schema migration failed: %w", err)) } logger.Info("pg_dump schema migration completed successfully") - return nil + return true, nil } diff --git a/flow/connectors/postgres/postgres_destination.go b/flow/connectors/postgres/postgres_destination.go index 2b87fc0aa..9314d798d 100644 --- a/flow/connectors/postgres/postgres_destination.go +++ b/flow/connectors/postgres/postgres_destination.go @@ -329,7 +329,6 @@ func (c *PostgresConnector) normalizeBatch( if _, err := tx.Exec(ctx, setSessionReplicaRoleSQL); err != nil { return 0, fmt.Errorf("failed to set session_replication_role to replica: %w", err) } - c.logger.Info("set session_replication_role to replica for PG type system normalize") break } } diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 917450401..f7cc162f0 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -247,10 +247,13 @@ func (s *SetupFlowExecution) createNormalizedTables( // runPgDumpSchema runs pg_dump --schema-only on the source and pipes the output // into psql on the destination, streaming the schema directly. // This is only used for PG type system (PG-to-PG mirrors). +// Returns true only if the dump activity actually ran (it skips for SSH tunnel +// or non-password auth peers); callers must use this to decide whether the +// destination tables were created. func (s *SetupFlowExecution) runPgDumpSchema( ctx workflow.Context, config *protos.FlowConnectionConfigsCore, -) error { +) (bool, error) { s.Info("running pg_dump schema migration from source to destination") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -267,11 +270,12 @@ func (s *SetupFlowExecution) runPgDumpSchema( Env: config.Env, } - if err := workflow.ExecuteActivity(ctx, flowable.RunPgDumpSchema, input).Get(ctx, nil); err != nil { - return fmt.Errorf("failed to run pg_dump schema migration: %w", err) + var ran bool + if err := workflow.ExecuteActivity(ctx, flowable.RunPgDumpSchema, input).Get(ctx, &ran); err != nil { + return false, fmt.Errorf("failed to run pg_dump schema migration: %w", err) } - return nil + return ran, nil } // getPGAutomatedSchemaDump checks the PEERDB_PG_AUTOMATED_SCHEMA_DUMP env flag via an activity. @@ -314,22 +318,24 @@ func (s *SetupFlowExecution) executeSetupFlow( } } - // for PG type system (PG-to-PG mirrors), run pg_dump schema migration if enabled - enablePgSchemaDump := false - if config.System == protos.TypeSystem_PG { - enablePgSchemaDump = s.getPGAutomatedSchemaDump(ctx, config.Env) - if enablePgSchemaDump { - if err := s.runPgDumpSchema(ctx, config); err != nil { - return nil, fmt.Errorf("failed to run pg_dump schema migration: %w", err) - } - } - } - if err := s.setupTableSchema(ctx, config); err != nil { return nil, fmt.Errorf("failed to fetch table schema: %w", err) } - if err := s.createNormalizedTables(ctx, config); err != nil { + // pg_dump silently no-ops for SSH tunnel / non-password-auth peers, so we + // only skip CreateNormalizedTable when the activity reports it actually ran. + skipCreateTables := false + if config.System == protos.TypeSystem_PG && s.getPGAutomatedSchemaDump(ctx, config.Env) { + ran, err := s.runPgDumpSchema(ctx, config) + if err != nil { + return nil, fmt.Errorf("failed to run pg_dump schema migration: %w", err) + } + skipCreateTables = ran + } + + if skipCreateTables { + s.Info("skipping normalized table creation, pg_dump already created tables") + } else if err := s.createNormalizedTables(ctx, config); err != nil { return nil, fmt.Errorf("failed to create normalized tables: %w", err) } From ffadce8a75d8364c40d917fc74c51ff60524db2a Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 8 May 2026 20:03:22 +0530 Subject: [PATCH 14/21] fix: rebase fix --- flow/workflows/setup_flow.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index f7cc162f0..098797cd3 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -232,15 +232,10 @@ func (s *SetupFlowExecution) createNormalizedTables( Flags: flowConnectionConfigs.Flags, } - if !skipCreateTables { - if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig).Get(ctx, nil); err != nil { - s.Error("failed to create normalized tables", slog.Any("error", err)) - return fmt.Errorf("failed to create normalized tables: %w", err) - } - } else { - s.Info("skipping normalized table creation, pg_dump already created tables") + if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig).Get(ctx, nil); err != nil { + s.Error("failed to create normalized tables", slog.Any("error", err)) + return fmt.Errorf("failed to create normalized tables: %w", err) } - return nil } From d52d41636001255adba12acaabd5a5a6377d9589 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 8 May 2026 20:04:10 +0530 Subject: [PATCH 15/21] chore: lint --- flow/activities/flowable.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 1693f9e4a..46a770b2d 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -2170,7 +2170,8 @@ func (a *FlowableActivity) RunPgDumpSchema( dstPgConfig, ok := dstPeer.Config.(*protos.Peer_PostgresConfig) if !ok { - return false, a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("destination peer %s is not a PostgreSQL peer", input.DestinationName)) + return false, a.Alerter.LogFlowError(ctx, input.FlowName, + fmt.Errorf("destination peer %s is not a PostgreSQL peer", input.DestinationName)) } // skip schema migration for peers using SSH tunnels From edb9f9462519c7e32c8bf55c6267b9d990bedcb9 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 11 May 2026 21:23:34 +0530 Subject: [PATCH 16/21] fix: use os.Pipe and add test --- flow/connectors/postgres/pgdump_schema.go | 85 ++++-- .../connectors/postgres/pgdump_schema_test.go | 250 ++++++++++++++++++ 2 files changed, 313 insertions(+), 22 deletions(-) create mode 100644 flow/connectors/postgres/pgdump_schema_test.go diff --git a/flow/connectors/postgres/pgdump_schema.go b/flow/connectors/postgres/pgdump_schema.go index 723b68c35..35a21d935 100644 --- a/flow/connectors/postgres/pgdump_schema.go +++ b/flow/connectors/postgres/pgdump_schema.go @@ -43,41 +43,82 @@ func pipeCommand( appendTLSEnv(ctx, srcCmd, srcConfig) appendTLSEnv(ctx, psqlCmd, dstConfig) - // pipe source command stdout -> psql stdin - pipe, err := srcCmd.StdoutPipe() + return runPipeline(srcCmd, psqlCmd, srcBinary, "psql") +} + +// runPipeline wires srcCmd's stdout into dstCmd's stdin and waits for both. +func runPipeline(srcCmd, dstCmd *exec.Cmd, srcName, dstName string) error { + pr, pw, err := os.Pipe() if err != nil { - return fmt.Errorf("failed to create %s stdout pipe: %w", srcBinary, err) + return fmt.Errorf("create pipe: %w", err) } - psqlCmd.Stdin = pipe + srcCmd.Stdout = pw + dstCmd.Stdin = pr - var srcStderr, psqlStderr bytes.Buffer + var srcStderr, dstStderr bytes.Buffer srcCmd.Stderr = &srcStderr - psqlCmd.Stderr = &psqlStderr + dstCmd.Stderr = &dstStderr - // start psql first so it's ready to read - if err := psqlCmd.Start(); err != nil { - return fmt.Errorf("failed to start psql: %w", err) + // Start dst first so it's ready to read. + if err := dstCmd.Start(); err != nil { + pr.Close() + pw.Close() + return fmt.Errorf("start %s: %w", dstName, err) } + // dst now owns the read end in its child process. + pr.Close() - // then start source command which writes to the pipe if err := srcCmd.Start(); err != nil { - // kill psql since source command failed to start - _ = psqlCmd.Process.Kill() - _ = psqlCmd.Wait() - return fmt.Errorf("failed to start %s: %w", srcBinary, err) + pw.Close() + _ = dstCmd.Process.Kill() + _ = dstCmd.Wait() + return fmt.Errorf("start %s: %w", srcName, err) + } + // src now owns the write end in its child process. + pw.Close() + + srcDone := make(chan error, 1) + dstDone := make(chan error, 1) + go func() { srcDone <- srcCmd.Wait() }() + go func() { dstDone <- dstCmd.Wait() }() + + var ( + srcErr, dstErr error + srcKilled, dstKilled bool + ) + for range 2 { + select { + case err := <-srcDone: + srcErr = err + if err != nil && dstCmd.ProcessState == nil { + _ = dstCmd.Process.Kill() + dstKilled = true + } + case err := <-dstDone: + dstErr = err + if srcCmd.ProcessState == nil { + // dst exited (success or failure) while src is still running; + // kill src so it doesn't block on a pipe with no reader. + _ = srcCmd.Process.Kill() + srcKilled = true + } + } } - // wait for source command to finish (closes the pipe, signaling EOF to psql) - srcErr := srcCmd.Wait() - psqlErr := psqlCmd.Wait() - + // Report the original cause, not the side we killed in response. + if dstErr != nil && !dstKilled { + return fmt.Errorf("%s failed: %w\nstderr:\n%s", dstName, dstErr, dstStderr.String()) + } + if srcErr != nil && !srcKilled { + return fmt.Errorf("%s failed: %w\nstderr:\n%s", srcName, srcErr, srcStderr.String()) + } + // Fallback: both sides killed (e.g. ctx cancel) — surface whichever error we have. if srcErr != nil { - return fmt.Errorf("%s failed: %w\nstderr: %s", srcBinary, srcErr, srcStderr.String()) + return fmt.Errorf("%s failed: %w\nstderr:\n%s", srcName, srcErr, srcStderr.String()) } - if psqlErr != nil { - return fmt.Errorf("psql failed: %w\nstderr: %s", psqlErr, psqlStderr.String()) + if dstErr != nil { + return fmt.Errorf("%s failed: %w\nstderr:\n%s", dstName, dstErr, dstStderr.String()) } - return nil } diff --git a/flow/connectors/postgres/pgdump_schema_test.go b/flow/connectors/postgres/pgdump_schema_test.go new file mode 100644 index 000000000..240b39dfe --- /dev/null +++ b/flow/connectors/postgres/pgdump_schema_test.go @@ -0,0 +1,250 @@ +package connpostgres + +import ( + "bytes" + "context" + "errors" + "os/exec" + "runtime" + "strings" + "testing" + "time" +) + +// requireUnix skips the test on platforms without the shell utilities used here. +func requireUnix(t *testing.T) { + t.Helper() + if runtime.GOOS == "windows" { + t.Skip("requires unix shell utilities") + } +} + +func TestRunPipeline_HappyPath(t *testing.T) { + requireUnix(t) + ctx := t.Context() + + src := exec.CommandContext(ctx, "sh", "-c", "printf 'hello world'") + var dstOut bytes.Buffer + dst := exec.CommandContext(ctx, "cat") + dst.Stdout = &dstOut + + if err := runPipeline(src, dst, "src", "dst"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := dstOut.String(); got != "hello world" { + t.Fatalf("dst stdout = %q, want %q", got, "hello world") + } +} + +func TestRunPipeline_SrcStartFails(t *testing.T) { + ctx := t.Context() + + src := exec.CommandContext(ctx, "/nonexistent/peerdb-test-binary") + dst := exec.CommandContext(ctx, "cat") + + err := runPipeline(src, dst, "src", "dst") + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "start src") { + t.Fatalf("error %q does not mention src start failure", err) + } + // dst should have been killed and reaped; ProcessState should be set. + if dst.ProcessState == nil { + t.Fatal("dst was not reaped after src start failure") + } +} + +func TestRunPipeline_DstStartFails(t *testing.T) { + ctx := t.Context() + + src := exec.CommandContext(ctx, "echo", "hi") + dst := exec.CommandContext(ctx, "/nonexistent/peerdb-test-binary") + + err := runPipeline(src, dst, "src", "dst") + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "start dst") { + t.Fatalf("error %q does not mention dst start failure", err) + } + // src must not have been started. + if src.ProcessState != nil { + t.Fatal("src should not have been started when dst failed to start") + } +} + +func TestRunPipeline_SrcExitsNonZero(t *testing.T) { + requireUnix(t) + ctx := t.Context() + + // write some output then exit with error + src := exec.CommandContext(ctx, "sh", "-c", "echo partial; exit 7") + dst := exec.CommandContext(ctx, "cat") + dst.Stdout = &bytes.Buffer{} + + err := runPipeline(src, dst, "src", "dst") + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "src failed") { + t.Fatalf("error %q does not mention src failure", err) + } +} + +func TestRunPipeline_DstExitsNonZero(t *testing.T) { + requireUnix(t) + ctx := t.Context() + + src := exec.CommandContext(ctx, "sh", "-c", "echo hi") + // exit 3 immediately, ignoring stdin + dst := exec.CommandContext(ctx, "sh", "-c", "exit 3") + + err := runPipeline(src, dst, "src", "dst") + if err == nil { + t.Fatal("expected error, got nil") + } + // src succeeded so error must be from dst + if !strings.Contains(err.Error(), "dst failed") { + t.Fatalf("error %q does not mention dst failure", err) + } +} + +// TestRunPipeline_SrcFailsWhileDstSlow verifies the deadlock-prevention fix: +// if src exits non-zero while dst is still reading slowly, dst is killed so +// runPipeline returns promptly instead of waiting for dst to finish its work. +func TestRunPipeline_SrcFailsWhileDstSlow(t *testing.T) { + requireUnix(t) + ctx := t.Context() + + // src writes a small amount (fits in pipe buffer, no blocking) then exits non-zero. + src := exec.CommandContext(ctx, "sh", "-c", "echo hi; exit 9") + // dst is a single process (no shell-spawned children) that doesn't read stdin + // and won't exit on its own. We expect runPipeline to kill it after src fails. + // Note: we deliberately avoid `sh -c "sleep 30; cat"` here -- when sh forks a + // child, that child inherits sh's stderr fd, and Go's exec.Wait blocks + // draining stderr until the inherited fd is closed (i.e. for the full sleep). + // psql doesn't fork children, so this matches real behavior. + dst := exec.CommandContext(ctx, "sleep", "30") + + start := time.Now() + done := make(chan error, 1) + go func() { done <- runPipeline(src, dst, "src", "dst") }() + + select { + case err := <-done: + if err == nil { + t.Fatal("expected error from src failure") + } + if !strings.Contains(err.Error(), "src failed") { + t.Fatalf("expected src failure, got %v", err) + } + if elapsed := time.Since(start); elapsed > 5*time.Second { + t.Fatalf("runPipeline took %v -- dst was not killed promptly after src failure", elapsed) + } + case <-time.After(10 * time.Second): + t.Fatal("runPipeline hung -- dst was not killed after src failure") + } +} + +// TestRunPipeline_DstExitsWhileSrcWriting verifies the inverse: if dst exits +// early while src is producing lots of data, src is killed so it doesn't hang +// forever blocked on a write to a closed pipe (would normally get SIGPIPE, +// but we explicitly kill to be safe / to surface the failure quickly). +func TestRunPipeline_DstExitsWhileSrcWriting(t *testing.T) { + requireUnix(t) + ctx := t.Context() + + // src tries to stream a lot of data + src := exec.CommandContext(ctx, "sh", "-c", "yes peerdb | head -c 10000000") + // dst exits immediately without reading + dst := exec.CommandContext(ctx, "sh", "-c", "exit 2") + + start := time.Now() + done := make(chan error, 1) + go func() { done <- runPipeline(src, dst, "src", "dst") }() + + select { + case err := <-done: + if err == nil { + t.Fatal("expected error from dst failure") + } + // We prefer dst's error since src's failure is just a downstream symptom. + if !strings.Contains(err.Error(), "dst failed") { + t.Fatalf("expected dst failure, got %v", err) + } + if elapsed := time.Since(start); elapsed > 5*time.Second { + t.Fatalf("runPipeline took %v -- src was not killed promptly after dst exit", elapsed) + } + case <-time.After(10 * time.Second): + t.Fatal("runPipeline hung -- src was not killed after dst exited") + } +} + +// TestRunPipeline_LargeStream verifies that streaming more than the kernel +// pipe buffer (typically 64KB on Linux) works without deadlock. +func TestRunPipeline_LargeStream(t *testing.T) { + requireUnix(t) + ctx := t.Context() + + const size = 2 * 1024 * 1024 // 2 MiB + src := exec.CommandContext(ctx, "sh", "-c", "yes a | head -c "+itoa(size)) + var out bytes.Buffer + dst := exec.CommandContext(ctx, "cat") + dst.Stdout = &out + + if err := runPipeline(src, dst, "src", "dst"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if out.Len() != size { + t.Fatalf("dst received %d bytes, want %d", out.Len(), size) + } +} + +func TestRunPipeline_ContextCancel(t *testing.T) { + requireUnix(t) + ctx, cancel := context.WithCancel(t.Context()) + + // long-running src that ignores stdin + src := exec.CommandContext(ctx, "sh", "-c", "sleep 30") + dst := exec.CommandContext(ctx, "cat") + dst.Stdout = &bytes.Buffer{} + + done := make(chan error, 1) + go func() { done <- runPipeline(src, dst, "src", "dst") }() + + // give them a moment to start + time.Sleep(100 * time.Millisecond) + cancel() + + select { + case err := <-done: + if err == nil { + t.Fatal("expected error after context cancel") + } + // CommandContext kills the process; just ensure we got back. + var exitErr *exec.ExitError + if !errors.As(err, &exitErr) && !strings.Contains(err.Error(), "killed") && + !strings.Contains(err.Error(), "signal") { + // any non-nil error is acceptable here; we're mostly checking we don't hang + t.Logf("got error after cancel: %v", err) + } + case <-time.After(10 * time.Second): + t.Fatal("runPipeline did not return after context cancel") + } +} + +// itoa avoids importing strconv just for one call site in tests. +func itoa(n int) string { + if n == 0 { + return "0" + } + var buf [20]byte + i := len(buf) + for n > 0 { + i-- + buf[i] = byte('0' + n%10) + n /= 10 + } + return string(buf[i:]) +} From 377a6e326d1b6dd7ca8ff6ddb7c702b16d1ff78c Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 11 May 2026 22:08:05 +0530 Subject: [PATCH 17/21] run in transaction and fix test --- flow/connectors/postgres/pgdump_schema.go | 10 ++++++++++ flow/connectors/postgres/pgdump_schema_test.go | 9 +++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/pgdump_schema.go b/flow/connectors/postgres/pgdump_schema.go index 35a21d935..525ced4db 100644 --- a/flow/connectors/postgres/pgdump_schema.go +++ b/flow/connectors/postgres/pgdump_schema.go @@ -152,6 +152,16 @@ func buildPsqlArgs(config *protos.PostgresConfig) []string { "-h", config.Host, "-p", strconv.FormatUint(uint64(port), 10), "-d", config.Database, + // Wrap the entire dump in a single transaction so partial failures + // roll back cleanly (makes the activity safely retryable) and avoid + // per-statement autocommit overhead on high-latency links. + "--single-transaction", + // Without this, psql logs errors to stderr but exits 0, so a half- + // applied schema would be reported as success. ON_ERROR_STOP=1 makes + // psql exit non-zero on the first failed statement. + "-v", "ON_ERROR_STOP=1", + // Quiet informational chatter; errors still go to stderr. + "--quiet", } if config.User != "" { args = append(args, "-U", config.User) diff --git a/flow/connectors/postgres/pgdump_schema_test.go b/flow/connectors/postgres/pgdump_schema_test.go index 240b39dfe..42c3fcfd8 100644 --- a/flow/connectors/postgres/pgdump_schema_test.go +++ b/flow/connectors/postgres/pgdump_schema_test.go @@ -205,8 +205,13 @@ func TestRunPipeline_ContextCancel(t *testing.T) { requireUnix(t) ctx, cancel := context.WithCancel(t.Context()) - // long-running src that ignores stdin - src := exec.CommandContext(ctx, "sh", "-c", "sleep 30") + // Use exec'd binaries directly (not `sh -c "..."`). When sh is run with + // a single argument, many shells fork a child for the command rather than + // exec-replacing themselves. That child inherits sh's stderr fd, and Go's + // exec.Wait blocks draining stderr until every fd holder closes it -- so + // CommandContext killing sh isn't enough; the child keeps stderr open and + // Wait hangs. Using a single-process command avoids the inheritance. + src := exec.CommandContext(ctx, "sleep", "30") dst := exec.CommandContext(ctx, "cat") dst.Stdout = &bytes.Buffer{} From a9b50b5c758ac0dd4c10e2f8f68db9cd7d2cc63d Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 11 May 2026 22:32:08 +0530 Subject: [PATCH 18/21] fix: lint --- flow/connectors/postgres/pgdump_schema_test.go | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/flow/connectors/postgres/pgdump_schema_test.go b/flow/connectors/postgres/pgdump_schema_test.go index 42c3fcfd8..8f6b8c172 100644 --- a/flow/connectors/postgres/pgdump_schema_test.go +++ b/flow/connectors/postgres/pgdump_schema_test.go @@ -188,7 +188,8 @@ func TestRunPipeline_LargeStream(t *testing.T) { ctx := t.Context() const size = 2 * 1024 * 1024 // 2 MiB - src := exec.CommandContext(ctx, "sh", "-c", "yes a | head -c "+itoa(size)) + // #nosec G204 -- test-only, constant arguments + src := exec.CommandContext(ctx, "sh", "-c", "yes a | head -c 2097152") var out bytes.Buffer dst := exec.CommandContext(ctx, "cat") dst.Stdout = &out @@ -238,18 +239,3 @@ func TestRunPipeline_ContextCancel(t *testing.T) { t.Fatal("runPipeline did not return after context cancel") } } - -// itoa avoids importing strconv just for one call site in tests. -func itoa(n int) string { - if n == 0 { - return "0" - } - var buf [20]byte - i := len(buf) - for n > 0 { - i-- - buf[i] = byte('0' + n%10) - n /= 10 - } - return string(buf[i:]) -} From b37fa20dd6f206e4d79f99892c556d7b5ae75e48 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 14 May 2026 00:07:44 +0530 Subject: [PATCH 19/21] fix: filter out set transaction timeout --- flow/activities/flowable.go | 8 +- flow/connectors/postgres/pgdump_schema.go | 136 ++++++++++++++++-- .../connectors/postgres/pgdump_schema_test.go | 47 ++++-- 3 files changed, 167 insertions(+), 24 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 46a770b2d..a71484f2f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -2196,11 +2196,17 @@ func (a *FlowableActivity) RunPgDumpSchema( logger.Info("running pg_dump schema migration from source to destination", slog.String("source", input.SourceName), slog.String("destination", input.DestinationName)) + a.Alerter.LogFlowInfo(ctx, input.FlowName, + fmt.Sprintf("starting pg_dump schema migration from %s to %s", input.SourceName, input.DestinationName)) + start := time.Now() if err := connpostgres.RunPgDumpSchema(ctx, srcPgConfig.PostgresConfig, dstPgConfig.PostgresConfig); err != nil { return false, a.Alerter.LogFlowError(ctx, input.FlowName, fmt.Errorf("pg_dump schema migration failed: %w", err)) } - logger.Info("pg_dump schema migration completed successfully") + elapsed := time.Since(start).Round(time.Millisecond) + logger.Info("pg_dump schema migration completed successfully", slog.Duration("elapsed", elapsed)) + a.Alerter.LogFlowInfo(ctx, input.FlowName, + fmt.Sprintf("pg_dump schema migration completed successfully in %s", elapsed)) return true, nil } diff --git a/flow/connectors/postgres/pgdump_schema.go b/flow/connectors/postgres/pgdump_schema.go index 525ced4db..7c77560e2 100644 --- a/flow/connectors/postgres/pgdump_schema.go +++ b/flow/connectors/postgres/pgdump_schema.go @@ -1,17 +1,31 @@ package connpostgres import ( + "bufio" "bytes" "context" "fmt" + "io" "log/slog" "os" "os/exec" + "regexp" "strconv" "github.com/PeerDB-io/peerdb/flow/generated/protos" ) +// pg_dump from newer Postgres versions emits statements that older +// destinations don't recognize: +// - SET transaction_timeout = 0; (PG17+ session GUC) +// - \restrict / \unrestrict (pg_dump 17.6+ psql meta-commands +// that gate replay against an unrelated psql session; older psql treats +// them as unknown backslash commands and aborts under ON_ERROR_STOP) +// +// These are session/replay housekeeping and safe to drop on the wire so we +// keep ON_ERROR_STOP=1 for genuine DDL failures while remaining cross-version. +var incompatibleLineRE = regexp.MustCompile(`^(SET\s+transaction_timeout\s*=|\\(?:un)?restrict(\s|$))`) + // RunPgDumpSchema streams a schema-only pg_dump from source directly into psql // on the destination, piping stdout into stdin without intermediate files. func RunPgDumpSchema(ctx context.Context, srcConfig *protos.PostgresConfig, dstConfig *protos.PostgresConfig) error { @@ -43,17 +57,74 @@ func pipeCommand( appendTLSEnv(ctx, srcCmd, srcConfig) appendTLSEnv(ctx, psqlCmd, dstConfig) - return runPipeline(srcCmd, psqlCmd, srcBinary, "psql") + return runPipeline(ctx, srcCmd, psqlCmd, srcBinary, "psql", filterIncompatibleLines) +} + +// filterIncompatibleLines copies r->w line by line, dropping statements that +// are valid in newer pg_dump output but rejected by older psql/destinations. +func filterIncompatibleLines(ctx context.Context, r io.Reader, w io.Writer) error { + br := bufio.NewReaderSize(r, 64*1024) + for { + line, err := br.ReadBytes('\n') + if len(line) > 0 { + if !incompatibleLineRE.Match(line) { + if _, werr := w.Write(line); werr != nil { + return werr + } + } else { + slog.DebugContext(ctx, "dropping incompatible line from pg_dump stream", + slog.String("line", string(bytes.TrimRight(line, "\n")))) + } + } + if err != nil { + if err == io.EOF { + return nil + } + return err + } + } } -// runPipeline wires srcCmd's stdout into dstCmd's stdin and waits for both. -func runPipeline(srcCmd, dstCmd *exec.Cmd, srcName, dstName string) error { - pr, pw, err := os.Pipe() +// runPipeline wires srcCmd's stdout into dstCmd's stdin (optionally through a +// filter goroutine) and waits for both processes. +// +// Pipe topology: +// +// without filter: src.stdout -> srcW |--pipe--| srcR -> dst.stdin +// with filter: src.stdout -> srcW |--pipe--| srcR -> filter -> dstW |--pipe--| dstR -> dst.stdin +// +// File descriptor ownership matters here -- if the parent keeps a write end +// open after the child consumer dies, the producer can hang forever on a +// blocked write. We close each fd as soon as the child or filter goroutine +// owns it. +func runPipeline( + ctx context.Context, + srcCmd, dstCmd *exec.Cmd, + srcName, dstName string, + filter func(context.Context, io.Reader, io.Writer) error, +) error { + srcR, srcW, err := os.Pipe() if err != nil { - return fmt.Errorf("create pipe: %w", err) + return fmt.Errorf("create src pipe: %w", err) + } + srcCmd.Stdout = srcW + + var ( + dstR, dstW *os.File + filterDone chan error + ) + if filter == nil { + dstCmd.Stdin = srcR + } else { + dstR, dstW, err = os.Pipe() + if err != nil { + srcR.Close() + srcW.Close() + return fmt.Errorf("create dst pipe: %w", err) + } + dstCmd.Stdin = dstR + filterDone = make(chan error, 1) } - srcCmd.Stdout = pw - dstCmd.Stdin = pr var srcStderr, dstStderr bytes.Buffer srcCmd.Stderr = &srcStderr @@ -61,21 +132,48 @@ func runPipeline(srcCmd, dstCmd *exec.Cmd, srcName, dstName string) error { // Start dst first so it's ready to read. if err := dstCmd.Start(); err != nil { - pr.Close() - pw.Close() + srcR.Close() + srcW.Close() + if dstW != nil { + dstR.Close() + dstW.Close() + } return fmt.Errorf("start %s: %w", dstName, err) } - // dst now owns the read end in its child process. - pr.Close() + // dst owns its stdin fd in its child; close our copy. + if filter == nil { + srcR.Close() + } else { + dstR.Close() + } if err := srcCmd.Start(); err != nil { - pw.Close() + srcW.Close() + if dstW != nil { + // filter never started; close its writer so dst sees EOF. + dstW.Close() + // and the read side we still hold if filter==nil path wasn't taken. + if filter != nil { + srcR.Close() + } + } _ = dstCmd.Process.Kill() _ = dstCmd.Wait() return fmt.Errorf("start %s: %w", srcName, err) } - // src now owns the write end in its child process. - pw.Close() + // src owns its stdout fd in its child; close our copy. + srcW.Close() + + // Run the filter goroutine if configured. It bridges srcR -> dstW. + if filter != nil { + go func() { + err := filter(ctx, srcR, dstW) + // Always close both ends so the producer/consumer unblock. + srcR.Close() + dstW.Close() + filterDone <- err + }() + } srcDone := make(chan error, 1) dstDone := make(chan error, 1) @@ -105,6 +203,13 @@ func runPipeline(srcCmd, dstCmd *exec.Cmd, srcName, dstName string) error { } } + // Wait for the filter to finish so we surface any I/O error and so the + // goroutine doesn't outlive this function. + var filterErr error + if filterDone != nil { + filterErr = <-filterDone + } + // Report the original cause, not the side we killed in response. if dstErr != nil && !dstKilled { return fmt.Errorf("%s failed: %w\nstderr:\n%s", dstName, dstErr, dstStderr.String()) @@ -112,6 +217,9 @@ func runPipeline(srcCmd, dstCmd *exec.Cmd, srcName, dstName string) error { if srcErr != nil && !srcKilled { return fmt.Errorf("%s failed: %w\nstderr:\n%s", srcName, srcErr, srcStderr.String()) } + if filterErr != nil { + return fmt.Errorf("filter failed: %w", filterErr) + } // Fallback: both sides killed (e.g. ctx cancel) — surface whichever error we have. if srcErr != nil { return fmt.Errorf("%s failed: %w\nstderr:\n%s", srcName, srcErr, srcStderr.String()) diff --git a/flow/connectors/postgres/pgdump_schema_test.go b/flow/connectors/postgres/pgdump_schema_test.go index 8f6b8c172..86d4bd5af 100644 --- a/flow/connectors/postgres/pgdump_schema_test.go +++ b/flow/connectors/postgres/pgdump_schema_test.go @@ -28,7 +28,7 @@ func TestRunPipeline_HappyPath(t *testing.T) { dst := exec.CommandContext(ctx, "cat") dst.Stdout = &dstOut - if err := runPipeline(src, dst, "src", "dst"); err != nil { + if err := runPipeline(ctx, src, dst, "src", "dst", nil); err != nil { t.Fatalf("unexpected error: %v", err) } if got := dstOut.String(); got != "hello world" { @@ -42,7 +42,7 @@ func TestRunPipeline_SrcStartFails(t *testing.T) { src := exec.CommandContext(ctx, "/nonexistent/peerdb-test-binary") dst := exec.CommandContext(ctx, "cat") - err := runPipeline(src, dst, "src", "dst") + err := runPipeline(ctx, src, dst, "src", "dst", nil) if err == nil { t.Fatal("expected error, got nil") } @@ -61,7 +61,7 @@ func TestRunPipeline_DstStartFails(t *testing.T) { src := exec.CommandContext(ctx, "echo", "hi") dst := exec.CommandContext(ctx, "/nonexistent/peerdb-test-binary") - err := runPipeline(src, dst, "src", "dst") + err := runPipeline(ctx, src, dst, "src", "dst", nil) if err == nil { t.Fatal("expected error, got nil") } @@ -83,7 +83,7 @@ func TestRunPipeline_SrcExitsNonZero(t *testing.T) { dst := exec.CommandContext(ctx, "cat") dst.Stdout = &bytes.Buffer{} - err := runPipeline(src, dst, "src", "dst") + err := runPipeline(ctx, src, dst, "src", "dst", nil) if err == nil { t.Fatal("expected error, got nil") } @@ -100,7 +100,7 @@ func TestRunPipeline_DstExitsNonZero(t *testing.T) { // exit 3 immediately, ignoring stdin dst := exec.CommandContext(ctx, "sh", "-c", "exit 3") - err := runPipeline(src, dst, "src", "dst") + err := runPipeline(ctx, src, dst, "src", "dst", nil) if err == nil { t.Fatal("expected error, got nil") } @@ -129,7 +129,7 @@ func TestRunPipeline_SrcFailsWhileDstSlow(t *testing.T) { start := time.Now() done := make(chan error, 1) - go func() { done <- runPipeline(src, dst, "src", "dst") }() + go func() { done <- runPipeline(ctx, src, dst, "src", "dst", nil) }() select { case err := <-done: @@ -162,7 +162,7 @@ func TestRunPipeline_DstExitsWhileSrcWriting(t *testing.T) { start := time.Now() done := make(chan error, 1) - go func() { done <- runPipeline(src, dst, "src", "dst") }() + go func() { done <- runPipeline(ctx, src, dst, "src", "dst", nil) }() select { case err := <-done: @@ -194,7 +194,7 @@ func TestRunPipeline_LargeStream(t *testing.T) { dst := exec.CommandContext(ctx, "cat") dst.Stdout = &out - if err := runPipeline(src, dst, "src", "dst"); err != nil { + if err := runPipeline(ctx, src, dst, "src", "dst", nil); err != nil { t.Fatalf("unexpected error: %v", err) } if out.Len() != size { @@ -217,7 +217,7 @@ func TestRunPipeline_ContextCancel(t *testing.T) { dst.Stdout = &bytes.Buffer{} done := make(chan error, 1) - go func() { done <- runPipeline(src, dst, "src", "dst") }() + go func() { done <- runPipeline(ctx, src, dst, "src", "dst", nil) }() // give them a moment to start time.Sleep(100 * time.Millisecond) @@ -239,3 +239,32 @@ func TestRunPipeline_ContextCancel(t *testing.T) { t.Fatal("runPipeline did not return after context cancel") } } + +// TestRunPipeline_FilterStripsLines verifies the filter goroutine drops +// matching lines and forwards the rest. Covers SET transaction_timeout (PG17+) +// and \restrict / \unrestrict psql meta-commands (pg_dump 17.6+). +func TestRunPipeline_FilterStripsLines(t *testing.T) { + requireUnix(t) + ctx := t.Context() + + input := "SELECT 1;\n" + + "SET transaction_timeout = 0;\n" + + "\\restrict abc123\n" + + "CREATE TABLE t(id int);\n" + + "\\unrestrict abc123\n" + + "SELECT 2;\n" + src := exec.CommandContext(ctx, "printf", "%s", input) + var out bytes.Buffer + dst := exec.CommandContext(ctx, "cat") + dst.Stdout = &out + + if err := runPipeline(ctx, src, dst, "src", "dst", filterIncompatibleLines); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + got := out.String() + want := "SELECT 1;\nCREATE TABLE t(id int);\nSELECT 2;\n" + if got != want { + t.Fatalf("filtered output = %q, want %q", got, want) + } +} From 9471f04420e2e61032df342e09a43cfb1d867bd3 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 14 May 2026 15:48:06 +0530 Subject: [PATCH 20/21] fix: go sum after rebase --- flow/go.sum | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/go.sum b/flow/go.sum index 36acb1e31..52fe9f0cf 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -865,8 +865,8 @@ golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa h1:efT73AJZfAAUV7SOip6 golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa/go.mod h1:kHjTxDEnAu6/Nl9lDkzjWpR+bmKfxeiRuSDlsMb70gE= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= -golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= +golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= +golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= From d467ed2dc8bab82eab5720674447273e9f675c5f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 14 May 2026 16:21:15 +0530 Subject: [PATCH 21/21] fix: skip for table addition and resync --- flow/workflows/setup_flow.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 098797cd3..31d164a14 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -5,6 +5,7 @@ import ( "log/slog" "maps" "slices" + "strings" "time" "go.temporal.io/sdk/log" @@ -273,6 +274,17 @@ func (s *SetupFlowExecution) runPgDumpSchema( return ran, nil } +// isTableAdditionChild reports whether this SetupFlow was launched as part of a +// table-addition child CDC flow. Such workflows are spawned with a parent +// workflow ID prefixed by "additional-cdc-flow-". +func isTableAdditionChild(ctx workflow.Context) bool { + parent := workflow.GetInfo(ctx).ParentWorkflowExecution + if parent == nil { + return false + } + return strings.HasPrefix(parent.ID, "additional-cdc-flow-") +} + // getPGAutomatedSchemaDump checks the PEERDB_PG_AUTOMATED_SCHEMA_DUMP env flag via an activity. func (s *SetupFlowExecution) getPGAutomatedSchemaDump(ctx workflow.Context, env map[string]string) bool { checkCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -319,8 +331,11 @@ func (s *SetupFlowExecution) executeSetupFlow( // pg_dump silently no-ops for SSH tunnel / non-password-auth peers, so we // only skip CreateNormalizedTable when the activity reports it actually ran. + // Skip pg_dump for resync (tables get _resync suffix and are swapped) and for + // table-addition child workflows (parent workflow ID prefix "additional-cdc-flow-"). skipCreateTables := false - if config.System == protos.TypeSystem_PG && s.getPGAutomatedSchemaDump(ctx, config.Env) { + if config.System == protos.TypeSystem_PG && !config.Resync && !isTableAdditionChild(ctx) && + s.getPGAutomatedSchemaDump(ctx, config.Env) { ran, err := s.runPgDumpSchema(ctx, config) if err != nil { return nil, fmt.Errorf("failed to run pg_dump schema migration: %w", err)