Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 80 additions & 41 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/tidb/pkg/parser/mysql"
tidbTypes "github.com/pingcap/tidb/pkg/types"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -168,7 +168,7 @@ type BasicDispatcher struct {
sink sink.Sink

// the max resolvedTs received by the dispatcher
resolvedTs uint64
resolvedTs atomic.Uint64

// blockEventStatus is used to store the current pending ddl/sync point event and its block status.
blockEventStatus BlockEventStatus
Expand Down Expand Up @@ -246,7 +246,6 @@ func NewBasicDispatcher(
sharedInfo: sharedInfo,
sink: sink,
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Initializing),
resolvedTs: startTs,
isRemoving: atomic.Bool{},
duringHandleEvents: atomic.Bool{},
blockEventStatus: BlockEventStatus{blockPendingEvent: nil},
Expand All @@ -258,13 +257,15 @@ func NewBasicDispatcher(
mode: mode,
BootstrapState: BootstrapFinished,
}
dispatcher.resolvedTs.Store(startTs)

return dispatcher
}

// AddDMLEventsToSink filters events for special tables and only returns true when
// at least one event remains to be written to the downstream sink.
func (d *BasicDispatcher) AddDMLEventsToSink(events []*commonEvent.DMLEvent) bool {
// AddDMLEventsToSink filters events for special tables, registers batch wake
// callbacks, and returns true when at least one event remains to be written to
// the downstream sink.
func (d *BasicDispatcher) AddDMLEventsToSink(events []*commonEvent.DMLEvent, wakeCallback func()) bool {
// Normal DML dispatch: most tables just pass through this function unchanged.
// Active-active or soft-delete tables are processed by FilterDMLEvent before
// being handed over to the sink (delete rows dropped; soft-delete transitions may
Expand All @@ -288,9 +289,18 @@ func (d *BasicDispatcher) AddDMLEventsToSink(events []*commonEvent.DMLEvent) boo
return false
}

var remaining atomic.Int64
remaining.Store(int64(len(filteredEvents)))
for _, event := range filteredEvents {
event.AddPostEnqueueFunc(func() {
if remaining.Dec() == 0 {
wakeCallback()
}
})
}

// for one batch events, we need to add all them in table progress first, then add them to sink
// because we need to ensure the wakeCallback only will be called when
// all these events are flushed to downstream successfully
// to avoid checkpoint jumping while events are being enqueued/flushed.
for _, event := range filteredEvents {
d.tableProgress.Add(event)
}
Expand Down Expand Up @@ -329,20 +339,38 @@ func (d *BasicDispatcher) AddBlockEventToSink(event commonEvent.BlockEvent) erro
if event.GetType() == commonEvent.TypeDDLEvent {
ddl := event.(*commonEvent.DDLEvent)
// If NotSync is true, it means the DDL should not be sent to downstream.
// So we just call PassBlockEventToSink to update the table progress and call the postFlush func.
// So we just call PassBlockEventToSink to finish local bookkeeping:
// mark it passed in tableProgress and trigger flush callbacks to unblock
// dispatcher progress, without sending this DDL to sink.
if ddl.NotSync {
log.Info("ignore DDL by NotSync", zap.Stringer("dispatcher", d.id), zap.Any("ddl", ddl))
d.PassBlockEventToSink(event)
return nil
return d.PassBlockEventToSink(event)
}
}
// Keep block-event write order with prior DML. For storage sink this may wait
// until prior DML are enqueued/flushed to the sink pipeline; for non-storage
// sinks it is usually a no-op.
if err := d.sink.FlushDMLBeforeBlock(event); err != nil {
return err
}

d.tableProgress.Add(event)
return d.sink.WriteBlockEvent(event)
}

