Skip to content
Draft

[WIP] #4270

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 4 additions & 23 deletions logservice/schemastore/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"
"go.uber.org/zap"
)

Expand All @@ -39,37 +38,19 @@ func transformDDLJobQuery(job *model.Job) (string, error) {
return "", errors.Trace(err)
}
var result string
buildQuery := func(stmt ast.StmtNode) (string, error) {
var sb strings.Builder
// translate TiDB feature to special comment
restoreFlags := format.RestoreTiDBSpecialComment
// escape the keyword
restoreFlags |= format.RestoreNameBackQuotes
// upper case keyword
restoreFlags |= format.RestoreKeyWordUppercase
// wrap string with single quote
restoreFlags |= format.RestoreStringSingleQuotes
// remove placement rule
restoreFlags |= format.SkipPlacementRuleForRestore
// force disable ttl
restoreFlags |= format.RestoreWithTTLEnableOff
if err = stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil {
return "", errors.Trace(err)
}
return sb.String(), nil
}

if len(stmts) > 1 {
results := make([]string, 0, len(stmts))
for _, stmt := range stmts {
query, err := buildQuery(stmt)
query, err := commonEvent.Restore(stmt)
if err != nil {
return "", errors.Trace(err)
}
results = append(results, query)
}
result = strings.Join(results, ";")
} else {
result, err = buildQuery(stmts[0])
result, err = commonEvent.Restore(stmts[0])
if err != nil {
return "", errors.Trace(err)
}
Expand Down
42 changes: 23 additions & 19 deletions pkg/common/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,27 @@ func toTableInfosKey(schema, table string) string {
return schema + "." + table
}

func Restore(stmt ast.StmtNode) (string, error) {
var sb strings.Builder
// translate TiDB feature to special comment
restoreFlags := format.RestoreTiDBSpecialComment
// escape the keyword
restoreFlags |= format.RestoreNameBackQuotes
// upper case keyword
restoreFlags |= format.RestoreKeyWordUppercase
// wrap string with single quote
restoreFlags |= format.RestoreStringSingleQuotes
// remove placement rule
restoreFlags |= format.SkipPlacementRuleForRestore
// force disable ttl
restoreFlags |= format.RestoreWithTTLEnableOff
err := stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb))
if err != nil {
return "", errors.Trace(err)
}
return sb.String(), nil
}

// SplitQueries takes a string containing multiple SQL statements and splits them into individual SQL statements.
// This function is designed for scenarios like batch creation of tables, where multiple `CREATE TABLE` statements
// might be combined into a single query string.
Expand All @@ -453,31 +474,14 @@ func SplitQueries(queries string) ([]string, error) {

var res []string
for _, stmt := range stmts {
var sb strings.Builder
// translate TiDB feature to special comment
restoreFlags := format.RestoreTiDBSpecialComment
// escape the keyword
restoreFlags |= format.RestoreNameBackQuotes
// upper case keyword
restoreFlags |= format.RestoreKeyWordUppercase
// wrap string with single quote
restoreFlags |= format.RestoreStringSingleQuotes
// remove placement rule
restoreFlags |= format.SkipPlacementRuleForRestore
// force disable ttl
restoreFlags |= format.RestoreWithTTLEnableOff
err := stmt.Restore(&format.RestoreCtx{
Flags: restoreFlags,
In: &sb,
})
query, err := Restore(stmt)
if err != nil {
return nil, errors.WrapError(errors.ErrTiDBUnexpectedJobMeta, err)
}
// The (ast.Node).Restore function generates a SQL string representation of the AST (Abstract Syntax Tree) node.
// By default, the resulting SQL string does not include a trailing semicolon ";".
// Therefore, we explicitly append a semicolon here to ensure the SQL statement is complete.
sb.WriteByte(';')
res = append(res, sb.String())
res = append(res, fmt.Sprintf("%s;", query))
}

return res, nil
Expand Down
181 changes: 181 additions & 0 deletions pkg/sink/mysql/ddl_index_rewrite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
)

type indexKeyPart struct {
nameL string
length int
}

func restoreAnonymousIndexToNamedIndex(query string, tableInfo *common.TableInfo) (string, bool, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a brief comment explaining the purpose of this function, which is to restore anonymous index names to named indexes in DDL queries.

if query == "" || tableInfo == nil {
return query, false, nil
}

p := parser.New()
stmt, err := p.ParseOneStmt(query, "", "")
if err != nil {
return query, false, errors.Trace(err)
}

alterStmt, ok := stmt.(*ast.AlterTableStmt)
if !ok {
return query, false, nil
}

changed := false
for _, spec := range alterStmt.Specs {
if spec == nil || spec.Tp != ast.AlterTableAddConstraint || spec.Constraint == nil {
continue
}
constraint := spec.Constraint
if constraint.Name != "" {
continue
}
if !isIndexConstraint(constraint) {
continue
}

indexName, ok := findIndexNameForConstraint(tableInfo, constraint)
if !ok {
continue
}
constraint.Name = indexName
changed = true
}

if !changed {
return query, false, nil
}

restoredQuery, err := commonEvent.Restore(stmt)
if err != nil {
return query, false, err
}
return restoredQuery, true, nil
}

