diff --git a/go/base/context.go b/go/base/context.go index ac077076f..e4008cdd6 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -247,7 +247,9 @@ type MigrationContext struct { recentBinlogCoordinates mysql.BinlogCoordinates - BinlogSyncerMaxReconnectAttempts int + BinlogSyncerMaxReconnectAttempts int + AllowSetupMetadataLockInstruments bool + IsOpenMetadataLockInstruments bool Log Logger } diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index a1670cdd4..4a28ad1aa 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -137,6 +137,7 @@ func main() { flag.Int64Var(&migrationContext.HooksStatusIntervalSec, "hooks-status-interval", 60, "how many seconds to wait between calling onStatus hook") flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999") + flag.BoolVar(&migrationContext.AllowSetupMetadataLockInstruments, "allow-setup-metadata-lock-instruments", false, "validate rename session hold the MDL of original table before unlock tables in cut-over phase") flag.IntVar(&migrationContext.BinlogSyncerMaxReconnectAttempts, "binlogsyncer-max-reconnect-attempts", 0, "when master node fails, the maximum number of binlog synchronization attempts to reconnect. 0 is unlimited") flag.BoolVar(&migrationContext.IncludeTriggers, "include-triggers", false, "When true, the triggers (if exist) will be created on the new table") diff --git a/go/logic/applier.go b/go/logic/applier.go index 98195b4bb..a7edaed24 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -416,6 +416,28 @@ func (this *Applier) dropTable(tableName string) error { return nil } +func (this *Applier) StateMetadataLockInstrument() error { + query := `select /*+ MAX_EXECUTION_TIME(300) */ ENABLED, TIMED from performance_schema.setup_instruments WHERE NAME = 'wait/lock/metadata/sql/mdl'` + var enabled, timed string + if err := this.db.QueryRow(query).Scan(&enabled, &timed); err != nil { + return this.migrationContext.Log.Errorf("query performance_schema.setup_instruments with name wait/lock/metadata/sql/mdl error: %s", err) + } + if strings.EqualFold(enabled, "YES") && strings.EqualFold(timed, "YES") { + this.migrationContext.IsOpenMetadataLockInstruments = true + return nil + } + if !this.migrationContext.AllowSetupMetadataLockInstruments { + return nil + } + this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl state: enabled %s, timed %s", enabled, timed) + if _, err := this.db.Exec(`UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME = 'wait/lock/metadata/sql/mdl'`); err != nil { + return this.migrationContext.Log.Errorf("enable instrument wait/lock/metadata/sql/mdl error: %s", err) + } + this.migrationContext.IsOpenMetadataLockInstruments = true + this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl enabled") + return nil +} + // dropTriggers drop the triggers on the applied host func (this *Applier) DropTriggersFromGhost() error { if len(this.migrationContext.Triggers) > 0 { @@ -1095,7 +1117,7 @@ func (this *Applier) RevertAtomicCutOverWaitTimeout() { } // AtomicCutOverMagicLock -func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error { +func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, renameLockSessionId *int64) error { tx, err := this.db.Begin() if err != nil { tableLocked <- err @@ -1186,6 +1208,20 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke // We DO NOT return here because we must `UNLOCK TABLES`! } + this.migrationContext.Log.Infof("Session renameLockSessionId is %+v", *renameLockSessionId) + // Checking the lock is held by rename session + if *renameLockSessionId > 0 && this.migrationContext.IsOpenMetadataLockInstruments { + sleepDuration := time.Duration(10*this.migrationContext.CutOverLockTimeoutSeconds) * time.Millisecond + for i := 1; i <= 100; i++ { + err := this.ExpectMetadataLock(*renameLockSessionId) + if err == nil { + this.migrationContext.Log.Infof("Rename session is pending lock on the origin table !") + break + } else { + time.Sleep(sleepDuration) + } + } + } // Tables still locked this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), @@ -1405,3 +1441,27 @@ func (this *Applier) Teardown() { this.singletonDB.Close() atomic.StoreInt64(&this.finishedMigrating, 1) } + +func (this *Applier) ExpectMetadataLock(sessionId int64) error { + found := false + query := ` + select /* gh-ost */ m.owner_thread_id + from performance_schema.metadata_locks m join performance_schema.threads t + on m.owner_thread_id=t.thread_id + where m.object_type = 'TABLE' and m.object_schema = ? and m.object_name = ? + and m.lock_type = 'EXCLUSIVE' and m.lock_status = 'PENDING' + and t.processlist_id = ? + ` + err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { + found = true + return nil + }, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, sessionId) + if err != nil { + return err + } + if !found { + err = fmt.Errorf("cannot find PENDING metadata lock on original table: `%s`.`%s`", this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName) + return this.migrationContext.Log.Errore(err) + } + return nil +} diff --git a/go/logic/migrator.go b/go/logic/migrator.go index a6fad7274..880b9b4c5 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -669,8 +669,9 @@ func (this *Migrator) atomicCutOver() (err error) { lockOriginalSessionIdChan := make(chan int64, 2) tableLocked := make(chan error, 2) tableUnlocked := make(chan error, 2) + var renameLockSessionId int64 go func() { - if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil { + if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &renameLockSessionId); err != nil { this.migrationContext.Log.Errore(err) } }() @@ -735,6 +736,7 @@ func (this *Migrator) atomicCutOver() (err error) { // Now that we've found the RENAME blocking, AND the locking connection still alive, // we know it is safe to proceed to release the lock + renameLockSessionId = renameSessionId okToUnlockTable <- true // BAM! magic table dropped, original table lock is released // -> RENAME released -> queries on original are unblocked. @@ -1203,6 +1205,10 @@ func (this *Migrator) initiateApplier() error { } } this.applier.WriteChangelogState(string(GhostTableMigrated)) + if err := this.applier.StateMetadataLockInstrument(); err != nil { + this.migrationContext.Log.Errorf("Unable to enable metadata lock instrument, see further error details. Bailing out") + return err + } go this.applier.InitiateHeartbeat() return nil } diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 65a26a59d..86c060acf 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -9,6 +9,7 @@ import ( "context" gosql "database/sql" "errors" + "fmt" "os" "path/filepath" "strings" @@ -22,7 +23,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go/modules/mysql" - "fmt" + "runtime" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" @@ -335,6 +336,7 @@ func (suite *MigratorTestSuite) TestMigrateEmpty() { migrationContext.ApplierConnectionConfig = connectionConfig migrationContext.InspectorConnectionConfig = connectionConfig migrationContext.SetConnectionConfig("innodb") + migrationContext.InitiallyDropOldTable = true migrationContext.AlterStatementOptions = "ADD COLUMN foobar varchar(255), ENGINE=InnoDB" @@ -573,6 +575,86 @@ func TestMigratorRetryWithExponentialBackoff(t *testing.T) { assert.Equal(t, tries, 100) } +func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() { + ctx := context.Background() + + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(64))", getTestTableName())) + suite.Require().NoError(err) + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("insert into %s values(1,'a')", getTestTableName())) + suite.Require().NoError(err) + + done := make(chan error, 1) + go func() { + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + if err != nil { + done <- err + return + } + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.AllowSetupMetadataLockInstruments = true + migrationContext.AlterStatementOptions = "ADD COLUMN foobar varchar(255)" + migrationContext.HeartbeatIntervalMilliseconds = 100 + migrationContext.CutOverLockTimeoutSeconds = 4 + + _, filename, _, _ := runtime.Caller(0) + migrationContext.PostponeCutOverFlagFile = filepath.Join(filepath.Dir(filename), "../../tmp/ghost.postpone.flag") + + migrator := NewMigrator(migrationContext, "0.0.0") + + //nolint:contextcheck + done <- migrator.Migrate() + }() + + time.Sleep(2 * time.Second) + //nolint:dogsled + _, filename, _, _ := runtime.Caller(0) + err = os.Remove(filepath.Join(filepath.Dir(filename), "../../tmp/ghost.postpone.flag")) + if err != nil { + suite.Require().NoError(err) + } + time.Sleep(1 * time.Second) + go func() { + holdConn, err := suite.db.Conn(ctx) + suite.Require().NoError(err) + _, err = holdConn.ExecContext(ctx, "SELECT *, sleep(2) FROM test._testing_gho WHERE id = 1") + suite.Require().NoError(err) + }() + + dmlConn, err := suite.db.Conn(ctx) + suite.Require().NoError(err) + + _, err = dmlConn.ExecContext(ctx, fmt.Sprintf("insert into %s (id, name) values(2,'b')", getTestTableName())) + fmt.Println("insert into table original table") + suite.Require().NoError(err) + + migrateErr := <-done + suite.Require().NoError(migrateErr) + + // Verify the new column was added + var delValue, OriginalValue int64 + err = suite.db.QueryRow( + fmt.Sprintf("select count(*) from %s._%s_del", testMysqlDatabase, testMysqlTableName), + ).Scan(&delValue) + suite.Require().NoError(err) + + err = suite.db.QueryRow("select count(*) from " + getTestTableName()).Scan(&OriginalValue) + suite.Require().NoError(err) + + suite.Require().LessOrEqual(delValue, OriginalValue) + + var tableName, createTableSQL string + //nolint:execinquery + err = suite.db.QueryRow("SHOW CREATE TABLE "+getTestTableName()).Scan(&tableName, &createTableSQL) + suite.Require().NoError(err) + + suite.Require().Equal(testMysqlTableName, tableName) + suite.Require().Equal("CREATE TABLE `testing` (\n `id` int NOT NULL,\n `name` varchar(64) DEFAULT NULL,\n `foobar` varchar(255) DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci", createTableSQL) +} + func TestMigrator(t *testing.T) { suite.Run(t, new(MigratorTestSuite)) }