func (d *BasicDispatcher) PassBlockEventToSink(event commonEvent.BlockEvent) {
// PassBlockEventToSink is used when block event handling result is "pass"
// (for example maintainer action=Pass or DDL NotSync).
//
// It intentionally does not call sink.WriteBlockEvent. Instead, it updates
// local progress as if the event had been handled, then fires PostFlush
// callbacks so wake/checkpoint logic can continue with consistent ordering.
func (d *BasicDispatcher) PassBlockEventToSink(event commonEvent.BlockEvent) error {
if err := d.sink.FlushDMLBeforeBlock(event); err != nil {
return err
}
d.tableProgress.Pass(event)
event.PostFlush()
return nil
}

// ensureActiveActiveTableInfo validates the table schema requirements for active-active mode.
Expand Down Expand Up @@ -473,7 +501,7 @@ func (d *BasicDispatcher) GetHeartBeatInfo(h *HeartBeatInfo) {
}

func (d *BasicDispatcher) GetResolvedTs() uint64 {
return atomic.LoadUint64(&d.resolvedTs)
return d.resolvedTs.Load()
}

func (d *BasicDispatcher) GetLastSyncedTs() uint64 {
Expand Down Expand Up @@ -537,15 +565,16 @@ func (d *BasicDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeC
return false
}

// handleEvents can batch handle events about resolvedTs Event and DML Event.
// While for DDLEvent and SyncPointEvent, they should be handled separately,
// because they are block events.
// We ensure we only will receive one event when it's ddl event or sync point event
// by setting them with different event types in DispatcherEventsHandler.GetType
// When we handle events, we don't have any previous events still in sink.
// handleEvents processes one batch for one dispatcher.
// the next batch of events can only be handled after the current batch is enqueued or flushed.
// A batch may mix DML and resolved-ts events; Block events are expected to be handled one by one.
// - Block events, such as DDL / Syncpoint, is sent to the sink synchronously,
// the dispatcher is blocked until the block event is flushed.
// - DML events is sent to the sink asynchronously.
// - Storage sink, the dispatcher wake up after all DML events is guaranteed enqueued.
// - Non-storage sink, the dispatche wake up after all DML events is guaranteed flushed.
//
// wakeCallback is used to wake the dynamic stream to handle the next batch events.
// It will be called when all the events are flushed to downstream successfully.
// Return true if should block the dispatcher.
func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool {
if d.GetRemovingStatus() {
log.Warn("dispatcher is removing", zap.Any("id", d.id))
Expand All @@ -557,7 +586,6 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC

// Only return false when all events are resolvedTs Event.
block := false
dmlWakeOnce := &sync.Once{}
dmlEvents := make([]*commonEvent.DMLEvent, 0, len(dispatcherEvents))
latestResolvedTs := uint64(0)
// Dispatcher is ready, handle the events
Expand Down Expand Up @@ -622,14 +650,6 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC

block = true
dml.ReplicatingTs = d.creationPDTs
dml.AddPostFlushFunc(func() {
// Considering dml event in sink may be written to downstream not in order,
// thus, we use tableProgress.Empty() to ensure these events are flushed to downstream completely
// and wake dynamic stream to handle the next events.
if d.tableProgress.Empty() {
dmlWakeOnce.Do(wakeCallback)
}
})
dmlEvents = append(dmlEvents, dml)
case commonEvent.TypeDDLEvent:
if len(dispatcherEvents) != 1 {
Expand Down Expand Up @@ -713,14 +733,14 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
// the checkpointTs may be incorrect set as the new resolvedTs,
// due to the tableProgress is empty before dml events add into sink.
if len(dmlEvents) > 0 {
hasDMLToFlush := d.AddDMLEventsToSink(dmlEvents)
hasDMLToFlush := d.AddDMLEventsToSink(dmlEvents, wakeCallback)
if !hasDMLToFlush {
// All DML events were filtered out, so they no longer block dispatcher progress.
block = false
}
}
if latestResolvedTs > 0 {
atomic.StoreUint64(&d.resolvedTs, latestResolvedTs)
d.resolvedTs.Store(latestResolvedTs)
}
return block
}
Expand All @@ -732,9 +752,10 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
// 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream.
// 2. If the action is a pass, we just need to pass the event
//
// For Action_Write, writing the block event may involve IO (e.g. executing DDL). To avoid blocking the
// dispatcher status dynamic stream handler, we execute the write asynchronously and return await=true.
// The status path will be waked up after the write finishes.
// For block actions (write/pass), execution may involve downstream IO because we flush prior DML first.
// To avoid blocking the dispatcher status dynamic stream handler, we execute the action asynchronously
// and return await=true.
// The status path will be waked up after the action finishes.
func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.DispatcherStatus) (await bool) {
log.Debug("dispatcher handle dispatcher status",
zap.Any("dispatcherStatus", dispatcherStatus),
Expand Down Expand Up @@ -781,17 +802,18 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
// 3. clear blockEventStatus(should be the old pending event, but clear the new one)
d.blockEventStatus.clear()
})
actionCommitTs := action.CommitTs
actionIsSyncPoint := action.IsSyncPoint
if action.Action == heartbeatpb.Action_Write {
actionCommitTs := action.CommitTs
actionIsSyncPoint := action.IsSyncPoint
d.sharedInfo.GetBlockEventExecutor().Submit(d, func() {
d.ExecuteBlockEventDDL(pendingEvent, actionCommitTs, actionIsSyncPoint)
})
return true
} else {
failpoint.Inject("BlockOrWaitBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)
d.sharedInfo.GetBlockEventExecutor().Submit(d, func() {
d.PassBlockEvent(pendingEvent, actionCommitTs, actionIsSyncPoint)
})
return true
}
} else {
ts, ok := d.blockEventStatus.getEventCommitTs()
Expand Down Expand Up @@ -832,7 +854,25 @@ func (d *BasicDispatcher) ExecuteBlockEventDDL(pendingEvent commonEvent.BlockEve
return
}
failpoint.Inject("BlockOrWaitReportAfterWrite", nil)
d.reportBlockedEventDone(actionCommitTs, actionIsSyncPoint)
}

// PassBlockEvent executes maintainer Action_Pass:
// It relies on PassBlockEventToSink to preserve ordering and mark the event passed.
func (d *BasicDispatcher) PassBlockEvent(pendingEvent commonEvent.BlockEvent, actionCommitTs uint64, actionIsSyncPoint bool) {
failpoint.Inject("BlockOrWaitBeforePass", nil)
err := d.PassBlockEventToSink(pendingEvent)
if err != nil {
d.HandleError(err)
return
}
failpoint.Inject("BlockAfterPass", nil)
d.reportBlockedEventDone(actionCommitTs, actionIsSyncPoint)
}

// reportBlockedEventDone sends DONE status and wakes dispatcher-status stream path
// so the next status for this dispatcher can be handled.
func (d *BasicDispatcher) reportBlockedEventDone(actionCommitTs uint64, actionIsSyncPoint bool) {
d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
Expand Down Expand Up @@ -961,7 +1001,6 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) {
d.holdBlockEvent(event)
return
}

d.reportBlockedEventToMaintainer(event)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package dispatcher
import (
"testing"

"github.com/pingcap/ticdc/downstreamadapter/sink"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
Expand Down Expand Up @@ -147,7 +146,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive
blockStatuses,
errCh,
)
dispatcherSink := sink.NewMockSink(sinkType)
dispatcherSink := newDispatcherTestSink(t, sinkType)
tableSpan := &heartbeatpb.TableSpan{TableID: 1, StartKey: []byte{0}, EndKey: []byte{1}}
dispatcher := NewBasicDispatcher(
common.NewDispatcherID(),
Expand All @@ -159,7 +158,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive
false,
200,
common.DefaultMode,
dispatcherSink,
dispatcherSink.Sink(),
sharedInfo,
)
return dispatcher
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/dispatcher/basic_dispatcher_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (d *BasicDispatcher) IsTableTriggerDispatcher() bool {
// SetStartTs only be called after the dispatcher is created
func (d *BasicDispatcher) SetStartTs(startTs uint64) {
atomic.StoreUint64(&d.startTs, startTs)
atomic.StoreUint64(&d.resolvedTs, startTs)
d.resolvedTs.Store(startTs)
}

func (d *BasicDispatcher) SetCurrentPDTs(currentPDTs uint64) {
Expand Down
Loading