Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 138 additions & 12 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ const (
_peerdb_batch_id Int64,
_peerdb_unchanged_toast_columns String
)`
walSinkColumns = `(
_peerdb_uid UUID,
_peerdb_timestamp Int64,
_peerdb_destination_table_name String,
_peerdb_data String,
_peerdb_record_type Int,
_peerdb_match_data String,
_peerdb_batch_id Int64,
_peerdb_unchanged_toast_columns String,
_peerdb_txid UInt64,
_peerdb_lsn Int64
)`
zooPathPrefix = "/clickhouse/tables/{uuid}/{shard}/{database}/"
)

Expand All @@ -42,6 +54,51 @@ func (c *ClickHouseConnector) GetRawTableName(flowJobName string) string {
return "_peerdb_raw_" + shared.ReplaceIllegalCharactersWithUnderscores(flowJobName)
}

// GetWALSinkTableName returns the WAL sink table name for CDC v2.
func (c *ClickHouseConnector) GetWALSinkTableName(flowJobName string) string {
return "_peerdb_wal_" + shared.ReplaceIllegalCharactersWithUnderscores(flowJobName)
}

func (c *ClickHouseConnector) CreateWALSinkTable(ctx context.Context, flowJobName string) error {
var walDistributedName string
walTableName := c.GetWALSinkTableName(flowJobName)
engine := "MergeTree()"
if c.Config.Replicated {
engine = fmt.Sprintf(
"ReplicatedMergeTree('%s%s','{replica}')",
zooPathPrefix,
peerdb_clickhouse.EscapeStr(walTableName),
)
}
onCluster := c.onCluster()
if onCluster != "" {
walDistributedName = walTableName
walTableName += "_shard"
}

createWALTableSQL := `CREATE TABLE IF NOT EXISTS %s%s %s ENGINE = %s ORDER BY (_peerdb_txid, _peerdb_lsn)`
if err := c.execWithLogging(ctx,
fmt.Sprintf(createWALTableSQL, peerdb_clickhouse.QuoteIdentifier(walTableName), onCluster, walSinkColumns, engine),
); err != nil {
return fmt.Errorf("unable to create WAL sink table: %w", err)
}

if onCluster != "" {
createWALDistributedSQL := `CREATE TABLE IF NOT EXISTS %s%s %s ENGINE = Distributed(%s,%s,%s,cityHash64(_peerdb_uid))`
if err := c.execWithLogging(ctx,
fmt.Sprintf(createWALDistributedSQL, peerdb_clickhouse.QuoteIdentifier(walDistributedName), onCluster,
walSinkColumns,
peerdb_clickhouse.QuoteIdentifier(c.Config.Cluster),
peerdb_clickhouse.QuoteIdentifier(c.Config.Database),
peerdb_clickhouse.QuoteIdentifier(walTableName)),
); err != nil {
return fmt.Errorf("unable to create distributed WAL sink table: %w", err)
}
}

return nil
}

func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseName string, tableIdentifier string) (bool, error) {
var result uint8
if err := c.queryRow(ctx,
Expand Down Expand Up @@ -96,10 +153,16 @@ func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.Cr
}

func (c *ClickHouseConnector) avroSyncMethod(flowJobName string, env map[string]string, version uint32) *ClickHouseAvroSyncMethod {
return c.avroSyncMethodForTable(flowJobName, c.GetRawTableName(flowJobName), env, version)
}

func (c *ClickHouseConnector) avroSyncMethodForTable(
flowJobName string, destTable string, env map[string]string, version uint32,
) *ClickHouseAvroSyncMethod {
qrepConfig := &protos.QRepConfig{
StagingPath: c.staging.BucketPath(),
FlowJobName: flowJobName,
DestinationTableIdentifier: c.GetRawTableName(flowJobName),
DestinationTableIdentifier: destTable,
Env: env,
Version: version,
}
Expand Down Expand Up @@ -147,8 +210,68 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
}, nil
}

// syncRecordsViaAvroWALSink stages a CDC v2 batch into the WAL sink table.
// Records are written including _peerdb_txid and _peerdb_lsn; the normalize
// step later filters by committed XID. Committed XIDs for the batch are
// persisted in catalog (cdc_v2_committed_xids) so normalize can read them
// without depending on the source connection.
func (c *ClickHouseConnector) syncRecordsViaAvroWALSink(
ctx context.Context,
req *model.SyncRecordsRequest[model.RecordItems],
syncBatchID int64,
) (*model.SyncResponse, error) {
if err := c.CreateWALSinkTable(ctx, req.FlowJobName); err != nil {
return nil, fmt.Errorf("failed to ensure WAL sink table exists: %w", err)
}

tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
unboundedNumericAsString, err := internal.PeerDBEnableClickHouseNumericAsString(ctx, req.Env)
if err != nil {
return nil, err
}
streamReq := model.NewRecordsToStreamRequest(
req.Records.GetRecords(), tableNameRowsMapping, syncBatchID, unboundedNumericAsString,
protos.DBType_CLICKHOUSE,
)
numericTruncator := model.NewStreamNumericTruncator(req.TableMappings, NumericDestinationTypes)
stream, err := utils.RecordsToWALSinkStream(streamReq, numericTruncator)
if err != nil {
return nil, fmt.Errorf("failed to convert records to WAL sink stream: %w", err)
}

avroSyncer := c.avroSyncMethodForTable(req.FlowJobName, c.GetWALSinkTableName(req.FlowJobName), req.Env, req.Version)
numRecords, err := avroSyncer.SyncRecords(ctx, req.Env, stream, req.FlowJobName, syncBatchID)
if err != nil {
return nil, err
}
warnings := numericTruncator.Warnings()

if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Flags); err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

if err := c.SetCommittedXIDsForBatch(ctx, req.FlowJobName, syncBatchID, req.Records.CommittedXIDs()); err != nil {
return nil, fmt.Errorf("failed to persist committed XIDs for batch: %w", err)
}

return &model.SyncResponse{
LastSyncedCheckpoint: req.Records.GetLastCheckpoint(),
NumRecordsSynced: numRecords,
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
Warnings: warnings,
}, nil
}

func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID)
var res *model.SyncResponse
var err error
if req.Records.V2Active() {
res, err = c.syncRecordsViaAvroWALSink(ctx, req, req.SyncBatchID)
} else {
res, err = c.syncRecordsViaAvro(ctx, req, req.SyncBatchID)
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -371,24 +494,27 @@ func (c *ClickHouseConnector) RenameTables(
}

func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName string) error {
// delete raw table if exists
rawTableIdentifier := c.GetRawTableName(jobName)
walTableIdentifier := c.GetWALSinkTableName(jobName)
onCluster := c.onCluster()
dropTableSQLWithCHSetting := dropTableIfExistsSQL +
chinternal.NewCHSettingsString(c.chVersion, chinternal.SettingMaxTableSizeToDrop, "0")
if err := c.execWithLogging(ctx,
fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(rawTableIdentifier), onCluster),
); err != nil {
return fmt.Errorf("[clickhouse] unable to drop raw table: %w", err)
}
if onCluster != "" {
for _, base := range []string{rawTableIdentifier, walTableIdentifier} {
if err := c.execWithLogging(ctx,
fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(rawTableIdentifier+"_shard"), onCluster),
fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(base), onCluster),
); err != nil {
return fmt.Errorf("[clickhouse] unable to drop raw table: %w", err)
return fmt.Errorf("[clickhouse] unable to drop %s: %w", base, err)
}
if onCluster != "" {
if err := c.execWithLogging(ctx,
fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(base+"_shard"), onCluster),
); err != nil {
return fmt.Errorf("[clickhouse] unable to drop %s shard: %w", base, err)
}
}
}
c.logger.Info("successfully dropped raw table", slog.String("table", rawTableIdentifier))
c.logger.Info("successfully dropped raw and WAL sink tables",
slog.String("rawTable", rawTableIdentifier), slog.String("walTable", walTableIdentifier))

return nil
}
Expand Down
Loading
Loading