From 3c8cbf4b46fda8f579a489512d75e5177747e857 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Fri, 20 Feb 2026 18:18:17 +0800 Subject: [PATCH 1/9] add failpoint Signed-off-by: Jianjun Liao --- downstreamadapter/sink/cloudstorage/writer.go | 133 ++++++++++++++++++ pkg/sink/failpointrecord/record.go | 108 ++++++++++++++ pkg/sink/mysql/mysql_writer_dml.go | 65 +++++++++ 3 files changed, 306 insertions(+) create mode 100644 pkg/sink/failpointrecord/record.go diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index 35eee16791..ba6811ee47 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -16,6 +16,7 @@ package cloudstorage import ( "bytes" "context" + "encoding/json" "path" "strconv" "sync/atomic" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/ticdc/pkg/sink/failpointrecord" "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/tidb/br/pkg/storage" "github.com/prometheus/client_golang/prometheus" @@ -213,6 +215,89 @@ func (d *writer) ignoreTableTask(task *singleTableTask) { } } +// mutateMessageValueForFailpoint rewrites a non-primary-key column value in +// canal-json encoded messages so that the multi-cluster-consistency-checker +// sees the original row as "lost" and the mutated row as "redundant". +// +// canal-json messages in msg.Value are separated by CRLF ("\r\n"). For every +// message we: +// 1. Parse the JSON to extract "pkNames" and "data". +// 2. Pick the first non-PK column in data[0] and replace its value with nil. +// 3. Re-marshal the whole message. +// +// This function is only called from within a failpoint.Inject block. +func mutateMessageValueForFailpoint(msg *common.Message) { + if len(msg.Value) == 0 { + return + } + terminator := []byte("\r\n") + parts := bytes.Split(msg.Value, terminator) + for i, part := range parts { + if len(part) == 0 { + continue + } + + // Decode the full message preserving all fields. + var m map[string]json.RawMessage + if err := json.Unmarshal(part, &m); err != nil { + continue + } + + // Extract pkNames so we can skip PK columns. + var pkNames []string + if raw, ok := m["pkNames"]; ok { + _ = json.Unmarshal(raw, &pkNames) + } + pkSet := make(map[string]struct{}, len(pkNames)) + for _, pk := range pkNames { + pkSet[pk] = struct{}{} + } + + // Extract the "data" array. + rawData, ok := m["data"] + if !ok { + continue + } + var data []map[string]interface{} + if err := json.Unmarshal(rawData, &data); err != nil || len(data) == 0 { + continue + } + + // Find the first non-PK column and mutate it to nil. + mutated := false + for _, row := range data { + for col := range row { + if _, isPK := pkSet[col]; isPK { + continue + } + row[col] = nil + mutated = true + break + } + if mutated { + break + } + } + if !mutated { + continue + } + + // Write the mutated data back. + newData, err := json.Marshal(data) + if err != nil { + continue + } + m["data"] = json.RawMessage(newData) + + newPart, err := json.Marshal(m) + if err != nil { + continue + } + parts[i] = newPart + } + msg.Value = bytes.Join(parts, terminator) +} + func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath string, task *singleTableTask) error { var callbacks []func() buf := bytes.NewBuffer(make([]byte, 0, task.size)) @@ -220,6 +305,34 @@ func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath bytesCnt := int64(0) // There is always only one message here in task.msgs for _, msg := range task.msgs { + // Failpoint: drop this message to simulate data loss. + // Usage: failpoint.Enable(".../cloudStorageSinkDropMessage", "return") + // failpoint.Enable(".../cloudStorageSinkDropMessage", "50%return") + failpoint.Inject("cloudStorageSinkDropMessage", func() { + log.Warn("cloudStorageSinkDropMessage: dropping message to simulate data loss", + zap.Int("workerID", d.id), + zap.String("keyspace", d.changeFeedID.Keyspace()), + zap.Stringer("changefeed", d.changeFeedID.ID()), + zap.String("dataFilePath", dataFilePath), + zap.Any("logInfo", msg.LogInfo)) + failpointrecord.Write("cloudStorageSinkDropMessage", logInfoToRowRecords(msg.LogInfo)) + callbacks = append(callbacks, msg.Callback) + failpoint.Continue() + }) + + // Failpoint: mutate non-PK column data in the message. + // Usage: failpoint.Enable(".../cloudStorageSinkMutateValue", "return") + failpoint.Inject("cloudStorageSinkMutateValue", func() { + log.Warn("cloudStorageSinkMutateValue: mutating message value to simulate data inconsistency", + zap.Int("workerID", d.id), + zap.String("keyspace", d.changeFeedID.Keyspace()), + zap.Stringer("changefeed", d.changeFeedID.ID()), + zap.String("dataFilePath", dataFilePath), + zap.Any("logInfo", msg.LogInfo)) + failpointrecord.Write("cloudStorageSinkMutateValue", logInfoToRowRecords(msg.LogInfo)) + mutateMessageValueForFailpoint(msg) + }) + if msg.Key != nil && rowsCnt == 0 { buf.Write(msg.Key) bytesCnt += int64(len(msg.Key)) @@ -349,6 +462,26 @@ func (d *writer) close() { } } +// logInfoToRowRecords converts a MessageLogInfo to failpointrecord.RowRecords +// for writing to the failpoint record file. +func logInfoToRowRecords(info *common.MessageLogInfo) []failpointrecord.RowRecord { + if info == nil || len(info.Rows) == 0 { + return nil + } + records := make([]failpointrecord.RowRecord, 0, len(info.Rows)) + for _, row := range info.Rows { + pks := make(map[string]any, len(row.PrimaryKeys)) + for _, pk := range row.PrimaryKeys { + pks[pk.Name] = pk.Value + } + records = append(records, failpointrecord.RowRecord{ + CommitTs: row.CommitTs, + PrimaryKeys: pks, + }) + } + return records +} + // batchedTask contains a set of singleTableTask. // We batch message of different tables together to reduce the overhead of calling external storage API. type batchedTask struct { diff --git a/pkg/sink/failpointrecord/record.go b/pkg/sink/failpointrecord/record.go new file mode 100644 index 0000000000..969edf65f6 --- /dev/null +++ b/pkg/sink/failpointrecord/record.go @@ -0,0 +1,108 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package failpointrecord provides a lightweight utility that records failpoint +// triggered events (PK, commit_ts, etc.) to a JSONL file so that external tools +// (e.g. the multi-cluster-consistency-checker) can easily consume them. +// +// The file path is controlled by the environment variable +// TICDC_FAILPOINT_RECORD_FILE. When the variable is empty or unset the +// recorder is a silent no-op, introducing zero overhead in production. +// +// Each line written to the file is a self-contained JSON object: +// +// {"time":"…","failpoint":"cloudStorageSinkDropMessage","rows":[{"commitTs":123,"primaryKeys":{"id":1}}]} +package failpointrecord + +import ( + "encoding/json" + "os" + "sync" + "time" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// envKey is the environment variable that controls the output file path. +const envKey = "TICDC_FAILPOINT_RECORD_FILE" + +// RowRecord captures the essential identity of a single affected row. +type RowRecord struct { + CommitTs uint64 `json:"commitTs"` + PrimaryKeys map[string]any `json:"primaryKeys"` +} + +// Record is one line written to the JSONL file. +type Record struct { + Time string `json:"time"` + Failpoint string `json:"failpoint"` + Rows []RowRecord `json:"rows"` +} + +var ( + initOnce sync.Once + mu sync.Mutex + file *os.File + disabled bool +) + +func ensureFile() { + initOnce.Do(func() { + path := os.Getenv(envKey) + if path == "" { + disabled = true + return + } + var err error + file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + log.Warn("failed to open failpoint record file, recording disabled", + zap.String("path", path), zap.Error(err)) + disabled = true + return + } + log.Info("failpoint record file opened", zap.String("path", path)) + }) +} + +// Write persists one failpoint event to the JSONL file. +// It is safe for concurrent use. +// If the env var is not set the call is a no-op (zero allocation). +func Write(failpoint string, rows []RowRecord) { + if disabled { + return + } + ensureFile() + if file == nil { + return + } + + rec := Record{ + Time: time.Now().UTC().Format(time.RFC3339Nano), + Failpoint: failpoint, + Rows: rows, + } + data, err := json.Marshal(rec) + if err != nil { + log.Warn("failed to marshal failpoint record", zap.Error(err)) + return + } + data = append(data, '\n') + + mu.Lock() + defer mu.Unlock() + if _, err := file.Write(data); err != nil { + log.Warn("failed to write failpoint record", zap.Error(err)) + } +} diff --git a/pkg/sink/mysql/mysql_writer_dml.go b/pkg/sink/mysql/mysql_writer_dml.go index a3100d6dc5..ab4864b069 100644 --- a/pkg/sink/mysql/mysql_writer_dml.go +++ b/pkg/sink/mysql/mysql_writer_dml.go @@ -16,8 +16,11 @@ package mysql import ( "sort" + "github.com/pingcap/failpoint" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/sink/failpointrecord" + "github.com/pingcap/tidb/pkg/parser/mysql" ) func groupEventsByTable(events []*commonEvent.DMLEvent) map[int64][][]*commonEvent.DMLEvent { @@ -113,6 +116,21 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err } func (w *Writer) genActiveActiveSQL(tableInfo *common.TableInfo, eventsInGroup []*commonEvent.DMLEvent) ([]string, [][]interface{}) { + // Failpoint: bypass the LWW UPSERT and fall back to normal SQL (REPLACE INTO + // or plain INSERT). This makes the downstream TiDB write the row without the + // LWW condition, creating a genuine LWW violation that naturally flows through + // the pipeline to S3. + // Usage: failpoint.Enable(".../mysqlSinkBypassLWW", "return") + // failpoint.Enable(".../mysqlSinkBypassLWW", "50%return") + failpoint.Inject("mysqlSinkBypassLWW", func() { + rowRecords := dmlEventsToRowRecords(tableInfo, eventsInGroup) + failpointrecord.Write("mysqlSinkBypassLWW", rowRecords) + if !w.shouldGenBatchSQL(tableInfo, eventsInGroup) { + failpoint.Return(w.generateNormalSQLs(eventsInGroup)) + } + failpoint.Return(w.generateBatchSQL(eventsInGroup)) + }) + if !w.shouldGenBatchSQL(tableInfo, eventsInGroup) { return w.generateActiveActiveNormalSQLs(eventsInGroup) } @@ -156,6 +174,53 @@ func (w *Writer) shouldGenBatchSQL(tableInfo *common.TableInfo, events []*common return allRowInSameSafeMode(w.cfg.SafeMode, events) } +// pkColInfo holds the index and name of a primary key column. +type pkColInfo struct { + index int + name string +} + +// findPKColumns returns the PK column indexes and names from a TableInfo. +func findPKColumns(tableInfo *common.TableInfo) []pkColInfo { + var cols []pkColInfo + for i, col := range tableInfo.GetColumns() { + if col != nil && mysql.HasPriKeyFlag(col.GetFlag()) { + cols = append(cols, pkColInfo{index: i, name: col.Name.O}) + } + } + return cols +} + +// dmlEventsToRowRecords converts DMLEvents to failpointrecord.RowRecords for +// writing to the failpoint record file. +func dmlEventsToRowRecords(tableInfo *common.TableInfo, events []*commonEvent.DMLEvent) []failpointrecord.RowRecord { + pkCols := findPKColumns(tableInfo) + var records []failpointrecord.RowRecord + for _, event := range events { + event.Rewind() + for { + rowChange, ok := event.GetNextRow() + if !ok { + break + } + r := &rowChange.Row + if rowChange.RowType == common.RowTypeDelete { + r = &rowChange.PreRow + } + pks := make(map[string]any, len(pkCols)) + for _, pk := range pkCols { + pks[pk.name] = common.ExtractColVal(r, tableInfo.GetColumns()[pk.index], pk.index) + } + records = append(records, failpointrecord.RowRecord{ + CommitTs: event.CommitTs, + PrimaryKeys: pks, + }) + } + event.Rewind() + } + return records +} + // allRowInSameSafeMode determines whether all DMLEvents in a batch have the same safe mode status. // Safe mode is either globally enabled via the safemode parameter, or determined per event // by comparing CommitTs and ReplicatingTs. From bb8ce03365bb946704c2e96fb2e16403dfe55d80 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Tue, 24 Feb 2026 10:21:28 +0800 Subject: [PATCH 2/9] move failpoint Signed-off-by: Jianjun Liao --- .../sink/cloudstorage/encoding_group.go | 96 +++++++++++++++++++ downstreamadapter/sink/cloudstorage/writer.go | 49 ---------- 2 files changed, 96 insertions(+), 49 deletions(-) diff --git a/downstreamadapter/sink/cloudstorage/encoding_group.go b/downstreamadapter/sink/cloudstorage/encoding_group.go index bcc37663e7..2b2ad05026 100644 --- a/downstreamadapter/sink/cloudstorage/encoding_group.go +++ b/downstreamadapter/sink/cloudstorage/encoding_group.go @@ -16,12 +16,17 @@ package cloudstorage import ( "context" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" commonType "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/sink/codec" "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/ticdc/pkg/sink/failpointrecord" "github.com/pingcap/ticdc/utils/chann" "go.uber.org/atomic" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -90,6 +95,15 @@ func (eg *encodingGroup) runEncoder(ctx context.Context) error { return err } frag.encodedMsgs = encoder.Build() + // Global switch for cloudstorage sink message failpoints. + // Usage: + // failpoint.Enable(".../cloudStorageSinkMessageFailpointSwitch", "return(false)") // disable + // failpoint.Enable(".../cloudStorageSinkMessageFailpointSwitch", "return(true)") // enable + failpoint.Inject("cloudStorageSinkMessageFailpointSwitch", func(val failpoint.Value) { + if enabled, ok := val.(bool); ok && enabled { + eg.applyFailpointsOnEncodedMessages(frag) + } + }) select { case <-ctx.Done(): @@ -103,3 +117,85 @@ func (eg *encodingGroup) runEncoder(ctx context.Context) error { func (eg *encodingGroup) close() { eg.closed.Store(true) } + +func (eg *encodingGroup) applyFailpointsOnEncodedMessages(frag eventFragment) { + rowRecordsByMsg := splitRowRecordsByMessages(frag.encodedMsgs, dmlEventToRowRecords(frag.event)) + for idx, msg := range frag.encodedMsgs { + var rowRecords []failpointrecord.RowRecord + if idx < len(rowRecordsByMsg) { + rowRecords = rowRecordsByMsg[idx] + } + failpoint.Inject("cloudStorageSinkDropMessage", func() { + log.Warn("cloudStorageSinkDropMessage: dropping message to simulate data loss", + zap.String("keyspace", eg.changeFeedID.Keyspace()), + zap.Stringer("changefeed", eg.changeFeedID.ID()), + zap.Any("rows", rowRecords)) + failpointrecord.Write("cloudStorageSinkDropMessage", rowRecords) + // Keep callback flow unchanged while dropping data payload. + msg.Key = nil + msg.Value = nil + msg.SetRowsCount(0) + }) + failpoint.Inject("cloudStorageSinkMutateValue", func() { + log.Warn("cloudStorageSinkMutateValue: mutating message value to simulate data inconsistency", + zap.String("keyspace", eg.changeFeedID.Keyspace()), + zap.Stringer("changefeed", eg.changeFeedID.ID()), + zap.Any("rows", rowRecords)) + failpointrecord.Write("cloudStorageSinkMutateValue", rowRecords) + mutateMessageValueForFailpoint(msg) + }) + } +} + +func splitRowRecordsByMessages(messages []*common.Message, rows []failpointrecord.RowRecord) [][]failpointrecord.RowRecord { + if len(messages) == 0 { + return nil + } + ret := make([][]failpointrecord.RowRecord, 0, len(messages)) + rowIdx := 0 + for _, msg := range messages { + rowsNeeded := msg.GetRowsCount() + if rowsNeeded <= 0 || rowIdx >= len(rows) { + ret = append(ret, nil) + continue + } + end := rowIdx + rowsNeeded + if end > len(rows) { + end = len(rows) + } + ret = append(ret, rows[rowIdx:end]) + rowIdx = end + } + return ret +} + +func dmlEventToRowRecords(event *commonEvent.DMLEvent) []failpointrecord.RowRecord { + if event == nil || event.TableInfo == nil { + return nil + } + indexes, columns := (&commonEvent.RowEvent{TableInfo: event.TableInfo}).PrimaryKeyColumn() + rowRecords := make([]failpointrecord.RowRecord, 0, event.Len()) + for { + row, ok := event.GetNextRow() + if !ok { + event.Rewind() + break + } + pks := make(map[string]any, len(columns)) + for i, col := range columns { + if col == nil { + continue + } + rowData := row.Row + if row.RowType == commonType.RowTypeDelete { + rowData = row.PreRow + } + pks[col.Name.String()] = commonType.ExtractColVal(&rowData, col, indexes[i]) + } + rowRecords = append(rowRecords, failpointrecord.RowRecord{ + CommitTs: event.CommitTs, + PrimaryKeys: pks, + }) + } + return rowRecords +} diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index ba6811ee47..a82536d170 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" - "github.com/pingcap/ticdc/pkg/sink/failpointrecord" "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/tidb/br/pkg/storage" "github.com/prometheus/client_golang/prometheus" @@ -305,34 +304,6 @@ func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath bytesCnt := int64(0) // There is always only one message here in task.msgs for _, msg := range task.msgs { - // Failpoint: drop this message to simulate data loss. - // Usage: failpoint.Enable(".../cloudStorageSinkDropMessage", "return") - // failpoint.Enable(".../cloudStorageSinkDropMessage", "50%return") - failpoint.Inject("cloudStorageSinkDropMessage", func() { - log.Warn("cloudStorageSinkDropMessage: dropping message to simulate data loss", - zap.Int("workerID", d.id), - zap.String("keyspace", d.changeFeedID.Keyspace()), - zap.Stringer("changefeed", d.changeFeedID.ID()), - zap.String("dataFilePath", dataFilePath), - zap.Any("logInfo", msg.LogInfo)) - failpointrecord.Write("cloudStorageSinkDropMessage", logInfoToRowRecords(msg.LogInfo)) - callbacks = append(callbacks, msg.Callback) - failpoint.Continue() - }) - - // Failpoint: mutate non-PK column data in the message. - // Usage: failpoint.Enable(".../cloudStorageSinkMutateValue", "return") - failpoint.Inject("cloudStorageSinkMutateValue", func() { - log.Warn("cloudStorageSinkMutateValue: mutating message value to simulate data inconsistency", - zap.Int("workerID", d.id), - zap.String("keyspace", d.changeFeedID.Keyspace()), - zap.Stringer("changefeed", d.changeFeedID.ID()), - zap.String("dataFilePath", dataFilePath), - zap.Any("logInfo", msg.LogInfo)) - failpointrecord.Write("cloudStorageSinkMutateValue", logInfoToRowRecords(msg.LogInfo)) - mutateMessageValueForFailpoint(msg) - }) - if msg.Key != nil && rowsCnt == 0 { buf.Write(msg.Key) bytesCnt += int64(len(msg.Key)) @@ -462,26 +433,6 @@ func (d *writer) close() { } } -// logInfoToRowRecords converts a MessageLogInfo to failpointrecord.RowRecords -// for writing to the failpoint record file. -func logInfoToRowRecords(info *common.MessageLogInfo) []failpointrecord.RowRecord { - if info == nil || len(info.Rows) == 0 { - return nil - } - records := make([]failpointrecord.RowRecord, 0, len(info.Rows)) - for _, row := range info.Rows { - pks := make(map[string]any, len(row.PrimaryKeys)) - for _, pk := range row.PrimaryKeys { - pks[pk.Name] = pk.Value - } - records = append(records, failpointrecord.RowRecord{ - CommitTs: row.CommitTs, - PrimaryKeys: pks, - }) - } - return records -} - // batchedTask contains a set of singleTableTask. // We batch message of different tables together to reduce the overhead of calling external storage API. type batchedTask struct { From 888a12f4dadb6bb75bca5fefa9ce733e2d6a806e Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Wed, 25 Feb 2026 10:25:08 +0800 Subject: [PATCH 3/9] update Signed-off-by: Jianjun Liao --- .../sink/cloudstorage/encoding_group.go | 10 ++- downstreamadapter/sink/cloudstorage/writer.go | 84 ++++++++++++++++--- .../sink/cloudstorage/writer_test.go | 37 ++++++++ 3 files changed, 117 insertions(+), 14 deletions(-) diff --git a/downstreamadapter/sink/cloudstorage/encoding_group.go b/downstreamadapter/sink/cloudstorage/encoding_group.go index 2b2ad05026..0869bd6a88 100644 --- a/downstreamadapter/sink/cloudstorage/encoding_group.go +++ b/downstreamadapter/sink/cloudstorage/encoding_group.go @@ -135,14 +135,20 @@ func (eg *encodingGroup) applyFailpointsOnEncodedMessages(frag eventFragment) { msg.Key = nil msg.Value = nil msg.SetRowsCount(0) + failpoint.Continue() }) failpoint.Inject("cloudStorageSinkMutateValue", func() { log.Warn("cloudStorageSinkMutateValue: mutating message value to simulate data inconsistency", zap.String("keyspace", eg.changeFeedID.Keyspace()), zap.Stringer("changefeed", eg.changeFeedID.ID()), zap.Any("rows", rowRecords)) - failpointrecord.Write("cloudStorageSinkMutateValue", rowRecords) - mutateMessageValueForFailpoint(msg) + mutatedRows, originTsMutatedRows := mutateMessageValueForFailpoint(msg, rowRecords) + if len(mutatedRows) > 0 { + failpointrecord.Write("cloudStorageSinkMutateValue", mutatedRows) + } + if len(originTsMutatedRows) > 0 { + failpointrecord.Write("cloudStorageSinkMutateValueTidbOriginTs", originTsMutatedRows) + } }) } } diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index a82536d170..9aed85de14 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -16,7 +16,9 @@ package cloudstorage import ( "bytes" "context" + cryptorand "crypto/rand" "encoding/json" + "math/big" "path" "strconv" "sync/atomic" @@ -31,6 +33,7 @@ import ( "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/ticdc/pkg/sink/failpointrecord" "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/tidb/br/pkg/storage" "github.com/prometheus/client_golang/prometheus" @@ -225,12 +228,19 @@ func (d *writer) ignoreTableTask(task *singleTableTask) { // 3. Re-marshal the whole message. // // This function is only called from within a failpoint.Inject block. -func mutateMessageValueForFailpoint(msg *common.Message) { +// It returns mutated row records grouped by whether `tidb_origin_ts` is mutated. +func mutateMessageValueForFailpoint( + msg *common.Message, + rowRecords []failpointrecord.RowRecord, +) ([]failpointrecord.RowRecord, []failpointrecord.RowRecord) { if len(msg.Value) == 0 { - return + return nil, nil } terminator := []byte("\r\n") parts := bytes.Split(msg.Value, terminator) + mutatedOffsets := make([]int, 0) + originTsMutatedOffsets := make([]int, 0) + rowOffset := 0 for i, part := range parts { if len(part) == 0 { continue @@ -257,44 +267,94 @@ func mutateMessageValueForFailpoint(msg *common.Message) { if !ok { continue } - var data []map[string]interface{} + var data []map[string]any if err := json.Unmarshal(rawData, &data); err != nil || len(data) == 0 { continue } - // Find the first non-PK column and mutate it to nil. + // Find the first row that has a non-PK column and mutate it to nil. mutated := false - for _, row := range data { - for col := range row { - if _, isPK := pkSet[col]; isPK { - continue - } - row[col] = nil - mutated = true - break + mutatedRowOffset := 0 + mutatedColumn := "" + for rowIdx, row := range data { + col, ok := selectColumnToMutate(row, pkSet) + if !ok { + continue } + row[col] = nil + mutated = true + mutatedRowOffset = rowIdx + mutatedColumn = col if mutated { break } } if !mutated { + rowOffset += len(data) continue } // Write the mutated data back. newData, err := json.Marshal(data) if err != nil { + rowOffset += len(data) continue } m["data"] = json.RawMessage(newData) newPart, err := json.Marshal(m) if err != nil { + rowOffset += len(data) continue } parts[i] = newPart + + if mutatedColumn == "tidb_origin_ts" { + originTsMutatedOffsets = append(originTsMutatedOffsets, rowOffset+mutatedRowOffset) + } else { + mutatedOffsets = append(mutatedOffsets, rowOffset+mutatedRowOffset) + } + rowOffset += len(data) } msg.Value = bytes.Join(parts, terminator) + return extractMutatedRowRecordsByOffset(rowRecords, mutatedOffsets), + extractMutatedRowRecordsByOffset(rowRecords, originTsMutatedOffsets) +} + +func selectColumnToMutate(row map[string]any, pkSet map[string]struct{}) (string, bool) { + columns := make([]string, 0, len(row)) + for col := range row { + if _, isPK := pkSet[col]; isPK { + continue + } + columns = append(columns, col) + } + if len(columns) == 0 { + return "", false + } + idx, err := cryptorand.Int(cryptorand.Reader, big.NewInt(int64(len(columns)))) + if err != nil { + // Best-effort fallback in failpoint path. + return columns[0], true + } + return columns[idx.Int64()], true +} + +func extractMutatedRowRecordsByOffset( + rowRecords []failpointrecord.RowRecord, + offsets []int, +) []failpointrecord.RowRecord { + if len(offsets) == 0 || len(rowRecords) == 0 { + return nil + } + ret := make([]failpointrecord.RowRecord, 0, len(offsets)) + for _, offset := range offsets { + if offset < 0 || offset >= len(rowRecords) { + continue + } + ret = append(ret, rowRecords[offset]) + } + return ret } func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath string, task *singleTableTask) error { diff --git a/downstreamadapter/sink/cloudstorage/writer_test.go b/downstreamadapter/sink/cloudstorage/writer_test.go index 0be8a78640..a4af45ca5e 100644 --- a/downstreamadapter/sink/cloudstorage/writer_test.go +++ b/downstreamadapter/sink/cloudstorage/writer_test.go @@ -14,6 +14,7 @@ package cloudstorage import ( + "bytes" "context" "fmt" "net/url" @@ -31,6 +32,7 @@ import ( "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/ticdc/pkg/sink/failpointrecord" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/utils/chann" timodel "github.com/pingcap/tidb/pkg/meta/model" @@ -130,3 +132,38 @@ func TestWriterRun(t *testing.T) { d.close() wg.Wait() } + +func TestMutateMessageValueForFailpointRecordClassification(t *testing.T) { + t.Parallel() + + msg := &common.Message{ + Value: []byte( + `{"pkNames":["id"],"data":[{"id":"1","c2":"v1"}]}` + + "\r\n" + + `{"pkNames":["id"],"data":[{"id":"2","tidb_origin_ts":"100"}]}`, + ), + } + rowRecords := []failpointrecord.RowRecord{ + { + CommitTs: 101, + PrimaryKeys: map[string]any{"id": "1"}, + }, + { + CommitTs: 102, + PrimaryKeys: map[string]any{"id": "2"}, + }, + } + + mutatedRows, originTsMutatedRows := mutateMessageValueForFailpoint(msg, rowRecords) + + require.Len(t, mutatedRows, 1) + require.Equal(t, uint64(101), mutatedRows[0].CommitTs) + require.Equal(t, "1", mutatedRows[0].PrimaryKeys["id"]) + + require.Len(t, originTsMutatedRows, 1) + require.Equal(t, uint64(102), originTsMutatedRows[0].CommitTs) + require.Equal(t, "2", originTsMutatedRows[0].PrimaryKeys["id"]) + + require.True(t, bytes.Contains(msg.Value, []byte(`"tidb_origin_ts":null`))) + require.True(t, bytes.Contains(msg.Value, []byte(`"c2":null`))) +} From 0c86f950532584501fc01ff9740f679fe3844c8a Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Wed, 25 Feb 2026 11:10:28 +0800 Subject: [PATCH 4/9] update Signed-off-by: Jianjun Liao --- downstreamadapter/sink/cloudstorage/writer.go | 55 ++++++++++++++++++- .../sink/cloudstorage/writer_test.go | 4 +- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index 9aed85de14..57c36b01de 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -228,7 +228,7 @@ func (d *writer) ignoreTableTask(task *singleTableTask) { // 3. Re-marshal the whole message. // // This function is only called from within a failpoint.Inject block. -// It returns mutated row records grouped by whether `tidb_origin_ts` is mutated. +// It returns mutated row records grouped by whether `_tidb_origin_ts` is mutated. func mutateMessageValueForFailpoint( msg *common.Message, rowRecords []failpointrecord.RowRecord, @@ -281,7 +281,15 @@ func mutateMessageValueForFailpoint( if !ok { continue } - row[col] = nil + if col == "_tidb_origin_ts" { + nextValue, ok := incrementOriginTSValue(row[col]) + if !ok { + continue + } + row[col] = nextValue + } else { + row[col] = nil + } mutated = true mutatedRowOffset = rowIdx mutatedColumn = col @@ -309,7 +317,7 @@ func mutateMessageValueForFailpoint( } parts[i] = newPart - if mutatedColumn == "tidb_origin_ts" { + if mutatedColumn == "_tidb_origin_ts" { originTsMutatedOffsets = append(originTsMutatedOffsets, rowOffset+mutatedRowOffset) } else { mutatedOffsets = append(mutatedOffsets, rowOffset+mutatedRowOffset) @@ -357,6 +365,47 @@ func extractMutatedRowRecordsByOffset( return ret } +func incrementOriginTSValue(v any) (any, bool) { + switch value := v.(type) { + case string: + originTS, err := strconv.ParseUint(value, 10, 64) + if err != nil { + return nil, false + } + return strconv.FormatUint(originTS+1, 10), true + case float64: + return value + 1, true + case json.Number: + originTS, err := value.Int64() + if err != nil { + return nil, false + } + return json.Number(strconv.FormatInt(originTS+1, 10)), true + case int: + return value + 1, true + case int8: + return value + 1, true + case int16: + return value + 1, true + case int32: + return value + 1, true + case int64: + return value + 1, true + case uint: + return value + 1, true + case uint8: + return value + 1, true + case uint16: + return value + 1, true + case uint32: + return value + 1, true + case uint64: + return value + 1, true + default: + return nil, false + } +} + func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath string, task *singleTableTask) error { var callbacks []func() buf := bytes.NewBuffer(make([]byte, 0, task.size)) diff --git a/downstreamadapter/sink/cloudstorage/writer_test.go b/downstreamadapter/sink/cloudstorage/writer_test.go index a4af45ca5e..dc973bdde1 100644 --- a/downstreamadapter/sink/cloudstorage/writer_test.go +++ b/downstreamadapter/sink/cloudstorage/writer_test.go @@ -140,7 +140,7 @@ func TestMutateMessageValueForFailpointRecordClassification(t *testing.T) { Value: []byte( `{"pkNames":["id"],"data":[{"id":"1","c2":"v1"}]}` + "\r\n" + - `{"pkNames":["id"],"data":[{"id":"2","tidb_origin_ts":"100"}]}`, + `{"pkNames":["id"],"data":[{"id":"2","_tidb_origin_ts":"100"}]}`, ), } rowRecords := []failpointrecord.RowRecord{ @@ -164,6 +164,6 @@ func TestMutateMessageValueForFailpointRecordClassification(t *testing.T) { require.Equal(t, uint64(102), originTsMutatedRows[0].CommitTs) require.Equal(t, "2", originTsMutatedRows[0].PrimaryKeys["id"]) - require.True(t, bytes.Contains(msg.Value, []byte(`"tidb_origin_ts":null`))) + require.True(t, bytes.Contains(msg.Value, []byte(`"_tidb_origin_ts":"101"`))) require.True(t, bytes.Contains(msg.Value, []byte(`"c2":null`))) } From b7504568eb80e24e023db37cfdbb7f3e3223cbc9 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Wed, 25 Feb 2026 13:40:57 +0800 Subject: [PATCH 5/9] update Signed-off-by: Jianjun Liao --- downstreamadapter/sink/cloudstorage/writer.go | 11 +++++++++++ .../sink/cloudstorage/writer_test.go | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index 57c36b01de..ac68435f7c 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/downstreamadapter/sink/metrics" commonType "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/errors" pmetrics "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/pdutil" @@ -331,12 +332,22 @@ func mutateMessageValueForFailpoint( func selectColumnToMutate(row map[string]any, pkSet map[string]struct{}) (string, bool) { columns := make([]string, 0, len(row)) + originTsNilColumns := make([]string, 0, 1) for col := range row { if _, isPK := pkSet[col]; isPK { continue } + // If _tidb_origin_ts is nil, prefer mutating another non-PK column. + // This avoids selecting a value that cannot be incremented. + if col == commonEvent.OriginTsColumn && row[col] == nil { + originTsNilColumns = append(originTsNilColumns, col) + continue + } columns = append(columns, col) } + if len(columns) == 0 { + columns = originTsNilColumns + } if len(columns) == 0 { return "", false } diff --git a/downstreamadapter/sink/cloudstorage/writer_test.go b/downstreamadapter/sink/cloudstorage/writer_test.go index dc973bdde1..2659104577 100644 --- a/downstreamadapter/sink/cloudstorage/writer_test.go +++ b/downstreamadapter/sink/cloudstorage/writer_test.go @@ -167,3 +167,22 @@ func TestMutateMessageValueForFailpointRecordClassification(t *testing.T) { require.True(t, bytes.Contains(msg.Value, []byte(`"_tidb_origin_ts":"101"`))) require.True(t, bytes.Contains(msg.Value, []byte(`"c2":null`))) } + +func TestSelectColumnToMutateSkipNilOriginTsWhenPossible(t *testing.T) { + t.Parallel() + + row := map[string]any{ + "id": "1", + commonEvent.OriginTsColumn: nil, + "c2": "v1", + } + pkSet := map[string]struct{}{ + "id": {}, + } + + for i := 0; i < 20; i++ { + col, ok := selectColumnToMutate(row, pkSet) + require.True(t, ok) + require.Equal(t, "c2", col) + } +} From ddb1a011fb7c181435f1c6de77d843fb2b11aa9b Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Wed, 25 Feb 2026 13:56:10 +0800 Subject: [PATCH 6/9] update Signed-off-by: Jianjun Liao --- downstreamadapter/sink/cloudstorage/writer.go | 21 ++++++++++--------- .../sink/cloudstorage/writer_test.go | 19 +++++++++++++++++ 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index ac68435f7c..e59bc1c060 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -282,7 +282,7 @@ func mutateMessageValueForFailpoint( if !ok { continue } - if col == "_tidb_origin_ts" { + if col == commonEvent.OriginTsColumn { nextValue, ok := incrementOriginTSValue(row[col]) if !ok { continue @@ -318,7 +318,7 @@ func mutateMessageValueForFailpoint( } parts[i] = newPart - if mutatedColumn == "_tidb_origin_ts" { + if mutatedColumn == commonEvent.OriginTsColumn { originTsMutatedOffsets = append(originTsMutatedOffsets, rowOffset+mutatedRowOffset) } else { mutatedOffsets = append(mutatedOffsets, rowOffset+mutatedRowOffset) @@ -331,23 +331,24 @@ func mutateMessageValueForFailpoint( } func selectColumnToMutate(row map[string]any, pkSet map[string]struct{}) (string, bool) { + // Prefer mutating _tidb_origin_ts when it exists and is non-NULL. + // Otherwise, mutate other non-PK columns. + if _, isPK := pkSet[commonEvent.OriginTsColumn]; !isPK { + if originTs, ok := row[commonEvent.OriginTsColumn]; ok && originTs != nil { + return commonEvent.OriginTsColumn, true + } + } + columns := make([]string, 0, len(row)) - originTsNilColumns := make([]string, 0, 1) for col := range row { if _, isPK := pkSet[col]; isPK { continue } - // If _tidb_origin_ts is nil, prefer mutating another non-PK column. - // This avoids selecting a value that cannot be incremented. - if col == commonEvent.OriginTsColumn && row[col] == nil { - originTsNilColumns = append(originTsNilColumns, col) + if col == commonEvent.OriginTsColumn { continue } columns = append(columns, col) } - if len(columns) == 0 { - columns = originTsNilColumns - } if len(columns) == 0 { return "", false } diff --git a/downstreamadapter/sink/cloudstorage/writer_test.go b/downstreamadapter/sink/cloudstorage/writer_test.go index 2659104577..251ae7c263 100644 --- a/downstreamadapter/sink/cloudstorage/writer_test.go +++ b/downstreamadapter/sink/cloudstorage/writer_test.go @@ -186,3 +186,22 @@ func TestSelectColumnToMutateSkipNilOriginTsWhenPossible(t *testing.T) { require.Equal(t, "c2", col) } } + +func TestSelectColumnToMutatePreferNonNilOriginTs(t *testing.T) { + t.Parallel() + + row := map[string]any{ + "id": "1", + commonEvent.OriginTsColumn: "100", + "c2": "v1", + } + pkSet := map[string]struct{}{ + "id": {}, + } + + for i := 0; i < 20; i++ { + col, ok := selectColumnToMutate(row, pkSet) + require.True(t, ok) + require.Equal(t, commonEvent.OriginTsColumn, col) + } +} From b67f722c12c4dc6273729e2efaf678966334846b Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Wed, 25 Feb 2026 14:29:19 +0800 Subject: [PATCH 7/9] update Signed-off-by: Jianjun Liao --- .../sink/cloudstorage/encoding_group.go | 18 +++-- pkg/sink/failpointrecord/record.go | 65 +++++++++++++++++++ pkg/sink/mysql/mysql_writer_dml.go | 9 +++ 3 files changed, 88 insertions(+), 4 deletions(-) diff --git a/downstreamadapter/sink/cloudstorage/encoding_group.go b/downstreamadapter/sink/cloudstorage/encoding_group.go index 0869bd6a88..ff7e90fa16 100644 --- a/downstreamadapter/sink/cloudstorage/encoding_group.go +++ b/downstreamadapter/sink/cloudstorage/encoding_group.go @@ -180,6 +180,8 @@ func dmlEventToRowRecords(event *commonEvent.DMLEvent) []failpointrecord.RowReco return nil } indexes, columns := (&commonEvent.RowEvent{TableInfo: event.TableInfo}).PrimaryKeyColumn() + originTsCol, hasOriginTsCol := event.TableInfo.GetColumnInfoByName(commonEvent.OriginTsColumn) + originTsOffset, hasOriginTsOffset := event.TableInfo.GetColumnOffsetByName(commonEvent.OriginTsColumn) rowRecords := make([]failpointrecord.RowRecord, 0, event.Len()) for { row, ok := event.GetNextRow() @@ -187,19 +189,27 @@ func dmlEventToRowRecords(event *commonEvent.DMLEvent) []failpointrecord.RowReco event.Rewind() break } + rowData := row.Row + if row.RowType == commonType.RowTypeDelete { + rowData = row.PreRow + } + pks := make(map[string]any, len(columns)) for i, col := range columns { if col == nil { continue } - rowData := row.Row - if row.RowType == commonType.RowTypeDelete { - rowData = row.PreRow - } pks[col.Name.String()] = commonType.ExtractColVal(&rowData, col, indexes[i]) } + originTs := uint64(0) + if hasOriginTsCol && hasOriginTsOffset { + originTs = failpointrecord.NormalizeOriginTs( + commonType.ExtractColVal(&rowData, originTsCol, originTsOffset), + ) + } rowRecords = append(rowRecords, failpointrecord.RowRecord{ CommitTs: event.CommitTs, + OriginTs: originTs, PrimaryKeys: pks, }) } diff --git a/pkg/sink/failpointrecord/record.go b/pkg/sink/failpointrecord/record.go index 969edf65f6..e8c668047e 100644 --- a/pkg/sink/failpointrecord/record.go +++ b/pkg/sink/failpointrecord/record.go @@ -27,6 +27,7 @@ package failpointrecord import ( "encoding/json" "os" + "strconv" "sync" "time" @@ -40,9 +41,73 @@ const envKey = "TICDC_FAILPOINT_RECORD_FILE" // RowRecord captures the essential identity of a single affected row. type RowRecord struct { CommitTs uint64 `json:"commitTs"` + OriginTs uint64 `json:"originTs"` PrimaryKeys map[string]any `json:"primaryKeys"` } +// NormalizeOriginTs converts `_tidb_origin_ts` values from row payloads into uint64. +// It returns 0 for nil/invalid values. +func NormalizeOriginTs(v any) uint64 { + switch value := v.(type) { + case nil: + return 0 + case uint64: + return value + case uint: + return uint64(value) + case uint32: + return uint64(value) + case uint16: + return uint64(value) + case uint8: + return uint64(value) + case int64: + if value < 0 { + return 0 + } + return uint64(value) + case int: + if value < 0 { + return 0 + } + return uint64(value) + case int32: + if value < 0 { + return 0 + } + return uint64(value) + case int16: + if value < 0 { + return 0 + } + return uint64(value) + case int8: + if value < 0 { + return 0 + } + return uint64(value) + case float64: + if value < 0 { + return 0 + } + return uint64(value) + case json.Number: + i, err := value.Int64() + if err != nil || i < 0 { + return 0 + } + return uint64(i) + case string: + parsed, err := strconv.ParseUint(value, 10, 64) + if err != nil { + return 0 + } + return parsed + default: + return 0 + } +} + // Record is one line written to the JSONL file. type Record struct { Time string `json:"time"` diff --git a/pkg/sink/mysql/mysql_writer_dml.go b/pkg/sink/mysql/mysql_writer_dml.go index ab4864b069..716dca4dc2 100644 --- a/pkg/sink/mysql/mysql_writer_dml.go +++ b/pkg/sink/mysql/mysql_writer_dml.go @@ -195,6 +195,8 @@ func findPKColumns(tableInfo *common.TableInfo) []pkColInfo { // writing to the failpoint record file. func dmlEventsToRowRecords(tableInfo *common.TableInfo, events []*commonEvent.DMLEvent) []failpointrecord.RowRecord { pkCols := findPKColumns(tableInfo) + originTsCol, hasOriginTsCol := tableInfo.GetColumnInfoByName(commonEvent.OriginTsColumn) + originTsOffset, hasOriginTsOffset := tableInfo.GetColumnOffsetByName(commonEvent.OriginTsColumn) var records []failpointrecord.RowRecord for _, event := range events { event.Rewind() @@ -211,8 +213,15 @@ func dmlEventsToRowRecords(tableInfo *common.TableInfo, events []*commonEvent.DM for _, pk := range pkCols { pks[pk.name] = common.ExtractColVal(r, tableInfo.GetColumns()[pk.index], pk.index) } + originTs := uint64(0) + if hasOriginTsCol && hasOriginTsOffset { + originTs = failpointrecord.NormalizeOriginTs( + common.ExtractColVal(r, originTsCol, originTsOffset), + ) + } records = append(records, failpointrecord.RowRecord{ CommitTs: event.CommitTs, + OriginTs: originTs, PrimaryKeys: pks, }) } From de52baa33ebdb233ec6def95a0f0516489e24e4e Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Wed, 25 Feb 2026 19:31:38 +0800 Subject: [PATCH 8/9] update Signed-off-by: Jianjun Liao --- downstreamadapter/sink/cloudstorage/writer.go | 4 +++ .../sink/cloudstorage/writer_test.go | 36 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index e59bc1c060..efbcb8de98 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -347,6 +347,10 @@ func selectColumnToMutate(row map[string]any, pkSet map[string]struct{}) (string if col == commonEvent.OriginTsColumn { continue } + // Keep the failpoint mutation meaningful: skip columns that are already NULL. + if row[col] == nil { + continue + } columns = append(columns, col) } if len(columns) == 0 { diff --git a/downstreamadapter/sink/cloudstorage/writer_test.go b/downstreamadapter/sink/cloudstorage/writer_test.go index 251ae7c263..e4a9ff9168 100644 --- a/downstreamadapter/sink/cloudstorage/writer_test.go +++ b/downstreamadapter/sink/cloudstorage/writer_test.go @@ -205,3 +205,39 @@ func TestSelectColumnToMutatePreferNonNilOriginTs(t *testing.T) { require.Equal(t, commonEvent.OriginTsColumn, col) } } + +func TestSelectColumnToMutateSkipNilNonPKColumns(t *testing.T) { + t.Parallel() + + row := map[string]any{ + "id": "1", + "c1": nil, + "c2": "v2", + } + pkSet := map[string]struct{}{ + "id": {}, + } + + for i := 0; i < 20; i++ { + col, ok := selectColumnToMutate(row, pkSet) + require.True(t, ok) + require.Equal(t, "c2", col) + } +} + +func TestSelectColumnToMutateNoCandidateWhenAllNonPKColumnsNil(t *testing.T) { + t.Parallel() + + row := map[string]any{ + "id": "1", + commonEvent.OriginTsColumn: nil, + "c1": nil, + } + pkSet := map[string]struct{}{ + "id": {}, + } + + col, ok := selectColumnToMutate(row, pkSet) + require.False(t, ok) + require.Empty(t, col) +} From 99fd167d67cc42d53379486dd8e00354e0fd4327 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Thu, 26 Feb 2026 14:19:28 +0800 Subject: [PATCH 9/9] clean codes Signed-off-by: Jianjun Liao --- .../sink/cloudstorage/encoding_group.go | 104 +--- .../sink/cloudstorage/failpoint.go | 467 ++++++++++++++++++ .../sink/cloudstorage/failpoint_test.go | 132 +++++ downstreamadapter/sink/cloudstorage/writer.go | 209 -------- .../sink/cloudstorage/writer_test.go | 111 ----- pkg/sink/failpointrecord/record.go | 173 ------- pkg/sink/mysql/mysql_writer_dml.go | 74 --- 7 files changed, 600 insertions(+), 670 deletions(-) create mode 100644 downstreamadapter/sink/cloudstorage/failpoint.go create mode 100644 downstreamadapter/sink/cloudstorage/failpoint_test.go delete mode 100644 pkg/sink/failpointrecord/record.go diff --git a/downstreamadapter/sink/cloudstorage/encoding_group.go b/downstreamadapter/sink/cloudstorage/encoding_group.go index ff7e90fa16..8dd9881f01 100644 --- a/downstreamadapter/sink/cloudstorage/encoding_group.go +++ b/downstreamadapter/sink/cloudstorage/encoding_group.go @@ -17,16 +17,12 @@ import ( "context" "github.com/pingcap/failpoint" - "github.com/pingcap/log" commonType "github.com/pingcap/ticdc/pkg/common" - commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/sink/codec" "github.com/pingcap/ticdc/pkg/sink/codec/common" - "github.com/pingcap/ticdc/pkg/sink/failpointrecord" "github.com/pingcap/ticdc/utils/chann" "go.uber.org/atomic" - "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -101,7 +97,7 @@ func (eg *encodingGroup) runEncoder(ctx context.Context) error { // failpoint.Enable(".../cloudStorageSinkMessageFailpointSwitch", "return(true)") // enable failpoint.Inject("cloudStorageSinkMessageFailpointSwitch", func(val failpoint.Value) { if enabled, ok := val.(bool); ok && enabled { - eg.applyFailpointsOnEncodedMessages(frag) + applyFailpointsOnEncodedMessages(frag) } }) @@ -117,101 +113,3 @@ func (eg *encodingGroup) runEncoder(ctx context.Context) error { func (eg *encodingGroup) close() { eg.closed.Store(true) } - -func (eg *encodingGroup) applyFailpointsOnEncodedMessages(frag eventFragment) { - rowRecordsByMsg := splitRowRecordsByMessages(frag.encodedMsgs, dmlEventToRowRecords(frag.event)) - for idx, msg := range frag.encodedMsgs { - var rowRecords []failpointrecord.RowRecord - if idx < len(rowRecordsByMsg) { - rowRecords = rowRecordsByMsg[idx] - } - failpoint.Inject("cloudStorageSinkDropMessage", func() { - log.Warn("cloudStorageSinkDropMessage: dropping message to simulate data loss", - zap.String("keyspace", eg.changeFeedID.Keyspace()), - zap.Stringer("changefeed", eg.changeFeedID.ID()), - zap.Any("rows", rowRecords)) - failpointrecord.Write("cloudStorageSinkDropMessage", rowRecords) - // Keep callback flow unchanged while dropping data payload. - msg.Key = nil - msg.Value = nil - msg.SetRowsCount(0) - failpoint.Continue() - }) - failpoint.Inject("cloudStorageSinkMutateValue", func() { - log.Warn("cloudStorageSinkMutateValue: mutating message value to simulate data inconsistency", - zap.String("keyspace", eg.changeFeedID.Keyspace()), - zap.Stringer("changefeed", eg.changeFeedID.ID()), - zap.Any("rows", rowRecords)) - mutatedRows, originTsMutatedRows := mutateMessageValueForFailpoint(msg, rowRecords) - if len(mutatedRows) > 0 { - failpointrecord.Write("cloudStorageSinkMutateValue", mutatedRows) - } - if len(originTsMutatedRows) > 0 { - failpointrecord.Write("cloudStorageSinkMutateValueTidbOriginTs", originTsMutatedRows) - } - }) - } -} - -func splitRowRecordsByMessages(messages []*common.Message, rows []failpointrecord.RowRecord) [][]failpointrecord.RowRecord { - if len(messages) == 0 { - return nil - } - ret := make([][]failpointrecord.RowRecord, 0, len(messages)) - rowIdx := 0 - for _, msg := range messages { - rowsNeeded := msg.GetRowsCount() - if rowsNeeded <= 0 || rowIdx >= len(rows) { - ret = append(ret, nil) - continue - } - end := rowIdx + rowsNeeded - if end > len(rows) { - end = len(rows) - } - ret = append(ret, rows[rowIdx:end]) - rowIdx = end - } - return ret -} - -func dmlEventToRowRecords(event *commonEvent.DMLEvent) []failpointrecord.RowRecord { - if event == nil || event.TableInfo == nil { - return nil - } - indexes, columns := (&commonEvent.RowEvent{TableInfo: event.TableInfo}).PrimaryKeyColumn() - originTsCol, hasOriginTsCol := event.TableInfo.GetColumnInfoByName(commonEvent.OriginTsColumn) - originTsOffset, hasOriginTsOffset := event.TableInfo.GetColumnOffsetByName(commonEvent.OriginTsColumn) - rowRecords := make([]failpointrecord.RowRecord, 0, event.Len()) - for { - row, ok := event.GetNextRow() - if !ok { - event.Rewind() - break - } - rowData := row.Row - if row.RowType == commonType.RowTypeDelete { - rowData = row.PreRow - } - - pks := make(map[string]any, len(columns)) - for i, col := range columns { - if col == nil { - continue - } - pks[col.Name.String()] = commonType.ExtractColVal(&rowData, col, indexes[i]) - } - originTs := uint64(0) - if hasOriginTsCol && hasOriginTsOffset { - originTs = failpointrecord.NormalizeOriginTs( - commonType.ExtractColVal(&rowData, originTsCol, originTsOffset), - ) - } - rowRecords = append(rowRecords, failpointrecord.RowRecord{ - CommitTs: event.CommitTs, - OriginTs: originTs, - PrimaryKeys: pks, - }) - } - return rowRecords -} diff --git a/downstreamadapter/sink/cloudstorage/failpoint.go b/downstreamadapter/sink/cloudstorage/failpoint.go new file mode 100644 index 0000000000..689951a29f --- /dev/null +++ b/downstreamadapter/sink/cloudstorage/failpoint.go @@ -0,0 +1,467 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "bytes" + cryptorand "crypto/rand" + "encoding/json" + "math/big" + "os" + "strconv" + "sync" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + commonType "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/sink/codec/common" + "go.uber.org/zap" +) + +func applyFailpointsOnEncodedMessages(frag eventFragment) { + rowRecordsByMsg := splitRowRecordsByMessages(frag.encodedMsgs, dmlEventToRowRecords(frag.event)) + for idx, msg := range frag.encodedMsgs { + var rowRecords []RowRecord + if idx < len(rowRecordsByMsg) { + rowRecords = rowRecordsByMsg[idx] + } + failpoint.Inject("cloudStorageSinkDropMessage", func() { + log.Warn("cloudStorageSinkDropMessage: dropping message to simulate data loss", + zap.Any("rows", rowRecords)) + Write("cloudStorageSinkDropMessage", rowRecords) + // Keep callback flow unchanged while dropping data payload. + msg.Key = nil + msg.Value = nil + msg.SetRowsCount(0) + failpoint.Continue() + }) + failpoint.Inject("cloudStorageSinkMutateValue", func() { + log.Warn("cloudStorageSinkMutateValue: mutating message value to simulate data inconsistency", + zap.Any("rows", rowRecords)) + mutatedRows, originTsMutatedRows := mutateMessageValueForFailpoint(msg, rowRecords) + if len(mutatedRows) > 0 { + Write("cloudStorageSinkMutateValue", mutatedRows) + } + if len(originTsMutatedRows) > 0 { + Write("cloudStorageSinkMutateValueTidbOriginTs", originTsMutatedRows) + } + }) + } +} + +func splitRowRecordsByMessages(messages []*common.Message, rows []RowRecord) [][]RowRecord { + if len(messages) == 0 { + return nil + } + ret := make([][]RowRecord, 0, len(messages)) + rowIdx := 0 + for _, msg := range messages { + rowsNeeded := msg.GetRowsCount() + if rowsNeeded <= 0 || rowIdx >= len(rows) { + ret = append(ret, nil) + continue + } + end := rowIdx + rowsNeeded + if end > len(rows) { + end = len(rows) + } + ret = append(ret, rows[rowIdx:end]) + rowIdx = end + } + return ret +} + +func dmlEventToRowRecords(event *commonEvent.DMLEvent) []RowRecord { + if event == nil || event.TableInfo == nil { + return nil + } + indexes, columns := (&commonEvent.RowEvent{TableInfo: event.TableInfo}).PrimaryKeyColumn() + originTsCol, hasOriginTsCol := event.TableInfo.GetColumnInfoByName(commonEvent.OriginTsColumn) + originTsOffset, hasOriginTsOffset := event.TableInfo.GetColumnOffsetByName(commonEvent.OriginTsColumn) + rowRecords := make([]RowRecord, 0, event.Len()) + for { + row, ok := event.GetNextRow() + if !ok { + event.Rewind() + break + } + rowData := row.Row + if row.RowType == commonType.RowTypeDelete { + rowData = row.PreRow + } + + pks := make(map[string]any, len(columns)) + for i, col := range columns { + if col == nil { + continue + } + pks[col.Name.String()] = commonType.ExtractColVal(&rowData, col, indexes[i]) + } + originTs := uint64(0) + if hasOriginTsCol && hasOriginTsOffset { + originTs = NormalizeOriginTs( + commonType.ExtractColVal(&rowData, originTsCol, originTsOffset), + ) + } + rowRecords = append(rowRecords, RowRecord{ + CommitTs: event.CommitTs, + OriginTs: originTs, + PrimaryKeys: pks, + }) + } + return rowRecords +} + +// mutateMessageValueForFailpoint rewrites a non-primary-key column value in +// canal-json encoded messages so that the multi-cluster-consistency-checker +// sees the original row as "lost" and the mutated row as "redundant". +// +// canal-json messages in msg.Value are separated by CRLF ("\r\n"). For every +// message we: +// 1. Parse the JSON to extract "pkNames" and "data". +// 2. Pick the first non-PK column in data[0] and replace its value with nil. +// 3. Re-marshal the whole message. +// +// This function is only called from within a failpoint.Inject block. +// It returns mutated row records grouped by whether `_tidb_origin_ts` is mutated. +func mutateMessageValueForFailpoint( + msg *common.Message, + rowRecords []RowRecord, +) ([]RowRecord, []RowRecord) { + if len(msg.Value) == 0 { + return nil, nil + } + terminator := []byte("\r\n") + parts := bytes.Split(msg.Value, terminator) + mutatedOffsets := make([]int, 0) + originTsMutatedOffsets := make([]int, 0) + rowOffset := 0 + for i, part := range parts { + if len(part) == 0 { + continue + } + + // Decode the full message preserving all fields. + var m map[string]json.RawMessage + if err := json.Unmarshal(part, &m); err != nil { + continue + } + + // Extract pkNames so we can skip PK columns. + var pkNames []string + if raw, ok := m["pkNames"]; ok { + _ = json.Unmarshal(raw, &pkNames) + } + pkSet := make(map[string]struct{}, len(pkNames)) + for _, pk := range pkNames { + pkSet[pk] = struct{}{} + } + + // Extract the "data" array. + rawData, ok := m["data"] + if !ok { + continue + } + var data []map[string]any + if err := json.Unmarshal(rawData, &data); err != nil || len(data) == 0 { + continue + } + + // Find the first row that has a non-PK column and mutate it to nil. + mutated := false + mutatedRowOffset := 0 + mutatedColumn := "" + for rowIdx, row := range data { + col, ok := selectColumnToMutate(row, pkSet) + if !ok { + continue + } + if col == commonEvent.OriginTsColumn { + nextValue, ok := incrementOriginTSValue(row[col]) + if !ok { + continue + } + row[col] = nextValue + } else { + row[col] = nil + } + mutated = true + mutatedRowOffset = rowIdx + mutatedColumn = col + if mutated { + break + } + } + if !mutated { + rowOffset += len(data) + continue + } + + // Write the mutated data back. + newData, err := json.Marshal(data) + if err != nil { + rowOffset += len(data) + continue + } + m["data"] = json.RawMessage(newData) + + newPart, err := json.Marshal(m) + if err != nil { + rowOffset += len(data) + continue + } + parts[i] = newPart + + if mutatedColumn == commonEvent.OriginTsColumn { + originTsMutatedOffsets = append(originTsMutatedOffsets, rowOffset+mutatedRowOffset) + } else { + mutatedOffsets = append(mutatedOffsets, rowOffset+mutatedRowOffset) + } + rowOffset += len(data) + } + msg.Value = bytes.Join(parts, terminator) + return extractMutatedRowRecordsByOffset(rowRecords, mutatedOffsets), + extractMutatedRowRecordsByOffset(rowRecords, originTsMutatedOffsets) +} + +func selectColumnToMutate(row map[string]any, pkSet map[string]struct{}) (string, bool) { + // Prefer mutating _tidb_origin_ts when it exists and is non-NULL. + // Otherwise, mutate other non-PK columns. + if _, isPK := pkSet[commonEvent.OriginTsColumn]; !isPK { + if originTs, ok := row[commonEvent.OriginTsColumn]; ok && originTs != nil { + return commonEvent.OriginTsColumn, true + } + } + + columns := make([]string, 0, len(row)) + for col := range row { + if _, isPK := pkSet[col]; isPK { + continue + } + if col == commonEvent.OriginTsColumn { + continue + } + // Keep the failpoint mutation meaningful: skip columns that are already NULL. + if row[col] == nil { + continue + } + columns = append(columns, col) + } + if len(columns) == 0 { + return "", false + } + idx, err := cryptorand.Int(cryptorand.Reader, big.NewInt(int64(len(columns)))) + if err != nil { + // Best-effort fallback in failpoint path. + return columns[0], true + } + return columns[idx.Int64()], true +} + +func extractMutatedRowRecordsByOffset( + rowRecords []RowRecord, + offsets []int, +) []RowRecord { + if len(offsets) == 0 || len(rowRecords) == 0 { + return nil + } + ret := make([]RowRecord, 0, len(offsets)) + for _, offset := range offsets { + if offset < 0 || offset >= len(rowRecords) { + continue + } + ret = append(ret, rowRecords[offset]) + } + return ret +} + +func incrementOriginTSValue(v any) (any, bool) { + switch value := v.(type) { + case string: + originTS, err := strconv.ParseUint(value, 10, 64) + if err != nil { + return nil, false + } + return strconv.FormatUint(originTS+1, 10), true + case float64: + return value + 1, true + case json.Number: + originTS, err := value.Int64() + if err != nil { + return nil, false + } + return json.Number(strconv.FormatInt(originTS+1, 10)), true + case int: + return value + 1, true + case int8: + return value + 1, true + case int16: + return value + 1, true + case int32: + return value + 1, true + case int64: + return value + 1, true + case uint: + return value + 1, true + case uint8: + return value + 1, true + case uint16: + return value + 1, true + case uint32: + return value + 1, true + case uint64: + return value + 1, true + default: + return nil, false + } +} + +// envKey is the environment variable that controls the output file path. +const envKey = "TICDC_FAILPOINT_RECORD_FILE" + +// RowRecord captures the essential identity of a single affected row. +type RowRecord struct { + CommitTs uint64 `json:"commitTs"` + OriginTs uint64 `json:"originTs"` + PrimaryKeys map[string]any `json:"primaryKeys"` +} + +// NormalizeOriginTs converts `_tidb_origin_ts` values from row payloads into uint64. +// It returns 0 for nil/invalid values. +func NormalizeOriginTs(v any) uint64 { + switch value := v.(type) { + case nil: + return 0 + case uint64: + return value + case uint: + return uint64(value) + case uint32: + return uint64(value) + case uint16: + return uint64(value) + case uint8: + return uint64(value) + case int64: + if value < 0 { + return 0 + } + return uint64(value) + case int: + if value < 0 { + return 0 + } + return uint64(value) + case int32: + if value < 0 { + return 0 + } + return uint64(value) + case int16: + if value < 0 { + return 0 + } + return uint64(value) + case int8: + if value < 0 { + return 0 + } + return uint64(value) + case float64: + if value < 0 { + return 0 + } + return uint64(value) + case json.Number: + i, err := value.Int64() + if err != nil || i < 0 { + return 0 + } + return uint64(i) + case string: + parsed, err := strconv.ParseUint(value, 10, 64) + if err != nil { + return 0 + } + return parsed + default: + return 0 + } +} + +// Record is one line written to the JSONL file. +type Record struct { + Time string `json:"time"` + Failpoint string `json:"failpoint"` + Rows []RowRecord `json:"rows"` +} + +var ( + initOnce sync.Once + mu sync.Mutex + file *os.File + disabled bool +) + +func ensureFile() { + initOnce.Do(func() { + path := os.Getenv(envKey) + if path == "" { + disabled = true + return + } + var err error + file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + log.Warn("failed to open failpoint record file, recording disabled", + zap.String("path", path), zap.Error(err)) + disabled = true + return + } + log.Info("failpoint record file opened", zap.String("path", path)) + }) +} + +// Write persists one failpoint event to the JSONL file. +// It is safe for concurrent use. +// If the env var is not set the call is a no-op (zero allocation). +func Write(failpoint string, rows []RowRecord) { + if disabled { + return + } + ensureFile() + if file == nil { + return + } + + rec := Record{ + Time: time.Now().UTC().Format(time.RFC3339Nano), + Failpoint: failpoint, + Rows: rows, + } + data, err := json.Marshal(rec) + if err != nil { + log.Warn("failed to marshal failpoint record", zap.Error(err)) + return + } + data = append(data, '\n') + + mu.Lock() + defer mu.Unlock() + if _, err := file.Write(data); err != nil { + log.Warn("failed to write failpoint record", zap.Error(err)) + } +} diff --git a/downstreamadapter/sink/cloudstorage/failpoint_test.go b/downstreamadapter/sink/cloudstorage/failpoint_test.go new file mode 100644 index 0000000000..672e77afbd --- /dev/null +++ b/downstreamadapter/sink/cloudstorage/failpoint_test.go @@ -0,0 +1,132 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "bytes" + "testing" + + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/stretchr/testify/require" +) + +func TestMutateMessageValueForFailpointRecordClassification(t *testing.T) { + t.Parallel() + + msg := &common.Message{ + Value: []byte( + `{"pkNames":["id"],"data":[{"id":"1","c2":"v1"}]}` + + "\r\n" + + `{"pkNames":["id"],"data":[{"id":"2","_tidb_origin_ts":"100"}]}`, + ), + } + rowRecords := []RowRecord{ + { + CommitTs: 101, + PrimaryKeys: map[string]any{"id": "1"}, + }, + { + CommitTs: 102, + PrimaryKeys: map[string]any{"id": "2"}, + }, + } + + mutatedRows, originTsMutatedRows := mutateMessageValueForFailpoint(msg, rowRecords) + + require.Len(t, mutatedRows, 1) + require.Equal(t, uint64(101), mutatedRows[0].CommitTs) + require.Equal(t, "1", mutatedRows[0].PrimaryKeys["id"]) + + require.Len(t, originTsMutatedRows, 1) + require.Equal(t, uint64(102), originTsMutatedRows[0].CommitTs) + require.Equal(t, "2", originTsMutatedRows[0].PrimaryKeys["id"]) + + require.True(t, bytes.Contains(msg.Value, []byte(`"_tidb_origin_ts":"101"`))) + require.True(t, bytes.Contains(msg.Value, []byte(`"c2":null`))) +} + +func TestSelectColumnToMutateSkipNilOriginTsWhenPossible(t *testing.T) { + t.Parallel() + + row := map[string]any{ + "id": "1", + commonEvent.OriginTsColumn: nil, + "c2": "v1", + } + pkSet := map[string]struct{}{ + "id": {}, + } + + for i := 0; i < 20; i++ { + col, ok := selectColumnToMutate(row, pkSet) + require.True(t, ok) + require.Equal(t, "c2", col) + } +} + +func TestSelectColumnToMutatePreferNonNilOriginTs(t *testing.T) { + t.Parallel() + + row := map[string]any{ + "id": "1", + commonEvent.OriginTsColumn: "100", + "c2": "v1", + } + pkSet := map[string]struct{}{ + "id": {}, + } + + for i := 0; i < 20; i++ { + col, ok := selectColumnToMutate(row, pkSet) + require.True(t, ok) + require.Equal(t, commonEvent.OriginTsColumn, col) + } +} + +func TestSelectColumnToMutateSkipNilNonPKColumns(t *testing.T) { + t.Parallel() + + row := map[string]any{ + "id": "1", + "c1": nil, + "c2": "v2", + } + pkSet := map[string]struct{}{ + "id": {}, + } + + for i := 0; i < 20; i++ { + col, ok := selectColumnToMutate(row, pkSet) + require.True(t, ok) + require.Equal(t, "c2", col) + } +} + +func TestSelectColumnToMutateNoCandidateWhenAllNonPKColumnsNil(t *testing.T) { + t.Parallel() + + row := map[string]any{ + "id": "1", + commonEvent.OriginTsColumn: nil, + "c1": nil, + } + pkSet := map[string]struct{}{ + "id": {}, + } + + col, ok := selectColumnToMutate(row, pkSet) + require.False(t, ok) + require.Empty(t, col) +} diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index efbcb8de98..35eee16791 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -16,9 +16,6 @@ package cloudstorage import ( "bytes" "context" - cryptorand "crypto/rand" - "encoding/json" - "math/big" "path" "strconv" "sync/atomic" @@ -28,13 +25,11 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/downstreamadapter/sink/metrics" commonType "github.com/pingcap/ticdc/pkg/common" - commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/errors" pmetrics "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" - "github.com/pingcap/ticdc/pkg/sink/failpointrecord" "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/tidb/br/pkg/storage" "github.com/prometheus/client_golang/prometheus" @@ -218,210 +213,6 @@ func (d *writer) ignoreTableTask(task *singleTableTask) { } } -// mutateMessageValueForFailpoint rewrites a non-primary-key column value in -// canal-json encoded messages so that the multi-cluster-consistency-checker -// sees the original row as "lost" and the mutated row as "redundant". -// -// canal-json messages in msg.Value are separated by CRLF ("\r\n"). For every -// message we: -// 1. Parse the JSON to extract "pkNames" and "data". -// 2. Pick the first non-PK column in data[0] and replace its value with nil. -// 3. Re-marshal the whole message. -// -// This function is only called from within a failpoint.Inject block. -// It returns mutated row records grouped by whether `_tidb_origin_ts` is mutated. -func mutateMessageValueForFailpoint( - msg *common.Message, - rowRecords []failpointrecord.RowRecord, -) ([]failpointrecord.RowRecord, []failpointrecord.RowRecord) { - if len(msg.Value) == 0 { - return nil, nil - } - terminator := []byte("\r\n") - parts := bytes.Split(msg.Value, terminator) - mutatedOffsets := make([]int, 0) - originTsMutatedOffsets := make([]int, 0) - rowOffset := 0 - for i, part := range parts { - if len(part) == 0 { - continue - } - - // Decode the full message preserving all fields. - var m map[string]json.RawMessage - if err := json.Unmarshal(part, &m); err != nil { - continue - } - - // Extract pkNames so we can skip PK columns. - var pkNames []string - if raw, ok := m["pkNames"]; ok { - _ = json.Unmarshal(raw, &pkNames) - } - pkSet := make(map[string]struct{}, len(pkNames)) - for _, pk := range pkNames { - pkSet[pk] = struct{}{} - } - - // Extract the "data" array. - rawData, ok := m["data"] - if !ok { - continue - } - var data []map[string]any - if err := json.Unmarshal(rawData, &data); err != nil || len(data) == 0 { - continue - } - - // Find the first row that has a non-PK column and mutate it to nil. - mutated := false - mutatedRowOffset := 0 - mutatedColumn := "" - for rowIdx, row := range data { - col, ok := selectColumnToMutate(row, pkSet) - if !ok { - continue - } - if col == commonEvent.OriginTsColumn { - nextValue, ok := incrementOriginTSValue(row[col]) - if !ok { - continue - } - row[col] = nextValue - } else { - row[col] = nil - } - mutated = true - mutatedRowOffset = rowIdx - mutatedColumn = col - if mutated { - break - } - } - if !mutated { - rowOffset += len(data) - continue - } - - // Write the mutated data back. - newData, err := json.Marshal(data) - if err != nil { - rowOffset += len(data) - continue - } - m["data"] = json.RawMessage(newData) - - newPart, err := json.Marshal(m) - if err != nil { - rowOffset += len(data) - continue - } - parts[i] = newPart - - if mutatedColumn == commonEvent.OriginTsColumn { - originTsMutatedOffsets = append(originTsMutatedOffsets, rowOffset+mutatedRowOffset) - } else { - mutatedOffsets = append(mutatedOffsets, rowOffset+mutatedRowOffset) - } - rowOffset += len(data) - } - msg.Value = bytes.Join(parts, terminator) - return extractMutatedRowRecordsByOffset(rowRecords, mutatedOffsets), - extractMutatedRowRecordsByOffset(rowRecords, originTsMutatedOffsets) -} - -func selectColumnToMutate(row map[string]any, pkSet map[string]struct{}) (string, bool) { - // Prefer mutating _tidb_origin_ts when it exists and is non-NULL. - // Otherwise, mutate other non-PK columns. - if _, isPK := pkSet[commonEvent.OriginTsColumn]; !isPK { - if originTs, ok := row[commonEvent.OriginTsColumn]; ok && originTs != nil { - return commonEvent.OriginTsColumn, true - } - } - - columns := make([]string, 0, len(row)) - for col := range row { - if _, isPK := pkSet[col]; isPK { - continue - } - if col == commonEvent.OriginTsColumn { - continue - } - // Keep the failpoint mutation meaningful: skip columns that are already NULL. - if row[col] == nil { - continue - } - columns = append(columns, col) - } - if len(columns) == 0 { - return "", false - } - idx, err := cryptorand.Int(cryptorand.Reader, big.NewInt(int64(len(columns)))) - if err != nil { - // Best-effort fallback in failpoint path. - return columns[0], true - } - return columns[idx.Int64()], true -} - -func extractMutatedRowRecordsByOffset( - rowRecords []failpointrecord.RowRecord, - offsets []int, -) []failpointrecord.RowRecord { - if len(offsets) == 0 || len(rowRecords) == 0 { - return nil - } - ret := make([]failpointrecord.RowRecord, 0, len(offsets)) - for _, offset := range offsets { - if offset < 0 || offset >= len(rowRecords) { - continue - } - ret = append(ret, rowRecords[offset]) - } - return ret -} - -func incrementOriginTSValue(v any) (any, bool) { - switch value := v.(type) { - case string: - originTS, err := strconv.ParseUint(value, 10, 64) - if err != nil { - return nil, false - } - return strconv.FormatUint(originTS+1, 10), true - case float64: - return value + 1, true - case json.Number: - originTS, err := value.Int64() - if err != nil { - return nil, false - } - return json.Number(strconv.FormatInt(originTS+1, 10)), true - case int: - return value + 1, true - case int8: - return value + 1, true - case int16: - return value + 1, true - case int32: - return value + 1, true - case int64: - return value + 1, true - case uint: - return value + 1, true - case uint8: - return value + 1, true - case uint16: - return value + 1, true - case uint32: - return value + 1, true - case uint64: - return value + 1, true - default: - return nil, false - } -} - func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath string, task *singleTableTask) error { var callbacks []func() buf := bytes.NewBuffer(make([]byte, 0, task.size)) diff --git a/downstreamadapter/sink/cloudstorage/writer_test.go b/downstreamadapter/sink/cloudstorage/writer_test.go index e4a9ff9168..0be8a78640 100644 --- a/downstreamadapter/sink/cloudstorage/writer_test.go +++ b/downstreamadapter/sink/cloudstorage/writer_test.go @@ -14,7 +14,6 @@ package cloudstorage import ( - "bytes" "context" "fmt" "net/url" @@ -32,7 +31,6 @@ import ( "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" - "github.com/pingcap/ticdc/pkg/sink/failpointrecord" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/utils/chann" timodel "github.com/pingcap/tidb/pkg/meta/model" @@ -132,112 +130,3 @@ func TestWriterRun(t *testing.T) { d.close() wg.Wait() } - -func TestMutateMessageValueForFailpointRecordClassification(t *testing.T) { - t.Parallel() - - msg := &common.Message{ - Value: []byte( - `{"pkNames":["id"],"data":[{"id":"1","c2":"v1"}]}` + - "\r\n" + - `{"pkNames":["id"],"data":[{"id":"2","_tidb_origin_ts":"100"}]}`, - ), - } - rowRecords := []failpointrecord.RowRecord{ - { - CommitTs: 101, - PrimaryKeys: map[string]any{"id": "1"}, - }, - { - CommitTs: 102, - PrimaryKeys: map[string]any{"id": "2"}, - }, - } - - mutatedRows, originTsMutatedRows := mutateMessageValueForFailpoint(msg, rowRecords) - - require.Len(t, mutatedRows, 1) - require.Equal(t, uint64(101), mutatedRows[0].CommitTs) - require.Equal(t, "1", mutatedRows[0].PrimaryKeys["id"]) - - require.Len(t, originTsMutatedRows, 1) - require.Equal(t, uint64(102), originTsMutatedRows[0].CommitTs) - require.Equal(t, "2", originTsMutatedRows[0].PrimaryKeys["id"]) - - require.True(t, bytes.Contains(msg.Value, []byte(`"_tidb_origin_ts":"101"`))) - require.True(t, bytes.Contains(msg.Value, []byte(`"c2":null`))) -} - -func TestSelectColumnToMutateSkipNilOriginTsWhenPossible(t *testing.T) { - t.Parallel() - - row := map[string]any{ - "id": "1", - commonEvent.OriginTsColumn: nil, - "c2": "v1", - } - pkSet := map[string]struct{}{ - "id": {}, - } - - for i := 0; i < 20; i++ { - col, ok := selectColumnToMutate(row, pkSet) - require.True(t, ok) - require.Equal(t, "c2", col) - } -} - -func TestSelectColumnToMutatePreferNonNilOriginTs(t *testing.T) { - t.Parallel() - - row := map[string]any{ - "id": "1", - commonEvent.OriginTsColumn: "100", - "c2": "v1", - } - pkSet := map[string]struct{}{ - "id": {}, - } - - for i := 0; i < 20; i++ { - col, ok := selectColumnToMutate(row, pkSet) - require.True(t, ok) - require.Equal(t, commonEvent.OriginTsColumn, col) - } -} - -func TestSelectColumnToMutateSkipNilNonPKColumns(t *testing.T) { - t.Parallel() - - row := map[string]any{ - "id": "1", - "c1": nil, - "c2": "v2", - } - pkSet := map[string]struct{}{ - "id": {}, - } - - for i := 0; i < 20; i++ { - col, ok := selectColumnToMutate(row, pkSet) - require.True(t, ok) - require.Equal(t, "c2", col) - } -} - -func TestSelectColumnToMutateNoCandidateWhenAllNonPKColumnsNil(t *testing.T) { - t.Parallel() - - row := map[string]any{ - "id": "1", - commonEvent.OriginTsColumn: nil, - "c1": nil, - } - pkSet := map[string]struct{}{ - "id": {}, - } - - col, ok := selectColumnToMutate(row, pkSet) - require.False(t, ok) - require.Empty(t, col) -} diff --git a/pkg/sink/failpointrecord/record.go b/pkg/sink/failpointrecord/record.go deleted file mode 100644 index e8c668047e..0000000000 --- a/pkg/sink/failpointrecord/record.go +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2025 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package failpointrecord provides a lightweight utility that records failpoint -// triggered events (PK, commit_ts, etc.) to a JSONL file so that external tools -// (e.g. the multi-cluster-consistency-checker) can easily consume them. -// -// The file path is controlled by the environment variable -// TICDC_FAILPOINT_RECORD_FILE. When the variable is empty or unset the -// recorder is a silent no-op, introducing zero overhead in production. -// -// Each line written to the file is a self-contained JSON object: -// -// {"time":"…","failpoint":"cloudStorageSinkDropMessage","rows":[{"commitTs":123,"primaryKeys":{"id":1}}]} -package failpointrecord - -import ( - "encoding/json" - "os" - "strconv" - "sync" - "time" - - "github.com/pingcap/log" - "go.uber.org/zap" -) - -// envKey is the environment variable that controls the output file path. -const envKey = "TICDC_FAILPOINT_RECORD_FILE" - -// RowRecord captures the essential identity of a single affected row. -type RowRecord struct { - CommitTs uint64 `json:"commitTs"` - OriginTs uint64 `json:"originTs"` - PrimaryKeys map[string]any `json:"primaryKeys"` -} - -// NormalizeOriginTs converts `_tidb_origin_ts` values from row payloads into uint64. -// It returns 0 for nil/invalid values. -func NormalizeOriginTs(v any) uint64 { - switch value := v.(type) { - case nil: - return 0 - case uint64: - return value - case uint: - return uint64(value) - case uint32: - return uint64(value) - case uint16: - return uint64(value) - case uint8: - return uint64(value) - case int64: - if value < 0 { - return 0 - } - return uint64(value) - case int: - if value < 0 { - return 0 - } - return uint64(value) - case int32: - if value < 0 { - return 0 - } - return uint64(value) - case int16: - if value < 0 { - return 0 - } - return uint64(value) - case int8: - if value < 0 { - return 0 - } - return uint64(value) - case float64: - if value < 0 { - return 0 - } - return uint64(value) - case json.Number: - i, err := value.Int64() - if err != nil || i < 0 { - return 0 - } - return uint64(i) - case string: - parsed, err := strconv.ParseUint(value, 10, 64) - if err != nil { - return 0 - } - return parsed - default: - return 0 - } -} - -// Record is one line written to the JSONL file. -type Record struct { - Time string `json:"time"` - Failpoint string `json:"failpoint"` - Rows []RowRecord `json:"rows"` -} - -var ( - initOnce sync.Once - mu sync.Mutex - file *os.File - disabled bool -) - -func ensureFile() { - initOnce.Do(func() { - path := os.Getenv(envKey) - if path == "" { - disabled = true - return - } - var err error - file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) - if err != nil { - log.Warn("failed to open failpoint record file, recording disabled", - zap.String("path", path), zap.Error(err)) - disabled = true - return - } - log.Info("failpoint record file opened", zap.String("path", path)) - }) -} - -// Write persists one failpoint event to the JSONL file. -// It is safe for concurrent use. -// If the env var is not set the call is a no-op (zero allocation). -func Write(failpoint string, rows []RowRecord) { - if disabled { - return - } - ensureFile() - if file == nil { - return - } - - rec := Record{ - Time: time.Now().UTC().Format(time.RFC3339Nano), - Failpoint: failpoint, - Rows: rows, - } - data, err := json.Marshal(rec) - if err != nil { - log.Warn("failed to marshal failpoint record", zap.Error(err)) - return - } - data = append(data, '\n') - - mu.Lock() - defer mu.Unlock() - if _, err := file.Write(data); err != nil { - log.Warn("failed to write failpoint record", zap.Error(err)) - } -} diff --git a/pkg/sink/mysql/mysql_writer_dml.go b/pkg/sink/mysql/mysql_writer_dml.go index 716dca4dc2..a3100d6dc5 100644 --- a/pkg/sink/mysql/mysql_writer_dml.go +++ b/pkg/sink/mysql/mysql_writer_dml.go @@ -16,11 +16,8 @@ package mysql import ( "sort" - "github.com/pingcap/failpoint" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "github.com/pingcap/ticdc/pkg/sink/failpointrecord" - "github.com/pingcap/tidb/pkg/parser/mysql" ) func groupEventsByTable(events []*commonEvent.DMLEvent) map[int64][][]*commonEvent.DMLEvent { @@ -116,21 +113,6 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err } func (w *Writer) genActiveActiveSQL(tableInfo *common.TableInfo, eventsInGroup []*commonEvent.DMLEvent) ([]string, [][]interface{}) { - // Failpoint: bypass the LWW UPSERT and fall back to normal SQL (REPLACE INTO - // or plain INSERT). This makes the downstream TiDB write the row without the - // LWW condition, creating a genuine LWW violation that naturally flows through - // the pipeline to S3. - // Usage: failpoint.Enable(".../mysqlSinkBypassLWW", "return") - // failpoint.Enable(".../mysqlSinkBypassLWW", "50%return") - failpoint.Inject("mysqlSinkBypassLWW", func() { - rowRecords := dmlEventsToRowRecords(tableInfo, eventsInGroup) - failpointrecord.Write("mysqlSinkBypassLWW", rowRecords) - if !w.shouldGenBatchSQL(tableInfo, eventsInGroup) { - failpoint.Return(w.generateNormalSQLs(eventsInGroup)) - } - failpoint.Return(w.generateBatchSQL(eventsInGroup)) - }) - if !w.shouldGenBatchSQL(tableInfo, eventsInGroup) { return w.generateActiveActiveNormalSQLs(eventsInGroup) } @@ -174,62 +156,6 @@ func (w *Writer) shouldGenBatchSQL(tableInfo *common.TableInfo, events []*common return allRowInSameSafeMode(w.cfg.SafeMode, events) } -// pkColInfo holds the index and name of a primary key column. -type pkColInfo struct { - index int - name string -} - -// findPKColumns returns the PK column indexes and names from a TableInfo. -func findPKColumns(tableInfo *common.TableInfo) []pkColInfo { - var cols []pkColInfo - for i, col := range tableInfo.GetColumns() { - if col != nil && mysql.HasPriKeyFlag(col.GetFlag()) { - cols = append(cols, pkColInfo{index: i, name: col.Name.O}) - } - } - return cols -} - -// dmlEventsToRowRecords converts DMLEvents to failpointrecord.RowRecords for -// writing to the failpoint record file. -func dmlEventsToRowRecords(tableInfo *common.TableInfo, events []*commonEvent.DMLEvent) []failpointrecord.RowRecord { - pkCols := findPKColumns(tableInfo) - originTsCol, hasOriginTsCol := tableInfo.GetColumnInfoByName(commonEvent.OriginTsColumn) - originTsOffset, hasOriginTsOffset := tableInfo.GetColumnOffsetByName(commonEvent.OriginTsColumn) - var records []failpointrecord.RowRecord - for _, event := range events { - event.Rewind() - for { - rowChange, ok := event.GetNextRow() - if !ok { - break - } - r := &rowChange.Row - if rowChange.RowType == common.RowTypeDelete { - r = &rowChange.PreRow - } - pks := make(map[string]any, len(pkCols)) - for _, pk := range pkCols { - pks[pk.name] = common.ExtractColVal(r, tableInfo.GetColumns()[pk.index], pk.index) - } - originTs := uint64(0) - if hasOriginTsCol && hasOriginTsOffset { - originTs = failpointrecord.NormalizeOriginTs( - common.ExtractColVal(r, originTsCol, originTsOffset), - ) - } - records = append(records, failpointrecord.RowRecord{ - CommitTs: event.CommitTs, - OriginTs: originTs, - PrimaryKeys: pks, - }) - } - event.Rewind() - } - return records -} - // allRowInSameSafeMode determines whether all DMLEvents in a batch have the same safe mode status. // Safe mode is either globally enabled via the safemode parameter, or determined per event // by comparing CommitTs and ReplicatingTs.