From b61ee34fa372a54408e61908d94c4752f2d096d5 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Sat, 14 Feb 2026 13:42:49 +0800 Subject: [PATCH 1/3] add failpoint Signed-off-by: Jianjun Liao --- downstreamadapter/sink/cloudstorage/writer.go | 49 +++++++++++++++++++ pkg/sink/mysql/mysql_writer_dml.go | 18 +++++++ 2 files changed, 67 insertions(+) diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index 35eee16791..1bf98708df 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -213,6 +213,31 @@ func (d *writer) ignoreTableTask(task *singleTableTask) { } } +// mutateMessageValueForFailpoint replaces the last CSV field of every row in +// msg.Value with a sentinel value so that the multi-cluster-consistency-checker +// sees the original row as "lost" and the mutated row as "redundant". +// This function is only called from within a failpoint.Inject block. +func mutateMessageValueForFailpoint(msg *common.Message) { + if len(msg.Value) == 0 { + return + } + copied := make([]byte, len(msg.Value)) + copy(copied, msg.Value) + lines := bytes.Split(copied, []byte("\n")) + for i, line := range lines { + if len(line) == 0 { + continue + } + // Replace the last comma-separated field with a sentinel value + // to corrupt a non-PK column. + lastComma := bytes.LastIndexByte(line, ',') + if lastComma >= 0 && lastComma < len(line)-1 { + lines[i] = append(line[:lastComma+1], []byte("\"MUTATED_BY_FAILPOINT\"")...) + } + } + msg.Value = bytes.Join(lines, []byte("\n")) +} + func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath string, task *singleTableTask) error { var callbacks []func() buf := bytes.NewBuffer(make([]byte, 0, task.size)) @@ -220,6 +245,30 @@ func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath bytesCnt := int64(0) // There is always only one message here in task.msgs for _, msg := range task.msgs { + // Failpoint: drop this message to simulate data loss. + // Usage: failpoint.Enable(".../cloudStorageSinkDropMessage", "return") + // failpoint.Enable(".../cloudStorageSinkDropMessage", "50%return") + failpoint.Inject("cloudStorageSinkDropMessage", func() { + log.Warn("cloudStorageSinkDropMessage: dropping message to simulate data loss", + zap.Int("workerID", d.id), + zap.String("keyspace", d.changeFeedID.Keyspace()), + zap.Stringer("changefeed", d.changeFeedID.ID()), + zap.String("dataFilePath", dataFilePath)) + callbacks = append(callbacks, msg.Callback) + failpoint.Continue() + }) + + // Failpoint: mutate non-PK column data in the message. + // Usage: failpoint.Enable(".../cloudStorageSinkMutateValue", "return") + failpoint.Inject("cloudStorageSinkMutateValue", func() { + log.Warn("cloudStorageSinkMutateValue: mutating message value to simulate data inconsistency", + zap.Int("workerID", d.id), + zap.String("keyspace", d.changeFeedID.Keyspace()), + zap.Stringer("changefeed", d.changeFeedID.ID()), + zap.String("dataFilePath", dataFilePath)) + mutateMessageValueForFailpoint(msg) + }) + if msg.Key != nil && rowsCnt == 0 { buf.Write(msg.Key) bytesCnt += int64(len(msg.Key)) diff --git a/pkg/sink/mysql/mysql_writer_dml.go b/pkg/sink/mysql/mysql_writer_dml.go index a352de7b88..9d6b159dbf 100644 --- a/pkg/sink/mysql/mysql_writer_dml.go +++ b/pkg/sink/mysql/mysql_writer_dml.go @@ -16,8 +16,11 @@ package mysql import ( "sort" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "go.uber.org/zap" ) func groupEventsByTable(events []*commonEvent.DMLEvent) map[int64][][]*commonEvent.DMLEvent { @@ -113,6 +116,21 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err } func (w *Writer) genActiveActiveSQL(tableInfo *common.TableInfo, eventsInGroup []*commonEvent.DMLEvent) ([]string, [][]interface{}) { + // Failpoint: bypass the LWW UPSERT and fall back to normal SQL (REPLACE INTO + // or plain INSERT). This makes the downstream TiDB write the row without the + // LWW condition, creating a genuine LWW violation that naturally flows through + // the pipeline to S3. + // Usage: failpoint.Enable(".../mysqlSinkBypassLWW", "return") + // failpoint.Enable(".../mysqlSinkBypassLWW", "50%return") + failpoint.Inject("mysqlSinkBypassLWW", func() { + log.Warn("mysqlSinkBypassLWW: bypassing LWW SQL, using normal SQL path to create LWW violation", + zap.Int("writerID", w.id)) + if !w.shouldGenBatchSQL(tableInfo.HasPKOrNotNullUK, tableInfo.HasVirtualColumns(), eventsInGroup) { + failpoint.Return(w.generateNormalSQLs(eventsInGroup)) + } + failpoint.Return(w.generateBatchSQL(eventsInGroup)) + }) + if !w.shouldGenBatchSQL(tableInfo.HasPKOrNotNullUK, tableInfo.HasVirtualColumns(), eventsInGroup) { return w.generateActiveActiveNormalSQLs(eventsInGroup) } From 11b7416f07e02976d8b38ce9c44fa586dd520d52 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Sat, 14 Feb 2026 13:54:56 +0800 Subject: [PATCH 2/3] add failpoint Signed-off-by: Jianjun Liao --- downstreamadapter/sink/cloudstorage/writer.go | 86 ++++++++++++++++--- 1 file changed, 73 insertions(+), 13 deletions(-) diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index 1bf98708df..7ead1a7c11 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -16,6 +16,7 @@ package cloudstorage import ( "bytes" "context" + "encoding/json" "path" "strconv" "sync/atomic" @@ -213,29 +214,88 @@ func (d *writer) ignoreTableTask(task *singleTableTask) { } } -// mutateMessageValueForFailpoint replaces the last CSV field of every row in -// msg.Value with a sentinel value so that the multi-cluster-consistency-checker +// mutateMessageValueForFailpoint rewrites a non-primary-key column value in +// canal-json encoded messages so that the multi-cluster-consistency-checker // sees the original row as "lost" and the mutated row as "redundant". +// +// canal-json messages in msg.Value are separated by CRLF ("\r\n"). For every +// message we: +// 1. Parse the JSON to extract "pkNames" and "data". +// 2. Pick the first non-PK column in data[0] and replace its value with +// "MUTATED_BY_FAILPOINT". +// 3. Re-marshal the whole message. +// // This function is only called from within a failpoint.Inject block. func mutateMessageValueForFailpoint(msg *common.Message) { if len(msg.Value) == 0 { return } - copied := make([]byte, len(msg.Value)) - copy(copied, msg.Value) - lines := bytes.Split(copied, []byte("\n")) - for i, line := range lines { - if len(line) == 0 { + terminator := []byte("\r\n") + parts := bytes.Split(msg.Value, terminator) + for i, part := range parts { + if len(part) == 0 { + continue + } + + // Decode the full message preserving all fields. + var m map[string]json.RawMessage + if err := json.Unmarshal(part, &m); err != nil { + continue + } + + // Extract pkNames so we can skip PK columns. + var pkNames []string + if raw, ok := m["pkNames"]; ok { + _ = json.Unmarshal(raw, &pkNames) + } + pkSet := make(map[string]struct{}, len(pkNames)) + for _, pk := range pkNames { + pkSet[pk] = struct{}{} + } + + // Extract the "data" array. + rawData, ok := m["data"] + if !ok { + continue + } + var data []map[string]interface{} + if err := json.Unmarshal(rawData, &data); err != nil || len(data) == 0 { continue } - // Replace the last comma-separated field with a sentinel value - // to corrupt a non-PK column. - lastComma := bytes.LastIndexByte(line, ',') - if lastComma >= 0 && lastComma < len(line)-1 { - lines[i] = append(line[:lastComma+1], []byte("\"MUTATED_BY_FAILPOINT\"")...) + + // Find the first non-PK column and mutate it. + mutated := false + for _, row := range data { + for col := range row { + if _, isPK := pkSet[col]; isPK { + continue + } + row[col] = "MUTATED_BY_FAILPOINT" + mutated = true + break + } + if mutated { + break + } + } + if !mutated { + continue + } + + // Write the mutated data back. + newData, err := json.Marshal(data) + if err != nil { + continue + } + m["data"] = json.RawMessage(newData) + + newPart, err := json.Marshal(m) + if err != nil { + continue } + parts[i] = newPart } - msg.Value = bytes.Join(lines, []byte("\n")) + msg.Value = bytes.Join(parts, terminator) } func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath string, task *singleTableTask) error { From b56a3fa1bb0bbf95d3024efc62966869d570a3f0 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Fri, 20 Feb 2026 13:52:40 +0800 Subject: [PATCH 3/3] output file Signed-off-by: Jianjun Liao --- downstreamadapter/sink/cloudstorage/writer.go | 29 ++++- pkg/sink/failpointrecord/record.go | 108 ++++++++++++++++++ pkg/sink/mysql/mysql_writer_dml.go | 55 ++++++++- 3 files changed, 186 insertions(+), 6 deletions(-) create mode 100644 pkg/sink/failpointrecord/record.go diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index 7ead1a7c11..e408b62a79 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/ticdc/pkg/sink/failpointrecord" "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/tidb/br/pkg/storage" "github.com/prometheus/client_golang/prometheus" @@ -313,7 +314,9 @@ func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath zap.Int("workerID", d.id), zap.String("keyspace", d.changeFeedID.Keyspace()), zap.Stringer("changefeed", d.changeFeedID.ID()), - zap.String("dataFilePath", dataFilePath)) + zap.String("dataFilePath", dataFilePath), + zap.Any("logInfo", msg.LogInfo)) + failpointrecord.Write("cloudStorageSinkDropMessage", logInfoToRowRecords(msg.LogInfo)) callbacks = append(callbacks, msg.Callback) failpoint.Continue() }) @@ -325,7 +328,9 @@ func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath zap.Int("workerID", d.id), zap.String("keyspace", d.changeFeedID.Keyspace()), zap.Stringer("changefeed", d.changeFeedID.ID()), - zap.String("dataFilePath", dataFilePath)) + zap.String("dataFilePath", dataFilePath), + zap.Any("logInfo", msg.LogInfo)) + failpointrecord.Write("cloudStorageSinkMutateValue", logInfoToRowRecords(msg.LogInfo)) mutateMessageValueForFailpoint(msg) }) @@ -458,6 +463,26 @@ func (d *writer) close() { } } +// logInfoToRowRecords converts a MessageLogInfo to failpointrecord.RowRecords +// for writing to the failpoint record file. +func logInfoToRowRecords(info *common.MessageLogInfo) []failpointrecord.RowRecord { + if info == nil || len(info.Rows) == 0 { + return nil + } + records := make([]failpointrecord.RowRecord, 0, len(info.Rows)) + for _, row := range info.Rows { + pks := make(map[string]any, len(row.PrimaryKeys)) + for _, pk := range row.PrimaryKeys { + pks[pk.Name] = pk.Value + } + records = append(records, failpointrecord.RowRecord{ + CommitTs: row.CommitTs, + PrimaryKeys: pks, + }) + } + return records +} + // batchedTask contains a set of singleTableTask. // We batch message of different tables together to reduce the overhead of calling external storage API. type batchedTask struct { diff --git a/pkg/sink/failpointrecord/record.go b/pkg/sink/failpointrecord/record.go new file mode 100644 index 0000000000..969edf65f6 --- /dev/null +++ b/pkg/sink/failpointrecord/record.go @@ -0,0 +1,108 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package failpointrecord provides a lightweight utility that records failpoint +// triggered events (PK, commit_ts, etc.) to a JSONL file so that external tools +// (e.g. the multi-cluster-consistency-checker) can easily consume them. +// +// The file path is controlled by the environment variable +// TICDC_FAILPOINT_RECORD_FILE. When the variable is empty or unset the +// recorder is a silent no-op, introducing zero overhead in production. +// +// Each line written to the file is a self-contained JSON object: +// +// {"time":"…","failpoint":"cloudStorageSinkDropMessage","rows":[{"commitTs":123,"primaryKeys":{"id":1}}]} +package failpointrecord + +import ( + "encoding/json" + "os" + "sync" + "time" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// envKey is the environment variable that controls the output file path. +const envKey = "TICDC_FAILPOINT_RECORD_FILE" + +// RowRecord captures the essential identity of a single affected row. +type RowRecord struct { + CommitTs uint64 `json:"commitTs"` + PrimaryKeys map[string]any `json:"primaryKeys"` +} + +// Record is one line written to the JSONL file. +type Record struct { + Time string `json:"time"` + Failpoint string `json:"failpoint"` + Rows []RowRecord `json:"rows"` +} + +var ( + initOnce sync.Once + mu sync.Mutex + file *os.File + disabled bool +) + +func ensureFile() { + initOnce.Do(func() { + path := os.Getenv(envKey) + if path == "" { + disabled = true + return + } + var err error + file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + log.Warn("failed to open failpoint record file, recording disabled", + zap.String("path", path), zap.Error(err)) + disabled = true + return + } + log.Info("failpoint record file opened", zap.String("path", path)) + }) +} + +// Write persists one failpoint event to the JSONL file. +// It is safe for concurrent use. +// If the env var is not set the call is a no-op (zero allocation). +func Write(failpoint string, rows []RowRecord) { + if disabled { + return + } + ensureFile() + if file == nil { + return + } + + rec := Record{ + Time: time.Now().UTC().Format(time.RFC3339Nano), + Failpoint: failpoint, + Rows: rows, + } + data, err := json.Marshal(rec) + if err != nil { + log.Warn("failed to marshal failpoint record", zap.Error(err)) + return + } + data = append(data, '\n') + + mu.Lock() + defer mu.Unlock() + if _, err := file.Write(data); err != nil { + log.Warn("failed to write failpoint record", zap.Error(err)) + } +} diff --git a/pkg/sink/mysql/mysql_writer_dml.go b/pkg/sink/mysql/mysql_writer_dml.go index 9d6b159dbf..c3237e5aaa 100644 --- a/pkg/sink/mysql/mysql_writer_dml.go +++ b/pkg/sink/mysql/mysql_writer_dml.go @@ -17,10 +17,10 @@ import ( "sort" "github.com/pingcap/failpoint" - "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "go.uber.org/zap" + "github.com/pingcap/ticdc/pkg/sink/failpointrecord" + "github.com/pingcap/tidb/pkg/parser/mysql" ) func groupEventsByTable(events []*commonEvent.DMLEvent) map[int64][][]*commonEvent.DMLEvent { @@ -123,8 +123,8 @@ func (w *Writer) genActiveActiveSQL(tableInfo *common.TableInfo, eventsInGroup [ // Usage: failpoint.Enable(".../mysqlSinkBypassLWW", "return") // failpoint.Enable(".../mysqlSinkBypassLWW", "50%return") failpoint.Inject("mysqlSinkBypassLWW", func() { - log.Warn("mysqlSinkBypassLWW: bypassing LWW SQL, using normal SQL path to create LWW violation", - zap.Int("writerID", w.id)) + rowRecords := dmlEventsToRowRecords(tableInfo, eventsInGroup) + failpointrecord.Write("mysqlSinkBypassLWW", rowRecords) if !w.shouldGenBatchSQL(tableInfo.HasPKOrNotNullUK, tableInfo.HasVirtualColumns(), eventsInGroup) { failpoint.Return(w.generateNormalSQLs(eventsInGroup)) } @@ -159,6 +159,53 @@ func (w *Writer) shouldGenBatchSQL(hasPKOrNotNullUK bool, hasVirtualCols bool, e return allRowInSameSafeMode(w.cfg.SafeMode, events) } +// pkColInfo holds the index and name of a primary key column. +type pkColInfo struct { + index int + name string +} + +// findPKColumns returns the PK column indexes and names from a TableInfo. +func findPKColumns(tableInfo *common.TableInfo) []pkColInfo { + var cols []pkColInfo + for i, col := range tableInfo.GetColumns() { + if col != nil && mysql.HasPriKeyFlag(col.GetFlag()) { + cols = append(cols, pkColInfo{index: i, name: col.Name.O}) + } + } + return cols +} + +// dmlEventsToRowRecords converts DMLEvents to failpointrecord.RowRecords for +// writing to the failpoint record file. +func dmlEventsToRowRecords(tableInfo *common.TableInfo, events []*commonEvent.DMLEvent) []failpointrecord.RowRecord { + pkCols := findPKColumns(tableInfo) + var records []failpointrecord.RowRecord + for _, event := range events { + event.Rewind() + for { + rowChange, ok := event.GetNextRow() + if !ok { + break + } + r := &rowChange.Row + if rowChange.RowType == common.RowTypeDelete { + r = &rowChange.PreRow + } + pks := make(map[string]any, len(pkCols)) + for _, pk := range pkCols { + pks[pk.name] = common.ExtractColVal(r, tableInfo.GetColumns()[pk.index], pk.index) + } + records = append(records, failpointrecord.RowRecord{ + CommitTs: event.CommitTs, + PrimaryKeys: pks, + }) + } + event.Rewind() + } + return records +} + // allRowInSameSafeMode determines whether all DMLEvents in a batch have the same safe mode status. // Safe mode is either globally enabled via the safemode parameter, or determined per event // by comparing CommitTs and ReplicatingTs.