diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 7296c2d550..a0abdd0b27 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -173,7 +173,7 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, }, model.ActionAddIndex: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, + buildPersistedDDLEventFunc: buildPersistedDDLEventForAddIndex, updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, updateFullTableInfoFunc: updateFullTableInfoForSingleTableDDL, updateSchemaMetadataFunc: updateSchemaMetadataIgnore, @@ -673,6 +673,15 @@ func buildPersistedDDLEventForNormalDDLOnSingleTable(args buildPersistedDDLEvent return event } +func buildPersistedDDLEventForAddIndex(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { + event := buildPersistedDDLEventCommon(args) + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) + event.TableName = getTableName(args.tableMap, event.TableID) + indexIDs := getIndexIDs(args.job) + event.IndexIDs = indexIDs + return event +} + func buildPersistedDDLEventForTruncateTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) // only table id change after truncate @@ -1743,7 +1752,8 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, TiDBOnly: tiDBOnly, BDRMode: rawEvent.BDRRole, - NotSync: notSync, + NotSync: notSync, + IndexIDs: rawEvent.IndexIDs, }, !filtered, nil } diff --git a/logservice/schemastore/types.go b/logservice/schemastore/types.go index 5b16ed4eb3..6b776eb28e 100644 --- a/logservice/schemastore/types.go +++ b/logservice/schemastore/types.go @@ -88,6 +88,10 @@ type PersistedDDLEvent struct { // TODO: do we need the following two fields? BDRRole string `msg:"bdr_role"` CDCWriteSource uint64 `msg:"cdc_write_source"` + + // IndexIDs store the index ids that are related to the ddl job, only used for add index. + // use these id to recover the index name for the anonymous add index + IndexIDs []int64 `msg:"index_ids"` } // TODO: use msgp.Raw to do version management diff --git a/logservice/schemastore/utils.go b/logservice/schemastore/utils.go index 06d27ab785..de9bfe72f1 100644 --- a/logservice/schemastore/utils.go +++ b/logservice/schemastore/utils.go @@ -18,10 +18,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/format" "go.uber.org/zap" ) @@ -39,29 +38,11 @@ func transformDDLJobQuery(job *model.Job) (string, error) { return "", errors.Trace(err) } var result string - buildQuery := func(stmt ast.StmtNode) (string, error) { - var sb strings.Builder - // translate TiDB feature to special comment - restoreFlags := format.RestoreTiDBSpecialComment - // escape the keyword - restoreFlags |= format.RestoreNameBackQuotes - // upper case keyword - restoreFlags |= format.RestoreKeyWordUppercase - // wrap string with single quote - restoreFlags |= format.RestoreStringSingleQuotes - // remove placement rule - restoreFlags |= format.SkipPlacementRuleForRestore - // force disable ttl - restoreFlags |= format.RestoreWithTTLEnableOff - if err = stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil { - return "", errors.Trace(err) - } - return sb.String(), nil - } + if len(stmts) > 1 { results := make([]string, 0, len(stmts)) for _, stmt := range stmts { - query, err := buildQuery(stmt) + query, err := commonEvent.Restore(stmt) if err != nil { return "", errors.Trace(err) } @@ -69,7 +50,7 @@ func transformDDLJobQuery(job *model.Job) (string, error) { } result = strings.Join(results, ";") } else { - result, err = buildQuery(stmts[0]) + result, err = commonEvent.Restore(stmts[0]) if err != nil { return "", errors.Trace(err) } @@ -98,3 +79,24 @@ func isSplitable(tableInfo *model.TableInfo) bool { } return true } + +func getIndexIDs(job *model.Job) []int64 { + res := make([]int64, 0) + idxArgs, err := model.GetModifyIndexArgs(job) + if idxArgs == nil || err != nil { + for idx, subJob := range job.MultiSchemaInfo.SubJobs { + proxyJob := subJob.ToProxyJob(job, idx) + idxArgs, err := model.GetModifyIndexArgs(&proxyJob) + if idxArgs != nil && err == nil { + for _, indexArg := range idxArgs.IndexArgs { + res = append(res, indexArg.IndexID) + } + } + } + return res + } + for _, indexArg := range idxArgs.IndexArgs { + res = append(res, indexArg.IndexID) + } + return res +} diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index ccd96bc621..dec0b1dd47 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -105,6 +105,10 @@ type DDLEvent struct { // If the DDL involves multiple tables, this field is not effective. // The multiple table DDL event will be handled by filtering querys and table infos. NotSync bool `msg:"not_sync"` + + // IndexIDs store the index ids that are related to the ddl job, only used for add index. + // use these id to recover the index name for the anonymous add index + IndexIDs []int64 `msg:"index_ids"` } func (d *DDLEvent) String() string { diff --git a/pkg/common/event/util.go b/pkg/common/event/util.go index 5e2237cfa6..ee61099b60 100644 --- a/pkg/common/event/util.go +++ b/pkg/common/event/util.go @@ -436,6 +436,27 @@ func toTableInfosKey(schema, table string) string { return schema + "." + table } +func Restore(stmt ast.StmtNode) (string, error) { + var sb strings.Builder + // translate TiDB feature to special comment + restoreFlags := format.RestoreTiDBSpecialComment + // escape the keyword + restoreFlags |= format.RestoreNameBackQuotes + // upper case keyword + restoreFlags |= format.RestoreKeyWordUppercase + // wrap string with single quote + restoreFlags |= format.RestoreStringSingleQuotes + // remove placement rule + restoreFlags |= format.SkipPlacementRuleForRestore + // force disable ttl + restoreFlags |= format.RestoreWithTTLEnableOff + err := stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)) + if err != nil { + return "", errors.Trace(err) + } + return sb.String(), nil +} + // SplitQueries takes a string containing multiple SQL statements and splits them into individual SQL statements. // This function is designed for scenarios like batch creation of tables, where multiple `CREATE TABLE` statements // might be combined into a single query string. @@ -453,31 +474,14 @@ func SplitQueries(queries string) ([]string, error) { var res []string for _, stmt := range stmts { - var sb strings.Builder - // translate TiDB feature to special comment - restoreFlags := format.RestoreTiDBSpecialComment - // escape the keyword - restoreFlags |= format.RestoreNameBackQuotes - // upper case keyword - restoreFlags |= format.RestoreKeyWordUppercase - // wrap string with single quote - restoreFlags |= format.RestoreStringSingleQuotes - // remove placement rule - restoreFlags |= format.SkipPlacementRuleForRestore - // force disable ttl - restoreFlags |= format.RestoreWithTTLEnableOff - err := stmt.Restore(&format.RestoreCtx{ - Flags: restoreFlags, - In: &sb, - }) + query, err := Restore(stmt) if err != nil { return nil, errors.WrapError(errors.ErrTiDBUnexpectedJobMeta, err) } // The (ast.Node).Restore function generates a SQL string representation of the AST (Abstract Syntax Tree) node. // By default, the resulting SQL string does not include a trailing semicolon ";". // Therefore, we explicitly append a semicolon here to ensure the SQL statement is complete. - sb.WriteByte(';') - res = append(res, sb.String()) + res = append(res, fmt.Sprintf("%s;", query)) } return res, nil diff --git a/pkg/sink/mysql/ddl_index_rewrite.go b/pkg/sink/mysql/ddl_index_rewrite.go new file mode 100644 index 0000000000..24937012a6 --- /dev/null +++ b/pkg/sink/mysql/ddl_index_rewrite.go @@ -0,0 +1,114 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" +) + +func restoreAnonymousIndexToNamedIndex(query string, tableInfo *common.TableInfo, indexIDs []int64) (string, bool, error) { + if query == "" || tableInfo == nil || len(indexIDs) == 0 { + return query, false, nil + } + + p := parser.New() + stmt, err := p.ParseOneStmt(query, "", "") + if err != nil { + return query, false, errors.Trace(err) + } + + alterStmt, ok := stmt.(*ast.AlterTableStmt) + if !ok { + return query, false, nil + } + + indexNameByID := getIndexNameByIDMap(tableInfo) + if len(indexNameByID) == 0 { + return query, false, nil + } + + changed := false + indexIDPos := 0 + for _, spec := range alterStmt.Specs { + if spec == nil || spec.Tp != ast.AlterTableAddConstraint || spec.Constraint == nil { + continue + } + constraint := spec.Constraint + if constraint.Name != "" { + continue + } + if !isIndexConstraint(constraint) { + continue + } + if indexIDPos >= len(indexIDs) { + continue + } + + indexName, ok := indexNameByID[indexIDs[indexIDPos]] + indexIDPos++ + if !ok { + continue + } + constraint.Name = indexName + changed = true + } + + if !changed { + return query, false, nil + } + + restoredQuery, err := commonEvent.Restore(stmt) + if err != nil { + return query, false, err + } + return restoredQuery, true, nil +} + +func getIndexNameByIDMap(tableInfo *common.TableInfo) map[int64]string { + indices := tableInfo.GetIndices() + if len(indices) == 0 { + return nil + } + indexNameByID := make(map[int64]string, len(indices)) + for _, index := range indices { + if index == nil { + continue + } + indexNameByID[index.ID] = index.Name.O + } + return indexNameByID +} + +func isIndexConstraint(constraint *ast.Constraint) bool { + if constraint == nil { + return false + } + switch constraint.Tp { + case ast.ConstraintKey, + ast.ConstraintIndex, + ast.ConstraintUniq, + ast.ConstraintUniqKey, + ast.ConstraintUniqIndex, + ast.ConstraintFulltext, + ast.ConstraintVector, + ast.ConstraintColumnar: + return true + default: + return false + } +} diff --git a/pkg/sink/mysql/format_ddl.go b/pkg/sink/mysql/format_ddl.go index 8332d18ff2..8f039cb05a 100644 --- a/pkg/sink/mysql/format_ddl.go +++ b/pkg/sink/mysql/format_ddl.go @@ -14,12 +14,10 @@ package mysql import ( - "bytes" - "github.com/pingcap/log" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/format" "github.com/pingcap/tidb/pkg/parser/mysql" "go.uber.org/zap" ) @@ -55,10 +53,9 @@ func formatQuery(sql string) string { } stmt.Accept(&visiter{}) - buf := new(bytes.Buffer) - restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf) - if err = stmt.Restore(restoreCtx); err != nil { + query, err := commonEvent.Restore(stmt) + if err != nil { log.Error("format query restore failed", zap.Error(err)) } - return buf.String() + return query } diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index 67a45a4aeb..e168039e98 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -56,6 +56,21 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error { ctx := w.ctx shouldSwitchDB := needSwitchDB(event) + if event.GetDDLType() == timodel.ActionAddIndex { + newQuery, changed, err := restoreAnonymousIndexToNamedIndex(event.Query, event.TableInfo, event.IndexIDs) + if err != nil { + log.Warn("failed to restore anonymous index name", + zap.String("changefeed", w.ChangefeedID.String()), + zap.String("query", event.Query), + zap.Error(err)) + } else if changed { + log.Info("restore anonymous index to named index", + zap.String("changefeed", w.ChangefeedID.String()), + zap.String("query", event.Query), + zap.String("newQuery", newQuery)) + event.Query = newQuery + } + } // Convert vector type to string type for unsupport database if w.cfg.HasVectorType { if newQuery := formatQuery(event.Query); newQuery != event.Query { diff --git a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go new file mode 100644 index 0000000000..af54fd037d --- /dev/null +++ b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go @@ -0,0 +1,195 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/stretchr/testify/require" +) + +func getIndexIDsFromJob(t *testing.T, job *timodel.Job) []int64 { + idxArgs, err := timodel.GetModifyIndexArgs(job) + if idxArgs != nil && err == nil { + indexIDs := make([]int64, 0, len(idxArgs.IndexArgs)) + for _, indexArg := range idxArgs.IndexArgs { + indexIDs = append(indexIDs, indexArg.IndexID) + } + return indexIDs + } + + indexIDs := make([]int64, 0) + require.NotNil(t, job.MultiSchemaInfo) + for idx, subJob := range job.MultiSchemaInfo.SubJobs { + proxyJob := subJob.ToProxyJob(job, idx) + subIdxArgs, subErr := timodel.GetModifyIndexArgs(&proxyJob) + if subIdxArgs == nil || subErr != nil { + continue + } + for _, indexArg := range subIdxArgs.IndexArgs { + indexIDs = append(indexIDs, indexArg.IndexID) + } + } + return indexIDs +} + +func getIndexNameByID(t *testing.T, tableInfo *common.TableInfo, indexID int64) string { + for _, index := range tableInfo.GetIndices() { + if index != nil && index.ID == indexID { + return index.Name.O + } + } + require.FailNow(t, "index id not found", "index id: %d", indexID) + return "" +} + +func parseAddIndexConstraintNames(t *testing.T, query string) []string { + p := parser.New() + stmt, err := p.ParseOneStmt(query, "", "") + require.NoError(t, err) + + alterStmt, ok := stmt.(*ast.AlterTableStmt) + require.True(t, ok) + + names := make([]string, 0) + for _, spec := range alterStmt.Specs { + if spec == nil || spec.Tp != ast.AlterTableAddConstraint || spec.Constraint == nil { + continue + } + if !isIndexConstraint(spec.Constraint) { + continue + } + names = append(names, spec.Constraint.Name) + } + return names +} + +func TestExecDDL_RestoreAnonymousIndexToNamedIndex(t *testing.T) { + writer, db, mock := newTestMysqlWriter(t) + defer db.Close() + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, name varchar(32), index name(id))") + + job := helper.DDL2Job("alter table t add index (name)") + require.Equal(t, timodel.ActionAddIndex, job.Type) + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + indexIDs := getIndexIDsFromJob(t, job) + require.Len(t, indexIDs, 1) + expectedIndexName := getIndexNameByID(t, tableInfo, indexIDs[0]) + + anonymousQuery := "ALTER TABLE `t` ADD INDEX (`name`)" + + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo, indexIDs) + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, []string{expectedIndexName}, parseAddIndexConstraintNames(t, restoredQuery)) + + ddlEvent := &commonEvent.DDLEvent{ + Type: byte(job.Type), + Query: anonymousQuery, + SchemaName: job.SchemaName, + TableName: job.TableName, + TableInfo: tableInfo, + IndexIDs: indexIDs, + } + + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(restoredQuery).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + err = writer.execDDL(ddlEvent) + require.NoError(t, err) + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestRestoreAnonymousIndexToNamedIndexMultipleAnonymousIndexes(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, name varchar(32), age int)") + + job := helper.DDL2Job("alter table t add index (name), add unique (age)") + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + indexIDs := getIndexIDsFromJob(t, job) + require.Len(t, indexIDs, 2) + + expectedNames := []string{ + getIndexNameByID(t, tableInfo, indexIDs[0]), + getIndexNameByID(t, tableInfo, indexIDs[1]), + } + + anonymousQuery := "ALTER TABLE `t` ADD INDEX (`name`), ADD UNIQUE (`age`)" + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo, indexIDs) + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, expectedNames, parseAddIndexConstraintNames(t, restoredQuery)) +} + +func TestRestoreAnonymousIndexToNamedIndexOnlyConsumesAnonymousIDs(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, name varchar(32), age int)") + + job := helper.DDL2Job("alter table t add index idx_name(name), add index (age)") + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + anonymousIndexID := int64(0) + expectedAnonymousName := "" + for _, index := range tableInfo.GetIndices() { + if index == nil || len(index.Columns) != 1 { + continue + } + if index.Columns[0].Name.L == "age" { + anonymousIndexID = index.ID + expectedAnonymousName = index.Name.O + break + } + } + require.NotZero(t, anonymousIndexID) + require.NotEmpty(t, expectedAnonymousName) + + mixedQuery := "ALTER TABLE `t` ADD INDEX `idx_name` (`name`), ADD INDEX (`age`)" + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(mixedQuery, tableInfo, []int64{anonymousIndexID}) + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, []string{"idx_name", expectedAnonymousName}, parseAddIndexConstraintNames(t, restoredQuery)) + + unchangedQuery, unchanged, err := restoreAnonymousIndexToNamedIndex(mixedQuery, tableInfo, nil) + require.NoError(t, err) + require.False(t, unchanged) + require.Equal(t, mixedQuery, unchangedQuery) +} diff --git a/tests/integration_tests/ddl_wait/run.sh b/tests/integration_tests/ddl_wait/run.sh index e566b97c27..977d7c26d3 100755 --- a/tests/integration_tests/ddl_wait/run.sh +++ b/tests/integration_tests/ddl_wait/run.sh @@ -62,17 +62,16 @@ function run() { # indexes should be the same when CDC retries happened # ref: https://github.com/pingcap/tiflow/issues/12128 - # FIXME: use named index to avoid duplicate index - # run_sql "update test.t set col = 55 where id = 5;" - # run_sql "alter table test.t add index (col);" - # run_sql "update test.t set col = 66 where id = 6;" - # run_sql "alter table test.t add index (col);" - # run_sql "update test.t set col = 77 where id = 7;" - # sleep 10 - # cleanup_process $CDC_BINARY - # run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - # # make sure all tables are equal in upstream and downstream - # check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 300 + run_sql "update test.t set col = 55 where id = 5;" + run_sql "alter table test.t add index (col);" + run_sql "update test.t set col = 66 where id = 6;" + run_sql "alter table test.t add index (col);" + run_sql "update test.t set col = 77 where id = 7;" + sleep 10 + cleanup_process $CDC_BINARY + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + # make sure all tables are equal in upstream and downstream + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 300 cleanup_process $CDC_BINARY }