Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions downstreamadapter/sink/cloudstorage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cloudstorage
import (
"bytes"
"context"
"encoding/json"
"path"
"strconv"
"sync/atomic"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
"github.com/pingcap/ticdc/pkg/sink/failpointrecord"
"github.com/pingcap/ticdc/utils/chann"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -213,13 +215,125 @@ func (d *writer) ignoreTableTask(task *singleTableTask) {
}
}

// mutateMessageValueForFailpoint rewrites a non-primary-key column value in
// canal-json encoded messages so that the multi-cluster-consistency-checker
// sees the original row as "lost" and the mutated row as "redundant".
//
// canal-json messages in msg.Value are separated by CRLF ("\r\n"). For every
// message we:
// 1. Parse the JSON to extract "pkNames" and "data".
// 2. Pick the first non-PK column in data[0] and replace its value with
// "MUTATED_BY_FAILPOINT".
// 3. Re-marshal the whole message.
//
// This function is only called from within a failpoint.Inject block.
func mutateMessageValueForFailpoint(msg *common.Message) {
if len(msg.Value) == 0 {
return
}
terminator := []byte("\r\n")
parts := bytes.Split(msg.Value, terminator)
for i, part := range parts {
if len(part) == 0 {
continue
}

// Decode the full message preserving all fields.
var m map[string]json.RawMessage
if err := json.Unmarshal(part, &m); err != nil {
continue
}

// Extract pkNames so we can skip PK columns.
var pkNames []string
if raw, ok := m["pkNames"]; ok {
_ = json.Unmarshal(raw, &pkNames)
}
pkSet := make(map[string]struct{}, len(pkNames))
for _, pk := range pkNames {
pkSet[pk] = struct{}{}
}

// Extract the "data" array.
rawData, ok := m["data"]
if !ok {
continue
}
var data []map[string]interface{}
if err := json.Unmarshal(rawData, &data); err != nil || len(data) == 0 {
continue
}

// Find the first non-PK column and mutate it.
mutated := false
for _, row := range data {
for col := range row {
if _, isPK := pkSet[col]; isPK {
continue
}
row[col] = "MUTATED_BY_FAILPOINT"
mutated = true
break
}
if mutated {
break
}
}
if !mutated {
continue
}

// Write the mutated data back.
newData, err := json.Marshal(data)
if err != nil {
continue
}
m["data"] = json.RawMessage(newData)

newPart, err := json.Marshal(m)
if err != nil {
continue
}
parts[i] = newPart
}
msg.Value = bytes.Join(parts, terminator)
}

