diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index c50442e95..e1c59424a 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -34,6 +34,18 @@ const ( _peerdb_batch_id Int64, _peerdb_unchanged_toast_columns String )` + walSinkColumns = `( + _peerdb_uid UUID, + _peerdb_timestamp Int64, + _peerdb_destination_table_name String, + _peerdb_data String, + _peerdb_record_type Int, + _peerdb_match_data String, + _peerdb_batch_id Int64, + _peerdb_unchanged_toast_columns String, + _peerdb_txid UInt64, + _peerdb_lsn Int64 + )` zooPathPrefix = "/clickhouse/tables/{uuid}/{shard}/{database}/" ) @@ -42,6 +54,51 @@ func (c *ClickHouseConnector) GetRawTableName(flowJobName string) string { return "_peerdb_raw_" + shared.ReplaceIllegalCharactersWithUnderscores(flowJobName) } +// GetWALSinkTableName returns the WAL sink table name for CDC v2. +func (c *ClickHouseConnector) GetWALSinkTableName(flowJobName string) string { + return "_peerdb_wal_" + shared.ReplaceIllegalCharactersWithUnderscores(flowJobName) +} + +func (c *ClickHouseConnector) CreateWALSinkTable(ctx context.Context, flowJobName string) error { + var walDistributedName string + walTableName := c.GetWALSinkTableName(flowJobName) + engine := "MergeTree()" + if c.Config.Replicated { + engine = fmt.Sprintf( + "ReplicatedMergeTree('%s%s','{replica}')", + zooPathPrefix, + peerdb_clickhouse.EscapeStr(walTableName), + ) + } + onCluster := c.onCluster() + if onCluster != "" { + walDistributedName = walTableName + walTableName += "_shard" + } + + createWALTableSQL := `CREATE TABLE IF NOT EXISTS %s%s %s ENGINE = %s ORDER BY (_peerdb_txid, _peerdb_lsn)` + if err := c.execWithLogging(ctx, + fmt.Sprintf(createWALTableSQL, peerdb_clickhouse.QuoteIdentifier(walTableName), onCluster, walSinkColumns, engine), + ); err != nil { + return fmt.Errorf("unable to create WAL sink table: %w", err) + } + + if onCluster != "" { + createWALDistributedSQL := `CREATE TABLE IF NOT EXISTS %s%s %s ENGINE = Distributed(%s,%s,%s,cityHash64(_peerdb_uid))` + if err := c.execWithLogging(ctx, + fmt.Sprintf(createWALDistributedSQL, peerdb_clickhouse.QuoteIdentifier(walDistributedName), onCluster, + walSinkColumns, + peerdb_clickhouse.QuoteIdentifier(c.Config.Cluster), + peerdb_clickhouse.QuoteIdentifier(c.Config.Database), + peerdb_clickhouse.QuoteIdentifier(walTableName)), + ); err != nil { + return fmt.Errorf("unable to create distributed WAL sink table: %w", err) + } + } + + return nil +} + func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseName string, tableIdentifier string) (bool, error) { var result uint8 if err := c.queryRow(ctx, @@ -96,10 +153,16 @@ func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.Cr } func (c *ClickHouseConnector) avroSyncMethod(flowJobName string, env map[string]string, version uint32) *ClickHouseAvroSyncMethod { + return c.avroSyncMethodForTable(flowJobName, c.GetRawTableName(flowJobName), env, version) +} + +func (c *ClickHouseConnector) avroSyncMethodForTable( + flowJobName string, destTable string, env map[string]string, version uint32, +) *ClickHouseAvroSyncMethod { qrepConfig := &protos.QRepConfig{ StagingPath: c.staging.BucketPath(), FlowJobName: flowJobName, - DestinationTableIdentifier: c.GetRawTableName(flowJobName), + DestinationTableIdentifier: destTable, Env: env, Version: version, } @@ -147,8 +210,68 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( }, nil } +// syncRecordsViaAvroWALSink stages a CDC v2 batch into the WAL sink table. +// Records are written including _peerdb_txid and _peerdb_lsn; the normalize +// step later filters by committed XID. Committed XIDs for the batch are +// persisted in catalog (cdc_v2_committed_xids) so normalize can read them +// without depending on the source connection. +func (c *ClickHouseConnector) syncRecordsViaAvroWALSink( + ctx context.Context, + req *model.SyncRecordsRequest[model.RecordItems], + syncBatchID int64, +) (*model.SyncResponse, error) { + if err := c.CreateWALSinkTable(ctx, req.FlowJobName); err != nil { + return nil, fmt.Errorf("failed to ensure WAL sink table exists: %w", err) + } + + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) + unboundedNumericAsString, err := internal.PeerDBEnableClickHouseNumericAsString(ctx, req.Env) + if err != nil { + return nil, err + } + streamReq := model.NewRecordsToStreamRequest( + req.Records.GetRecords(), tableNameRowsMapping, syncBatchID, unboundedNumericAsString, + protos.DBType_CLICKHOUSE, + ) + numericTruncator := model.NewStreamNumericTruncator(req.TableMappings, NumericDestinationTypes) + stream, err := utils.RecordsToWALSinkStream(streamReq, numericTruncator) + if err != nil { + return nil, fmt.Errorf("failed to convert records to WAL sink stream: %w", err) + } + + avroSyncer := c.avroSyncMethodForTable(req.FlowJobName, c.GetWALSinkTableName(req.FlowJobName), req.Env, req.Version) + numRecords, err := avroSyncer.SyncRecords(ctx, req.Env, stream, req.FlowJobName, syncBatchID) + if err != nil { + return nil, err + } + warnings := numericTruncator.Warnings() + + if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Flags); err != nil { + return nil, fmt.Errorf("failed to sync schema changes: %w", err) + } + + if err := c.SetCommittedXIDsForBatch(ctx, req.FlowJobName, syncBatchID, req.Records.CommittedXIDs()); err != nil { + return nil, fmt.Errorf("failed to persist committed XIDs for batch: %w", err) + } + + return &model.SyncResponse{ + LastSyncedCheckpoint: req.Records.GetLastCheckpoint(), + NumRecordsSynced: numRecords, + CurrentSyncBatchID: syncBatchID, + TableNameRowsMapping: tableNameRowsMapping, + TableSchemaDeltas: req.Records.SchemaDeltas, + Warnings: warnings, + }, nil +} + func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { - res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID) + var res *model.SyncResponse + var err error + if req.Records.V2Active() { + res, err = c.syncRecordsViaAvroWALSink(ctx, req, req.SyncBatchID) + } else { + res, err = c.syncRecordsViaAvro(ctx, req, req.SyncBatchID) + } if err != nil { return nil, err } @@ -371,24 +494,27 @@ func (c *ClickHouseConnector) RenameTables( } func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName string) error { - // delete raw table if exists rawTableIdentifier := c.GetRawTableName(jobName) + walTableIdentifier := c.GetWALSinkTableName(jobName) onCluster := c.onCluster() dropTableSQLWithCHSetting := dropTableIfExistsSQL + chinternal.NewCHSettingsString(c.chVersion, chinternal.SettingMaxTableSizeToDrop, "0") - if err := c.execWithLogging(ctx, - fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(rawTableIdentifier), onCluster), - ); err != nil { - return fmt.Errorf("[clickhouse] unable to drop raw table: %w", err) - } - if onCluster != "" { + for _, base := range []string{rawTableIdentifier, walTableIdentifier} { if err := c.execWithLogging(ctx, - fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(rawTableIdentifier+"_shard"), onCluster), + fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(base), onCluster), ); err != nil { - return fmt.Errorf("[clickhouse] unable to drop raw table: %w", err) + return fmt.Errorf("[clickhouse] unable to drop %s: %w", base, err) + } + if onCluster != "" { + if err := c.execWithLogging(ctx, + fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(base+"_shard"), onCluster), + ); err != nil { + return fmt.Errorf("[clickhouse] unable to drop %s shard: %w", base, err) + } } } - c.logger.Info("successfully dropped raw table", slog.String("table", rawTableIdentifier)) + c.logger.Info("successfully dropped raw and WAL sink tables", + slog.String("rawTable", rawTableIdentifier), slog.String("walTable", walTableIdentifier)) return nil } diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 3bf70d887..17805f53c 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -444,20 +444,44 @@ func (c *ClickHouseConnector) NormalizeRecords( endBatchID := min(req.SyncBatchID, lastNormBatchID+groupBatches) - if err := c.copyAvroStagesToDestination(ctx, req.FlowJobName, lastNormBatchID, endBatchID, req.Env, req.Version); err != nil { + walSinkMode, err := internal.PeerDBCDCV2Enabled(ctx, req.Env) + if err != nil { + return model.NormalizeResponse{}, fmt.Errorf("failed to get CDC v2 WAL sink setting: %w", err) + } + + stagingTable := c.GetRawTableName(req.FlowJobName) + if walSinkMode { + stagingTable = c.GetWALSinkTableName(req.FlowJobName) + } + + if err := c.copyAvroStagesToDestination( + ctx, req.FlowJobName, stagingTable, lastNormBatchID, endBatchID, req.Env, req.Version, + ); err != nil { return model.NormalizeResponse{}, fmt.Errorf("failed to copy avro stages to destination: %w", err) } - destinationTableNames, err := c.getDistinctTableNamesInBatch( - ctx, - req.FlowJobName, - endBatchID, - lastNormBatchID, - req.TableNameSchemaMapping, - ) - if err != nil { - c.logger.Error("[clickhouse] error while getting distinct table names in batch", slog.Any("error", err)) - return model.NormalizeResponse{}, err + var committedXIDs []int64 + var destinationTableNames []string + if walSinkMode { + committedXIDs, err = c.GetCommittedXIDsForBatches(ctx, req.FlowJobName, lastNormBatchID, endBatchID) + if err != nil { + return model.NormalizeResponse{}, fmt.Errorf("failed to read committed XIDs: %w", err) + } + destinationTableNames, err = c.getDistinctTableNamesInWALSink( + ctx, req.FlowJobName, committedXIDs, req.TableNameSchemaMapping, + ) + if err != nil { + c.logger.Error("[clickhouse] error while getting distinct table names in WAL sink", slog.Any("error", err)) + return model.NormalizeResponse{}, err + } + } else { + destinationTableNames, err = c.getDistinctTableNamesInBatch( + ctx, req.FlowJobName, endBatchID, lastNormBatchID, req.TableNameSchemaMapping, + ) + if err != nil { + c.logger.Error("[clickhouse] error while getting distinct table names in batch", slog.Any("error", err)) + return model.NormalizeResponse{}, err + } } enablePrimaryUpdate, err := internal.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env) @@ -497,7 +521,8 @@ func (c *ClickHouseConnector) NormalizeRecords( lastNormBatchID int64 } queriesCh := make(chan queryInfo) - rawTbl := c.GetRawTableName(req.FlowJobName) + // rawTbl is the source table for the normalize query: raw table in v1, WAL sink table in v2. + rawTbl := stagingTable group, errCtx := errgroup.WithContext(ctx) // create N=PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE goroutines to process requests from queriesCh @@ -588,6 +613,8 @@ func (c *ClickHouseConnector) NormalizeRecords( req.SoftDeleteColName, req.Version, req.Flags, + walSinkMode, + committedXIDs, ) query, err := queryGenerator.BuildQuery(ctx) if err != nil { @@ -625,6 +652,16 @@ func (c *ClickHouseConnector) NormalizeRecords( slog.Int64("batchID", endBatchID), slog.Any("error", err)) return model.NormalizeResponse{}, err } + if walSinkMode { + if err := c.CleanupWALSink(ctx, req.FlowJobName, committedXIDs); err != nil { + c.logger.Warn("[clickhouse] failed to cleanup WAL sink table", + slog.Int64("endBatchID", endBatchID), slog.Any("error", err)) + } + if err := c.CleanupCommittedXIDs(ctx, req.FlowJobName, endBatchID); err != nil { + c.logger.Warn("[clickhouse] failed to cleanup committed XIDs metadata", + slog.Int64("endBatchID", endBatchID), slog.Any("error", err)) + } + } return model.NormalizeResponse{ StartBatchID: lastNormBatchID + 1, EndBatchID: endBatchID, @@ -670,14 +707,84 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch( return tableNames, nil } +func (c *ClickHouseConnector) getDistinctTableNamesInWALSink( + ctx context.Context, + flowJobName string, + committedXIDs []int64, + tableToSchema map[string]*protos.TableSchema, +) ([]string, error) { + if len(committedXIDs) == 0 { + return nil, nil + } + + walTbl := c.GetWALSinkTableName(flowJobName) + xidParts := make([]string, len(committedXIDs)) + for i, xid := range committedXIDs { + xidParts[i] = strconv.FormatInt(xid, 10) + } + xidList := strings.Join(xidParts, ",") + + q := fmt.Sprintf( + "SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_txid IN (%s)", + peerdb_clickhouse.QuoteIdentifier(walTbl), xidList) + + rows, err := c.query(ctx, q) + if err != nil { + return nil, fmt.Errorf("error while querying WAL sink table for distinct table names: %w", err) + } + defer rows.Close() + var tableNames []string + for rows.Next() { + var tableName string + if err := rows.Scan(&tableName); err != nil { + return nil, fmt.Errorf("error while scanning table name: %w", err) + } + if _, ok := tableToSchema[tableName]; ok { + tableNames = append(tableNames, tableName) + } else { + c.logger.Warn("table not found in table to schema mapping", "table", tableName) + } + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to read rows: %w", err) + } + return tableNames, nil +} + +// CleanupWALSink deletes rows for the given committed XIDs from the WAL sink +// table. XID-based (not LSN-based) so in-progress streamed transactions whose +// segments share an LSN range with committed transactions are preserved. +func (c *ClickHouseConnector) CleanupWALSink(ctx context.Context, flowJobName string, committedXIDs []int64) error { + if len(committedXIDs) == 0 { + return nil + } + walTbl := c.GetWALSinkTableName(flowJobName) + onCluster := c.onCluster() + if onCluster != "" { + // Lightweight deletes are not supported on Distributed tables + walTbl += "_shard" + } + xidParts := make([]string, len(committedXIDs)) + for i, xid := range committedXIDs { + xidParts[i] = strconv.FormatInt(xid, 10) + } + q := fmt.Sprintf("ALTER TABLE %s DELETE WHERE _peerdb_txid IN (%s)", + peerdb_clickhouse.QuoteIdentifier(walTbl), strings.Join(xidParts, ",")) + if err := c.execWithLogging(ctx, q); err != nil { + return fmt.Errorf("failed to cleanup WAL sink table: %w", err) + } + return nil +} + func (c *ClickHouseConnector) copyAvroStageToDestination( ctx context.Context, flowJobName string, + destTable string, syncBatchID int64, env map[string]string, version uint32, ) error { - avroSyncMethod := c.avroSyncMethod(flowJobName, env, version) + avroSyncMethod := c.avroSyncMethodForTable(flowJobName, destTable, env, version) avroFile, err := GetAvroStage(ctx, flowJobName, syncBatchID) if err != nil { return fmt.Errorf("failed to get avro stage: %w", err) @@ -691,20 +798,22 @@ func (c *ClickHouseConnector) copyAvroStageToDestination( } func (c *ClickHouseConnector) copyAvroStagesToDestination( - ctx context.Context, flowJobName string, lastNormBatchID int64, endBatchID int64, env map[string]string, version uint32, + ctx context.Context, flowJobName string, destTable string, + lastNormBatchID int64, endBatchID int64, env map[string]string, version uint32, ) error { - // Skip batches already copied to raw table. This can happen if a previous normalization - // run failed after copying to raw table but before completing normalization. + // Skip batches already copied. This can happen if a previous normalization + // run failed after copying to the staging table but before completing normalization. lastBatchIDInRawTable, err := c.GetLastBatchIDInRawTable(ctx, flowJobName) if err != nil { return fmt.Errorf("failed to get last batch id in raw table: %w", err) } lastCopiedBatchID := max(lastBatchIDInRawTable, lastNormBatchID) - c.logger.Info("[clickhouse] pushing s3 data to raw table", + c.logger.Info("[clickhouse] pushing s3 data to staging table", + slog.String("destTable", destTable), slog.Int64("batchID", lastCopiedBatchID), slog.Int64("endBatchID", endBatchID)) for batchID := lastCopiedBatchID + 1; batchID <= endBatchID; batchID++ { - if err := c.copyAvroStageToDestination(ctx, flowJobName, batchID, env, version); err != nil { + if err := c.copyAvroStageToDestination(ctx, flowJobName, destTable, batchID, env, version); err != nil { return fmt.Errorf("failed to copy avro stage to destination: %w", err) } c.logger.Info("[clickhouse] setting last batch id in raw table", diff --git a/flow/connectors/clickhouse/normalize_query.go b/flow/connectors/clickhouse/normalize_query.go index 64d0cf404..c4034e508 100644 --- a/flow/connectors/clickhouse/normalize_query.go +++ b/flow/connectors/clickhouse/normalize_query.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "slices" + "strconv" "strings" chproto "github.com/ClickHouse/clickhouse-go/v2/lib/proto" @@ -19,23 +20,28 @@ import ( type NormalizeQueryGenerator struct { env map[string]string - flags []string tableNameSchemaMapping map[string]*protos.TableSchema chVersion *chproto.Version + isDeletedColName string Query string TableName string rawTableName string - isDeletedColName string tableMappings []*protos.TableMapping + flags []string + committedXIDs []int64 lastNormBatchID int64 endBatchID int64 + version uint32 enablePrimaryUpdate bool sourceSchemaAsDestinationColumn bool cluster bool - version uint32 + walSinkMode bool } // NewTableNormalizeQuery constructs a TableNormalizeQuery with required fields. +// walSinkMode=true switches the FROM table to the WAL sink table and filters by +// committedXIDs instead of batch_id range; rawTableName should then be the WAL +// sink table name. func NewNormalizeQueryGenerator( tableName string, tableNameSchemaMapping map[string]*protos.TableSchema, @@ -51,6 +57,8 @@ func NewNormalizeQueryGenerator( configuredSoftDeleteColName string, version uint32, flags []string, + walSinkMode bool, + committedXIDs []int64, ) *NormalizeQueryGenerator { isDeletedColumn := isDeletedColName if configuredSoftDeleteColName != "" { @@ -71,6 +79,8 @@ func NewNormalizeQueryGenerator( isDeletedColName: isDeletedColumn, version: version, flags: flags, + walSinkMode: walSinkMode, + committedXIDs: committedXIDs, } } @@ -286,14 +296,26 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error fmt.Fprintf(&projection, "intDiv(_peerdb_record_type, 2) AS %s,", peerdb_clickhouse.QuoteIdentifier(isDeletedColName)) fmt.Fprintf(&colSelector, "%s,", peerdb_clickhouse.QuoteIdentifier(isDeletedColName)) - // add _peerdb_timestamp as _peerdb_version - fmt.Fprintf(&projection, "_peerdb_timestamp AS %s", peerdb_clickhouse.QuoteIdentifier(versionColName)) + // In WAL sink mode, use _peerdb_lsn for strict WAL ordering; otherwise use _peerdb_timestamp + versionSource := "_peerdb_timestamp" + if t.walSinkMode { + versionSource = "_peerdb_lsn" + } + fmt.Fprintf(&projection, "%s AS %s", versionSource, peerdb_clickhouse.QuoteIdentifier(versionColName)) fmt.Fprintf(&colSelector, "%s) ", peerdb_clickhouse.QuoteIdentifier(versionColName)) selectQuery.WriteString(projection.String()) - fmt.Fprintf(&selectQuery, - " FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_destination_table_name = %s", - peerdb_clickhouse.QuoteIdentifier(t.rawTableName), t.lastNormBatchID, t.endBatchID, peerdb_clickhouse.QuoteLiteral(t.TableName)) + + // WHERE clause: WAL sink mode filters by committed XIDs, v1 mode filters by batch_id range + if t.walSinkMode { + fmt.Fprintf(&selectQuery, + " FROM %s WHERE _peerdb_txid IN (%s) AND _peerdb_destination_table_name = %s", + peerdb_clickhouse.QuoteIdentifier(t.rawTableName), t.committedXIDsSQL(), peerdb_clickhouse.QuoteLiteral(t.TableName)) + } else { + fmt.Fprintf(&selectQuery, + " FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_destination_table_name = %s", + peerdb_clickhouse.QuoteIdentifier(t.rawTableName), t.lastNormBatchID, t.endBatchID, peerdb_clickhouse.QuoteLiteral(t.TableName)) + } if t.enablePrimaryUpdate { if t.sourceSchemaAsDestinationColumn { @@ -304,15 +326,24 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error fmt.Fprintf(&projectionUpdate, "1 AS %s,", peerdb_clickhouse.QuoteIdentifier(isDeletedColName)) // decrement timestamp by 1 so delete is ordered before latest data, // could be same if deletion records were only generated when ordering updated - fmt.Fprintf(&projectionUpdate, "_peerdb_timestamp - 1 AS %s", peerdb_clickhouse.QuoteIdentifier(versionColName)) + fmt.Fprintf(&projectionUpdate, "%s - 1 AS %s", versionSource, peerdb_clickhouse.QuoteIdentifier(versionColName)) selectQuery.WriteString(" UNION ALL SELECT ") selectQuery.WriteString(projectionUpdate.String()) - fmt.Fprintf(&selectQuery, - " FROM %s WHERE _peerdb_match_data != '' AND _peerdb_batch_id > %d AND _peerdb_batch_id <= %d"+ - " AND _peerdb_destination_table_name = %s AND _peerdb_record_type = 1", - peerdb_clickhouse.QuoteIdentifier(t.rawTableName), - t.lastNormBatchID, t.endBatchID, peerdb_clickhouse.QuoteLiteral(t.TableName)) + + if t.walSinkMode { + fmt.Fprintf(&selectQuery, + " FROM %s WHERE _peerdb_match_data != '' AND _peerdb_txid IN (%s)"+ + " AND _peerdb_destination_table_name = %s AND _peerdb_record_type = 1", + peerdb_clickhouse.QuoteIdentifier(t.rawTableName), + t.committedXIDsSQL(), peerdb_clickhouse.QuoteLiteral(t.TableName)) + } else { + fmt.Fprintf(&selectQuery, + " FROM %s WHERE _peerdb_match_data != '' AND _peerdb_batch_id > %d AND _peerdb_batch_id <= %d"+ + " AND _peerdb_destination_table_name = %s AND _peerdb_record_type = 1", + peerdb_clickhouse.QuoteIdentifier(t.rawTableName), + t.lastNormBatchID, t.endBatchID, peerdb_clickhouse.QuoteLiteral(t.TableName)) + } } chSettings := clickhouse.NewCHSettings(t.chVersion) @@ -333,6 +364,15 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error return t.Query, nil } +// committedXIDsSQL returns a comma-separated list of committed XIDs for use in SQL IN clause. +func (t *NormalizeQueryGenerator) committedXIDsSQL() string { + parts := make([]string, len(t.committedXIDs)) + for i, xid := range t.committedXIDs { + parts[i] = strconv.FormatInt(xid, 10) + } + return strings.Join(parts, ",") +} + func extendedTimeToDateTime(jsonExtractExpr string, time64Supported bool) string { if time64Supported { return fmt.Sprintf("toDateTime64(toTime64OrNull(%s, 6), 6)", jsonExtractExpr) diff --git a/flow/connectors/clickhouse/normalize_test.go b/flow/connectors/clickhouse/normalize_test.go index bf7cf3257..944b85d93 100644 --- a/flow/connectors/clickhouse/normalize_test.go +++ b/flow/connectors/clickhouse/normalize_test.go @@ -260,6 +260,8 @@ func TestBuildQuery_Basic(t *testing.T) { "", shared.InternalVersion_Latest, nil, + false, + nil, ) query, err := g.BuildQuery(ctx) @@ -315,6 +317,8 @@ func TestBuildQuery_WithPrimaryUpdate(t *testing.T) { "", shared.InternalVersion_Latest, nil, + false, + nil, ) query, err := g.BuildQuery(ctx) @@ -367,6 +371,8 @@ func TestBuildQuery_WithSourceSchemaAsDestinationColumn(t *testing.T) { "", shared.InternalVersion_Latest, nil, + false, + nil, ) query, err := g.BuildQuery(ctx) diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 23a4747d6..ad3a3ada8 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -286,6 +286,62 @@ func (p *PostgresMetadata) IsQRepPartitionSynced(ctx context.Context, req *proto return exists, nil } +// SetCommittedXIDsForBatch stores the set of committed transaction IDs for a CDC v2 batch. +func (p *PostgresMetadata) SetCommittedXIDsForBatch(ctx context.Context, flowName string, batchID int64, xids []int64) error { + if _, err := p.pool.Exec(ctx, + `INSERT INTO cdc_v2_committed_xids (flow_name, batch_id, committed_xids) + VALUES ($1, $2, $3) + ON CONFLICT (flow_name, batch_id) + DO UPDATE SET committed_xids = excluded.committed_xids`, + flowName, batchID, xids, + ); err != nil { + p.logger.Error("failed to set committed xids for batch", + slog.String("flowName", flowName), slog.Int64("batchID", batchID), slog.Any("error", err)) + return fmt.Errorf("failed to set committed xids for batch: %w", err) + } + return nil +} + +// GetCommittedXIDsForBatches returns all committed XIDs for the given batch range. +func (p *PostgresMetadata) GetCommittedXIDsForBatches( + ctx context.Context, flowName string, startBatchID int64, endBatchID int64, +) ([]int64, error) { + rows, err := p.pool.Query(ctx, + `SELECT committed_xids FROM cdc_v2_committed_xids + WHERE flow_name = $1 AND batch_id > $2 AND batch_id <= $3`, + flowName, startBatchID, endBatchID, + ) + if err != nil { + return nil, fmt.Errorf("failed to query committed xids: %w", err) + } + defer rows.Close() + + var allXIDs []int64 + for rows.Next() { + var batchXIDs []int64 + if err := rows.Scan(&batchXIDs); err != nil { + return nil, fmt.Errorf("failed to scan committed xids: %w", err) + } + allXIDs = append(allXIDs, batchXIDs...) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating committed xids rows: %w", err) + } + + return allXIDs, nil +} + +// CleanupCommittedXIDs removes committed XID records up to the given batch ID. +func (p *PostgresMetadata) CleanupCommittedXIDs(ctx context.Context, flowName string, upToBatchID int64) error { + if _, err := p.pool.Exec(ctx, + `DELETE FROM cdc_v2_committed_xids WHERE flow_name = $1 AND batch_id <= $2`, + flowName, upToBatchID, + ); err != nil { + return fmt.Errorf("failed to cleanup committed xids: %w", err) + } + return nil +} + func (p *PostgresMetadata) SyncFlowCleanup(ctx context.Context, jobName string) error { tx, err := p.pool.Begin(ctx) if err != nil { @@ -313,5 +369,9 @@ func SyncFlowCleanupInTx(ctx context.Context, tx pgx.Tx, jobName string) error { return err } + if _, err := tx.Exec(ctx, `DELETE FROM cdc_v2_committed_xids WHERE flow_name = $1`, jobName); err != nil { + return err + } + return nil } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 3262ca961..56a11877f 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -65,6 +65,14 @@ type PostgresCDCSource struct { handleInheritanceForNonPartitionedTables bool originMetadataAsDestinationColumn bool internalVersion uint32 + + // v2 protocol state + useV2Protocol bool + inStream bool + // tracks XID -> list of sub-XIDs seen during streaming for that transaction + activeStreams map[uint32][]uint32 + // set of committed XIDs (top-level + sub-XIDs) in current batch + committedXIDs map[uint32]struct{} } type PostgresCDCConfig struct { @@ -77,10 +85,11 @@ type PostgresCDCConfig struct { FlowJobName string Slot string Publication string + InternalVersion uint32 HandleInheritanceForNonPartitionedTables bool SourceSchemaAsDestinationColumn bool OriginMetaAsDestinationColumn bool - InternalVersion uint32 + UseV2Protocol bool } // Create a new PostgresCDCSource @@ -136,6 +145,9 @@ func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig handleInheritanceForNonPartitionedTables: cdcConfig.HandleInheritanceForNonPartitionedTables, originMetadataAsDestinationColumn: cdcConfig.OriginMetaAsDestinationColumn, internalVersion: cdcConfig.InternalVersion, + useV2Protocol: cdcConfig.UseV2Protocol, + activeStreams: make(map[uint32][]uint32), + committedXIDs: make(map[uint32]struct{}), }, nil } @@ -522,7 +534,16 @@ func PullCdcRecords[Items model.Items]( var latestServerWALEnd, lastXLogDataServerWALEnd atomic.Int64 defer func() { if totalRecords == 0 { - records.SignalAsEmpty() + // In v2 mode a batch may carry only a StreamCommit/Commit (no DML records) + // when a transaction's commit lands in a different batch than its inserts. + // Treat such batches as non-empty so the activity still calls SyncRecords + // and persists the committed XIDs; otherwise the prior batch's records + // in the WAL sink would never be marked committed and never normalize. + if p.useV2Protocol && len(p.committedXIDs) > 0 { + records.SignalAsNotEmpty() + } else { + records.SignalAsEmpty() + } } logger.Info("[finished] PullRecords", slog.Int64("records", totalRecords), @@ -624,50 +645,80 @@ func PullCdcRecords[Items model.Items]( } } - if p.commitLock == nil { + if p.useV2Protocol { + // In v2 mode batch boundaries are not gated on whole-transaction commit; + // huge transactions may span batches. But we still avoid returning while + // a streamed transaction is in flight (StreamStart seen, no StreamCommit/ + // Abort yet) so its commit XID is recorded alongside its inserts whenever + // possible. Without this guard a small streamed txn whose Commit arrives + // shortly after StreamStop could be split across two batches, leaving the + // records orphaned in the WAL sink with no committedXID to normalize. + inProgressStreams := p.inStream || len(p.activeStreams) > 0 if totalRecords >= int64(req.MaxBatchSize) { - logger.Info("batch filled, returning currently accumulated records", + logger.Info("v2: batch filled, returning currently accumulated records", slog.Int64("records", totalRecords), slog.Int64("bytes", totalFetchedBytes.Load()), - slog.Int("channelLen", records.ChannelLen()), slog.Float64("elapsedMinutes", time.Since(pullStart).Minutes())) return nil } - - if waitingForCommit { - logger.Info("commit received, returning currently accumulated records", - slog.Int64("records", totalRecords), - slog.Int64("bytes", totalFetchedBytes.Load()), - slog.Int("channelLen", records.ChannelLen()), - slog.Float64("elapsedMinutes", time.Since(pullStart).Minutes())) - return nil + if time.Now().After(nextStandbyMessageDeadline) { + if totalRecords != 0 && !inProgressStreams { + logger.Info("v2: standby deadline reached, returning records", + slog.Int64("records", totalRecords), + slog.Int64("bytes", totalFetchedBytes.Load()), + slog.Float64("elapsedMinutes", time.Since(pullStart).Minutes())) + return nil + } + logger.Info("standby deadline reached, no records accumulated or streams in flight, continuing to wait", + slog.Bool("inProgressStreams", inProgressStreams)) + nextStandbyMessageDeadline = time.Now().Add(req.IdleTimeout) } - } - - // if we are past the next standby deadline (?) - if time.Now().After(nextStandbyMessageDeadline) { - if totalRecords != 0 { - logger.Info("standby deadline reached", slog.Int64("records", totalRecords)) - - if p.commitLock == nil { - logger.Info("no commit lock, returning currently accumulated records", + } else { + if p.commitLock == nil { + if totalRecords >= int64(req.MaxBatchSize) { + logger.Info("batch filled, returning currently accumulated records", slog.Int64("records", totalRecords), slog.Int64("bytes", totalFetchedBytes.Load()), slog.Int("channelLen", records.ChannelLen()), slog.Float64("elapsedMinutes", time.Since(pullStart).Minutes())) return nil - } else { - logger.Info("commit lock, waiting for commit to return records", + } + + if waitingForCommit { + logger.Info("commit received, returning currently accumulated records", slog.Int64("records", totalRecords), slog.Int64("bytes", totalFetchedBytes.Load()), slog.Int("channelLen", records.ChannelLen()), slog.Float64("elapsedMinutes", time.Since(pullStart).Minutes())) - waitingForCommit = true + return nil } - } else { - logger.Info(("standby deadline reached, no records accumulated, continuing to wait")) } - nextStandbyMessageDeadline = time.Now().Add(req.IdleTimeout) + + // if we are past the next standby deadline (?) + if time.Now().After(nextStandbyMessageDeadline) { + if totalRecords != 0 { + logger.Info("standby deadline reached", slog.Int64("records", totalRecords)) + + if p.commitLock == nil { + logger.Info("no commit lock, returning currently accumulated records", + slog.Int64("records", totalRecords), + slog.Int64("bytes", totalFetchedBytes.Load()), + slog.Int("channelLen", records.ChannelLen()), + slog.Float64("elapsedMinutes", time.Since(pullStart).Minutes())) + return nil + } else { + logger.Info("commit lock, waiting for commit to return records", + slog.Int64("records", totalRecords), + slog.Int64("bytes", totalFetchedBytes.Load()), + slog.Int("channelLen", records.ChannelLen()), + slog.Float64("elapsedMinutes", time.Since(pullStart).Minutes())) + waitingForCommit = true + } + } else { + logger.Info(("standby deadline reached, no records accumulated, continuing to wait")) + } + nextStandbyMessageDeadline = time.Now().Add(req.IdleTimeout) + } } var receiveCtx context.Context @@ -736,7 +787,12 @@ func PullCdcRecords[Items model.Items]( logger.Debug("XLogData", slog.Any("WALStart", xld.WALStart), slog.Any("ServerWALEnd", xld.ServerWALEnd), slog.Time("ServerTime", xld.ServerTime)) - rec, err := processMessage(ctx, p, records, xld, clientXLogPos, processor) + var rec model.Record[Items] + if p.useV2Protocol { + rec, err = processMessageV2(ctx, p, records, xld, clientXLogPos, processor) + } else { + rec, err = processMessage(ctx, p, records, xld, clientXLogPos, processor) + } if err != nil { return exceptions.NewPostgresLogicalMessageProcessingError(err) } @@ -925,11 +981,11 @@ func processMessage[Items model.Items]( logger.Debug("BeginMessage", slog.Any("FinalLSN", msg.FinalLSN), slog.Uint64("XID", uint64(msg.Xid))) p.commitLock = msg case *pglogrepl.InsertMessage: - return processInsertMessage(p, xld.WALStart, msg, processor, customTypeMapping) + return processInsertMessage(p, xld.WALStart, msg, p.baseRecord(xld.WALStart), processor, customTypeMapping) case *pglogrepl.UpdateMessage: - return processUpdateMessage(p, xld.WALStart, msg, processor, customTypeMapping) + return processUpdateMessage(p, xld.WALStart, msg, p.baseRecord(xld.WALStart), processor, customTypeMapping) case *pglogrepl.DeleteMessage: - return processDeleteMessage(p, xld.WALStart, msg, processor, customTypeMapping) + return processDeleteMessage(p, xld.WALStart, msg, p.baseRecord(xld.WALStart), processor, customTypeMapping) case *pglogrepl.CommitMessage: // for a commit message, update the last checkpoint id for the record batch. logger.Debug("CommitMessage", @@ -991,10 +1047,189 @@ func processMessage[Items model.Items]( return nil, nil } +// baseRecordV2 creates a BaseRecord for v2 protocol DML messages. +// For streamed transactions xid is the in-stream XID and commitLock is nil. +// For non-streamed v2 transactions the wrapper carries xid==0 and the XID +// (and CommitTime) come from the surrounding Begin/Commit pair via commitLock, +// matching v1 baseRecord semantics. +func (p *PostgresCDCSource) baseRecordV2(lsn pglogrepl.LSN, xid uint32) model.BaseRecord { + var nano int64 + transactionID := uint64(xid) + if p.commitLock != nil { + nano = p.commitLock.CommitTime.UnixNano() + if transactionID == 0 { + transactionID = uint64(p.commitLock.Xid) + } + } + return model.BaseRecord{ + CheckpointID: int64(lsn), + CommitTimeNano: nano, + TransactionID: transactionID, + } +} + +func processMessageV2[Items model.Items]( + ctx context.Context, + p *PostgresCDCSource, + batch *model.CDCStream[Items], + xld pglogrepl.XLogData, + currentClientXlogPos pglogrepl.LSN, + processor replProcessor[Items], +) (model.Record[Items], error) { + logger := internal.LoggerFromCtx(ctx) + logicalMsg, err := pglogrepl.ParseV2(xld.WALData, p.inStream) + if err != nil { + return nil, fmt.Errorf("error parsing v2 logical message: %w", err) + } + customTypeMapping, err := p.fetchCustomTypeMapping(ctx) + if err != nil { + return nil, err + } + + switch msg := logicalMsg.(type) { + case *pglogrepl.StreamStartMessageV2: + logger.Debug("StreamStartMessageV2", + slog.Uint64("XID", uint64(msg.Xid)), + slog.Uint64("FirstSegment", uint64(msg.FirstSegment))) + p.inStream = true + if msg.FirstSegment == 1 { + // First segment of a new streaming transaction + p.activeStreams[msg.Xid] = nil + } + + case *pglogrepl.StreamStopMessageV2: + logger.Debug("StreamStopMessageV2") + p.inStream = false + + case *pglogrepl.StreamCommitMessageV2: + logger.Debug("StreamCommitMessageV2", + slog.Uint64("XID", uint64(msg.Xid)), + slog.Any("CommitLSN", msg.CommitLSN)) + // Mark top-level XID as committed + p.committedXIDs[msg.Xid] = struct{}{} + // Mark all sub-XIDs for this transaction as committed + if subXIDs, ok := p.activeStreams[msg.Xid]; ok { + for _, subXID := range subXIDs { + p.committedXIDs[subXID] = struct{}{} + } + delete(p.activeStreams, msg.Xid) + } + batch.UpdateLatestCheckpointID(int64(msg.CommitLSN)) + p.otelManager.Metrics.ReceivedCommitLSNGauge.Record(ctx, int64(msg.CommitLSN)) + p.otelManager.Metrics.CommitLagGauge.Record(ctx, time.Now().UTC().Sub(msg.CommitTime).Microseconds()) + + case *pglogrepl.StreamAbortMessageV2: + logger.Debug("StreamAbortMessageV2", + slog.Uint64("XID", uint64(msg.Xid)), + slog.Uint64("SubXID", uint64(msg.SubXid))) + // If SubXid != Xid, only the subtransaction is aborted + if msg.SubXid == msg.Xid { + delete(p.activeStreams, msg.Xid) + } else { + // Remove the specific sub-XID from tracking + if subXIDs, ok := p.activeStreams[msg.Xid]; ok { + filtered := subXIDs[:0] + for _, sid := range subXIDs { + if sid != msg.SubXid { + filtered = append(filtered, sid) + } + } + p.activeStreams[msg.Xid] = filtered + } + } + + // Non-streaming transactions still use Begin/Commit + case *pglogrepl.BeginMessage: + logger.Debug("BeginMessage", slog.Any("FinalLSN", msg.FinalLSN), slog.Uint64("XID", uint64(msg.Xid))) + p.commitLock = msg + case *pglogrepl.CommitMessage: + logger.Debug("CommitMessage", + slog.Any("CommitLSN", msg.CommitLSN), + slog.Any("TransactionEndLSN", msg.TransactionEndLSN)) + batch.UpdateLatestCheckpointID(int64(msg.CommitLSN)) + p.otelManager.Metrics.ReceivedCommitLSNGauge.Record(ctx, int64(msg.CommitLSN)) + p.otelManager.Metrics.CommitLagGauge.Record(ctx, time.Now().UTC().Sub(msg.CommitTime).Microseconds()) + // Mark the non-streaming transaction as committed + if p.commitLock != nil { + p.committedXIDs[p.commitLock.Xid] = struct{}{} + } + p.commitLock = nil + + // Streamed DML messages (v2 wrappers around v1 messages). For streamed + // transactions msg.Xid carries the XID; for non-streamed v2 transactions + // msg.Xid is 0 and baseRecordV2 falls back to commitLock. + case *pglogrepl.InsertMessageV2: + return processInsertMessageV2(p, xld.WALStart, msg, processor, customTypeMapping) + case *pglogrepl.UpdateMessageV2: + return processUpdateMessageV2(p, xld.WALStart, msg, processor, customTypeMapping) + case *pglogrepl.DeleteMessageV2: + return processDeleteMessageV2(p, xld.WALStart, msg, processor, customTypeMapping) + + case *pglogrepl.RelationMessageV2: + // Schema changes applied eagerly regardless of transaction commit status + rmsg := &msg.RelationMessage + originalRelID := rmsg.RelationID + var parentRelKind byte + rmsg.RelationID, parentRelKind, err = p.checkIfUnknownTableInherits(ctx, rmsg.RelationID) + if err != nil { + return nil, err + } + if _, exists := p.srcTableIDNameMapping[rmsg.RelationID]; !exists { + return nil, nil + } + if originalRelID != rmsg.RelationID && parentRelKind == 'p' && p.publishViaPartitionRoot { + return nil, nil + } + logger.Info("processing RelationMessageV2", + slog.Any("LSN", currentClientXlogPos), + slog.Uint64("RelationID", uint64(rmsg.RelationID)), + slog.String("Namespace", rmsg.Namespace), + slog.String("RelationName", rmsg.RelationName), + slog.Any("Columns", rmsg.Columns)) + return processRelationMessage[Items](ctx, p, currentClientXlogPos, rmsg) + + case *pglogrepl.LogicalDecodingMessageV2: + logger.Debug("LogicalDecodingMessageV2", + slog.Bool("Transactional", msg.Transactional), + slog.String("Prefix", msg.Prefix), + slog.String("LSN", msg.LSN.String())) + if !msg.Transactional { + batch.UpdateLatestCheckpointID(int64(msg.LSN)) + } + return &model.MessageRecord[Items]{ + BaseRecord: p.baseRecordV2(msg.LSN, msg.Xid), + Prefix: msg.Prefix, + Content: string(msg.Content), + }, nil + + default: + if _, ok := p.hushWarnUnhandledMessageType[msg.Type()]; !ok { + logger.Warn(fmt.Sprintf("Unhandled v2 message type: %T", msg)) + p.hushWarnUnhandledMessageType[msg.Type()] = struct{}{} + } + } + + return nil, nil +} + +// GetAndResetCommittedXIDs returns the current set of committed XIDs and resets it. +// XIDs are widened to int64 at the boundary so downstream code (catalog, +// connectors) can stay in a single integer type, while in-memory tracking +// uses the native postgres uint32 representation. +func (p *PostgresCDCSource) GetAndResetCommittedXIDs() []int64 { + xids := make([]int64, 0, len(p.committedXIDs)) + for xid := range p.committedXIDs { + xids = append(xids, int64(xid)) + } + clear(p.committedXIDs) + return xids +} + func processInsertMessage[Items model.Items]( p *PostgresCDCSource, lsn pglogrepl.LSN, msg *pglogrepl.InsertMessage, + baseRecord model.BaseRecord, processor replProcessor[Items], customTypeMapping map[uint32]shared.CustomDataType, ) (model.Record[Items], error) { @@ -1018,7 +1253,6 @@ func processInsertMessage[Items model.Items]( return nil, err } - baseRecord := p.baseRecord(lsn) items, _, err := processTuple(processor, p, msg.Tuple, rel, p.tableNameMapping[tableName], customTypeMapping, schemaName, baseRecord) if err != nil { return nil, fmt.Errorf("failed to process insert message for table %s: %w", tableName, err) @@ -1037,6 +1271,7 @@ func processUpdateMessage[Items model.Items]( p *PostgresCDCSource, lsn pglogrepl.LSN, msg *pglogrepl.UpdateMessage, + baseRecord model.BaseRecord, processor replProcessor[Items], customTypeMapping map[uint32]shared.CustomDataType, ) (model.Record[Items], error) { @@ -1065,7 +1300,6 @@ func processUpdateMessage[Items model.Items]( return nil, fmt.Errorf("failed to process update message (OldTuple) for table %s: %w", tableName, err) } - baseRecord := p.baseRecord(lsn) newItems, unchangedToastColumns, err := processTuple( processor, p, msg.NewTuple, rel, p.tableNameMapping[tableName], customTypeMapping, schemaName, baseRecord) if err != nil { @@ -1100,6 +1334,7 @@ func processDeleteMessage[Items model.Items]( p *PostgresCDCSource, lsn pglogrepl.LSN, msg *pglogrepl.DeleteMessage, + baseRecord model.BaseRecord, processor replProcessor[Items], customTypeMapping map[uint32]shared.CustomDataType, ) (model.Record[Items], error) { @@ -1123,7 +1358,6 @@ func processDeleteMessage[Items model.Items]( return nil, err } - baseRecord := p.baseRecord(lsn) items, _, err := processTuple(processor, p, msg.OldTuple, rel, p.tableNameMapping[tableName], customTypeMapping, schemaName, baseRecord) if err != nil { return nil, fmt.Errorf("failed to process delete message for table %s: %w", tableName, err) @@ -1137,6 +1371,40 @@ func processDeleteMessage[Items model.Items]( }, nil } +// processInsertMessageV2 / processUpdateMessageV2 / processDeleteMessageV2 wrap +// the v1 processors with a v2-aware BaseRecord. In streamed transactions the +// XID is carried on the wire-level wrapper (msg.Xid) rather than via Begin/Commit, +// so we cannot rely on commitLock for it. +func processInsertMessageV2[Items model.Items]( + p *PostgresCDCSource, + lsn pglogrepl.LSN, + msg *pglogrepl.InsertMessageV2, + processor replProcessor[Items], + customTypeMapping map[uint32]shared.CustomDataType, +) (model.Record[Items], error) { + return processInsertMessage(p, lsn, &msg.InsertMessage, p.baseRecordV2(lsn, msg.Xid), processor, customTypeMapping) +} + +func processUpdateMessageV2[Items model.Items]( + p *PostgresCDCSource, + lsn pglogrepl.LSN, + msg *pglogrepl.UpdateMessageV2, + processor replProcessor[Items], + customTypeMapping map[uint32]shared.CustomDataType, +) (model.Record[Items], error) { + return processUpdateMessage(p, lsn, &msg.UpdateMessage, p.baseRecordV2(lsn, msg.Xid), processor, customTypeMapping) +} + +func processDeleteMessageV2[Items model.Items]( + p *PostgresCDCSource, + lsn pglogrepl.LSN, + msg *pglogrepl.DeleteMessageV2, + processor replProcessor[Items], + customTypeMapping map[uint32]shared.CustomDataType, +) (model.Record[Items], error) { + return processDeleteMessage(p, lsn, &msg.DeleteMessage, p.baseRecordV2(lsn, msg.Xid), processor, customTypeMapping) +} + // processRelationMessage processes a RelationMessage and returns a TableSchemaDelta func processRelationMessage[Items model.Items]( ctx context.Context, diff --git a/flow/connectors/postgres/cdc_v2_test.go b/flow/connectors/postgres/cdc_v2_test.go new file mode 100644 index 000000000..a25852bdf --- /dev/null +++ b/flow/connectors/postgres/cdc_v2_test.go @@ -0,0 +1,40 @@ +package connpostgres + +import ( + "testing" + + "github.com/jackc/pglogrepl" + "github.com/stretchr/testify/require" +) + +// TestBaseRecordV2 covers the two cases the v2 DML dispatch hits: +// streaming (commitLock nil, wrapper Xid set) and non-streaming v2 +// (commitLock from Begin, wrapper Xid zero so we fall back to it). +func TestBaseRecordV2(t *testing.T) { + t.Run("streaming uses wrapper Xid", func(t *testing.T) { + p := &PostgresCDCSource{} + rec := p.baseRecordV2(pglogrepl.LSN(0xdeadbeef), 12345) + require.Equal(t, int64(0xdeadbeef), rec.CheckpointID) + require.Equal(t, uint64(12345), rec.TransactionID) + require.Equal(t, int64(0), rec.CommitTimeNano) + }) + + t.Run("non-streaming falls back to commitLock Xid", func(t *testing.T) { + p := &PostgresCDCSource{ + commitLock: &pglogrepl.BeginMessage{Xid: 67890}, + } + rec := p.baseRecordV2(pglogrepl.LSN(42), 0) + require.Equal(t, int64(42), rec.CheckpointID) + require.Equal(t, uint64(67890), rec.TransactionID, "should fall back to commitLock.Xid when wrapper Xid is 0") + }) + + t.Run("wrapper Xid wins over commitLock when both present", func(t *testing.T) { + // shouldn't happen in practice (StreamStartMessageV2 implies no Begin in flight), + // but the precedence matters: wrapper carries the streaming subtransaction XID. + p := &PostgresCDCSource{ + commitLock: &pglogrepl.BeginMessage{Xid: 67890}, + } + rec := p.baseRecordV2(pglogrepl.LSN(42), 12345) + require.Equal(t, uint64(12345), rec.TransactionID) + }) +} diff --git a/flow/connectors/postgres/postgres_source.go b/flow/connectors/postgres/postgres_source.go index 24eeeea83..a16e76a3b 100644 --- a/flow/connectors/postgres/postgres_source.go +++ b/flow/connectors/postgres/postgres_source.go @@ -101,6 +101,8 @@ func (c *PostgresConnector) MaybeStartReplication( publicationName string, lastOffset int64, pgVersion shared.PGVersion, + useV2Protocol bool, + env map[string]string, ) error { if c.replState != nil && (c.replState.Offset != lastOffset || c.replState.Slot != slotName || @@ -113,7 +115,7 @@ func (c *PostgresConnector) MaybeStartReplication( } if c.replState == nil { - replicationOpts, err := c.replicationOptions(publicationName, pgVersion) + replicationOpts, err := c.replicationOptions(publicationName, pgVersion, useV2Protocol) if err != nil { return fmt.Errorf("error getting replication options: %w", err) } @@ -126,6 +128,20 @@ func (c *PostgresConnector) MaybeStartReplication( c.replLock.Lock() defer c.replLock.Unlock() + + // Apply walsender-scoped GUC overrides before START_REPLICATION enters + // CopyBoth mode (after which the connection won't accept SQL). + if debugStream, err := internal.PeerDBPgDebugLogicalReplicationStreaming(ctx, env); err != nil { + return fmt.Errorf("error reading debug_logical_replication_streaming setting: %w", err) + } else if debugStream != "" { + c.logger.Warn("forcing debug_logical_replication_streaming on replication connection", + slog.String("value", debugStream)) + if _, err := c.replConn.Exec(ctx, + "SET debug_logical_replication_streaming = "+utils.QuoteLiteral(debugStream)); err != nil { + return fmt.Errorf("failed to set debug_logical_replication_streaming: %w", err) + } + } + if err := pglogrepl.StartReplication( ctx, c.replConn.PgConn(), common.QuoteIdentifier(slotName), startLSN, replicationOpts); err != nil { c.logger.Error("error starting replication", slog.Any("error", err)) @@ -144,9 +160,13 @@ func (c *PostgresConnector) MaybeStartReplication( return nil } -func (c *PostgresConnector) replicationOptions(publicationName string, pgVersion shared.PGVersion, +func (c *PostgresConnector) replicationOptions(publicationName string, pgVersion shared.PGVersion, useV2Protocol bool, ) (pglogrepl.StartReplicationOptions, error) { - pluginArguments := append(make([]string, 0, 3), "proto_version '1'") + protoVersion := "1" + if useV2Protocol { + protoVersion = "2" + } + pluginArguments := append(make([]string, 0, 5), "proto_version '"+protoVersion+"'") if publicationName != "" { pubOpt := "publication_names " + utils.QuoteLiteral(publicationName) @@ -159,6 +179,10 @@ func (c *PostgresConnector) replicationOptions(publicationName string, pgVersion pluginArguments = append(pluginArguments, "messages 'true'") } + if useV2Protocol { + pluginArguments = append(pluginArguments, "streaming 'on'") + } + return pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments}, nil } @@ -189,7 +213,11 @@ func pullCore[Items model.Items]( req *model.PullRecordsRequest[Items], processor replProcessor[Items], ) error { + var cdc *PostgresCDCSource defer func() { + if cdc != nil && cdc.useV2Protocol { + req.RecordStream.SetCommittedXIDs(cdc.GetAndResetCommittedXIDs()) + } req.RecordStream.Close() if c.replState != nil { c.replState.Offset = req.RecordStream.GetLastCheckpoint().ID @@ -229,7 +257,22 @@ func pullCore[Items model.Items]( if err != nil { return err } - if err := c.MaybeStartReplication(ctx, slotName, publicationName, req.LastOffset.ID, pgVersion); err != nil { + useV2Protocol, err := internal.PeerDBCDCV2Enabled(ctx, req.Env) + if err != nil { + return fmt.Errorf("failed to get CDC v2 WAL sink setting: %w", err) + } + // v2 protocol requires PostgreSQL >= 14 + if useV2Protocol && pgVersion < shared.POSTGRES_14 { + c.logger.Warn("CDC v2 WAL sink requires PostgreSQL >= 14, falling back to v1") + useV2Protocol = false + } + // Set v2 active on the stream before pull starts: the sync goroutine reads + // it concurrently and routes records based on this flag. + req.RecordStream.SetV2Active(useV2Protocol) + + if err := c.MaybeStartReplication( + ctx, slotName, publicationName, req.LastOffset.ID, pgVersion, useV2Protocol, req.Env, + ); err != nil { c.logger.Error("error starting replication", slog.Any("error", err)) return err } @@ -246,7 +289,7 @@ func pullCore[Items model.Items]( return fmt.Errorf("failed to get get setting for originMetaAsDestinationColumn: %w", err) } - cdc, err := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{ + cdc, err = c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{ CatalogPool: catalogPool, OtelManager: otelManager, SrcTableIDNameMapping: req.SrcTableIDNameMapping, @@ -260,6 +303,7 @@ func pullCore[Items model.Items]( SourceSchemaAsDestinationColumn: sourceSchemaAsDestinationColumn, OriginMetaAsDestinationColumn: originMetaAsDestinationColumn, InternalVersion: req.InternalVersion, + UseV2Protocol: useV2Protocol, }) if err != nil { c.logger.Error("error creating cdc source", slog.Any("error", err)) diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 01b2fe34c..b35bbf10d 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -146,6 +146,113 @@ func recordToQRecordOrError( return entries[:], nil } +// RecordsToWALSinkStream converts records to a stream suitable for the WAL sink table (v2). +// Includes _peerdb_txid and _peerdb_lsn columns from the record's BaseRecord. +func RecordsToWALSinkStream( + req *model.RecordsToStreamRequest[model.RecordItems], numericTruncator model.StreamNumericTruncator, +) (*model.QRecordStream, error) { + recordStream := model.NewQRecordStream(1024) + recordStream.SetSchema(types.QRecordSchema{ + Fields: []types.QField{ + {Name: "_peerdb_uid", Type: types.QValueKindString, Nullable: false}, + {Name: "_peerdb_timestamp", Type: types.QValueKindInt64, Nullable: false}, + {Name: "_peerdb_destination_table_name", Type: types.QValueKindString, Nullable: false}, + {Name: "_peerdb_data", Type: types.QValueKindString, Nullable: false}, + {Name: "_peerdb_record_type", Type: types.QValueKindInt64, Nullable: true}, + {Name: "_peerdb_match_data", Type: types.QValueKindString, Nullable: true}, + {Name: "_peerdb_batch_id", Type: types.QValueKindInt64, Nullable: true}, + {Name: "_peerdb_unchanged_toast_columns", Type: types.QValueKindString, Nullable: true}, + {Name: "_peerdb_txid", Type: types.QValueKindInt64, Nullable: false}, + {Name: "_peerdb_lsn", Type: types.QValueKindInt64, Nullable: false}, + }, + }) + + go func() { + for record := range req.GetRecords() { + record.PopulateCountMap(req.TableMapping) + qRecord, err := recordToWALSinkQRecordOrError( + req.BatchID, record, req.TargetDWH, req.UnboundedNumericAsString, numericTruncator, + ) + if err != nil { + recordStream.Close(err) + return + } else if qRecord != nil { + recordStream.Records <- qRecord + } + } + + close(recordStream.Records) + }() + return recordStream, nil +} + +func recordToWALSinkQRecordOrError( + batchID int64, record model.Record[model.RecordItems], targetDWH protos.DBType, unboundedNumericAsString bool, + numericTruncator model.StreamNumericTruncator, +) ([]types.QValue, error) { + var entries [10]types.QValue + switch typedRecord := record.(type) { + case *model.InsertRecord[model.RecordItems]: + tableNumericTruncator := numericTruncator.Get(typedRecord.DestinationTableName) + preprocessedItems := truncateNumerics( + typedRecord.Items, targetDWH, unboundedNumericAsString, tableNumericTruncator, + ) + itemsJSON, err := model.ItemsToJSON(preprocessedItems) + if err != nil { + return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err) + } + + entries[3] = types.QValueString{Val: itemsJSON} + entries[4] = types.QValueInt64{Val: 0} + entries[5] = types.QValueString{Val: ""} + entries[7] = types.QValueString{Val: ""} + case *model.UpdateRecord[model.RecordItems]: + tableNumericTruncator := numericTruncator.Get(typedRecord.DestinationTableName) + preprocessedItems := truncateNumerics( + typedRecord.NewItems, targetDWH, unboundedNumericAsString, tableNumericTruncator, + ) + newItemsJSON, err := model.ItemsToJSON(preprocessedItems) + if err != nil { + return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err) + } + oldItemsJSON, err := model.ItemsToJSON(typedRecord.OldItems) + if err != nil { + return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err) + } + + entries[3] = types.QValueString{Val: newItemsJSON} + entries[4] = types.QValueInt64{Val: 1} + entries[5] = types.QValueString{Val: oldItemsJSON} + entries[7] = types.QValueString{Val: KeysToString(typedRecord.UnchangedToastColumns)} + + case *model.DeleteRecord[model.RecordItems]: + itemsJSON, err := model.ItemsToJSON(typedRecord.Items) + if err != nil { + return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err) + } + + entries[3] = types.QValueString{Val: itemsJSON} + entries[4] = types.QValueInt64{Val: 2} + entries[5] = types.QValueString{Val: itemsJSON} + entries[7] = types.QValueString{Val: KeysToString(typedRecord.UnchangedToastColumns)} + + case *model.MessageRecord[model.RecordItems]: + return nil, nil + + default: + return nil, fmt.Errorf("unknown record type: %T", typedRecord) + } + + entries[0] = types.QValueUUID{Val: uuid.New()} + entries[1] = types.QValueInt64{Val: time.Now().UnixNano()} + entries[2] = types.QValueString{Val: record.GetDestinationTableName()} + entries[6] = types.QValueInt64{Val: batchID} + entries[8] = types.QValueInt64{Val: int64(record.GetTransactionID())} + entries[9] = types.QValueInt64{Val: record.GetCheckpointID()} + + return entries[:], nil +} + func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts { tableNameRowsMapping := make(map[string]*model.RecordTypeCounts, len(tableMaps)) for _, mapping := range tableMaps { diff --git a/flow/e2e/clickhouse_test.go b/flow/e2e/clickhouse_test.go index 29fc1d536..a91738b5c 100644 --- a/flow/e2e/clickhouse_test.go +++ b/flow/e2e/clickhouse_test.go @@ -24,6 +24,7 @@ import ( connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres" "github.com/PeerDB-io/peerdb/flow/e2eshared" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" "github.com/PeerDB-io/peerdb/flow/model" "github.com/PeerDB-io/peerdb/flow/model/qvalue" "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse" @@ -3576,3 +3577,130 @@ func (s ClickHouseSuite) Test_Composite_PKey() { env.Cancel(s.t.Context()) RequireEnvCanceled(s.t, env) } + +// Test_CDC_V2_Protocol exercises PEERDB_CDC_V2_ENABLED end-to-end. +// On the source it switches the Postgres replication plugin to proto_version '2' +// with streaming 'on'. On the destination it routes records into a separate +// _peerdb_wal_ sink table (instead of the v1 _peerdb_raw_) and +// makes the normalize step filter by committed XIDs read from catalog table +// cdc_v2_committed_xids. +// +// Asserts: +// - row equality across initial, single insert, multi-statement tx, and delete +// (all non-streamed: small enough to fit under logical_decoding_work_mem). +// - WAL sink table populated with non-zero _peerdb_txid (v2 sink fired). +// - v1 raw table unused. +// - cdc_v2_committed_xids cleaned up post-normalize. +// - Streamed-transaction path: forced via +// PEERDB_PG_DEBUG_LOGICAL_REPLICATION_STREAMING=immediate (PG14+, scoped to +// this slot's walsender — does not leak to parallel v1 tests). With this +// GUC PG streams every change rather than waiting for the in-progress +// transaction to exceed logical_decoding_work_mem. +func (s ClickHouseSuite) Test_CDC_V2_Protocol() { + pgSource, ok := s.source.(*PostgresSource) + if !ok { + s.t.Skip("v2 protocol only applies to postgres source") + } + + srcTableName := "test_v2_protocol" + srcFullName := s.attachSchemaSuffix(srcTableName) + dstTableName := "test_v2_protocol_dst" + + require.NoError(s.t, pgSource.Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + "key" TEXT NOT NULL, + val TEXT + ); + `, srcFullName))) + + require.NoError(s.t, pgSource.Exec(s.t.Context(), + fmt.Sprintf(`INSERT INTO %s ("key", val) VALUES ('init','a')`, srcFullName))) + + flowJobName := s.attachSuffix("ch_v2_protocol") + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: flowJobName, + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + flowConnConfig.Env = map[string]string{ + "PEERDB_CDC_V2_ENABLED": "true", + // Force streaming for every change so a moderate-sized transaction + // (below logical_decoding_work_mem) still exercises StreamStart/Commit. + "PEERDB_PG_DEBUG_LOGICAL_REPLICATION_STREAMING": "immediate", + } + + tc := NewTemporalClient(s.t) + env := ExecutePeerflow(s.t, tc, flowConnConfig) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + EnvWaitForEqualTablesWithNames(env, s, "v2 initial", srcTableName, dstTableName, `id,"key",val`) + + require.NoError(s.t, pgSource.Exec(s.t.Context(), + fmt.Sprintf(`INSERT INTO %s ("key", val) VALUES ('cdc1','b')`, srcFullName))) + EnvWaitForEqualTablesWithNames(env, s, "v2 single insert", srcTableName, dstTableName, `id,"key",val`) + + require.NoError(s.t, pgSource.Exec(s.t.Context(), fmt.Sprintf( + `BEGIN; INSERT INTO %[1]s ("key", val) VALUES ('tx1','c'); INSERT INTO %[1]s ("key", val) VALUES ('tx2','d');`+ + ` UPDATE %[1]s SET val='upd' WHERE "key"='init'; COMMIT;`, srcFullName))) + EnvWaitForEqualTablesWithNames(env, s, "v2 multi-statement tx", srcTableName, dstTableName, `id,"key",val`) + + require.NoError(s.t, pgSource.Exec(s.t.Context(), + fmt.Sprintf(`DELETE FROM %s WHERE "key"='tx1'`, srcFullName))) + EnvWaitForEqualTablesWithNames(env, s, "v2 delete", srcTableName, dstTableName, `id,"key",val`) + + // Verify streaming actually happened. With debug_logical_replication_streaming + // = 'immediate', every change above streams via StreamStart/InsertMessageV2/ + // StreamCommit rather than buffered Begin/Commit. Walsender accounts these + // in pg_stat_replication_slots.stream_txns. + slotName := "peerflow_slot_" + flowJobName + var streamTxns int64 + require.NoError(s.t, pgSource.PostgresConnector.Conn().QueryRow(s.t.Context(), + "SELECT stream_txns FROM pg_stat_replication_slots WHERE slot_name = $1", + slotName, + ).Scan(&streamTxns)) + require.Positive(s.t, streamTxns, + "expected at least one streamed transaction with debug_logical_replication_streaming=immediate; got %d", streamTxns) + + // Verify the v2 pipeline actually fired rather than silently falling back to v1. + flowSlug := shared.ReplaceIllegalCharactersWithUnderscores(flowJobName) + walSinkTable := "_peerdb_wal_" + flowSlug + rawTable := "_peerdb_raw_" + flowSlug + + ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) + require.NoError(s.t, err) + defer ch.Close() + + var walRows uint64 + require.NoError(s.t, ch.QueryRow(s.t.Context(), + "SELECT count() FROM "+clickhouse.QuoteIdentifier(walSinkTable)+ + " WHERE _peerdb_txid > 0 SETTINGS use_query_cache = false", + ).Scan(&walRows), + "WAL sink table should exist when v2 mode is active") + + var rawRows uint64 + if err := ch.QueryRow(s.t.Context(), + "SELECT count() FROM "+clickhouse.QuoteIdentifier(rawTable)+ + " SETTINGS use_query_cache = false", + ).Scan(&rawRows); err == nil { + require.Zero(s.t, rawRows, "v1 raw table should be empty when v2 pipeline is active") + } + // After normalize, the catalog's cdc_v2_committed_xids rows for normalized + // batches should be cleaned up. + catalog, err := internal.GetCatalogConnectionPoolFromEnv(s.t.Context()) + require.NoError(s.t, err) + var leftoverXIDBatches int64 + require.NoError(s.t, catalog.QueryRow(s.t.Context(), + `SELECT count(*) FROM cdc_v2_committed_xids + WHERE flow_name = $1 + AND batch_id <= (SELECT normalize_batch_id FROM metadata_last_sync_state WHERE job_name = $1)`, + flowJobName, + ).Scan(&leftoverXIDBatches)) + require.Zero(s.t, leftoverXIDBatches, + "cdc_v2_committed_xids should be cleaned up for normalized batches") + + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} diff --git a/flow/internal/dynamicconf.go b/flow/internal/dynamicconf.go index 351f83457..ae68b84eb 100644 --- a/flow/internal/dynamicconf.go +++ b/flow/internal/dynamicconf.go @@ -411,6 +411,26 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, + { + Name: "PEERDB_CDC_V2_ENABLED", + Description: "Enable CDC v2 protocol with for ClickHouse destinations", + DefaultValue: "false", + ValueType: protos.DynconfValueType_BOOL, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, + TargetForSetting: protos.DynconfTarget_CLICKHOUSE, + }, + { + // Test/debug only. When set to "immediate" PG streams every change + // regardless of logical_decoding_work_mem (PG14+, PGC_USERSET, used + // by PG's own logical-replication test suite). Applied via SET on + // the per-flow replication connection so blast radius is one slot. + Name: "PEERDB_PG_DEBUG_LOGICAL_REPLICATION_STREAMING", + Description: "Override debug_logical_replication_streaming on the replication connection (buffered | immediate)", + DefaultValue: "", + ValueType: protos.DynconfValueType_STRING, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, + TargetForSetting: protos.DynconfTarget_ALL, + }, { Name: "PEERDB_POSTGRES_ENABLE_FAILOVER_SLOTS", Description: "Create slots with failover enabled when possible", @@ -784,6 +804,14 @@ func PeerDBForceInternalVersion(ctx context.Context, env map[string]string) (uin return dynamicConfUnsigned[uint32](ctx, env, "PEERDB_FORCE_INTERNAL_VERSION") } +func PeerDBCDCV2Enabled(ctx context.Context, env map[string]string) (bool, error) { + return dynamicConfBool(ctx, env, "PEERDB_CDC_V2_ENABLED") +} + +func PeerDBPgDebugLogicalReplicationStreaming(ctx context.Context, env map[string]string) (string, error) { + return dynLookup(ctx, env, "PEERDB_PG_DEBUG_LOGICAL_REPLICATION_STREAMING") +} + func PeerDBPostgresEnableFailoverSlots(ctx context.Context, env map[string]string) (bool, error) { return dynamicConfBool(ctx, env, "PEERDB_POSTGRES_ENABLE_FAILOVER_SLOTS") } diff --git a/flow/model/cdc_stream.go b/flow/model/cdc_stream.go index 26c0da3c8..38b1d2703 100644 --- a/flow/model/cdc_stream.go +++ b/flow/model/cdc_stream.go @@ -17,12 +17,17 @@ type CDCStream[T Items] struct { lastCheckpointText string // Schema changes from slot SchemaDeltas []*protos.TableSchemaDelta + // CDC v2: XIDs whose commit was fully observed in this batch. + // Set by source after PullRecords drains, before Close; read by sync. + committedXIDs []int64 // lastCheckpointID is the last ID of the commit that corresponds to this batch. lastCheckpointID int64 lastCheckpointSet bool needsNormalize bool - empty bool - emptySet bool + // CDC v2 protocol was actually used on the source side for this batch + v2Active bool + empty bool + emptySet bool } type CdcCheckpoint struct { @@ -129,3 +134,30 @@ func (r *CDCStream[T]) AddSchemaDelta( func (r *CDCStream[T]) NeedsNormalize() bool { return r.needsNormalize } + +// SetV2Active is called by the source at the start of pull, before any DML +// records arrive. The sync path needs to know v2 mode before it picks an avro +// destination table, which happens concurrently with pull — leaving this to +// the post-pull defer races against the sync goroutine and routes records to +// the v1 raw table while normalize reads from the v2 WAL sink. +func (r *CDCStream[T]) SetV2Active(active bool) { + r.v2Active = active +} + +// SetCommittedXIDs is called by the source after pull drains. Receiving any +// committed XID forces a normalize pass even with zero DML records in this +// batch: prior batches' WAL sink rows still need promotion. +func (r *CDCStream[T]) SetCommittedXIDs(committedXIDs []int64) { + r.committedXIDs = committedXIDs + if r.v2Active && len(committedXIDs) > 0 { + r.needsNormalize = true + } +} + +func (r *CDCStream[T]) V2Active() bool { + return r.v2Active +} + +func (r *CDCStream[T]) CommittedXIDs() []int64 { + return r.committedXIDs +} diff --git a/nexus/catalog/migrations/V54__cdc_v2_committed_xids.sql b/nexus/catalog/migrations/V54__cdc_v2_committed_xids.sql new file mode 100644 index 000000000..9a7c2308b --- /dev/null +++ b/nexus/catalog/migrations/V54__cdc_v2_committed_xids.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS cdc_v2_committed_xids ( + flow_name TEXT NOT NULL, + batch_id BIGINT NOT NULL, + committed_xids BIGINT[] NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (flow_name, batch_id) +);