From 5dffca88fb3eeca05241a70b62b6c3834d5b41ce Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Wed, 9 Apr 2025 15:45:10 -0400 Subject: [PATCH 1/6] Remove error return value since we don't use it. --- go/logic/coordinator.go | 6 +++--- go/logic/coordinator_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/logic/coordinator.go b/go/logic/coordinator.go index 1921daffc..11fff2cb6 100644 --- a/go/logic/coordinator.go +++ b/go/logic/coordinator.go @@ -365,14 +365,14 @@ func (c *Coordinator) ProcessEventsUntilNextChangelogEvent() (*binlog.BinlogDMLE // ProcessEventsUntilDrained reads binlog events and sends them to the workers to process. // It exits when the event queue is empty and all the workers are returned to the workerQueue. -func (c *Coordinator) ProcessEventsUntilDrained() error { +func (c *Coordinator) ProcessEventsUntilDrained() { for { select { // Read events from the binlog and submit them to the next worker case ev := <-c.events: { if c.finishedMigrating.Load() { - return nil + return } switch binlogEvent := ev.Event.(type) { @@ -428,7 +428,7 @@ func (c *Coordinator) ProcessEventsUntilDrained() error { default: { if c.busyWorkers.Load() == 0 { - return nil + return } } } diff --git a/go/logic/coordinator_test.go b/go/logic/coordinator_test.go index 6a5f6d8df..b90f192ee 100644 --- a/go/logic/coordinator_test.go +++ b/go/logic/coordinator_test.go @@ -194,7 +194,7 @@ func (suite *CoordinatorTestSuite) TestApplyDML() { break } - err = coord.ProcessEventsUntilDrained() + coord.ProcessEventsUntilDrained() suite.Require().NoError(err) } From 0f37735e76e041a1eeb6040251f54bd3035808cb Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Wed, 9 Apr 2025 15:47:09 -0400 Subject: [PATCH 2/6] Lock the mutex whenever we plan to update the low watermark to avoid a race condition. --- go/logic/coordinator.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/logic/coordinator.go b/go/logic/coordinator.go index 11fff2cb6..32fee128b 100644 --- a/go/logic/coordinator.go +++ b/go/logic/coordinator.go @@ -377,9 +377,11 @@ func (c *Coordinator) ProcessEventsUntilDrained() { switch binlogEvent := ev.Event.(type) { case *replication.GTIDEvent: + c.mu.Lock() if c.lowWaterMark == 0 && binlogEvent.SequenceNumber > 0 { c.lowWaterMark = binlogEvent.SequenceNumber - 1 } + c.mu.Unlock() case *replication.RotateEvent: c.currentCoordinatesMutex.Lock() c.currentCoordinates.LogFile = string(binlogEvent.NextLogName) From 517a532eb0c6d2b2a738559221f2b5f771bbd527 Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Wed, 9 Apr 2025 15:47:45 -0400 Subject: [PATCH 3/6] Check for data races in our unit tests. --- script/test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/script/test b/script/test index bec4f67f5..3f66288d1 100755 --- a/script/test +++ b/script/test @@ -14,4 +14,4 @@ script/build cd .gopath/src/github.com/github/gh-ost echo "Running unit tests" -go test -v -p 1 -covermode=atomic ./go/... +go test -v -p 1 -covermode=atomic -race ./go/... From 8a4aa6f89e965f7cf8023fdfbacd244def3f5758 Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Wed, 9 Apr 2025 15:51:36 -0400 Subject: [PATCH 4/6] Still return an error from ProcessEventsUntilDrained but actually check it in our code. --- go/logic/coordinator.go | 7 ++++--- go/logic/migrator.go | 4 +++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/go/logic/coordinator.go b/go/logic/coordinator.go index 32fee128b..ce38ac832 100644 --- a/go/logic/coordinator.go +++ b/go/logic/coordinator.go @@ -365,14 +365,14 @@ func (c *Coordinator) ProcessEventsUntilNextChangelogEvent() (*binlog.BinlogDMLE // ProcessEventsUntilDrained reads binlog events and sends them to the workers to process. // It exits when the event queue is empty and all the workers are returned to the workerQueue. -func (c *Coordinator) ProcessEventsUntilDrained() { +func (c *Coordinator) ProcessEventsUntilDrained() error { for { select { // Read events from the binlog and submit them to the next worker case ev := <-c.events: { if c.finishedMigrating.Load() { - return + return nil } switch binlogEvent := ev.Event.(type) { @@ -430,11 +430,12 @@ func (c *Coordinator) ProcessEventsUntilDrained() { default: { if c.busyWorkers.Load() == 0 { - return + return nil } } } } + return nil } func (c *Coordinator) InitializeWorkers(count int) { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 39c248009..d90598ddc 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1319,7 +1319,9 @@ func (this *Migrator) executeWriteFuncs() error { // We give higher priority to event processing. // ProcessEventsUntilDrained will process all events in the queue, and then return once no more events are available. - this.trxCoordinator.ProcessEventsUntilDrained() + if err := this.trxCoordinator.ProcessEventsUntilDrained(); err != nil { + return this.migrationContext.Log.Errore(err) + } this.throttler.throttle(nil) From d1289b3df7fe0231937aed387845f05437a02f83 Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Wed, 9 Apr 2025 15:52:22 -0400 Subject: [PATCH 5/6] Make coordinator_test.go to check the err from ProcessEventsUntilDrained again --- go/logic/coordinator_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/coordinator_test.go b/go/logic/coordinator_test.go index b90f192ee..6a5f6d8df 100644 --- a/go/logic/coordinator_test.go +++ b/go/logic/coordinator_test.go @@ -194,7 +194,7 @@ func (suite *CoordinatorTestSuite) TestApplyDML() { break } - coord.ProcessEventsUntilDrained() + err = coord.ProcessEventsUntilDrained() suite.Require().NoError(err) } From bf75daf730fb25c59aba4ba3a6af134ceb98cae9 Mon Sep 17 00:00:00 2001 From: "Miguel D. Salcedo" Date: Wed, 9 Apr 2025 15:55:16 -0400 Subject: [PATCH 6/6] Remove unreachable return in ProcessEventsUntilDrained --- go/logic/coordinator.go | 1 - 1 file changed, 1 deletion(-) diff --git a/go/logic/coordinator.go b/go/logic/coordinator.go index ce38ac832..f3ee4942f 100644 --- a/go/logic/coordinator.go +++ b/go/logic/coordinator.go @@ -435,7 +435,6 @@ func (c *Coordinator) ProcessEventsUntilDrained() error { } } } - return nil } func (c *Coordinator) InitializeWorkers(count int) {