func (d *writer) writeDataFile(ctx context.Context, dataFilePath, indexFilePath string, task *singleTableTask) error {
var callbacks []func()
buf := bytes.NewBuffer(make([]byte, 0, task.size))
rowsCnt := 0
bytesCnt := int64(0)
// There is always only one message here in task.msgs
for _, msg := range task.msgs {
// Failpoint: drop this message to simulate data loss.
// Usage: failpoint.Enable(".../cloudStorageSinkDropMessage", "return")
// failpoint.Enable(".../cloudStorageSinkDropMessage", "50%return")
failpoint.Inject("cloudStorageSinkDropMessage", func() {
log.Warn("cloudStorageSinkDropMessage: dropping message to simulate data loss",
zap.Int("workerID", d.id),
zap.String("keyspace", d.changeFeedID.Keyspace()),
zap.Stringer("changefeed", d.changeFeedID.ID()),
zap.String("dataFilePath", dataFilePath),
zap.Any("logInfo", msg.LogInfo))
failpointrecord.Write("cloudStorageSinkDropMessage", logInfoToRowRecords(msg.LogInfo))
callbacks = append(callbacks, msg.Callback)
failpoint.Continue()
})

// Failpoint: mutate non-PK column data in the message.
// Usage: failpoint.Enable(".../cloudStorageSinkMutateValue", "return")
failpoint.Inject("cloudStorageSinkMutateValue", func() {
log.Warn("cloudStorageSinkMutateValue: mutating message value to simulate data inconsistency",
zap.Int("workerID", d.id),
zap.String("keyspace", d.changeFeedID.Keyspace()),
zap.Stringer("changefeed", d.changeFeedID.ID()),
zap.String("dataFilePath", dataFilePath),
zap.Any("logInfo", msg.LogInfo))
failpointrecord.Write("cloudStorageSinkMutateValue", logInfoToRowRecords(msg.LogInfo))
mutateMessageValueForFailpoint(msg)
})

if msg.Key != nil && rowsCnt == 0 {
buf.Write(msg.Key)
bytesCnt += int64(len(msg.Key))
Expand Down Expand Up @@ -349,6 +463,26 @@ func (d *writer) close() {
}
}

// logInfoToRowRecords converts a MessageLogInfo to failpointrecord.RowRecords
// for writing to the failpoint record file.
func logInfoToRowRecords(info *common.MessageLogInfo) []failpointrecord.RowRecord {
if info == nil || len(info.Rows) == 0 {
return nil
}
records := make([]failpointrecord.RowRecord, 0, len(info.Rows))
for _, row := range info.Rows {
pks := make(map[string]any, len(row.PrimaryKeys))
for _, pk := range row.PrimaryKeys {
pks[pk.Name] = pk.Value
}
records = append(records, failpointrecord.RowRecord{
CommitTs: row.CommitTs,
PrimaryKeys: pks,
})
}
return records
}

// batchedTask contains a set of singleTableTask.
// We batch message of different tables together to reduce the overhead of calling external storage API.
type batchedTask struct {
Expand Down
108 changes: 108 additions & 0 deletions pkg/sink/failpointrecord/record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Package failpointrecord provides a lightweight utility that records failpoint
// triggered events (PK, commit_ts, etc.) to a JSONL file so that external tools
// (e.g. the multi-cluster-consistency-checker) can easily consume them.
//
// The file path is controlled by the environment variable
// TICDC_FAILPOINT_RECORD_FILE. When the variable is empty or unset the
// recorder is a silent no-op, introducing zero overhead in production.
//
// Each line written to the file is a self-contained JSON object:
//
// {"time":"…","failpoint":"cloudStorageSinkDropMessage","rows":[{"commitTs":123,"primaryKeys":{"id":1}}]}
package failpointrecord

import (
"encoding/json"
"os"
"sync"
"time"

"github.com/pingcap/log"
"go.uber.org/zap"
)

// envKey is the environment variable that controls the output file path.
const envKey = "TICDC_FAILPOINT_RECORD_FILE"

// RowRecord captures the essential identity of a single affected row.
type RowRecord struct {
CommitTs uint64 `json:"commitTs"`
PrimaryKeys map[string]any `json:"primaryKeys"`
}

// Record is one line written to the JSONL file.
type Record struct {
Time string `json:"time"`
Failpoint string `json:"failpoint"`
Rows []RowRecord `json:"rows"`
}

var (
initOnce sync.Once
mu sync.Mutex
file *os.File
disabled bool
)

func ensureFile() {
initOnce.Do(func() {
path := os.Getenv(envKey)
if path == "" {
disabled = true
return
}
var err error
file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Warn("failed to open failpoint record file, recording disabled",
zap.String("path", path), zap.Error(err))
disabled = true
return
}
log.Info("failpoint record file opened", zap.String("path", path))
})
}

// Write persists one failpoint event to the JSONL file.
// It is safe for concurrent use.
// If the env var is not set the call is a no-op (zero allocation).
func Write(failpoint string, rows []RowRecord) {
if disabled {
return
}
ensureFile()
if file == nil {
return
}

rec := Record{
Time: time.Now().UTC().Format(time.RFC3339Nano),
Failpoint: failpoint,
Rows: rows,
}
data, err := json.Marshal(rec)
if err != nil {
log.Warn("failed to marshal failpoint record", zap.Error(err))
return
}
data = append(data, '\n')

mu.Lock()
defer mu.Unlock()
if _, err := file.Write(data); err != nil {
log.Warn("failed to write failpoint record", zap.Error(err))
}
}
65 changes: 65 additions & 0 deletions pkg/sink/mysql/mysql_writer_dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package mysql
import (
"sort"

"github.com/pingcap/failpoint"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/sink/failpointrecord"
"github.com/pingcap/tidb/pkg/parser/mysql"
)

func groupEventsByTable(events []*commonEvent.DMLEvent) map[int64][][]*commonEvent.DMLEvent {
Expand Down Expand Up @@ -113,6 +116,21 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err
}

func (w *Writer) genActiveActiveSQL(tableInfo *common.TableInfo, eventsInGroup []*commonEvent.DMLEvent) ([]string, [][]interface{}) {
// Failpoint: bypass the LWW UPSERT and fall back to normal SQL (REPLACE INTO
// or plain INSERT). This makes the downstream TiDB write the row without the
// LWW condition, creating a genuine LWW violation that naturally flows through
// the pipeline to S3.
// Usage: failpoint.Enable(".../mysqlSinkBypassLWW", "return")
// failpoint.Enable(".../mysqlSinkBypassLWW", "50%return")
failpoint.Inject("mysqlSinkBypassLWW", func() {
rowRecords := dmlEventsToRowRecords(tableInfo, eventsInGroup)
failpointrecord.Write("mysqlSinkBypassLWW", rowRecords)
if !w.shouldGenBatchSQL(tableInfo.HasPKOrNotNullUK, tableInfo.HasVirtualColumns(), eventsInGroup) {
failpoint.Return(w.generateNormalSQLs(eventsInGroup))
}
failpoint.Return(w.generateBatchSQL(eventsInGroup))
})

if !w.shouldGenBatchSQL(tableInfo.HasPKOrNotNullUK, tableInfo.HasVirtualColumns(), eventsInGroup) {
return w.generateActiveActiveNormalSQLs(eventsInGroup)
}
Expand Down Expand Up @@ -141,6 +159,53 @@ func (w *Writer) shouldGenBatchSQL(hasPKOrNotNullUK bool, hasVirtualCols bool, e
return allRowInSameSafeMode(w.cfg.SafeMode, events)
}

// pkColInfo holds the index and name of a primary key column.
type pkColInfo struct {
index int
name string
}

// findPKColumns returns the PK column indexes and names from a TableInfo.
func findPKColumns(tableInfo *common.TableInfo) []pkColInfo {
var cols []pkColInfo
for i, col := range tableInfo.GetColumns() {
if col != nil && mysql.HasPriKeyFlag(col.GetFlag()) {
cols = append(cols, pkColInfo{index: i, name: col.Name.O})
}
}
return cols
}

// dmlEventsToRowRecords converts DMLEvents to failpointrecord.RowRecords for
// writing to the failpoint record file.
func dmlEventsToRowRecords(tableInfo *common.TableInfo, events []*commonEvent.DMLEvent) []failpointrecord.RowRecord {
pkCols := findPKColumns(tableInfo)
var records []failpointrecord.RowRecord
for _, event := range events {
event.Rewind()
for {
rowChange, ok := event.GetNextRow()
if !ok {
break
}
r := &rowChange.Row
if rowChange.RowType == common.RowTypeDelete {
r = &rowChange.PreRow
}
pks := make(map[string]any, len(pkCols))
for _, pk := range pkCols {
pks[pk.name] = common.ExtractColVal(r, tableInfo.GetColumns()[pk.index], pk.index)
}
records = append(records, failpointrecord.RowRecord{
CommitTs: event.CommitTs,
PrimaryKeys: pks,
})
}
event.Rewind()
}
return records
}

// allRowInSameSafeMode determines whether all DMLEvents in a batch have the same safe mode status.
// Safe mode is either globally enabled via the safemode parameter, or determined per event
// by comparing CommitTs and ReplicatingTs.
Expand Down