diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index 1af6b01c81..e1db0b7eff 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -17,7 +17,6 @@ import ( "fmt" "math/rand" "sync" - "sync/atomic" "time" "github.com/pingcap/failpoint" @@ -31,6 +30,7 @@ import ( "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/tidb/pkg/parser/mysql" tidbTypes "github.com/pingcap/tidb/pkg/types" + "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -168,7 +168,7 @@ type BasicDispatcher struct { sink sink.Sink // the max resolvedTs received by the dispatcher - resolvedTs uint64 + resolvedTs atomic.Uint64 // blockEventStatus is used to store the current pending ddl/sync point event and its block status. blockEventStatus BlockEventStatus @@ -246,7 +246,6 @@ func NewBasicDispatcher( sharedInfo: sharedInfo, sink: sink, componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Initializing), - resolvedTs: startTs, isRemoving: atomic.Bool{}, duringHandleEvents: atomic.Bool{}, blockEventStatus: BlockEventStatus{blockPendingEvent: nil}, @@ -258,13 +257,15 @@ func NewBasicDispatcher( mode: mode, BootstrapState: BootstrapFinished, } + dispatcher.resolvedTs.Store(startTs) return dispatcher } -// AddDMLEventsToSink filters events for special tables and only returns true when -// at least one event remains to be written to the downstream sink. -func (d *BasicDispatcher) AddDMLEventsToSink(events []*commonEvent.DMLEvent) bool { +// AddDMLEventsToSink filters events for special tables, registers batch wake +// callbacks, and returns true when at least one event remains to be written to +// the downstream sink. +func (d *BasicDispatcher) AddDMLEventsToSink(events []*commonEvent.DMLEvent, wakeCallback func()) bool { // Normal DML dispatch: most tables just pass through this function unchanged. // Active-active or soft-delete tables are processed by FilterDMLEvent before // being handed over to the sink (delete rows dropped; soft-delete transitions may @@ -288,9 +289,18 @@ func (d *BasicDispatcher) AddDMLEventsToSink(events []*commonEvent.DMLEvent) boo return false } + var remaining atomic.Int64 + remaining.Store(int64(len(filteredEvents))) + for _, event := range filteredEvents { + event.AddPostEnqueueFunc(func() { + if remaining.Dec() == 0 { + wakeCallback() + } + }) + } + // for one batch events, we need to add all them in table progress first, then add them to sink - // because we need to ensure the wakeCallback only will be called when - // all these events are flushed to downstream successfully + // to avoid checkpoint jumping while events are being enqueued/flushed. for _, event := range filteredEvents { d.tableProgress.Add(event) } @@ -329,20 +339,38 @@ func (d *BasicDispatcher) AddBlockEventToSink(event commonEvent.BlockEvent) erro if event.GetType() == commonEvent.TypeDDLEvent { ddl := event.(*commonEvent.DDLEvent) // If NotSync is true, it means the DDL should not be sent to downstream. - // So we just call PassBlockEventToSink to update the table progress and call the postFlush func. + // So we just call PassBlockEventToSink to finish local bookkeeping: + // mark it passed in tableProgress and trigger flush callbacks to unblock + // dispatcher progress, without sending this DDL to sink. if ddl.NotSync { log.Info("ignore DDL by NotSync", zap.Stringer("dispatcher", d.id), zap.Any("ddl", ddl)) - d.PassBlockEventToSink(event) - return nil + return d.PassBlockEventToSink(event) } } + // Keep block-event write order with prior DML. For storage sink this may wait + // until prior DML are enqueued/flushed to the sink pipeline; for non-storage + // sinks it is usually a no-op. + if err := d.sink.FlushDMLBeforeBlock(event); err != nil { + return err + } + d.tableProgress.Add(event) return d.sink.WriteBlockEvent(event) } -func (d *BasicDispatcher) PassBlockEventToSink(event commonEvent.BlockEvent) { +// PassBlockEventToSink is used when block event handling result is "pass" +// (for example maintainer action=Pass or DDL NotSync). +// +// It intentionally does not call sink.WriteBlockEvent. Instead, it updates +// local progress as if the event had been handled, then fires PostFlush +// callbacks so wake/checkpoint logic can continue with consistent ordering. +func (d *BasicDispatcher) PassBlockEventToSink(event commonEvent.BlockEvent) error { + if err := d.sink.FlushDMLBeforeBlock(event); err != nil { + return err + } d.tableProgress.Pass(event) event.PostFlush() + return nil } // ensureActiveActiveTableInfo validates the table schema requirements for active-active mode. @@ -473,7 +501,7 @@ func (d *BasicDispatcher) GetHeartBeatInfo(h *HeartBeatInfo) { } func (d *BasicDispatcher) GetResolvedTs() uint64 { - return atomic.LoadUint64(&d.resolvedTs) + return d.resolvedTs.Load() } func (d *BasicDispatcher) GetLastSyncedTs() uint64 { @@ -537,15 +565,16 @@ func (d *BasicDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeC return false } -// handleEvents can batch handle events about resolvedTs Event and DML Event. -// While for DDLEvent and SyncPointEvent, they should be handled separately, -// because they are block events. -// We ensure we only will receive one event when it's ddl event or sync point event -// by setting them with different event types in DispatcherEventsHandler.GetType -// When we handle events, we don't have any previous events still in sink. +// handleEvents processes one batch for one dispatcher. +// the next batch of events can only be handled after the current batch is enqueued or flushed. +// A batch may mix DML and resolved-ts events; Block events are expected to be handled one by one. +// - Block events, such as DDL / Syncpoint, is sent to the sink synchronously, +// the dispatcher is blocked until the block event is flushed. +// - DML events is sent to the sink asynchronously. +// - Storage sink, the dispatcher wake up after all DML events is guaranteed enqueued. +// - Non-storage sink, the dispatche wake up after all DML events is guaranteed flushed. // -// wakeCallback is used to wake the dynamic stream to handle the next batch events. -// It will be called when all the events are flushed to downstream successfully. +// Return true if should block the dispatcher. func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool { if d.GetRemovingStatus() { log.Warn("dispatcher is removing", zap.Any("id", d.id)) @@ -557,7 +586,6 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC // Only return false when all events are resolvedTs Event. block := false - dmlWakeOnce := &sync.Once{} dmlEvents := make([]*commonEvent.DMLEvent, 0, len(dispatcherEvents)) latestResolvedTs := uint64(0) // Dispatcher is ready, handle the events @@ -622,14 +650,6 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC block = true dml.ReplicatingTs = d.creationPDTs - dml.AddPostFlushFunc(func() { - // Considering dml event in sink may be written to downstream not in order, - // thus, we use tableProgress.Empty() to ensure these events are flushed to downstream completely - // and wake dynamic stream to handle the next events. - if d.tableProgress.Empty() { - dmlWakeOnce.Do(wakeCallback) - } - }) dmlEvents = append(dmlEvents, dml) case commonEvent.TypeDDLEvent: if len(dispatcherEvents) != 1 { @@ -718,14 +738,14 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC // the checkpointTs may be incorrect set as the new resolvedTs, // due to the tableProgress is empty before dml events add into sink. if len(dmlEvents) > 0 { - hasDMLToFlush := d.AddDMLEventsToSink(dmlEvents) + hasDMLToFlush := d.AddDMLEventsToSink(dmlEvents, wakeCallback) if !hasDMLToFlush { // All DML events were filtered out, so they no longer block dispatcher progress. block = false } } if latestResolvedTs > 0 { - atomic.StoreUint64(&d.resolvedTs, latestResolvedTs) + d.resolvedTs.Store(latestResolvedTs) } return block } @@ -737,9 +757,10 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC // 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream. // 2. If the action is a pass, we just need to pass the event // -// For Action_Write, writing the block event may involve IO (e.g. executing DDL). To avoid blocking the -// dispatcher status dynamic stream handler, we execute the write asynchronously and return await=true. -// The status path will be waked up after the write finishes. +// For block actions (write/pass), execution may involve downstream IO because we flush prior DML first. +// To avoid blocking the dispatcher status dynamic stream handler, we execute the action asynchronously +// and return await=true. +// The status path will be waked up after the action finishes. func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.DispatcherStatus) (await bool) { log.Debug("dispatcher handle dispatcher status", zap.Any("dispatcherStatus", dispatcherStatus), @@ -786,17 +807,18 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D // 3. clear blockEventStatus(should be the old pending event, but clear the new one) d.blockEventStatus.clear() }) + actionCommitTs := action.CommitTs + actionIsSyncPoint := action.IsSyncPoint if action.Action == heartbeatpb.Action_Write { - actionCommitTs := action.CommitTs - actionIsSyncPoint := action.IsSyncPoint d.sharedInfo.GetBlockEventExecutor().Submit(d, func() { d.ExecuteBlockEventDDL(pendingEvent, actionCommitTs, actionIsSyncPoint) }) return true } else { - failpoint.Inject("BlockOrWaitBeforePass", nil) - d.PassBlockEventToSink(pendingEvent) - failpoint.Inject("BlockAfterPass", nil) + d.sharedInfo.GetBlockEventExecutor().Submit(d, func() { + d.PassBlockEvent(pendingEvent, actionCommitTs, actionIsSyncPoint) + }) + return true } } else { ts, ok := d.blockEventStatus.getEventCommitTs() @@ -837,7 +859,25 @@ func (d *BasicDispatcher) ExecuteBlockEventDDL(pendingEvent commonEvent.BlockEve return } failpoint.Inject("BlockOrWaitReportAfterWrite", nil) + d.reportBlockedEventDone(actionCommitTs, actionIsSyncPoint) +} + +// PassBlockEvent executes maintainer Action_Pass: +// It relies on PassBlockEventToSink to preserve ordering and mark the event passed. +func (d *BasicDispatcher) PassBlockEvent(pendingEvent commonEvent.BlockEvent, actionCommitTs uint64, actionIsSyncPoint bool) { + failpoint.Inject("BlockOrWaitBeforePass", nil) + err := d.PassBlockEventToSink(pendingEvent) + if err != nil { + d.HandleError(err) + return + } + failpoint.Inject("BlockAfterPass", nil) + d.reportBlockedEventDone(actionCommitTs, actionIsSyncPoint) +} +// reportBlockedEventDone sends DONE status and wakes dispatcher-status stream path +// so the next status for this dispatcher can be handled. +func (d *BasicDispatcher) reportBlockedEventDone(actionCommitTs uint64, actionIsSyncPoint bool) { d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{ ID: d.id.ToPB(), State: &heartbeatpb.State{ @@ -966,7 +1006,6 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) { d.holdBlockEvent(event) return } - d.reportBlockedEventToMaintainer(event) } diff --git a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go index 7f53f70a3b..8f95a93b5b 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go @@ -15,7 +15,6 @@ package dispatcher import ( "testing" - "github.com/pingcap/ticdc/downstreamadapter/sink" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" @@ -147,7 +146,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive blockStatuses, errCh, ) - dispatcherSink := sink.NewMockSink(sinkType) + dispatcherSink := newDispatcherTestSink(t, sinkType) tableSpan := &heartbeatpb.TableSpan{TableID: 1, StartKey: []byte{0}, EndKey: []byte{1}} dispatcher := NewBasicDispatcher( common.NewDispatcherID(), @@ -159,7 +158,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive false, 200, common.DefaultMode, - dispatcherSink, + dispatcherSink.Sink(), sharedInfo, ) return dispatcher diff --git a/downstreamadapter/dispatcher/basic_dispatcher_info.go b/downstreamadapter/dispatcher/basic_dispatcher_info.go index 426da3968d..19a0980461 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_info.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_info.go @@ -234,7 +234,7 @@ func (d *BasicDispatcher) IsTableTriggerDispatcher() bool { // SetStartTs only be called after the dispatcher is created func (d *BasicDispatcher) SetStartTs(startTs uint64) { atomic.StoreUint64(&d.startTs, startTs) - atomic.StoreUint64(&d.resolvedTs, startTs) + d.resolvedTs.Store(startTs) } func (d *BasicDispatcher) SetCurrentPDTs(currentPDTs uint64) { diff --git a/downstreamadapter/dispatcher/event_dispatcher_test.go b/downstreamadapter/dispatcher/event_dispatcher_test.go index 897182c951..3c574917d1 100644 --- a/downstreamadapter/dispatcher/event_dispatcher_test.go +++ b/downstreamadapter/dispatcher/event_dispatcher_test.go @@ -14,6 +14,7 @@ package dispatcher import ( + "fmt" "math" "sync/atomic" "testing" @@ -124,10 +125,10 @@ func TestDispatcherHandleEvents(t *testing.T) { tableInfo := dmlEvent.TableInfo - sink := sink.NewMockSink(common.MysqlSinkType) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) require.NoError(t, err) - dispatcher := newDispatcherForTest(sink, tableSpan) + dispatcher := newDispatcherForTest(testSink.Sink(), tableSpan) require.Equal(t, uint64(0), dispatcher.GetCheckpointTs()) require.Equal(t, uint64(0), dispatcher.GetResolvedTs()) tableProgress := dispatcher.tableProgress @@ -140,7 +141,7 @@ func TestDispatcherHandleEvents(t *testing.T) { nodeID := node.NewID() block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, dmlEvent)}, callback) require.Equal(t, true, block) - require.Equal(t, 1, len(sink.GetDMLs())) + require.Equal(t, 1, len(testSink.GetDMLs())) checkpointTs, isEmpty = tableProgress.GetCheckpointTs() require.Equal(t, false, isEmpty) @@ -148,8 +149,8 @@ func TestDispatcherHandleEvents(t *testing.T) { require.Equal(t, int32(0), count.Load()) // flush - sink.FlushDMLs() - require.Equal(t, 0, len(sink.GetDMLs())) + testSink.FlushDMLs() + require.Equal(t, 0, len(testSink.GetDMLs())) checkpointTs, isEmpty = tableProgress.GetCheckpointTs() require.Equal(t, true, isEmpty) require.Equal(t, uint64(1), checkpointTs) @@ -168,7 +169,7 @@ func TestDispatcherHandleEvents(t *testing.T) { block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent)}, callback) require.Equal(t, true, block) - require.Equal(t, 0, len(sink.GetDMLs())) + require.Equal(t, 0, len(testSink.GetDMLs())) time.Sleep(5 * time.Second) // no pending event blockPendingEvent, blockStage := dispatcher.blockEventStatus.getEventAndStage() @@ -195,7 +196,7 @@ func TestDispatcherHandleEvents(t *testing.T) { } block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent21)}, callback) require.Equal(t, true, block) - require.Equal(t, 0, len(sink.GetDMLs())) + require.Equal(t, 0, len(testSink.GetDMLs())) time.Sleep(5 * time.Second) // no pending event blockPendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() @@ -236,7 +237,7 @@ func TestDispatcherHandleEvents(t *testing.T) { } block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent2)}, callback) require.Equal(t, true, block) - require.Equal(t, 0, len(sink.GetDMLs())) + require.Equal(t, 0, len(testSink.GetDMLs())) time.Sleep(5 * time.Second) // no pending event blockPendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() @@ -286,7 +287,7 @@ func TestDispatcherHandleEvents(t *testing.T) { } block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent3)}, callback) require.Equal(t, true, block) - require.Equal(t, 0, len(sink.GetDMLs())) + require.Equal(t, 0, len(testSink.GetDMLs())) time.Sleep(5 * time.Second) // pending event blockPendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() @@ -350,7 +351,7 @@ func TestDispatcherHandleEvents(t *testing.T) { block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, syncPointEvent)}, callback) require.Equal(t, true, block) time.Sleep(5 * time.Second) - require.Equal(t, 0, len(sink.GetDMLs())) + require.Equal(t, 0, len(testSink.GetDMLs())) // pending event blockPendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() require.NotNil(t, blockPendingEvent) @@ -384,10 +385,13 @@ func TestDispatcherHandleEvents(t *testing.T) { }, } dispatcher.HandleDispatcherStatus(dispatcherStatus) - checkpointTs, isEmpty = tableProgress.GetCheckpointTs() - require.Equal(t, true, isEmpty) - require.Equal(t, uint64(5), checkpointTs) - require.Equal(t, int32(6), count.Load()) + require.Eventually(t, func() bool { + checkpointTs, isEmpty = tableProgress.GetCheckpointTs() + if !isEmpty || checkpointTs != uint64(5) { + return false + } + return count.Load() == int32(6) + }, 5*time.Second, 10*time.Millisecond) // ===== resolved event ===== checkpointTs = dispatcher.GetCheckpointTs() @@ -397,7 +401,7 @@ func TestDispatcherHandleEvents(t *testing.T) { } block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, resolvedEvent)}, callback) require.Equal(t, false, block) - require.Equal(t, 0, len(sink.GetDMLs())) + require.Equal(t, 0, len(testSink.GetDMLs())) require.Equal(t, uint64(7), dispatcher.GetResolvedTs()) checkpointTs = dispatcher.GetCheckpointTs() require.Equal(t, uint64(7), checkpointTs) @@ -413,9 +417,9 @@ func TestUncompeleteTableSpanDispatcherHandleEvents(t *testing.T) { ddlJob := helper.DDL2Job("create table t(id int primary key, v int)") require.NotNil(t, ddlJob) - sink := sink.NewMockSink(common.MysqlSinkType) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan := getUncompleteTableSpan() - dispatcher := newDispatcherForTest(sink, tableSpan) + dispatcher := newDispatcherForTest(testSink.Sink(), tableSpan) dmlEvent := helper.DML2Event("test", "t", "insert into t values(1, 1)") require.NotNil(t, dmlEvent) @@ -483,8 +487,8 @@ func TestTableTriggerEventDispatcherInMysql(t *testing.T) { count.Swap(0) ddlTableSpan := common.KeyspaceDDLSpan(common.DefaultKeyspaceID) - sink := sink.NewMockSink(common.MysqlSinkType) - tableTriggerEventDispatcher := newDispatcherForTest(sink, ddlTableSpan) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) + tableTriggerEventDispatcher := newDispatcherForTest(testSink.Sink(), ddlTableSpan) require.Nil(t, tableTriggerEventDispatcher.tableSchemaStore) ok, err := tableTriggerEventDispatcher.InitializeTableSchemaStore([]*heartbeatpb.SchemaInfo{}) @@ -567,8 +571,8 @@ func TestTableTriggerEventDispatcherInKafka(t *testing.T) { count.Swap(0) ddlTableSpan := common.KeyspaceDDLSpan(common.DefaultKeyspaceID) - sink := sink.NewMockSink(common.KafkaSinkType) - tableTriggerEventDispatcher := newDispatcherForTest(sink, ddlTableSpan) + testSink := newDispatcherTestSink(t, common.KafkaSinkType) + tableTriggerEventDispatcher := newDispatcherForTest(testSink.Sink(), ddlTableSpan) require.Nil(t, tableTriggerEventDispatcher.tableSchemaStore) ok, err := tableTriggerEventDispatcher.InitializeTableSchemaStore([]*heartbeatpb.SchemaInfo{}) @@ -662,10 +666,10 @@ func TestDispatcherClose(t *testing.T) { dmlEvent.Length = 1 { - sink := sink.NewMockSink(common.MysqlSinkType) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) require.NoError(t, err) - dispatcher := newDispatcherForTest(sink, tableSpan) + dispatcher := newDispatcherForTest(testSink.Sink(), tableSpan) // ===== dml event ===== nodeID := node.NewID() @@ -675,7 +679,7 @@ func TestDispatcherClose(t *testing.T) { require.Equal(t, false, ok) // flush - sink.FlushDMLs() + testSink.FlushDMLs() watermark, ok := dispatcher.TryClose() require.Equal(t, true, ok) @@ -685,10 +689,10 @@ func TestDispatcherClose(t *testing.T) { // test sink is not normal { - sink := sink.NewMockSink(common.MysqlSinkType) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) require.NoError(t, err) - dispatcher := newDispatcherForTest(sink, tableSpan) + dispatcher := newDispatcherForTest(testSink.Sink(), tableSpan) // ===== dml event ===== nodeID := node.NewID() @@ -697,7 +701,7 @@ func TestDispatcherClose(t *testing.T) { _, ok := dispatcher.TryClose() require.Equal(t, false, ok) - sink.SetIsNormal(false) + testSink.SetIsNormal(false) watermark, ok := dispatcher.TryClose() require.Equal(t, true, ok) @@ -732,48 +736,111 @@ func TestBatchDMLEventsPartialFlush(t *testing.T) { dmlEvent3.CommitTs = 12 dmlEvent3.Length = 1 - mockSink := sink.NewMockSink(common.MysqlSinkType) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) require.NoError(t, err) - dispatcher := newDispatcherForTest(mockSink, tableSpan) - - // Create a callback that records when it's called - var callbackCalled bool - wakeCallback := func() { - callbackCalled = true - } + dispatcher := newDispatcherForTest(testSink.Sink(), tableSpan) nodeID := node.NewID() - - // Create dispatcher events for all three DML events + var callbackCalled atomic.Bool dispatcherEvents := []DispatcherEvent{ NewDispatcherEvent(&nodeID, dmlEvent1), NewDispatcherEvent(&nodeID, dmlEvent2), NewDispatcherEvent(&nodeID, dmlEvent3), } - failpoint.Enable("github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockAddDMLEvents", `pause`) + require.NoError(t, failpoint.Enable("github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockAddDMLEvents", `pause`)) + failpointEnabled := true + defer func() { + if failpointEnabled { + require.NoError(t, failpoint.Disable("github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockAddDMLEvents")) + } + }() + resultCh := make(chan bool, 1) go func() { - block := dispatcher.HandleEvents(dispatcherEvents, wakeCallback) - require.Equal(t, true, block) + resultCh <- dispatcher.HandleEvents(dispatcherEvents, func() { + callbackCalled.Store(true) + }) }() - time.Sleep(1 * time.Second) - require.Equal(t, 1, len(mockSink.GetDMLs())) - mockSink.FlushDMLs() - require.False(t, callbackCalled) + require.Eventually(t, func() bool { + return len(testSink.GetDMLs()) == 1 + }, 5*time.Second, 10*time.Millisecond) - failpoint.Disable("github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockAddDMLEvents") + testSink.FlushDMLs() + require.False(t, callbackCalled.Load()) - time.Sleep(1 * time.Second) - require.Equal(t, 2, len(mockSink.GetDMLs())) - mockSink.FlushDMLs() - // Now the callback should be called after all events are flushed - require.True(t, callbackCalled) + require.NoError(t, failpoint.Disable("github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockAddDMLEvents")) + failpointEnabled = false + + require.Eventually(t, func() bool { + return len(testSink.GetDMLs()) == 2 + }, 5*time.Second, 10*time.Millisecond) + + testSink.FlushDMLs() + require.Eventually(t, func() bool { + return callbackCalled.Load() + }, 5*time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { + return len(testSink.GetDMLs()) == 0 + }, 5*time.Second, 10*time.Millisecond) + require.True(t, <-resultCh) + require.True(t, dispatcher.tableProgress.Empty()) +} - // Verify that all events were actually flushed - require.Equal(t, 0, len(mockSink.GetDMLs())) +func TestDMLWakeCallbackStorageAfterBatchEnqueue(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + ddlJob := helper.DDL2Job("create table t(id int primary key, v int)") + require.NotNil(t, ddlJob) + + buildDMLEvent := func(commitTs uint64) *commonEvent.DMLEvent { + event := helper.DML2Event( + "test", + "t", + fmt.Sprintf("insert into t values(%d, %d)", commitTs, commitTs), + ) + require.NotNil(t, event) + event.CommitTs = commitTs + return event + } + + testSink := newDispatcherTestSink(t, common.CloudStorageSinkType) + tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) + require.NoError(t, err) + dispatcher := newDispatcherForTest(testSink.Sink(), tableSpan) + + dmlEvent1 := buildDMLEvent(110) + dmlEvent2 := buildDMLEvent(111) + dmlEvent3 := buildDMLEvent(112) + nodeID := node.NewID() + dispatcherEvents := []DispatcherEvent{ + NewDispatcherEvent(&nodeID, dmlEvent1), + NewDispatcherEvent(&nodeID, dmlEvent2), + NewDispatcherEvent(&nodeID, dmlEvent3), + } + + var callbackCalled atomic.Bool + block := dispatcher.HandleEvents(dispatcherEvents, func() { + callbackCalled.Store(true) + }) + require.True(t, block) + require.False(t, callbackCalled.Load()) + + capturedDMLs := testSink.GetDMLs() + require.Len(t, capturedDMLs, 3) + + capturedDMLs[0].PostEnqueue() + require.False(t, callbackCalled.Load()) + + capturedDMLs[1].PostEnqueue() + require.False(t, callbackCalled.Load()) + + capturedDMLs[2].PostEnqueue() + require.True(t, callbackCalled.Load()) } // TestDispatcherSplittableCheck tests that a split table dispatcher with enableSplittableCheck=true @@ -800,7 +867,7 @@ func TestDispatcherSplittableCheck(t *testing.T) { require.False(t, commonEvent.IsSplitable(commonTableInfo)) // Create a mock sink - sink := sink.NewMockSink(common.MysqlSinkType) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) // Create an incomplete table span (split table) tableSpan := getUncompleteTableSpan() @@ -837,7 +904,7 @@ func TestDispatcherSplittableCheck(t *testing.T) { false, // skipSyncpointAtStartTs false, // skipDMLAsStartTs common.Ts(0), // pdTs - sink, + testSink.Sink(), sharedInfo, false, &redoTs, @@ -905,7 +972,7 @@ func TestDispatcher_SkipDMLAsStartTs_FilterCorrectly(t *testing.T) { dmlEvent101.CommitTs = 101 dmlEvent101.Length = 1 - mockSink := sink.NewMockSink(common.MysqlSinkType) + mockSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) require.NoError(t, err) @@ -944,7 +1011,7 @@ func TestDispatcher_SkipDMLAsStartTs_FilterCorrectly(t *testing.T) { false, // skipSyncpointAtStartTs true, // skipDMLAsStartTs = true (KEY: enable DML filtering) common.Ts(99), - mockSink, + mockSink.Sink(), sharedInfo, false, &redoTs, @@ -989,7 +1056,7 @@ func TestDispatcher_SkipDMLAsStartTs_Disabled(t *testing.T) { dmlEvent100.CommitTs = 100 dmlEvent100.Length = 1 - mockSink := sink.NewMockSink(common.MysqlSinkType) + mockSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) require.NoError(t, err) @@ -1024,7 +1091,7 @@ func TestDispatcher_SkipDMLAsStartTs_Disabled(t *testing.T) { false, // skipSyncpointAtStartTs false, // skipDMLAsStartTs = false (KEY: DML filtering disabled) common.Ts(99), - mockSink, + mockSink.Sink(), sharedInfo, false, &redoTs, @@ -1041,8 +1108,8 @@ func TestDispatcher_SkipDMLAsStartTs_Disabled(t *testing.T) { func TestHoldBlockEventUntilNoResendTasks(t *testing.T) { keyspaceID := getTestingKeyspaceID() ddlTableSpan := common.KeyspaceDDLSpan(keyspaceID) - mockSink := sink.NewMockSink(common.MysqlSinkType) - dispatcher := newDispatcherForTest(mockSink, ddlTableSpan) + mockSink := newDispatcherTestSink(t, common.MysqlSinkType) + dispatcher := newDispatcherForTest(mockSink.Sink(), ddlTableSpan) nodeID := node.NewID() diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index 29373edc14..177eab4ffa 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -346,9 +346,8 @@ func (d *DispatcherStatusWithID) GetDispatcherID() common.DispatcherID { // If so, we can cancel the resend task. // If we get a dispatcher action, we need to check whether the action is for the current pending ddl event. // If so, we can deal the ddl event based on the action. -// 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream(async). -// 2. If the action is a pass, we just need to pass the event in tableProgress(for correct calculation) and -// wake the dispatcherEventsHandler to handle the event. +// 1. If the action is a write, flush prior DML and write the block event to sink(async). +// 2. If the action is a pass, flush prior DML and pass the event in tableProgress(async). type DispatcherStatusHandler struct{} func (h *DispatcherStatusHandler) Path(event DispatcherStatusWithID) common.DispatcherID { @@ -382,7 +381,8 @@ func (h *DispatcherStatusHandler) GetTimestamp(event DispatcherStatusWithID) dyn } func (h *DispatcherStatusHandler) GetType(event DispatcherStatusWithID) dynstream.EventType { - // DispatcherStatus may trigger downstream IO (e.g. executing DDL) when handling Action_Write. + // DispatcherStatus may trigger downstream IO when handling action write/pass + // because we flush prior DML before completing the block action. // Make it non-batchable to ensure we can safely return await=true for a single event. return dynstream.EventType{DataGroup: 0, Property: dynstream.NonBatchable} } diff --git a/downstreamadapter/dispatcher/mock_sink_helper_test.go b/downstreamadapter/dispatcher/mock_sink_helper_test.go new file mode 100644 index 0000000000..6ed6d2117e --- /dev/null +++ b/downstreamadapter/dispatcher/mock_sink_helper_test.go @@ -0,0 +1,92 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/golang/mock/gomock" + "github.com/pingcap/ticdc/downstreamadapter/sink" + "github.com/pingcap/ticdc/downstreamadapter/sink/mock" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" +) + +// dispatcherTestSink wraps gomock sink and keeps the few stateful helpers that +// old tests used (captured DML events and normal/abnormal switch). +type dispatcherTestSink struct { + sink *mock.MockSink + + mu sync.Mutex + dmls []*commonEvent.DMLEvent + isNormal atomic.Bool +} + +func newDispatcherTestSink(t *testing.T, sinkType common.SinkType) *dispatcherTestSink { + t.Helper() + + ctrl := gomock.NewController(t) + testSink := &dispatcherTestSink{ + sink: mock.NewMockSink(ctrl), + dmls: make([]*commonEvent.DMLEvent, 0), + } + testSink.isNormal.Store(true) + + testSink.sink.EXPECT().SinkType().Return(sinkType).AnyTimes() + testSink.sink.EXPECT().IsNormal().DoAndReturn(func() bool { + return testSink.isNormal.Load() + }).AnyTimes() + testSink.sink.EXPECT().AddDMLEvent(gomock.Any()).Do(func(event *commonEvent.DMLEvent) { + testSink.mu.Lock() + defer testSink.mu.Unlock() + testSink.dmls = append(testSink.dmls, event) + }).AnyTimes() + testSink.sink.EXPECT().WriteBlockEvent(gomock.Any()).DoAndReturn(func(event commonEvent.BlockEvent) error { + event.PostFlush() + return nil + }).AnyTimes() + testSink.sink.EXPECT().FlushDMLBeforeBlock(gomock.Any()).Return(nil).AnyTimes() + testSink.sink.EXPECT().AddCheckpointTs(gomock.Any()).AnyTimes() + testSink.sink.EXPECT().SetTableSchemaStore(gomock.Any()).AnyTimes() + testSink.sink.EXPECT().Close(gomock.Any()).AnyTimes() + testSink.sink.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes() + return testSink +} + +func (s *dispatcherTestSink) Sink() sink.Sink { + return s.sink +} + +func (s *dispatcherTestSink) SetIsNormal(isNormal bool) { + s.isNormal.Store(isNormal) +} + +func (s *dispatcherTestSink) GetDMLs() []*commonEvent.DMLEvent { + s.mu.Lock() + defer s.mu.Unlock() + dmls := make([]*commonEvent.DMLEvent, len(s.dmls)) + copy(dmls, s.dmls) + return dmls +} + +func (s *dispatcherTestSink) FlushDMLs() { + s.mu.Lock() + defer s.mu.Unlock() + for _, dml := range s.dmls { + dml.PostFlush() + } + s.dmls = s.dmls[:0] +} diff --git a/downstreamadapter/dispatcher/redo_dispatcher_test.go b/downstreamadapter/dispatcher/redo_dispatcher_test.go index ba008d3a0e..68f97016b7 100644 --- a/downstreamadapter/dispatcher/redo_dispatcher_test.go +++ b/downstreamadapter/dispatcher/redo_dispatcher_test.go @@ -80,10 +80,10 @@ func TestRedoDispatcherHandleEvents(t *testing.T) { tableInfo := dmlEvent.TableInfo - sink := sink.NewMockSink(common.MysqlSinkType) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) require.NoError(t, err) - dispatcher := newRedoDispatcherForTest(sink, tableSpan) + dispatcher := newRedoDispatcherForTest(testSink.Sink(), tableSpan) require.Equal(t, uint64(0), dispatcher.GetCheckpointTs()) require.Equal(t, uint64(0), dispatcher.GetResolvedTs()) tableProgress := dispatcher.tableProgress @@ -96,7 +96,7 @@ func TestRedoDispatcherHandleEvents(t *testing.T) { nodeID := node.NewID() block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, dmlEvent)}, redoCallback) require.Equal(t, true, block) - require.Equal(t, 1, len(sink.GetDMLs())) + require.Equal(t, 1, len(testSink.GetDMLs())) checkpointTs, isEmpty = tableProgress.GetCheckpointTs() require.Equal(t, false, isEmpty) @@ -104,8 +104,8 @@ func TestRedoDispatcherHandleEvents(t *testing.T) { require.Equal(t, int32(0), redoCount.Load()) // flush - sink.FlushDMLs() - require.Equal(t, 0, len(sink.GetDMLs())) + testSink.FlushDMLs() + require.Equal(t, 0, len(testSink.GetDMLs())) checkpointTs, isEmpty = tableProgress.GetCheckpointTs() require.Equal(t, true, isEmpty) require.Equal(t, uint64(1), checkpointTs) @@ -124,7 +124,7 @@ func TestRedoDispatcherHandleEvents(t *testing.T) { block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent)}, redoCallback) require.Equal(t, true, block) - require.Equal(t, 0, len(sink.GetDMLs())) + require.Equal(t, 0, len(testSink.GetDMLs())) time.Sleep(5 * time.Second) // no pending event blockPendingEvent, blockStage := dispatcher.blockEventStatus.getEventAndStage() @@ -151,7 +151,7 @@ func TestRedoDispatcherHandleEvents(t *testing.T) { } block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent21)}, redoCallback) require.Equal(t, true, block) - require.Equal(t, 0, len(sink.GetDMLs())) + require.Equal(t, 0, len(testSink.GetDMLs())) time.Sleep(5 * time.Second) // no pending event blockPendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() @@ -193,7 +193,7 @@ func TestRedoDispatcherHandleEvents(t *testing.T) { } block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent2)}, redoCallback) require.Equal(t, true, block) - require.Equal(t, 0, len(sink.GetDMLs())) + require.Equal(t, 0, len(testSink.GetDMLs())) time.Sleep(5 * time.Second) // no pending event blockPendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() @@ -243,7 +243,7 @@ func TestRedoDispatcherHandleEvents(t *testing.T) { } block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent3)}, redoCallback) require.Equal(t, true, block) - require.Equal(t, 0, len(sink.GetDMLs())) + require.Equal(t, 0, len(testSink.GetDMLs())) time.Sleep(5 * time.Second) // pending event blockPendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() @@ -304,7 +304,7 @@ func TestRedoDispatcherHandleEvents(t *testing.T) { } block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, resolvedEvent)}, redoCallback) require.Equal(t, false, block) - require.Equal(t, 0, len(sink.GetDMLs())) + require.Equal(t, 0, len(testSink.GetDMLs())) require.Equal(t, uint64(7), dispatcher.GetResolvedTs()) checkpointTs = dispatcher.GetCheckpointTs() require.Equal(t, uint64(7), checkpointTs) @@ -319,9 +319,9 @@ func TestRedoUncompeleteTableSpanDispatcherHandleEvents(t *testing.T) { ddlJob := helper.DDL2Job("create table t(id int primary key, v int)") require.NotNil(t, ddlJob) - sink := sink.NewMockSink(common.MysqlSinkType) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan := getUncompleteTableSpan() - dispatcher := newRedoDispatcherForTest(sink, tableSpan) + dispatcher := newRedoDispatcherForTest(testSink.Sink(), tableSpan) dmlEvent := helper.DML2Event("test", "t", "insert into t values(1, 1)") require.NotNil(t, dmlEvent) @@ -389,8 +389,8 @@ func TestTableTriggerRedoDispatcherInMysql(t *testing.T) { redoCount.Store(0) ddlTableSpan := common.KeyspaceDDLSpan(common.DefaultKeyspaceID) - sink := sink.NewMockSink(common.MysqlSinkType) - tableTriggerEventDispatcher := newRedoDispatcherForTest(sink, ddlTableSpan) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) + tableTriggerEventDispatcher := newRedoDispatcherForTest(testSink.Sink(), ddlTableSpan) helper := commonEvent.NewEventTestHelper(t) defer helper.Close() @@ -459,8 +459,8 @@ func TestTableTriggerRedoDispatcherInKafka(t *testing.T) { redoCount.Store(0) ddlTableSpan := common.KeyspaceDDLSpan(common.DefaultKeyspaceID) - sink := sink.NewMockSink(common.KafkaSinkType) - tableTriggerEventDispatcher := newRedoDispatcherForTest(sink, ddlTableSpan) + testSink := newDispatcherTestSink(t, common.KafkaSinkType) + tableTriggerEventDispatcher := newRedoDispatcherForTest(testSink.Sink(), ddlTableSpan) helper := commonEvent.NewEventTestHelper(t) defer helper.Close() @@ -539,10 +539,10 @@ func TestRedoDispatcherClose(t *testing.T) { dmlEvent.Length = 1 { - sink := sink.NewMockSink(common.MysqlSinkType) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) require.NoError(t, err) - dispatcher := newRedoDispatcherForTest(sink, tableSpan) + dispatcher := newRedoDispatcherForTest(testSink.Sink(), tableSpan) // ===== dml event ===== nodeID := node.NewID() @@ -552,7 +552,7 @@ func TestRedoDispatcherClose(t *testing.T) { require.Equal(t, false, ok) // flush - sink.FlushDMLs() + testSink.FlushDMLs() watermark, ok := dispatcher.TryClose() require.Equal(t, true, ok) @@ -562,10 +562,10 @@ func TestRedoDispatcherClose(t *testing.T) { // test sink is not normal { - sink := sink.NewMockSink(common.MysqlSinkType) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) require.NoError(t, err) - dispatcher := newRedoDispatcherForTest(sink, tableSpan) + dispatcher := newRedoDispatcherForTest(testSink.Sink(), tableSpan) // ===== dml event ===== nodeID := node.NewID() @@ -574,7 +574,7 @@ func TestRedoDispatcherClose(t *testing.T) { _, ok := dispatcher.TryClose() require.Equal(t, false, ok) - sink.SetIsNormal(false) + testSink.SetIsNormal(false) watermark, ok := dispatcher.TryClose() require.Equal(t, true, ok) @@ -607,15 +607,15 @@ func TestRedoBatchDMLEventsPartialFlush(t *testing.T) { dmlEvent3.CommitTs = 12 dmlEvent3.Length = 1 - mockSink := sink.NewMockSink(common.MysqlSinkType) + mockSink := newDispatcherTestSink(t, common.MysqlSinkType) tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) require.NoError(t, err) - dispatcher := newRedoDispatcherForTest(mockSink, tableSpan) + dispatcher := newRedoDispatcherForTest(mockSink.Sink(), tableSpan) // Create a redoCallback that records when it's called - var redoCallbackCalled bool + var redoCallbackCalled atomic.Bool wakeredoCallback := func() { - redoCallbackCalled = true + redoCallbackCalled.Store(true) } nodeID := node.NewID() @@ -627,26 +627,40 @@ func TestRedoBatchDMLEventsPartialFlush(t *testing.T) { NewDispatcherEvent(&nodeID, dmlEvent3), } - failpoint.Enable("github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockAddDMLEvents", `pause`) + require.NoError(t, failpoint.Enable("github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockAddDMLEvents", `pause`)) + failpointEnabled := true + defer func() { + if failpointEnabled { + require.NoError(t, failpoint.Disable("github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockAddDMLEvents")) + } + }() + resultCh := make(chan bool, 1) go func() { - block := dispatcher.HandleEvents(dispatcherEvents, wakeredoCallback) - require.Equal(t, true, block) + resultCh <- dispatcher.HandleEvents(dispatcherEvents, wakeredoCallback) }() - time.Sleep(1 * time.Second) - require.Equal(t, 1, len(mockSink.GetDMLs())) + require.Eventually(t, func() bool { + return len(mockSink.GetDMLs()) == 1 + }, 5*time.Second, 10*time.Millisecond) mockSink.FlushDMLs() - require.False(t, redoCallbackCalled) + require.False(t, redoCallbackCalled.Load()) - failpoint.Disable("github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockAddDMLEvents") + require.NoError(t, failpoint.Disable("github.com/pingcap/ticdc/downstreamadapter/dispatcher/BlockAddDMLEvents")) + failpointEnabled = false - time.Sleep(1 * time.Second) - require.Equal(t, 2, len(mockSink.GetDMLs())) + require.Eventually(t, func() bool { + return len(mockSink.GetDMLs()) == 2 + }, 5*time.Second, 10*time.Millisecond) mockSink.FlushDMLs() // Now the redoCallback should be called after all events are flushed - require.True(t, redoCallbackCalled) + require.Eventually(t, func() bool { + return redoCallbackCalled.Load() + }, 5*time.Second, 10*time.Millisecond) // Verify that all events were actually flushed - require.Equal(t, 0, len(mockSink.GetDMLs())) + require.Eventually(t, func() bool { + return len(mockSink.GetDMLs()) == 0 + }, 5*time.Second, 10*time.Millisecond) + require.True(t, <-resultCh) } diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go index 4f54b86da7..d26a6193d8 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go @@ -19,9 +19,11 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/pingcap/ticdc/downstreamadapter/dispatcher" "github.com/pingcap/ticdc/downstreamadapter/eventcollector" "github.com/pingcap/ticdc/downstreamadapter/sink" + "github.com/pingcap/ticdc/downstreamadapter/sink/mock" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" appcontext "github.com/pingcap/ticdc/pkg/common/context" @@ -36,7 +38,25 @@ import ( "github.com/stretchr/testify/require" ) -var mockSink = sink.NewMockSink(common.BlackHoleSinkType) +func newDispatcherManagerTestSink(t *testing.T, sinkType common.SinkType) sink.Sink { + t.Helper() + + ctrl := gomock.NewController(t) + mockSink := mock.NewMockSink(ctrl) + mockSink.EXPECT().SinkType().Return(sinkType).AnyTimes() + mockSink.EXPECT().IsNormal().Return(true).AnyTimes() + mockSink.EXPECT().AddDMLEvent(gomock.Any()).AnyTimes() + mockSink.EXPECT().FlushDMLBeforeBlock(gomock.Any()).Return(nil).AnyTimes() + mockSink.EXPECT().WriteBlockEvent(gomock.Any()).DoAndReturn(func(blockEvent event.BlockEvent) error { + blockEvent.PostFlush() + return nil + }).AnyTimes() + mockSink.EXPECT().AddCheckpointTs(gomock.Any()).AnyTimes() + mockSink.EXPECT().SetTableSchemaStore(gomock.Any()).AnyTimes() + mockSink.EXPECT().Close(gomock.Any()).AnyTimes() + mockSink.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes() + return mockSink +} // createTestDispatcher creates a test dispatcher with given parameters func createTestDispatcher(t *testing.T, manager *DispatcherManager, id common.DispatcherID, tableID int64, startKey, endKey []byte) *dispatcher.EventDispatcher { @@ -72,7 +92,7 @@ func createTestDispatcher(t *testing.T, manager *DispatcherManager, id common.Di false, // skipSyncpointAtStartTs false, // skipDMLAsStartTs 0, // currentPDTs - mockSink, + manager.sink, sharedInfo, false, &redoTs, @@ -84,12 +104,13 @@ func createTestDispatcher(t *testing.T, manager *DispatcherManager, id common.Di // createTestManager creates a test DispatcherManager func createTestManager(t *testing.T) *DispatcherManager { changefeedID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + testSink := newDispatcherManagerTestSink(t, common.BlackHoleSinkType) manager := &DispatcherManager{ changefeedID: changefeedID, dispatcherMap: newDispatcherMap[*dispatcher.EventDispatcher](), heartbeatRequestQueue: NewHeartbeatRequestQueue(), blockStatusRequestQueue: NewBlockStatusRequestQueue(), - sink: mockSink, + sink: testSink, schemaIDToDispatchers: dispatcher.NewSchemaIDToDispatchers(), sinkQuota: util.GetOrZero(config.GetDefaultReplicaConfig().MemoryQuota), latestWatermark: NewWatermark(0), diff --git a/downstreamadapter/dispatchermanager/helper_test.go b/downstreamadapter/dispatchermanager/helper_test.go index bd3fcc1981..35bd9eddaf 100644 --- a/downstreamadapter/dispatchermanager/helper_test.go +++ b/downstreamadapter/dispatchermanager/helper_test.go @@ -15,117 +15,20 @@ package dispatchermanager import ( "context" - "sync" "testing" "time" - "github.com/pingcap/log" + "github.com/golang/mock/gomock" "github.com/pingcap/ticdc/downstreamadapter/dispatcher" + "github.com/pingcap/ticdc/downstreamadapter/sink/mock" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" - commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/stretchr/testify/require" - "go.uber.org/atomic" ) -// mockKafkaSink implements the sink.Sink interface with a blocking AddCheckpointTs method -type mockKafkaSink struct { - cancel context.CancelFunc - ctx context.Context - checkpointChan chan uint64 - isNormal *atomic.Bool - mu sync.Mutex -} - -func newMockKafkaSink(ctx context.Context, cancel context.CancelFunc) *mockKafkaSink { - return &mockKafkaSink{ - cancel: cancel, - ctx: ctx, - checkpointChan: make(chan uint64), - isNormal: atomic.NewBool(true), - } -} - -func (m *mockKafkaSink) SinkType() common.SinkType { - return common.KafkaSinkType -} - -func (m *mockKafkaSink) IsNormal() bool { - return true -} - -func (m *mockKafkaSink) AddDMLEvent(event *commonEvent.DMLEvent) { - // No-op for this test -} - -func (m *mockKafkaSink) WriteBlockEvent(event commonEvent.BlockEvent) error { - // No-op for this test - return nil -} - -// AddCheckpointTs simulates the blocking behavior when sink is closed -func (m *mockKafkaSink) AddCheckpointTs(ts uint64) { - select { - case m.checkpointChan <- ts: - case <-m.ctx.Done(): - return - } -} - -func (m *mockKafkaSink) SetTableSchemaStore(tableSchemaStore *commonEvent.TableSchemaStore) { - // No-op for this test -} - -func (m *mockKafkaSink) Close(removeChangefeed bool) { - m.mu.Lock() - defer m.mu.Unlock() - m.isNormal.Store(false) - // Don't close the channel to simulate the real bug where channel becomes orphaned -} - -func (m *mockKafkaSink) Run(ctx context.Context) error { - // Simulate consuming from checkpoint channel - for { - select { - case <-ctx.Done(): - log.Info("mockKafkaSink context done") - return ctx.Err() - case ts := <-m.checkpointChan: - // Simulate processing checkpoint - _ = ts - } - } -} - -// simulateCloseSink simulates closing the sink (e.g., when path is removed) -func (m *mockKafkaSink) CloseSinkAndCancelContext() { - m.Close(false) - m.cancel() -} - func TestCheckpointTsMessageHandlerDeadlock(t *testing.T) { t.Parallel() - // Create context for the test - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Create mock sink - mockSink := newMockKafkaSink(ctx, cancel) - - // Start the sink's Run method in background to consume checkpoint messages - sinkCtx, _ := context.WithCancel(ctx) - go func() { - _ = mockSink.Run(sinkCtx) - }() - - // Create a mock DispatcherManager - dispatcherManager := &DispatcherManager{ - sink: mockSink, - tableTriggerEventDispatcher: &dispatcher.EventDispatcher{}, // Non-nil to pass the check - } - - // Create CheckpointTsMessage changefeedID := &heartbeatpb.ChangefeedID{ Keyspace: "test-namespace", Name: "test-changefeed", @@ -136,12 +39,20 @@ func TestCheckpointTsMessageHandlerDeadlock(t *testing.T) { CheckpointTs: 12345, }) - // Create handler handler := &CheckpointTsMessageHandler{} - // Step 1: Normal operation should work t.Run("normal_operation", func(t *testing.T) { - // This should complete quickly + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockSink := mock.NewMockSink(ctrl) + mockSink.EXPECT().AddCheckpointTs(checkpointTsMessage.CheckpointTs).Times(1) + + dispatcherManager := &DispatcherManager{ + sink: mockSink, + tableTriggerEventDispatcher: &dispatcher.EventDispatcher{}, + } + done := make(chan bool, 1) go func() { blocking := handler.Handle(dispatcherManager, checkpointTsMessage) @@ -151,31 +62,30 @@ func TestCheckpointTsMessageHandlerDeadlock(t *testing.T) { select { case <-done: - // Success case <-time.After(1 * time.Second): t.Fatal("Normal operation took too long, unexpected") } }) - // Step 2: Simulate the deadlock scenario t.Run("deadlock_scenario", func(t *testing.T) { - // Create a new mock sink for this test to avoid interference - deadlockMockSink := newMockKafkaSink(ctx, cancel) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + blockCh := make(chan struct{}) + deadlockMockSink := mock.NewMockSink(ctrl) + deadlockMockSink.EXPECT().AddCheckpointTs(checkpointTsMessage.CheckpointTs).Do( + func(uint64) { + <-blockCh + }, + ).Times(1) - // Create a new dispatcher manager with the deadlock sink deadlockDispatcherManager := &DispatcherManager{ sink: deadlockMockSink, tableTriggerEventDispatcher: &dispatcher.EventDispatcher{}, } - // Close the sink but does not cancel the context, so the AddCheckpointTs will block forever - deadlockMockSink.Close(false) - - // Now try to send a checkpoint message - // This should block because there's no consumer for the channel done := make(chan bool, 1) go func() { - // This should block indefinitely handler.Handle(deadlockDispatcherManager, checkpointTsMessage) done <- true }() @@ -184,37 +94,49 @@ func TestCheckpointTsMessageHandlerDeadlock(t *testing.T) { case <-done: t.Fatal("Handler completed unexpectedly - deadlock was not reproduced") case <-time.After(1 * time.Second): - // Expected: the handler should be blocked t.Log("Successfully reproduced the deadlock: handler is blocked in AddCheckpointTs") } + + close(blockCh) + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("handler should resume once AddCheckpointTs unblocks") + } }) - // Step 3: close the sink and cancel the context, so the AddCheckpointTs will return t.Run("deadlock_resolve_scenario", func(t *testing.T) { - // Create a new mock sink for this test to avoid interference - deadlockMockSink := newMockKafkaSink(ctx, cancel) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + deadlockMockSink := mock.NewMockSink(ctrl) + deadlockMockSink.EXPECT().AddCheckpointTs(checkpointTsMessage.CheckpointTs).Do( + func(uint64) { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + t.Fatal("context cancellation should unblock AddCheckpointTs path") + } + }, + ).Times(1) - // Create a new dispatcher manager with the deadlock sink deadlockDispatcherManager := &DispatcherManager{ sink: deadlockMockSink, tableTriggerEventDispatcher: &dispatcher.EventDispatcher{}, } - // Close the sink but does not cancel the context, so the AddCheckpointTs will block forever - deadlockMockSink.CloseSinkAndCancelContext() - - // Now try to send a checkpoint message - // This should not block because the context is canceled done := make(chan bool, 1) go func() { - // This should not block handler.Handle(deadlockDispatcherManager, checkpointTsMessage) done <- true }() select { case <-done: - // Expected: the handler should complete without blocking. t.Log("Handler completed normally") case <-time.After(1 * time.Second): t.Fatal("deadlock: handler is blocked in AddCheckpointTs") diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index c07e886419..6ce095b46b 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -53,6 +53,10 @@ func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) { s.eventCh <- event } +func (s *sink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error { + return nil +} + func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { switch event.GetType() { case commonEvent.TypeDDLEvent: diff --git a/downstreamadapter/sink/cloudstorage/defragmenter.go b/downstreamadapter/sink/cloudstorage/defragmenter.go index 6f14990e71..206cc6a7f6 100644 --- a/downstreamadapter/sink/cloudstorage/defragmenter.go +++ b/downstreamadapter/sink/cloudstorage/defragmenter.go @@ -16,18 +16,42 @@ package cloudstorage import ( "context" + commonType "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/hash" "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" "github.com/pingcap/ticdc/utils/chann" ) +type eventFragmentKind uint8 + +const ( + eventFragmentKindDML eventFragmentKind = iota + eventFragmentKindDrain +) + +type drainMarker struct { + dispatcherID commonType.DispatcherID + commitTs uint64 + doneCh chan error +} + +func (m *drainMarker) done(err error) { + select { + case m.doneCh <- err: + default: + } +} + // eventFragment is used to attach a sequence number to TxnCallbackableEvent. type eventFragment struct { + kind eventFragmentKind + event *commonEvent.DMLEvent versionedTable cloudstorage.VersionedTableName + dispatcherID commonType.DispatcherID + marker *drainMarker // The sequence number is mainly useful for TxnCallbackableEvent defragmentation. // e.g. TxnCallbackableEvent 1~5 are dispatched to a group of encoding workers, but the @@ -39,14 +63,43 @@ type eventFragment struct { encodedMsgs []*common.Message } -func newEventFragment(seq uint64, version cloudstorage.VersionedTableName, event *commonEvent.DMLEvent) eventFragment { +func newEventFragment( + seq uint64, + version cloudstorage.VersionedTableName, + dispatcherID commonType.DispatcherID, + event *commonEvent.DMLEvent, +) eventFragment { return eventFragment{ + kind: eventFragmentKindDML, seqNumber: seq, versionedTable: version, + dispatcherID: dispatcherID, event: event, } } +func newDrainEventFragment( + seq uint64, + dispatcherID commonType.DispatcherID, + commitTs uint64, + doneCh chan error, +) eventFragment { + return eventFragment{ + kind: eventFragmentKindDrain, + seqNumber: seq, + dispatcherID: dispatcherID, + marker: &drainMarker{ + dispatcherID: dispatcherID, + commitTs: commitTs, + doneCh: doneCh, + }, + } +} + +func (e eventFragment) isDrain() bool { + return e.kind == eventFragmentKindDrain +} + // defragmenter is used to handle event fragments which can be registered // out of order. type defragmenter struct { @@ -54,7 +107,6 @@ type defragmenter struct { future map[uint64]eventFragment inputCh <-chan eventFragment outputChs []*chann.DrainableChann[eventFragment] - hasher *hash.PositionInertia } func newDefragmenter( @@ -65,7 +117,6 @@ func newDefragmenter( future: make(map[uint64]eventFragment), inputCh: inputCh, outputChs: outputChs, - hasher: hash.NewPositionInertia(), } } @@ -117,11 +168,11 @@ func (d *defragmenter) writeMsgsConsecutive( } func (d *defragmenter) dispatchFragToDMLWorker(frag eventFragment) { - tableName := frag.versionedTable.TableNameWithPhysicTableID - d.hasher.Reset() - d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table)) - workerID := d.hasher.Sum32() % uint32(len(d.outputChs)) + workerID := commonType.GID(frag.dispatcherID).Hash(uint64(len(d.outputChs))) d.outputChs[workerID].In() <- frag + if !frag.isDrain() { + frag.event.PostEnqueue() + } d.lastDispatchedSeq = frag.seqNumber } diff --git a/downstreamadapter/sink/cloudstorage/dml_writers.go b/downstreamadapter/sink/cloudstorage/dml_writers.go index bdc442bda2..b899b8984c 100644 --- a/downstreamadapter/sink/cloudstorage/dml_writers.go +++ b/downstreamadapter/sink/cloudstorage/dml_writers.go @@ -15,20 +15,24 @@ package cloudstorage import ( "context" - "sync/atomic" + "time" + sinkmetrics "github.com/pingcap/ticdc/downstreamadapter/sink/metrics" commonType "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/codec/common" "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/tidb/br/pkg/storage" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" ) // dmlWriters denotes a worker responsible for writing messages to cloud storage. type dmlWriters struct { + ctx context.Context changefeedID commonType.ChangeFeedID statistics *metrics.Statistics @@ -45,10 +49,11 @@ type dmlWriters struct { writers []*writer // last sequence number - lastSeqNum uint64 + lastSeqNum atomic.Uint64 } func newDMLWriters( + ctx context.Context, changefeedID commonType.ChangeFeedID, storage storage.ExternalStorage, config *cloudstorage.Config, @@ -69,6 +74,7 @@ func newDMLWriters( } return &dmlWriters{ + ctx: ctx, changefeedID: changefeedID, statistics: statistics, msgCh: messageCh, @@ -82,6 +88,12 @@ func newDMLWriters( func (d *dmlWriters) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + <-ctx.Done() + d.msgCh.Close() + return nil + }) + eg.Go(func() error { return d.encodeGroup.Run(ctx) }) @@ -90,12 +102,10 @@ func (d *dmlWriters) Run(ctx context.Context) error { return d.defragmenter.Run(ctx) }) - for i := 0; i < len(d.writers); i++ { + for i := range d.writers { + writer := d.writers[i] eg.Go(func() error { - // UnlimitedChannel will block when there is no event, they cannot dirrectly find ctx.Done() - // Thus, we need to close the channel when the context is done - defer d.encodeGroup.inputCh.Close() - return d.writers[i].Run(ctx) + return writer.Run(ctx) }) } return eg.Wait() @@ -112,12 +122,35 @@ func (d *dmlWriters) AddDMLEvent(event *commonEvent.DMLEvent) { TableInfoVersion: event.TableInfoVersion, DispatcherID: event.GetDispatcherID(), } - seq := atomic.AddUint64(&d.lastSeqNum, 1) - _ = d.statistics.RecordBatchExecution(func() (int, int64, error) { - // emit a TxnCallbackableEvent encoupled with a sequence number starting from one. - d.msgCh.Push(newEventFragment(seq, tbl, event)) - return int(event.Len()), event.GetSize(), nil - }) + seq := d.lastSeqNum.Inc() + // emit a TxnCallbackableEvent encoupled with a sequence number starting from one. + d.msgCh.Push(newEventFragment(seq, tbl, event.GetDispatcherID(), event)) +} + +func (d *dmlWriters) FlushDMLBeforeBlock(event commonEvent.BlockEvent) error { + if event == nil { + return nil + } + + start := time.Now() + defer sinkmetrics.CloudStorageDDLDrainDurationHistogram.WithLabelValues( + d.changefeedID.Keyspace(), + d.changefeedID.ID().String(), + ).Observe(time.Since(start).Seconds()) + + doneCh := make(chan error, 1) + seq := d.lastSeqNum.Inc() + // Drain marker shares the same global sequence as DML fragments. + // Defragmenter and writer will place it after all prior fragments, so once + // doneCh returns, previous DML of this dispatcher are already enqueued/drained. + d.msgCh.Push(newDrainEventFragment(seq, event.GetDispatcherID(), event.GetCommitTs(), doneCh)) + + select { + case err := <-doneCh: + return err + case <-d.ctx.Done(): + return errors.Trace(d.ctx.Err()) + } } func (d *dmlWriters) close() { diff --git a/downstreamadapter/sink/cloudstorage/dml_writers_test.go b/downstreamadapter/sink/cloudstorage/dml_writers_test.go index bd9a89a5ff..aafd385206 100644 --- a/downstreamadapter/sink/cloudstorage/dml_writers_test.go +++ b/downstreamadapter/sink/cloudstorage/dml_writers_test.go @@ -52,6 +52,47 @@ func getTableFiles(t *testing.T, tableDir string) []string { return fileNames } +func TestAddDMLEventDoesNotCallPostEnqueueBeforePipelineRun(t *testing.T) { + uri := fmt.Sprintf("file:///%s?protocol=csv", t.TempDir()) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + + cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + tableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "test", + Table: "t_enqueue", + TableID: 100, + }, + } + event := commonEvent.NewDMLEvent(common.NewDispatcherID(), tableInfo.TableName.TableID, 1, 1, tableInfo) + event.TableInfoVersion = 1 + event.Length = 1 + event.ApproximateSize = 1 + + var enqueueCalled int64 + event.AddPostEnqueueFunc(func() { + atomic.AddInt64(&enqueueCalled, 1) + }) + + // Without starting sink.Run, the event should only be accepted by AddDMLEvent + // and should not be considered enqueued into downstream write pipeline yet. + cloudStorageSink.AddDMLEvent(event) + require.Equal(t, int64(0), atomic.LoadInt64(&enqueueCalled)) +} + func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { parentDir := t.TempDir() diff --git a/downstreamadapter/sink/cloudstorage/encoding_group.go b/downstreamadapter/sink/cloudstorage/encoding_group.go index bcc37663e7..998f475786 100644 --- a/downstreamadapter/sink/cloudstorage/encoding_group.go +++ b/downstreamadapter/sink/cloudstorage/encoding_group.go @@ -85,11 +85,13 @@ func (eg *encodingGroup) runEncoder(ctx context.Context) error { if !ok || eg.closed.Load() { return nil } - err = encoder.AppendTxnEvent(frag.event) - if err != nil { - return err + if !frag.isDrain() { + err = encoder.AppendTxnEvent(frag.event) + if err != nil { + return err + } + frag.encodedMsgs = encoder.Build() } - frag.encodedMsgs = encoder.Build() select { case <-ctx.Done(): diff --git a/downstreamadapter/sink/cloudstorage/sink.go b/downstreamadapter/sink/cloudstorage/sink.go index cb59ce6953..ca15f38836 100644 --- a/downstreamadapter/sink/cloudstorage/sink.go +++ b/downstreamadapter/sink/cloudstorage/sink.go @@ -126,7 +126,7 @@ func New( cfg: cfg, cleanupJobs: cleanupJobs, storage: storage, - dmlWriters: newDMLWriters(changefeedID, storage, cfg, encoderConfig, ext, statistics), + dmlWriters: newDMLWriters(ctx, changefeedID, storage, cfg, encoderConfig, ext, statistics), checkpointChan: make(chan uint64, 16), lastSendCheckpointTsTime: time.Now(), outputRawChangeEvent: sinkConfig.CloudStorageConfig.GetOutputRawChangeEvent(), @@ -170,6 +170,13 @@ func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) { s.dmlWriters.AddDMLEvent(event) } +func (s *sink) FlushDMLBeforeBlock(event commonEvent.BlockEvent) error { + if event == nil { + return nil + } + return s.dmlWriters.FlushDMLBeforeBlock(event) +} + func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { var err error switch e := event.(type) { diff --git a/downstreamadapter/sink/cloudstorage/sink_test.go b/downstreamadapter/sink/cloudstorage/sink_test.go index ecede56719..0c9fae45c2 100644 --- a/downstreamadapter/sink/cloudstorage/sink_test.go +++ b/downstreamadapter/sink/cloudstorage/sink_test.go @@ -207,6 +207,69 @@ func TestWriteDDLEvent(t *testing.T) { }`, string(tableSchema)) } +func TestPassBlockEventDrainsBeforeWriteDDLEvent(t *testing.T) { + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv&flush-interval=3600s", parentDir) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + + cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + go cloudStorageSink.Run(ctx) + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + job := helper.DDL2Job("create table t_drain_before_ddl (id int primary key, v int)") + require.NotNil(t, job) + helper.ApplyJob(job) + + dispatcherID := common.NewDispatcherID() + dmlEvent := helper.DML2Event(job.SchemaName, job.TableName, "insert into t_drain_before_ddl values (1, 1)") + dmlEvent.TableInfoVersion = job.BinlogInfo.FinishedTS + dmlEvent.DispatcherID = dispatcherID + + var dmlFlushed atomic.Int64 + dmlEvent.AddPostFlushFunc(func() { + dmlFlushed.Add(1) + }) + + cloudStorageSink.AddDMLEvent(dmlEvent) + + ddlEvent := &commonEvent.DDLEvent{ + Query: "alter table t_drain_before_ddl add column c2 int", + Type: byte(timodel.ActionAddColumn), + SchemaName: job.SchemaName, + TableName: job.TableName, + FinishedTs: dmlEvent.CommitTs + 10, + TableInfo: helper.GetTableInfo(job), + DispatcherID: dispatcherID, + BlockedTables: &commonEvent.InfluencedTables{ + InfluenceType: commonEvent.InfluenceTypeNormal, + TableIDs: []int64{dmlEvent.PhysicalTableID}, + }, + } + + err = cloudStorageSink.FlushDMLBeforeBlock(ddlEvent) + require.NoError(t, err) + require.Equal(t, int64(1), dmlFlushed.Load()) + + err = cloudStorageSink.WriteBlockEvent(ddlEvent) + require.NoError(t, err) +} + func TestWriteCheckpointEvent(t *testing.T) { parentDir := t.TempDir() uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index 35eee16791..72bdb99324 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -45,7 +45,7 @@ type writer struct { storage storage.ExternalStorage config *cloudstorage.Config // toBeFlushedCh contains a set of batchedTask waiting to be flushed to cloud storage. - toBeFlushedCh chan batchedTask + toBeFlushedCh chan writerTask inputCh *chann.DrainableChann[eventFragment] isClosed uint64 statistics *pmetrics.Statistics @@ -57,6 +57,11 @@ type writer struct { metricsWorkerBusyRatio prometheus.Counter } +type writerTask struct { + batch batchedTask + marker *drainMarker +} + func newWriter( id int, changefeedID commonType.ChangeFeedID, @@ -72,7 +77,7 @@ func newWriter( storage: storage, config: config, inputCh: inputCh, - toBeFlushedCh: make(chan batchedTask, 64), + toBeFlushedCh: make(chan writerTask, 64), statistics: statistics, filePathGenerator: cloudstorage.NewFilePathGenerator(changefeedID, config, storage, extension), metricWriteBytes: metrics.CloudStorageWriteBytesGauge. @@ -123,10 +128,18 @@ func (d *writer) flushMessages(ctx context.Context) error { case <-overseerTicker.C: d.metricsWorkerBusyRatio.Add(flushTimeSlice.Seconds()) flushTimeSlice = 0 - case batchedTask := <-d.toBeFlushedCh: + case task := <-d.toBeFlushedCh: if atomic.LoadUint64(&d.isClosed) == 1 { return nil } + if task.marker != nil { + task.marker.done(nil) + continue + } + batchedTask := task.batch + if len(batchedTask.batch) == 0 { + continue + } start := time.Now() for table, task := range batchedTask.batch { if len(task.msgs) == 0 { @@ -314,7 +327,7 @@ func (d *writer) genAndDispatchTask(ctx context.Context, select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case d.toBeFlushedCh <- batchedTask: + case d.toBeFlushedCh <- writerTask{batch: batchedTask}: log.Debug("flush task is emitted successfully when flush interval exceeds", zap.Int("tablesLength", len(batchedTask.batch))) batchedTask = newBatchedTask() @@ -322,7 +335,33 @@ func (d *writer) genAndDispatchTask(ctx context.Context, } case frag, ok := <-ch.Out(): if !ok || atomic.LoadUint64(&d.isClosed) == 1 { - return nil + if len(batchedTask.batch) == 0 { + return nil + } + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case d.toBeFlushedCh <- writerTask{batch: batchedTask}: + return nil + } + } + if frag.isDrain() { + // Drain marker must be placed behind pending batchedTask so the caller + // observes completion only after all prior DML flush tasks are handled. + if len(batchedTask.batch) > 0 { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case d.toBeFlushedCh <- writerTask{batch: batchedTask}: + batchedTask = newBatchedTask() + } + } + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case d.toBeFlushedCh <- writerTask{marker: frag.marker}: + } + continue } batchedTask.handleSingleTableEvent(frag) // if the file size exceeds the upper limit, emit the flush task containing the table @@ -333,7 +372,7 @@ func (d *writer) genAndDispatchTask(ctx context.Context, select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case d.toBeFlushedCh <- task: + case d.toBeFlushedCh <- writerTask{batch: task}: log.Debug("flush task is emitted successfully when file size exceeds", zap.Any("table", table), zap.Int("eventsLenth", len(task.batch[table].msgs))) diff --git a/downstreamadapter/sink/cloudstorage/writer_test.go b/downstreamadapter/sink/cloudstorage/writer_test.go index 0be8a78640..f6ab81a578 100644 --- a/downstreamadapter/sink/cloudstorage/writer_test.go +++ b/downstreamadapter/sink/cloudstorage/writer_test.go @@ -19,6 +19,7 @@ import ( "net/url" "path" "sync" + "sync/atomic" "testing" "time" @@ -130,3 +131,75 @@ func TestWriterRun(t *testing.T) { d.close() wg.Wait() } + +func TestWriterDrainMarker(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + parentDir := t.TempDir() + d := testWriter(ctx, t, parentDir) + + tidbTableInfo := &timodel.TableInfo{ + ID: 100, + Name: ast.NewCIStr("table1"), + Columns: []*timodel.ColumnInfo{ + {ID: 1, Name: ast.NewCIStr("c1"), FieldType: *types.NewFieldType(mysql.TypeLong)}, + }, + } + tableInfo := commonType.WrapTableInfo("test", tidbTableInfo) + dispatcherID := commonType.NewDispatcherID() + + var callbackCnt int64 + msg := common.NewMsg(nil, []byte(`{"id":1}`)) + msg.SetRowsCount(1) + msg.Callback = func() { + atomic.AddInt64(&callbackCnt, 1) + } + + d.inputCh.In() <- eventFragment{ + kind: eventFragmentKindDML, + seqNumber: 1, + dispatcherID: dispatcherID, + versionedTable: cloudstorage.VersionedTableName{ + TableNameWithPhysicTableID: commonType.TableName{ + Schema: "test", + Table: "table1", + TableID: 100, + }, + TableInfoVersion: 99, + DispatcherID: dispatcherID, + }, + event: &commonEvent.DMLEvent{ + PhysicalTableID: 100, + TableInfo: tableInfo, + }, + encodedMsgs: []*common.Message{msg}, + } + + doneCh := make(chan error, 1) + d.inputCh.In() <- newDrainEventFragment(2, dispatcherID, 100, doneCh) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = d.Run(ctx) + }() + + select { + case err := <-doneCh: + require.NoError(t, err) + case <-time.After(10 * time.Second): + t.Fatal("wait drain marker timeout") + } + + require.Eventually(t, func() bool { + return atomic.LoadInt64(&callbackCnt) == 1 + }, 5*time.Second, 100*time.Millisecond) + + d.inputCh.CloseAndDrain() + d.close() + cancel() + wg.Wait() +} diff --git a/downstreamadapter/sink/helper/row_callback.go b/downstreamadapter/sink/helper/row_callback.go new file mode 100644 index 0000000000..e690ec16c0 --- /dev/null +++ b/downstreamadapter/sink/helper/row_callback.go @@ -0,0 +1,30 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "github.com/pingcap/ticdc/pkg/common/event" + "go.uber.org/atomic" +) + +// NewTxnPostFlushRowCallback returns a row-level callback that triggers txn-level +// PostFlush exactly once when the callback has been invoked totalCount times. +func NewTxnPostFlushRowCallback(event *event.DMLEvent, totalCount uint64) func() { + var calledCount atomic.Uint64 + return func() { + if calledCount.Inc() == totalCount { + event.PostFlush() + } + } +} diff --git a/downstreamadapter/sink/helper/row_callback_test.go b/downstreamadapter/sink/helper/row_callback_test.go new file mode 100644 index 0000000000..b4996ba4a4 --- /dev/null +++ b/downstreamadapter/sink/helper/row_callback_test.go @@ -0,0 +1,42 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "testing" + + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/stretchr/testify/require" +) + +func TestTxnPostFlushRowCallback(t *testing.T) { + event := commonEvent.NewDMLEvent(common.NewDispatcherID(), 1, 1, 1, nil) + + flushCount := 0 + event.AddPostFlushFunc(func() { + flushCount++ + }) + + rowCallback := NewTxnPostFlushRowCallback(event, 3) + rowCallback() + rowCallback() + require.Equal(t, 0, flushCount) + + rowCallback() + require.Equal(t, 1, flushCount) + + rowCallback() + require.Equal(t, 1, flushCount) +} diff --git a/downstreamadapter/sink/kafka/sink.go b/downstreamadapter/sink/kafka/sink.go index cc2690423f..c5df6737a5 100644 --- a/downstreamadapter/sink/kafka/sink.go +++ b/downstreamadapter/sink/kafka/sink.go @@ -144,6 +144,10 @@ func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) { s.eventChan.Push(event) } +func (s *sink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error { + return nil +} + func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { var err error switch v := event.(type) { @@ -219,20 +223,8 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error { partitionGenerator := s.comp.eventRouter.GetPartitionGenerator(schema, table) selector := s.comp.columnSelector.Get(schema, table) - toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() { - var calledCount atomic.Uint64 - // The callback of the last row will trigger the callback of the txn. - return func() { - if calledCount.Inc() == totalCount { - for _, callback := range postTxnFlushed { - callback() - } - } - } - } - rowsCount := uint64(event.Len()) - rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount) + rowCallback := helper.NewTxnPostFlushRowCallback(event, rowsCount) for { row, ok := event.GetNextRow() diff --git a/downstreamadapter/sink/metrics/cloudstorage.go b/downstreamadapter/sink/metrics/cloudstorage.go index 7fc8dae975..13a5c1e2fc 100644 --- a/downstreamadapter/sink/metrics/cloudstorage.go +++ b/downstreamadapter/sink/metrics/cloudstorage.go @@ -55,6 +55,14 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13), }, []string{"namespace", "changefeed"}) + CloudStorageDDLDrainDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "cloud_storage_ddl_drain_duration_seconds", + Help: "DDL drain duration for cloud storage sink", + Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13), + }, []string{"namespace", "changefeed"}) + // CloudStorageWorkerBusyRatio records the busy ratio of CloudStorage bgUpdateLog worker. CloudStorageWorkerBusyRatio = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -71,5 +79,6 @@ func InitCloudStorageMetrics(registry *prometheus.Registry) { registry.MustRegister(CloudStorageFileCountGauge) registry.MustRegister(CloudStorageWriteDurationHistogram) registry.MustRegister(CloudStorageFlushDurationHistogram) + registry.MustRegister(CloudStorageDDLDrainDurationHistogram) registry.MustRegister(CloudStorageWorkerBusyRatio) } diff --git a/downstreamadapter/sink/mock/sink_mock.go b/downstreamadapter/sink/mock/sink_mock.go new file mode 100644 index 0000000000..8726735eb2 --- /dev/null +++ b/downstreamadapter/sink/mock/sink_mock.go @@ -0,0 +1,155 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: downstreamadapter/sink/sink.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + common "github.com/pingcap/ticdc/pkg/common" + event "github.com/pingcap/ticdc/pkg/common/event" +) + +// MockSink is a mock of Sink interface. +type MockSink struct { + ctrl *gomock.Controller + recorder *MockSinkMockRecorder +} + +// MockSinkMockRecorder is the mock recorder for MockSink. +type MockSinkMockRecorder struct { + mock *MockSink +} + +// NewMockSink creates a new mock instance. +func NewMockSink(ctrl *gomock.Controller) *MockSink { + mock := &MockSink{ctrl: ctrl} + mock.recorder = &MockSinkMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSink) EXPECT() *MockSinkMockRecorder { + return m.recorder +} + +// AddCheckpointTs mocks base method. +func (m *MockSink) AddCheckpointTs(ts uint64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddCheckpointTs", ts) +} + +// AddCheckpointTs indicates an expected call of AddCheckpointTs. +func (mr *MockSinkMockRecorder) AddCheckpointTs(ts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddCheckpointTs", reflect.TypeOf((*MockSink)(nil).AddCheckpointTs), ts) +} + +// AddDMLEvent mocks base method. +func (m *MockSink) AddDMLEvent(event *event.DMLEvent) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddDMLEvent", event) +} + +// AddDMLEvent indicates an expected call of AddDMLEvent. +func (mr *MockSinkMockRecorder) AddDMLEvent(event interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDMLEvent", reflect.TypeOf((*MockSink)(nil).AddDMLEvent), event) +} + +// Close mocks base method. +func (m *MockSink) Close(removeChangefeed bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close", removeChangefeed) +} + +// Close indicates an expected call of Close. +func (mr *MockSinkMockRecorder) Close(removeChangefeed interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSink)(nil).Close), removeChangefeed) +} + +// FlushDMLBeforeBlock mocks base method. +func (m *MockSink) FlushDMLBeforeBlock(event event.BlockEvent) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FlushDMLBeforeBlock", event) + ret0, _ := ret[0].(error) + return ret0 +} + +// FlushDMLBeforeBlock indicates an expected call of FlushDMLBeforeBlock. +func (mr *MockSinkMockRecorder) FlushDMLBeforeBlock(event interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushDMLBeforeBlock", reflect.TypeOf((*MockSink)(nil).FlushDMLBeforeBlock), event) +} + +// IsNormal mocks base method. +func (m *MockSink) IsNormal() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsNormal") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsNormal indicates an expected call of IsNormal. +func (mr *MockSinkMockRecorder) IsNormal() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsNormal", reflect.TypeOf((*MockSink)(nil).IsNormal)) +} + +// Run mocks base method. +func (m *MockSink) Run(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Run", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// Run indicates an expected call of Run. +func (mr *MockSinkMockRecorder) Run(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSink)(nil).Run), ctx) +} + +// SetTableSchemaStore mocks base method. +func (m *MockSink) SetTableSchemaStore(tableSchemaStore *event.TableSchemaStore) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTableSchemaStore", tableSchemaStore) +} + +// SetTableSchemaStore indicates an expected call of SetTableSchemaStore. +func (mr *MockSinkMockRecorder) SetTableSchemaStore(tableSchemaStore interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTableSchemaStore", reflect.TypeOf((*MockSink)(nil).SetTableSchemaStore), tableSchemaStore) +} + +// SinkType mocks base method. +func (m *MockSink) SinkType() common.SinkType { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SinkType") + ret0, _ := ret[0].(common.SinkType) + return ret0 +} + +// SinkType indicates an expected call of SinkType. +func (mr *MockSinkMockRecorder) SinkType() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SinkType", reflect.TypeOf((*MockSink)(nil).SinkType)) +} + +// WriteBlockEvent mocks base method. +func (m *MockSink) WriteBlockEvent(event event.BlockEvent) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteBlockEvent", event) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteBlockEvent indicates an expected call of WriteBlockEvent. +func (mr *MockSinkMockRecorder) WriteBlockEvent(event interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteBlockEvent", reflect.TypeOf((*MockSink)(nil).WriteBlockEvent), event) +} diff --git a/downstreamadapter/sink/mock_sink.go b/downstreamadapter/sink/mock_sink.go deleted file mode 100644 index 37b1cfe986..0000000000 --- a/downstreamadapter/sink/mock_sink.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2025 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package sink - -import ( - "context" - "sync" - - "github.com/pingcap/ticdc/pkg/common" - commonEvent "github.com/pingcap/ticdc/pkg/common/event" -) - -type mockSink struct { - mu sync.Mutex - dmls []*commonEvent.DMLEvent - isNormal bool - sinkType common.SinkType -} - -func (s *mockSink) AddDMLEvent(event *commonEvent.DMLEvent) { - s.mu.Lock() - defer s.mu.Unlock() - s.dmls = append(s.dmls, event) -} - -func (s *mockSink) WriteBlockEvent(event commonEvent.BlockEvent) error { - event.PostFlush() - return nil -} - -func (s *mockSink) AddCheckpointTs(_ uint64) { -} - -func (s *mockSink) SetTableSchemaStore(_ *commonEvent.TableSchemaStore) { -} - -func (s *mockSink) Close(bool) {} - -func (s *mockSink) Run(context.Context) error { - return nil -} - -func (s *mockSink) SinkType() common.SinkType { - return s.sinkType -} - -func (s *mockSink) IsNormal() bool { - return s.isNormal -} - -func (s *mockSink) SetIsNormal(isNormal bool) { - s.isNormal = isNormal -} - -func (s *mockSink) FlushDMLs() { - s.mu.Lock() - defer s.mu.Unlock() - for _, dml := range s.dmls { - dml.PostFlush() - } - s.dmls = make([]*commonEvent.DMLEvent, 0) -} - -func (s *mockSink) GetDMLs() []*commonEvent.DMLEvent { - s.mu.Lock() - defer s.mu.Unlock() - return s.dmls -} - -func NewMockSink(sinkType common.SinkType) *mockSink { - return &mockSink{ - dmls: make([]*commonEvent.DMLEvent, 0), - isNormal: true, - sinkType: sinkType, - } -} diff --git a/downstreamadapter/sink/mysql/sink.go b/downstreamadapter/sink/mysql/sink.go index b4a6d0f3d5..f1b222e3b6 100644 --- a/downstreamadapter/sink/mysql/sink.go +++ b/downstreamadapter/sink/mysql/sink.go @@ -270,6 +270,10 @@ func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) { s.conflictDetector.Add(event) } +func (s *Sink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error { + return nil +} + func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error { var err error switch event.GetType() { diff --git a/downstreamadapter/sink/pulsar/sink.go b/downstreamadapter/sink/pulsar/sink.go index 73e1e92cb6..136f95ca1a 100644 --- a/downstreamadapter/sink/pulsar/sink.go +++ b/downstreamadapter/sink/pulsar/sink.go @@ -128,6 +128,10 @@ func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) { s.eventChan.Push(event) } +func (s *sink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error { + return nil +} + func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { var err error switch v := event.(type) { @@ -329,20 +333,8 @@ func (s *sink) calculateKeyPartitions(ctx context.Context) error { partitionGenerator := s.comp.eventRouter.GetPartitionGenerator(schema, table) selector := s.comp.columnSelector.Get(schema, table) - toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() { - var calledCount atomic.Uint64 - // The callback of the last row will trigger the callback of the txn. - return func() { - if calledCount.Inc() == totalCount { - for _, callback := range postTxnFlushed { - callback() - } - } - } - } - rowsCount := uint64(event.Len()) - rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount) + rowCallback := helper.NewTxnPostFlushRowCallback(event, rowsCount) for { row, ok := event.GetNextRow() diff --git a/downstreamadapter/sink/redo/sink.go b/downstreamadapter/sink/redo/sink.go index fe01ca46ab..646035799d 100644 --- a/downstreamadapter/sink/redo/sink.go +++ b/downstreamadapter/sink/redo/sink.go @@ -18,6 +18,7 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/ticdc/downstreamadapter/sink/helper" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" @@ -111,6 +112,10 @@ func (s *Sink) Run(ctx context.Context) error { return err } +func (s *Sink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error { + return nil +} + func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error { switch e := event.(type) { case *commonEvent.DDLEvent: @@ -128,19 +133,8 @@ func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error { } func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) { - toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() { - var calledCount atomic.Uint64 - // The callback of the last row will trigger the callback of the txn. - return func() { - if calledCount.Inc() == totalCount { - for _, callback := range postTxnFlushed { - callback() - } - } - } - } rowsCount := uint64(event.Len()) - rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount) + rowCallback := helper.NewTxnPostFlushRowCallback(event, rowsCount) for { row, ok := event.GetNextRow() diff --git a/downstreamadapter/sink/sink.go b/downstreamadapter/sink/sink.go index 77a5b43fc4..4b249a6b76 100644 --- a/downstreamadapter/sink/sink.go +++ b/downstreamadapter/sink/sink.go @@ -33,6 +33,12 @@ type Sink interface { IsNormal() bool AddDMLEvent(event *commonEvent.DMLEvent) + // FlushDMLBeforeBlock is a pre-block hook before reporting or writing a block + // event (DDL/syncpoint). Sinks can use it as a barrier to drain/serialize + // prior DML events for ordering guarantees. Most non-storage sinks no-op. + FlushDMLBeforeBlock(event commonEvent.BlockEvent) error + // WriteBlockEvent writes the block event to downstream. On success, sink + // implementations are expected to call event.PostFlush(). WriteBlockEvent(event commonEvent.BlockEvent) error AddCheckpointTs(ts uint64) diff --git a/pkg/common/event/active_active.go b/pkg/common/event/active_active.go index eac1853cc5..2ee11c2ffb 100644 --- a/pkg/common/event/active_active.go +++ b/pkg/common/event/active_active.go @@ -443,7 +443,10 @@ func newFilteredDMLEvent( newEvent.Seq = source.Seq newEvent.Epoch = source.Epoch newEvent.ReplicatingTs = source.ReplicatingTs + newEvent.PostTxnEnqueued = source.PostTxnEnqueued newEvent.PostTxnFlushed = source.PostTxnFlushed + newEvent.postEnqueueCalled.Store(source.postEnqueueCalled.Load()) + source.PostTxnEnqueued = nil source.PostTxnFlushed = nil newEvent.SetRows(rows) diff --git a/pkg/common/event/active_active_test.go b/pkg/common/event/active_active_test.go index b3d1633fd3..70e47b17c4 100644 --- a/pkg/common/event/active_active_test.go +++ b/pkg/common/event/active_active_test.go @@ -25,6 +25,7 @@ import ( tidbTypes "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) func TestFilterDMLEventNormalTablePassthrough(t *testing.T) { @@ -316,6 +317,36 @@ func TestFilterDMLEventSoftDeleteTableMissingColumnReportsError(t *testing.T) { require.Contains(t, handledErr.Error(), SoftDeleteTimeColumn) } +func TestFilterDMLEventKeepsPostEnqueueCallbacksOnFilteredEvent(t *testing.T) { + ti := newTestTableInfo(t, true, true) + ts := newTimestampValue(time.Date(2025, time.March, 10, 0, 0, 0, 0, time.UTC)) + event := newDMLEventForTest(t, ti, + []commonpkg.RowType{commonpkg.RowTypeUpdate}, + [][]interface{}{ + {int64(1), nil}, + {int64(1), ts}, + }) + + var enqueueCalled atomic.Int64 + var flushCalled atomic.Int64 + event.AddPostEnqueueFunc(func() { + enqueueCalled.Inc() + }) + event.AddPostFlushFunc(func() { + flushCalled.Inc() + }) + + filtered, skip := FilterDMLEvent(event, false, nil) + require.False(t, skip) + require.NotNil(t, filtered) + require.NotEqual(t, event, filtered) + + filtered.PostEnqueue() + filtered.PostFlush() + require.Equal(t, int64(1), enqueueCalled.Load()) + require.Equal(t, int64(1), flushCalled.Load()) +} + func newTestTableInfo(t *testing.T, activeActive, softDelete bool) *commonpkg.TableInfo { idCol := newTestColumn(1, "id", mysql.TypeLong, mysql.PriKeyFlag) cols := []*model.ColumnInfo{idCol} diff --git a/pkg/common/event/dml_event.go b/pkg/common/event/dml_event.go index 8f18234db8..2216fc4f2f 100644 --- a/pkg/common/event/dml_event.go +++ b/pkg/common/event/dml_event.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -392,9 +393,18 @@ type DMLEvent struct { // The following fields are set and used by dispatcher. ReplicatingTs uint64 `json:"replicating_ts"` - // PostTxnFlushed is the functions to be executed after the transaction is flushed. - // It is set and used by dispatcher. + // PostTxnEnqueued contains callbacks executed when the txn is accepted by + // sink's internal pipeline (enqueue stage). + // + // Note: enqueue means "handed over to sink workers", not "durably written + // to downstream". + PostTxnEnqueued []func() `json:"-"` + // PostTxnFlushed contains callbacks executed when the txn is fully flushed to + // downstream (flush stage), which is stronger than enqueue and is used by + // checkpoint related logic. PostTxnFlushed []func() `json:"-"` + // postEnqueueCalled ensures PostTxnEnqueued callbacks are triggered at most once. + postEnqueueCalled atomic.Bool `json:"-"` // eventSize is the size of the event in bytes. It is set when it's unmarshaled. eventSize int64 `json:"-"` @@ -630,10 +640,30 @@ func (t *DMLEvent) GetStartTs() common.Ts { return t.StartTs } +// PostFlush marks the transaction as flushed to downstream. +// +// It runs flush callbacks first, then triggers PostEnqueue. For sinks with an +// explicit enqueue hook, PostEnqueue may already have been called before flush. func (t *DMLEvent) PostFlush() { for _, f := range t.PostTxnFlushed { f() } + // Keep this fallback to preserve flush-then-wake semantics for sinks without + // an explicit enqueue stage; CAS in PostEnqueue guarantees double-calls are no-op. + t.PostEnqueue() +} + +// PostEnqueue marks the transaction as enqueued into sink internal pipeline. +// +// This stage does not mean data is already written to downstream. The method is +// idempotent and guarantees enqueue callbacks run at most once. +func (t *DMLEvent) PostEnqueue() { + if !t.postEnqueueCalled.CAS(false, true) { + return + } + for _, f := range t.PostTxnEnqueued { + f() + } } func (t *DMLEvent) GetSeq() uint64 { @@ -644,18 +674,36 @@ func (t *DMLEvent) GetEpoch() uint64 { return t.Epoch } +// PushFrontFlushFunc prepends a flush callback so it runs before existing ones. func (t *DMLEvent) PushFrontFlushFunc(f func()) { t.PostTxnFlushed = append([]func(){f}, t.PostTxnFlushed...) } +// ClearPostFlushFunc removes all registered flush callbacks. func (t *DMLEvent) ClearPostFlushFunc() { t.PostTxnFlushed = t.PostTxnFlushed[:0] } +// AddPostFlushFunc registers a callback that runs at flush stage. +// +// Use this when the callback depends on downstream persistence semantics. func (t *DMLEvent) AddPostFlushFunc(f func()) { t.PostTxnFlushed = append(t.PostTxnFlushed, f) } +// ClearPostEnqueueFunc removes all registered enqueue callbacks. +func (t *DMLEvent) ClearPostEnqueueFunc() { + t.PostTxnEnqueued = t.PostTxnEnqueued[:0] +} + +// AddPostEnqueueFunc registers a callback that runs at enqueue stage. +// +// Use this when only sink acceptance is required. For sinks with no explicit +// enqueue signal, this callback is triggered after flush callbacks in PostFlush. +func (t *DMLEvent) AddPostEnqueueFunc(f func()) { + t.PostTxnEnqueued = append(t.PostTxnEnqueued, f) +} + // Rewind reset the offset to 0, So that the next GetNextRow will return the first row func (t *DMLEvent) Rewind() { t.offset = 0 diff --git a/pkg/common/event/dml_event_test.go b/pkg/common/event/dml_event_test.go index 555f1587af..766c20767b 100644 --- a/pkg/common/event/dml_event_test.go +++ b/pkg/common/event/dml_event_test.go @@ -15,6 +15,7 @@ package event import ( "encoding/binary" + "sync" "testing" "github.com/pingcap/ticdc/pkg/common" @@ -23,6 +24,7 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) func TestDMLEventBasicEncodeAndDecode(t *testing.T) { @@ -342,3 +344,86 @@ func TestBatchDMLEventHeaderValidation(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "incomplete data") } + +func TestDMLEventPostEnqueueFuncs(t *testing.T) { + t.Parallel() + + event := &DMLEvent{} + var called atomic.Int64 + event.AddPostEnqueueFunc(func() { + called.Inc() + }) + event.AddPostEnqueueFunc(func() { + called.Inc() + }) + + event.PostEnqueue() + event.PostEnqueue() + + require.Equal(t, int64(2), called.Load()) +} + +func TestDMLEventPostFlushTriggersPostEnqueueOnce(t *testing.T) { + t.Parallel() + + event := &DMLEvent{} + var enqueueCalled atomic.Int64 + var flushCalled atomic.Int64 + event.AddPostEnqueueFunc(func() { + enqueueCalled.Inc() + }) + event.AddPostFlushFunc(func() { + flushCalled.Inc() + }) + + event.PostFlush() + event.PostFlush() + + require.Equal(t, int64(1), enqueueCalled.Load()) + require.Equal(t, int64(2), flushCalled.Load()) +} + +func TestDMLEventPostFlushRunsFlushBeforePostEnqueueFallback(t *testing.T) { + t.Parallel() + + event := &DMLEvent{} + order := make([]string, 0, 3) + event.AddPostFlushFunc(func() { + order = append(order, "flush") + }) + event.AddPostEnqueueFunc(func() { + order = append(order, "enqueue") + }) + + event.PostFlush() + event.PostFlush() + + require.Equal(t, []string{"flush", "enqueue", "flush"}, order) +} + +func TestDMLEventPostEnqueueConcurrentWithPostFlush(t *testing.T) { + t.Parallel() + + event := &DMLEvent{} + var enqueueCalled atomic.Int64 + event.AddPostEnqueueFunc(func() { + enqueueCalled.Inc() + }) + + var wg sync.WaitGroup + const loops = 256 + for i := 0; i < loops; i++ { + wg.Add(2) + go func() { + defer wg.Done() + event.PostEnqueue() + }() + go func() { + defer wg.Done() + event.PostFlush() + }() + } + wg.Wait() + + require.Equal(t, int64(1), enqueueCalled.Load()) +} diff --git a/pkg/common/event/table_schema_store.go b/pkg/common/event/table_schema_store.go index 256721b362..588f33b7fa 100644 --- a/pkg/common/event/table_schema_store.go +++ b/pkg/common/event/table_schema_store.go @@ -92,7 +92,6 @@ func newTableSchemaStoreRequirements( needTableNames: true, } case commonType.CloudStorageSinkType, commonType.BlackHoleSinkType: - // These sinks currently do not consume TableSchemaStore metadata. return tableSchemaStoreRequirements{ needTableIDs: false, updateTableIDs: false, diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 4b817dd394..390c6d37c2 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -60,7 +60,7 @@ const ( // `0 0 2 * * ?` means 2:00:00 AM every day defaultFileCleanupCronSpec = "0 0 2 * * *" - defaultEnableTableAcrossNodes = true + defaultEnableTableAcrossNodes = false ) type urlConfig struct { diff --git a/pkg/sink/cloudstorage/config_test.go b/pkg/sink/cloudstorage/config_test.go index 442d95a2eb..f02cfb446b 100644 --- a/pkg/sink/cloudstorage/config_test.go +++ b/pkg/sink/cloudstorage/config_test.go @@ -41,7 +41,7 @@ func TestConfigApply(t *testing.T) { err = replicaConfig.ValidateAndAdjust(sinkURI) require.NoError(t, err) cfg := NewConfig() - err = cfg.Apply(context.TODO(), sinkURI, replicaConfig.Sink, true) + err = cfg.Apply(context.TODO(), sinkURI, replicaConfig.Sink, false) require.Nil(t, err) require.Equal(t, expected, cfg) } diff --git a/scripts/generate-mock.sh b/scripts/generate-mock.sh index 3610b056ab..2b2cc79d68 100755 --- a/scripts/generate-mock.sh +++ b/scripts/generate-mock.sh @@ -36,3 +36,4 @@ fi "$MOCKGEN" -source pkg/sink/codec/simple/marshaller.go -destination pkg/sink/codec/simple/mock/marshaller.go "$MOCKGEN" -source pkg/keyspace/keyspace_manager.go -destination pkg/keyspace/keyspace_manager_mock.go -package keyspace "$MOCKGEN" -source pkg/txnutil/gc/gc_manager.go -destination pkg/txnutil/gc/gc_manager_mock.go -package gc +"$MOCKGEN" -source downstreamadapter/sink/sink.go -destination downstreamadapter/sink/mock/sink_mock.go -package mock