diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index 35eee16791..e408b62a79 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -16,6 +16,7 @@ package cloudstorage import ( "bytes" "context" + "encoding/json" "path" "strconv" "sync/atomic" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/ticdc/pkg/sink/failpointrecord" "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/tidb/br/pkg/storage" "github.com/prometheus/client_golang/prometheus" @@ -213,6 +215,90 @@ func (d *writer) ignoreTableTask(task *singleTableTask) { } } +// mutateMessageValueForFailpoint rewrites a non-primary-key column value in +// canal-json encoded messages so that the multi-cluster-consistency-checker +// sees the original row as "lost" and the mutated row as "redundant". +// +// canal-json messages in msg.Value are separated by CRLF ("\r\n"). For every +// message we: +// 1. Parse the JSON to extract "pkNames" and "data". +// 2. Pick the first non-PK column in data[0] and replace its value with +// "MUTATED_BY_FAILPOINT". +// 3. Re-marshal the whole message. +// +// This function is only called from within a failpoint.Inject block. +func mutateMessageValueForFailpoint(msg *common.Message) { + if len(msg.Value) == 0 { + return + } + terminator := []byte("\r\n") + parts := bytes.Split(msg.Value, terminator) + for i, part := range parts { + if len(part) == 0 { + continue + } + + // Decode the full message preserving all fields. + var m map[string]json.RawMessage + if err := json.Unmarshal(part, &m); err != nil { + continue + } + + // Extract pkNames so we can skip PK columns. + var pkNames []string + if raw, ok := m["pkNames"]; ok { + _ = json.Unmarshal(raw, &pkNames) + } + pkSet := make(map[string]struct{}, len(pkNames)) + for _, pk := range pkNames { + pkSet[pk] = struct{}{} + } + + // Extract the "data" array. + rawData, ok := m["data"] + if !ok { + continue + } + var data []map[string]interface{} + if err := json.Unmarshal(rawData, &data); err != nil || len(data) == 0 { + continue + } + + // Find the first non-PK column and mutate it. + mutated := false + for _, row := range data { + for col := range row { + if _, isPK := pkSet[col]; isPK { + continue + } + row[col] = "MUTATED_BY_FAILPOINT" + mutated = true + break + } + if mutated { + break + } + } + if !mutated { + continue + } + + // Write the mutated data back. + newData, err := json.Marshal(data) + if err != nil { + continue + } + m["data"] = json.RawMessage(newData) + + newPart, err := json.Marshal(m) + if err != nil { + continue + } + parts[i] = newPart + } + msg.Value = bytes.Join(parts, terminator) +} + func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath string, task *singleTableTask) error { var callbacks []func() buf := bytes.NewBuffer(make([]byte, 0, task.size)) @@ -220,6 +306,34 @@ func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath bytesCnt := int64(0) // There is always only one message here in task.msgs for _, msg := range task.msgs { + // Failpoint: drop this message to simulate data loss. + // Usage: failpoint.Enable(".../cloudStorageSinkDropMessage", "return") + // failpoint.Enable(".../cloudStorageSinkDropMessage", "50%return") + failpoint.Inject("cloudStorageSinkDropMessage", func() { + log.Warn("cloudStorageSinkDropMessage: dropping message to simulate data loss", + zap.Int("workerID", d.id), + zap.String("keyspace", d.changeFeedID.Keyspace()), + zap.Stringer("changefeed", d.changeFeedID.ID()), + zap.String("dataFilePath", dataFilePath), + zap.Any("logInfo", msg.LogInfo)) + failpointrecord.Write("cloudStorageSinkDropMessage", logInfoToRowRecords(msg.LogInfo)) + callbacks = append(callbacks, msg.Callback) + failpoint.Continue() + }) + + // Failpoint: mutate non-PK column data in the message. + // Usage: failpoint.Enable(".../cloudStorageSinkMutateValue", "return") + failpoint.Inject("cloudStorageSinkMutateValue", func() { + log.Warn("cloudStorageSinkMutateValue: mutating message value to simulate data inconsistency", + zap.Int("workerID", d.id), + zap.String("keyspace", d.changeFeedID.Keyspace()), + zap.Stringer("changefeed", d.changeFeedID.ID()), + zap.String("dataFilePath", dataFilePath), + zap.Any("logInfo", msg.LogInfo)) + failpointrecord.Write("cloudStorageSinkMutateValue", logInfoToRowRecords(msg.LogInfo)) + mutateMessageValueForFailpoint(msg) + }) + if msg.Key != nil && rowsCnt == 0 { buf.Write(msg.Key) bytesCnt += int64(len(msg.Key)) @@ -349,6 +463,26 @@ func (d *writer) close() { } } +// logInfoToRowRecords converts a MessageLogInfo to failpointrecord.RowRecords +// for writing to the failpoint record file. +func logInfoToRowRecords(info *common.MessageLogInfo) []failpointrecord.RowRecord { + if info == nil || len(info.Rows) == 0 { + return nil + } + records := make([]failpointrecord.RowRecord, 0, len(info.Rows)) + for _, row := range info.Rows { + pks := make(map[string]any, len(row.PrimaryKeys)) + for _, pk := range row.PrimaryKeys { + pks[pk.Name] = pk.Value + } + records = append(records, failpointrecord.RowRecord{ + CommitTs: row.CommitTs, + PrimaryKeys: pks, + }) + } + return records +} + // batchedTask contains a set of singleTableTask. // We batch message of different tables together to reduce the overhead of calling external storage API. type batchedTask struct { diff --git a/pkg/sink/failpointrecord/record.go b/pkg/sink/failpointrecord/record.go new file mode 100644 index 0000000000..969edf65f6 --- /dev/null +++ b/pkg/sink/failpointrecord/record.go @@ -0,0 +1,108 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package failpointrecord provides a lightweight utility that records failpoint +// triggered events (PK, commit_ts, etc.) to a JSONL file so that external tools +// (e.g. the multi-cluster-consistency-checker) can easily consume them. +// +// The file path is controlled by the environment variable +// TICDC_FAILPOINT_RECORD_FILE. When the variable is empty or unset the +// recorder is a silent no-op, introducing zero overhead in production. +// +// Each line written to the file is a self-contained JSON object: +// +// {"time":"…","failpoint":"cloudStorageSinkDropMessage","rows":[{"commitTs":123,"primaryKeys":{"id":1}}]} +package failpointrecord + +import ( + "encoding/json" + "os" + "sync" + "time" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// envKey is the environment variable that controls the output file path. +const envKey = "TICDC_FAILPOINT_RECORD_FILE" + +// RowRecord captures the essential identity of a single affected row. +type RowRecord struct { + CommitTs uint64 `json:"commitTs"` + PrimaryKeys map[string]any `json:"primaryKeys"` +} + +// Record is one line written to the JSONL file. +type Record struct { + Time string `json:"time"` + Failpoint string `json:"failpoint"` + Rows []RowRecord `json:"rows"` +} + +var ( + initOnce sync.Once + mu sync.Mutex + file *os.File + disabled bool +) + +func ensureFile() { + initOnce.Do(func() { + path := os.Getenv(envKey) + if path == "" { + disabled = true + return + } + var err error + file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + log.Warn("failed to open failpoint record file, recording disabled", + zap.String("path", path), zap.Error(err)) + disabled = true + return + } + log.Info("failpoint record file opened", zap.String("path", path)) + }) +} + +// Write persists one failpoint event to the JSONL file. +// It is safe for concurrent use. +// If the env var is not set the call is a no-op (zero allocation). +func Write(failpoint string, rows []RowRecord) { + if disabled { + return + } + ensureFile() + if file == nil { + return + } + + rec := Record{ + Time: time.Now().UTC().Format(time.RFC3339Nano), + Failpoint: failpoint, + Rows: rows, + } + data, err := json.Marshal(rec) + if err != nil { + log.Warn("failed to marshal failpoint record", zap.Error(err)) + return + } + data = append(data, '\n') + + mu.Lock() + defer mu.Unlock() + if _, err := file.Write(data); err != nil { + log.Warn("failed to write failpoint record", zap.Error(err)) + } +} diff --git a/pkg/sink/mysql/mysql_writer_dml.go b/pkg/sink/mysql/mysql_writer_dml.go index a352de7b88..c3237e5aaa 100644 --- a/pkg/sink/mysql/mysql_writer_dml.go +++ b/pkg/sink/mysql/mysql_writer_dml.go @@ -16,8 +16,11 @@ package mysql import ( "sort" + "github.com/pingcap/failpoint" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/sink/failpointrecord" + "github.com/pingcap/tidb/pkg/parser/mysql" ) func groupEventsByTable(events []*commonEvent.DMLEvent) map[int64][][]*commonEvent.DMLEvent { @@ -113,6 +116,21 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err } func (w *Writer) genActiveActiveSQL(tableInfo *common.TableInfo, eventsInGroup []*commonEvent.DMLEvent) ([]string, [][]interface{}) { + // Failpoint: bypass the LWW UPSERT and fall back to normal SQL (REPLACE INTO + // or plain INSERT). This makes the downstream TiDB write the row without the + // LWW condition, creating a genuine LWW violation that naturally flows through + // the pipeline to S3. + // Usage: failpoint.Enable(".../mysqlSinkBypassLWW", "return") + // failpoint.Enable(".../mysqlSinkBypassLWW", "50%return") + failpoint.Inject("mysqlSinkBypassLWW", func() { + rowRecords := dmlEventsToRowRecords(tableInfo, eventsInGroup) + failpointrecord.Write("mysqlSinkBypassLWW", rowRecords) + if !w.shouldGenBatchSQL(tableInfo.HasPKOrNotNullUK, tableInfo.HasVirtualColumns(), eventsInGroup) { + failpoint.Return(w.generateNormalSQLs(eventsInGroup)) + } + failpoint.Return(w.generateBatchSQL(eventsInGroup)) + }) + if !w.shouldGenBatchSQL(tableInfo.HasPKOrNotNullUK, tableInfo.HasVirtualColumns(), eventsInGroup) { return w.generateActiveActiveNormalSQLs(eventsInGroup) } @@ -141,6 +159,53 @@ func (w *Writer) shouldGenBatchSQL(hasPKOrNotNullUK bool, hasVirtualCols bool, e return allRowInSameSafeMode(w.cfg.SafeMode, events) } +// pkColInfo holds the index and name of a primary key column. +type pkColInfo struct { + index int + name string +} + +// findPKColumns returns the PK column indexes and names from a TableInfo. +func findPKColumns(tableInfo *common.TableInfo) []pkColInfo { + var cols []pkColInfo + for i, col := range tableInfo.GetColumns() { + if col != nil && mysql.HasPriKeyFlag(col.GetFlag()) { + cols = append(cols, pkColInfo{index: i, name: col.Name.O}) + } + } + return cols +} + +// dmlEventsToRowRecords converts DMLEvents to failpointrecord.RowRecords for +// writing to the failpoint record file. +func dmlEventsToRowRecords(tableInfo *common.TableInfo, events []*commonEvent.DMLEvent) []failpointrecord.RowRecord { + pkCols := findPKColumns(tableInfo) + var records []failpointrecord.RowRecord + for _, event := range events { + event.Rewind() + for { + rowChange, ok := event.GetNextRow() + if !ok { + break + } + r := &rowChange.Row + if rowChange.RowType == common.RowTypeDelete { + r = &rowChange.PreRow + } + pks := make(map[string]any, len(pkCols)) + for _, pk := range pkCols { + pks[pk.name] = common.ExtractColVal(r, tableInfo.GetColumns()[pk.index], pk.index) + } + records = append(records, failpointrecord.RowRecord{ + CommitTs: event.CommitTs, + PrimaryKeys: pks, + }) + } + event.Rewind() + } + return records +} + // allRowInSameSafeMode determines whether all DMLEvents in a batch have the same safe mode status. // Safe mode is either globally enabled via the safemode parameter, or determined per event // by comparing CommitTs and ReplicatingTs.