func isIndexConstraint(constraint *ast.Constraint) bool {
if constraint == nil {
return false
}
switch constraint.Tp {
case ast.ConstraintKey,
ast.ConstraintIndex,
ast.ConstraintUniq,
ast.ConstraintUniqKey,
ast.ConstraintUniqIndex,
ast.ConstraintFulltext,
ast.ConstraintVector,
ast.ConstraintColumnar:
return true
default:
return false
}
}

func isUniqueIndexConstraint(constraint *ast.Constraint) bool {
if constraint == nil {
return false
}
switch constraint.Tp {
case ast.ConstraintUniq, ast.ConstraintUniqKey, ast.ConstraintUniqIndex:
return true
default:
return false
}
}

func findIndexNameForConstraint(tableInfo *common.TableInfo, constraint *ast.Constraint) (string, bool) {
keyParts, ok := getConstraintKeyParts(constraint)
if !ok {
return "", false
}

wantUnique := isUniqueIndexConstraint(constraint)
indices := tableInfo.GetIndices()
if len(indices) == 0 {
return "", false
}

matches := make([]*timodel.IndexInfo, 0, 1)
for _, index := range indices {
if index == nil || index.Primary {
continue
}
if index.Unique != wantUnique {
continue
}
// Only use non-public indices.
if index.State == timodel.StatePublic {
continue
}
if !indexColumnsMatchKeyParts(index.Columns, keyParts) {
continue
}
matches = append(matches, index)
}
if len(matches) != 1 {
return "", false
}
return matches[0].Name.O, true
}

func getConstraintKeyParts(constraint *ast.Constraint) ([]indexKeyPart, bool) {
if constraint == nil || len(constraint.Keys) == 0 {
return nil, false
}

parts := make([]indexKeyPart, 0, len(constraint.Keys))
for _, key := range constraint.Keys {
if key == nil || key.Expr != nil || key.Column == nil {
return nil, false
}
parts = append(parts, indexKeyPart{
nameL: key.Column.Name.L,
length: key.Length,
})
}
return parts, true
}

func indexColumnsMatchKeyParts(indexColumns []*timodel.IndexColumn, keyParts []indexKeyPart) bool {
if len(indexColumns) != len(keyParts) {
return false
}
for i, part := range keyParts {
col := indexColumns[i]
if col == nil {
return false
}
if col.Name.L != part.nameL {
return false
}
if part.length > 0 {
if col.Length != part.length {
return false
}
}
}
return true
}
11 changes: 4 additions & 7 deletions pkg/sink/mysql/format_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
package mysql

import (
"bytes"

"github.com/pingcap/log"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/pingcap/tidb/pkg/parser/mysql"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -55,10 +53,9 @@ func formatQuery(sql string) string {
}
stmt.Accept(&visiter{})

buf := new(bytes.Buffer)
restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf)
if err = stmt.Restore(restoreCtx); err != nil {
query, err := commonEvent.Restore(stmt)
if err != nil {
log.Error("format query restore failed", zap.Error(err))
}
return buf.String()
return query
}
15 changes: 15 additions & 0 deletions pkg/sink/mysql/mysql_writer_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error {
ctx := w.ctx
shouldSwitchDB := needSwitchDB(event)

if event.GetDDLType() == timodel.ActionAddIndex {
newQuery, changed, err := restoreAnonymousIndexToNamedIndex(event.Query, event.TableInfo)
if err != nil {
log.Warn("failed to restore anonymous index name",
zap.String("changefeed", w.ChangefeedID.String()),
zap.String("query", event.Query),
zap.Error(err))
Comment on lines +60 to +65

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The log message indicates a failure to restore the anonymous index name. It would be helpful to include the ChangefeedID in the log message to facilitate debugging in environments with multiple changefeeds. This will help correlate the log with the specific changefeed experiencing the issue.

log.Warn("failed to restore anonymous index name",
				zap.String("changefeed", w.ChangefeedID.String()),
				zap.String("query", event.Query),
				zap.Error(err))

} else if changed {
log.Info("restore anonymous index to named index",
zap.String("changefeed", w.ChangefeedID.String()),
zap.String("query", event.Query),
zap.String("newQuery", newQuery))
Comment on lines +69 to +70

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The log message indicates a successful restoration of the anonymous index name. It would be helpful to include the ChangefeedID in the log message to facilitate debugging in environments with multiple changefeeds. This will help correlate the log with the specific changefeed where the restoration occurred.

log.Info("restore anonymous index to named index",
				zap.String("changefeed", w.ChangefeedID.String()),
				zap.String("query", event.Query),
				zap.String("newQuery", newQuery))

event.Query = newQuery
}
}
// Convert vector type to string type for unsupport database
if w.cfg.HasVectorType {
if newQuery := formatQuery(event.Query); newQuery != event.Query {
Expand Down
Loading
Loading