diff --git a/go.mod b/go.mod index f2a364eaa2..f3240dcfab 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/gin-gonic/gin v1.9.1 github.com/go-mysql-org/go-mysql v1.13.0 github.com/go-oauth2/oauth2/v4 v4.5.4 - github.com/go-sql-driver/mysql v1.7.1 + github.com/go-sql-driver/mysql v1.9.3 github.com/goccy/go-json v0.10.4 github.com/gogo/protobuf v1.3.2 github.com/golang-jwt/jwt/v5 v5.3.0 diff --git a/go.sum b/go.sum index a082508a59..18a15c5753 100644 --- a/go.sum +++ b/go.sum @@ -1681,8 +1681,8 @@ github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QX github.com/go-resty/resty/v2 v2.11.0 h1:i7jMfNOJYMp69lq7qozJP+bjgzfAzeOhuGlyDrqxT/8= github.com/go-resty/resty/v2 v2.11.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5BGXiVdTu+A= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= -github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= -github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo= +github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= diff --git a/metrics/grafana/ticdc_new_arch.json b/metrics/grafana/ticdc_new_arch.json index 8151caf5d6..526dd04420 100644 --- a/metrics/grafana/ticdc_new_arch.json +++ b/metrics/grafana/ticdc_new_arch.json @@ -13312,6 +13312,110 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The total number of different dml events type that EventService outputs", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 11, + "w": 12, + "x": 12, + "y": 74 + }, + "hiddenSeries": false, + "id": 22476, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_event_service_send_dml_type_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (instance,dml_type)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{instance}}-{{dml_type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EventService Output Different DML Event Types / s", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "Event Service", @@ -16386,6 +16490,105 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of affected rows", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 32 + }, + "hiddenSeries": false, + "id": 636, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(increase(ticdc_sink_dml_event_affected_row_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}[1m])) by (namespace, changefeed, count_type, row_type)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{namespace}}-{{changefeed}}-{{count_type}}-{{row_type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Row Affected Count / m", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "logBase": 1, + "min": "0", + "show": true + }, + { + "format": "none", + "logBase": 1, + "show": false + } + ], + "yaxis": { + "align": false + } } ], "title": "Sink - Transaction Sink", diff --git a/metrics/nextgengrafana/ticdc_new_arch_next_gen.json b/metrics/nextgengrafana/ticdc_new_arch_next_gen.json index 35e37df7f5..746a5327dc 100644 --- a/metrics/nextgengrafana/ticdc_new_arch_next_gen.json +++ b/metrics/nextgengrafana/ticdc_new_arch_next_gen.json @@ -13312,6 +13312,110 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The total number of different dml events type that EventService outputs", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 11, + "w": 12, + "x": 12, + "y": 74 + }, + "hiddenSeries": false, + "id": 22476, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_event_service_send_dml_type_count{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (instance,dml_type)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{instance}}-{{dml_type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EventService Output Different DML Event Types / s", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "Event Service", @@ -16386,6 +16490,105 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of affected rows", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 32 + }, + "hiddenSeries": false, + "id": 636, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(increase(ticdc_sink_dml_event_affected_row_count{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\",keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}[1m])) by (keyspace_name, changefeed, count_type, row_type)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{keyspace_name}}-{{changefeed}}-{{count_type}}-{{row_type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Row Affected Count / m", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "logBase": 1, + "min": "0", + "show": true + }, + { + "format": "none", + "logBase": 1, + "show": false + } + ], + "yaxis": { + "align": false + } } ], "title": "Sink - Transaction Sink", diff --git a/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json b/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json index 8b812cac71..1e17088bbf 100644 --- a/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json +++ b/metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json @@ -5449,6 +5449,105 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of affected rows", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 32 + }, + "hiddenSeries": false, + "id": 636, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(increase(ticdc_sink_dml_event_affected_row_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",keyspace_name=~\"$keyspace_name\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}[1m])) by (keyspace_name, changefeed, count_type, row_type)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{keyspace_name}}-{{changefeed}}-{{count_type}}-{{row_type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Row Affected Count / m", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "logBase": 1, + "min": "0", + "show": true + }, + { + "format": "none", + "logBase": 1, + "show": false + } + ], + "yaxis": { + "align": false + } } ], "title": "Sink - Transaction Sink", diff --git a/pkg/common/kv_entry.go b/pkg/common/kv_entry.go index 448cb99304..3296801ec5 100644 --- a/pkg/common/kv_entry.go +++ b/pkg/common/kv_entry.go @@ -75,6 +75,23 @@ func (v *RawKVEntry) IsInsert() bool { return v.OpType == OpTypePut && len(v.OldValue) == 0 } +// GetType returns the type of the RawKVEntry as a string. +func (v *RawKVEntry) GetType() string { + switch v.OpType { + case OpTypePut: + if len(v.OldValue) == 0 { + return "insert" + } else if len(v.OldValue) > 0 && len(v.Value) > 0 { + return "update" + } + case OpTypeDelete: + return "delete" + case OpTypeResolved: + return "resolved" + } + return "unknown" +} + func (v *RawKVEntry) SplitUpdate() (deleteRow, insertRow *RawKVEntry, err error) { if !v.IsUpdate() { return nil, nil, nil diff --git a/pkg/eventservice/event_scanner.go b/pkg/eventservice/event_scanner.go index 00b29c825b..137aa4a888 100644 --- a/pkg/eventservice/event_scanner.go +++ b/pkg/eventservice/event_scanner.go @@ -190,7 +190,7 @@ func (s *eventScanner) scanAndMergeEvents( ) (bool, error) { tableID := session.dataRange.Span.TableID dispatcher := session.dispatcherStat - processor := newDMLProcessor(s.mounter, s.schemaGetter, dispatcher.filter, dispatcher.info.IsOutputRawChangeEvent()) + processor := newDMLProcessor(s.mounter, s.schemaGetter, dispatcher.filter, dispatcher.info.IsOutputRawChangeEvent(), s.mode) for { shouldStop, err := s.checkScanConditions(session) @@ -675,12 +675,13 @@ type dmlProcessor struct { batchDML *event.BatchDMLEvent outputRawChangeEvent bool + mode int64 } // newDMLProcessor creates a new DML processor func newDMLProcessor( mounter event.Mounter, schemaGetter schemaGetter, - filter filter.Filter, outputRawChangeEvent bool, + filter filter.Filter, outputRawChangeEvent bool, mode int64, ) *dmlProcessor { return &dmlProcessor{ mounter: mounter, @@ -689,6 +690,7 @@ func newDMLProcessor( batchDML: event.NewBatchDMLEvent(), insertRowCache: make([]*common.RawKVEntry, 0), outputRawChangeEvent: outputRawChangeEvent, + mode: mode, } } @@ -754,7 +756,9 @@ func (p *dmlProcessor) appendRow(rawEvent *common.RawKVEntry) error { rawEvent.Key = event.RemoveKeyspacePrefix(rawEvent.Key) + rawType := rawEvent.GetType() if !rawEvent.IsUpdate() { + updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false) return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter) } @@ -769,6 +773,8 @@ func (p *dmlProcessor) appendRow(rawEvent *common.RawKVEntry) error { } } + updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, shouldSplit) + if !shouldSplit { return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter) } diff --git a/pkg/eventservice/event_scanner_test.go b/pkg/eventservice/event_scanner_test.go index 46723a726f..7e86bacda6 100644 --- a/pkg/eventservice/event_scanner_test.go +++ b/pkg/eventservice/event_scanner_test.go @@ -696,7 +696,7 @@ func TestDMLProcessor(t *testing.T) { // Test case 0: create a new DML processor t.Run("CreateNewDMLProcessor", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) require.NotNil(t, processor) require.NotNil(t, processor.batchDML) require.Nil(t, processor.currentTxn) @@ -705,7 +705,7 @@ func TestDMLProcessor(t *testing.T) { // Test case 1: commitTxn with no current DML, happens when the iter is nil. t.Run("CommitTxnWithNoCurrentDML", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) err := processor.commitTxn() require.NoError(t, err) require.Nil(t, processor.currentTxn) @@ -715,7 +715,7 @@ func TestDMLProcessor(t *testing.T) { // Test case 1: start the first transaction t.Run("FirstTransactionWithoutCache", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) rawEvent := kvEvents[0] processor.startTxn(dispatcherID, tableID, tableInfo, rawEvent.StartTs, rawEvent.CRTs, false) @@ -737,7 +737,7 @@ func TestDMLProcessor(t *testing.T) { // Test case 2: Process new transaction when there are cached insert rows t.Run("NewTransactionWithInsertCache", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) // Setup first transaction firstEvent := kvEvents[0] @@ -790,7 +790,7 @@ func TestDMLProcessor(t *testing.T) { // Test case 3: Multiple consecutive transactions t.Run("ConsecutiveTransactions", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) // Process multiple transactions for i, item := range kvEvents { @@ -823,7 +823,7 @@ func TestDMLProcessor(t *testing.T) { // Test case 5: Process transaction with empty insert cache followed by one with cache t.Run("EmptyThenNonEmptyCache", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) // First transaction - no cache firstEvent := kvEvents[0] @@ -865,7 +865,7 @@ func TestDMLProcessor(t *testing.T) { // Test 6: First event is update that changes UK t.Run("UpdateThatChangesUK", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) helper.Tk().MustExec("use test") ddlEvent := helper.DDL2Event("create table t2 (id int primary key, a int(50), b char(50), unique key uk_a(a))") @@ -920,7 +920,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 1: appendRow before txn started, illegal usage. t.Run("NoCurrentDMLEvent", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) rawEvent := kvEvents[0] require.Panics(t, func() { @@ -930,7 +930,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 2: appendRow for insert operation (non-update) t.Run("AppendInsertRow", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) firstEvent := kvEvents[0] processor.startTxn(dispatcherID, tableID, tableInfo, firstEvent.StartTs, firstEvent.CRTs, false) @@ -948,7 +948,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 3: appendRow for delete operation (non-update) t.Run("AppendDeleteRow", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) rawEvent := kvEvents[0] deleteRow := insertToDeleteRow(rawEvent) @@ -967,7 +967,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 4: appendRow for update operation without unique key change t.Run("AppendUpdateRowWithoutUKChange", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) // Create a current DML event first rawEvent := kvEvents[0] @@ -991,7 +991,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 5: appendRow for update operation with unique key change (split update) t.Run("AppendUpdateRowWithUKChange", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) // Create a current DML event first rawEvent := kvEvents[0] @@ -1019,7 +1019,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 6: Test multiple appendRow calls t.Run("MultipleAppendRows", func(t *testing.T) { - processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mockMounter, mockSchemaGetter, nil, false, common.DefaultMode) // Create a current DML event first rawEvent := kvEvents[0] @@ -1051,7 +1051,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { // Test case 7: appendRow for update operation with unique key change and outputRawChangeEvent is true (do not split update) t.Run("AppendUpdateRowWithUKChangeAndOutputRawChangeEvent", func(t *testing.T) { - processor := newDMLProcessor(event.NewMounter(time.UTC, &integrity.Config{}), mockSchemaGetter, nil, true) + processor := newDMLProcessor(event.NewMounter(time.UTC, &integrity.Config{}), mockSchemaGetter, nil, true, common.DefaultMode) // Generate a real update event that changes unique key using helper // This updates the unique key column 'a' from 'a1' to 'a1_new' insertSQL, updateSQL := "insert into test.t(id,a,b) values (7, 'a7', 'b7')", "update test.t set a = 'a7_updated' where id = 7" @@ -1236,7 +1236,7 @@ func TestEventMerger(t *testing.T) { mockSchemaGetter := NewMockSchemaStore() mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) - processor := newDMLProcessor(&mockMounter{}, mockSchemaGetter, nil, false) + processor := newDMLProcessor(&mockMounter{}, mockSchemaGetter, nil, false, common.DefaultMode) processor.startTxn(dispatcherID, tableID, ddlEvent.TableInfo, kvEvents[0].StartTs, kvEvents[0].CRTs, false) err := processor.appendRow(kvEvents[0]) @@ -1268,7 +1268,7 @@ func TestEventMerger(t *testing.T) { tableID := ddlEvent.GetTableID() mockSchemaGetter := NewMockSchemaStore() mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) - processor := newDMLProcessor(&mockMounter{}, mockSchemaGetter, nil, false) + processor := newDMLProcessor(&mockMounter{}, mockSchemaGetter, nil, false, common.DefaultMode) processor.startTxn(dispatcherID, tableID, ddlEvent.TableInfo, kvEvents[0].StartTs, kvEvents[0].CRTs, false) @@ -1320,7 +1320,7 @@ func TestEventMerger(t *testing.T) { tableID := ddlEvent1.GetTableID() tableInfo := ddlEvent1.TableInfo - processor := newDMLProcessor(mounter, mockSchemaGetter, nil, false) + processor := newDMLProcessor(mounter, mockSchemaGetter, nil, false, common.DefaultMode) processor.startTxn(dispatcherID, tableID, tableInfo, kvEvents1[0].StartTs, kvEvents1[0].CRTs, false) diff --git a/pkg/eventservice/metrics_collector.go b/pkg/eventservice/metrics_collector.go index 8d82370617..0952ed9260 100644 --- a/pkg/eventservice/metrics_collector.go +++ b/pkg/eventservice/metrics_collector.go @@ -97,6 +97,15 @@ func updateMetricEventServiceSkipResolvedTsCount(mode int64) { updateCounter(mode, metricEventServiceSkipResolvedTsCount, metricRedoEventServiceSkipResolvedTsCount) } +// updateMetricEventServiceSendDMLTypeCount records the DML event types being sent. +// insert/delete/update/updateUK +func updateMetricEventServiceSendDMLTypeCount(mode int64, rawType string, updateUK bool) { + if rawType == "update" && updateUK { + rawType = "updateUK" + } + metrics.EventServiceSendDMLTypeCount.WithLabelValues(common.StringMode(mode), rawType).Inc() +} + // dispatcherHeapItem wraps dispatcherStat to implement heap.Item interface. // The heap maintains the slowest dispatchers by checkpointTs. // The heap top is the fastest (largest checkpointTs) among the slowest ones. diff --git a/pkg/metrics/event_service.go b/pkg/metrics/event_service.go index 763aa90f10..4c08ada207 100644 --- a/pkg/metrics/event_service.go +++ b/pkg/metrics/event_service.go @@ -169,6 +169,13 @@ var ( Name: "reset_dispatcher_count", Help: "The number of event dispatcher reset operations performed", }) + + EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "send_dml_type_count", + Help: "The number of different dml events type sent by the event service, it is potentially inaccurat if some dml events are filter", + }, []string{"mode", "dml_type"}) ) // initEventServiceMetrics registers all metrics in this file. @@ -193,4 +200,5 @@ func initEventServiceMetrics(registry *prometheus.Registry) { registry.MustRegister(EventServiceInterruptScanCount) registry.MustRegister(EventServiceGetDDLEventDuration) registry.MustRegister(EventServiceResetDispatcherCount) + registry.MustRegister(EventServiceSendDMLTypeCount) } diff --git a/pkg/metrics/sink.go b/pkg/metrics/sink.go index 0fbb63fa9e..ee7cde8f72 100644 --- a/pkg/metrics/sink.go +++ b/pkg/metrics/sink.go @@ -66,6 +66,14 @@ var ( Help: "Total count of DML events.", }, []string{getKeyspaceLabel(), "changefeed"}) + ExecDMLEventRowsAffectedCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "dml_event_affected_row_count", + Help: "Total count of affected rows.", + }, []string{getKeyspaceLabel(), "changefeed", "count_type", "row_type"}) + ActiveActiveConflictSkipRowsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "ticdc", @@ -231,6 +239,7 @@ func initSinkMetrics(registry *prometheus.Registry) { registry.MustRegister(TotalWriteBytesCounter) registry.MustRegister(EventSizeHistogram) registry.MustRegister(ExecDMLEventCounter) + registry.MustRegister(ExecDMLEventRowsAffectedCounter) registry.MustRegister(ActiveActiveConflictSkipRowsCounter) registry.MustRegister(ExecutionErrorCounter) diff --git a/pkg/metrics/statistics.go b/pkg/metrics/statistics.go index 6550018ddc..b4fa38ef79 100644 --- a/pkg/metrics/statistics.go +++ b/pkg/metrics/statistics.go @@ -14,6 +14,8 @@ package metrics import ( + "fmt" + "strings" "sync" "time" @@ -27,9 +29,10 @@ func NewStatistics( sinkType string, ) *Statistics { statistics := &Statistics{ - sinkType: sinkType, - changefeedID: changefeed, - ddlTypes: sync.Map{}, + sinkType: sinkType, + changefeedID: changefeed, + ddlTypes: sync.Map{}, + rowsAffectedMap: sync.Map{}, } keyspace := changefeed.Keyspace() @@ -42,15 +45,17 @@ func NewStatistics( statistics.metricExecErrCntForDDL = ExecutionErrorCounter.WithLabelValues(keyspace, changefeedID, "ddl") statistics.metricExecErrCntForDML = ExecutionErrorCounter.WithLabelValues(keyspace, changefeedID, "dml") statistics.metricExecDMLCnt = ExecDMLEventCounter.WithLabelValues(keyspace, changefeedID) + return statistics } // Statistics maintains some status and metrics of the Sink // Note: All methods of Statistics should be thread-safe. type Statistics struct { - sinkType string - changefeedID common.ChangeFeedID - ddlTypes sync.Map + sinkType string + changefeedID common.ChangeFeedID + ddlTypes sync.Map + rowsAffectedMap sync.Map // metricExecDDLHis records each DDL execution time duration. metricExecDDLHis prometheus.Observer @@ -108,6 +113,30 @@ func (b *Statistics) RecordDDLExecution(executor func() (string, error)) error { return nil } +func (b *Statistics) RecordTotalRowsAffected(actualRowsAffected, expectedRowsAffected int64) { + b.getRowsAffected("actual", "total").Add(float64(actualRowsAffected)) + b.getRowsAffected("expected", "total").Add(float64(expectedRowsAffected)) +} + +func (b *Statistics) RecordRowsAffected(rowsAffected int64, rowType common.RowType) { + b.getRowsAffected("actual", rowType.String()).Add(float64(rowsAffected)) + b.getRowsAffected("expected", rowType.String()).Add(1) + b.RecordTotalRowsAffected(rowsAffected, 1) +} + +func (b *Statistics) getRowsAffected(countType, rowType string) prometheus.Counter { + key := fmt.Sprintf("%s-%s", countType, rowType) + counter, loaded := b.rowsAffectedMap.Load(key) + if !loaded { + keyspace := b.changefeedID.Keyspace() + changefeedID := b.changefeedID.Name() + counter := ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, countType, rowType) + b.rowsAffectedMap.Store(key, counter) + return counter + } + return counter.(prometheus.Counter) +} + // Close release some internal resources. func (b *Statistics) Close() { keyspace := b.changefeedID.Keyspace() @@ -123,6 +152,13 @@ func (b *Statistics) Close() { ExecDDLCounter.DeleteLabelValues(keyspace, changefeedID, ddlType) return true }) + b.rowsAffectedMap.Range(func(key, value any) bool { + countTypeAndRowType := key.(string) + splitTypes := strings.Split(countTypeAndRowType, "-") + countType, rowType := splitTypes[0], splitTypes[1] + ExecDMLEventRowsAffectedCounter.DeleteLabelValues(keyspace, changefeedID, countType, rowType) + return true + }) TotalWriteBytesCounter.DeleteLabelValues(keyspace, changefeedID) ExecDMLEventCounter.DeleteLabelValues(keyspace, changefeedID) } diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index c6002a4b70..cb4e47d1f9 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -18,7 +18,9 @@ import ( "context" "database/sql" "encoding/json" + "fmt" "path/filepath" + "reflect" "slices" "strconv" "strings" @@ -203,22 +205,38 @@ func buildData(holder *common.ColumnsHolder) (map[string]interface{}, map[string for i := 0; i < columnsCount; i++ { t := holder.Types[i] - name := holder.Types[i].Name() + name := t.Name() mysqlType := strings.ToLower(t.DatabaseTypeName()) + // Snapshot query returns enum/set as their string representations, while canal-json format + // uses integer/bitset values for these types. Downgrade enum/set to varchar to keep the + // assembled handle-key-only events decodable. + if strings.HasPrefix(mysqlType, "enum") || strings.HasPrefix(mysqlType, "set") { + mysqlType = "varchar" + } - var value string - rawValue := holder.Values[i].([]uint8) - if common.IsBinaryMySQLType(mysqlType) { - rawValue, err := bytesDecoder.Bytes(rawValue) - if err != nil { - log.Panic("decode binary value failed", zap.String("value", util.RedactAny(rawValue)), zap.Error(err)) + var value any + switch rawValue := holder.Values[i].(type) { + case nil: + value = nil + case []byte: + if common.IsBinaryMySQLType(mysqlType) { + rawValue, err := bytesDecoder.Bytes(rawValue) + if err != nil { + log.Panic("decode binary value failed", zap.String("value", util.RedactAny(rawValue)), zap.Error(err)) + } + value = string(rawValue) + } else if strings.Contains(mysqlType, "bit") { + bitValue := common.MustBinaryLiteralToInt(rawValue) + value = strconv.FormatUint(bitValue, 10) + } else { + value = string(rawValue) } - value = string(rawValue) - } else if strings.Contains(mysqlType, "bit") || strings.Contains(mysqlType, "set") { - bitValue := common.MustBinaryLiteralToInt(rawValue) - value = strconv.FormatUint(bitValue, 10) - } else { - value = string(rawValue) + case string: + value = rawValue + case int64, uint64, float32, float64: + value = fmt.Sprintf("%v", rawValue) + default: + log.Panic("unexpected column value type", zap.Any("type", reflect.TypeOf(rawValue)), zap.Any("rawValue", util.RedactAny(rawValue)), zap.Any("mysqlType", mysqlType)) } mysqlTypeMap[name] = mysqlType data[name] = value @@ -360,12 +378,9 @@ func (d *decoder) NextDDLEvent() *commonEvent.DDLEvent { tableIDAllocator.AddBlockTableID(result.SchemaName, result.TableName, tableIDAllocator.Allocate(result.SchemaName, result.TableName)) result.BlockedTables = common.GetBlockedTables(tableIDAllocator, result) - cacheKey := tableKey{ - schema: result.SchemaName, - table: result.TableName, - } // if receive a table level DDL, just remove the table info to trigger create a new one. - delete(d.tableInfoCache, cacheKey) + delete(d.tableInfoCache, tableKey{schema: result.SchemaName, table: result.TableName}) + delete(d.tableInfoCache, tableKey{schema: result.SchemaName, table: result.TableName}) return result } diff --git a/pkg/sink/codec/common/helper.go b/pkg/sink/codec/common/helper.go index 2cbf1c201f..e92fc9a102 100644 --- a/pkg/sink/codec/common/helper.go +++ b/pkg/sink/codec/common/helper.go @@ -365,6 +365,8 @@ func MustSnapshotQuery( zap.String("schema", schema), zap.String("table", table), zap.Uint64("commitTs", commitTs), zap.Error(err)) } + // go-mysql-driver 1.8 converts integer/float values into int64/double even in text protocol. + // This doesn't increase allocation compared to []byte and conversion cost is negilible. for rows.Next() { err = rows.Scan(holder.ValuePointers...) if err != nil { diff --git a/pkg/sink/codec/open/decoder.go b/pkg/sink/codec/open/decoder.go index b6eeae3ee6..3cd9848036 100644 --- a/pkg/sink/codec/open/decoder.go +++ b/pkg/sink/codec/open/decoder.go @@ -19,6 +19,7 @@ import ( "encoding/base64" "encoding/binary" "encoding/json" + "fmt" "path/filepath" "slices" "sort" @@ -237,6 +238,12 @@ func buildColumns( var flag uint64 // todo: we can extract more detailed type information here. dataType := strings.ToLower(columnType.DatabaseTypeName()) + // Snapshot query returns enum/set as their string representations, while open protocol uses + // integer/bitset values for these types. Downgrade enum/set to varchar to keep the + // assembled handle-key-only events decodable. + if strings.HasPrefix(dataType, "enum") || strings.HasPrefix(dataType, "set") { + dataType = "varchar" + } if common.IsUnsignedMySQLType(dataType) { flag |= unsignedFlag } @@ -588,6 +595,10 @@ func formatColumn(c column, ft types.FieldType) column { data, err = strconv.ParseFloat(string(v), 64) case json.Number: data, err = v.Float64() + case float64: + data = v + case float32: + data = float64(v) default: log.Panic("invalid column value, please report a bug", zap.String("col", util.RedactAny(c)), zap.Any("type", v)) } @@ -605,6 +616,8 @@ func formatColumn(c column, ft types.FieldType) column { data = string(v) case []uint8: data = string(v) + case int64, uint64: + data = fmt.Sprintf("%v", v) default: log.Panic("invalid column value, please report a bug", zap.String("col", util.RedactAny(c)), zap.Any("type", v)) } @@ -621,8 +634,10 @@ func formatColumn(c column, ft types.FieldType) column { switch v := c.Value.(type) { case json.Number: value, err = v.Int64() - case []uint8: - value, err = strconv.ParseInt(string(v), 10, 64) + case int64: + value = v + case uint64: + value = int64(v) default: log.Panic("invalid column value for year", zap.String("value", util.RedactAny(c.Value)), zap.Any("type", v)) } diff --git a/pkg/sink/codec/simple/decoder.go b/pkg/sink/codec/simple/decoder.go index 89c0c2cb02..5ce956132b 100644 --- a/pkg/sink/codec/simple/decoder.go +++ b/pkg/sink/codec/simple/decoder.go @@ -531,76 +531,52 @@ func parseValue( if value == nil { return nil } + var val string + switch v := value.(type) { + case []byte: + val = string(v) + default: + val = fmt.Sprintf("%v", value) + } var err error switch ft.GetType() { case mysql.TypeBit: - switch v := value.(type) { - case []uint8: - value = common.MustBinaryLiteralToInt(v) - default: - } + v := common.MustBinaryLiteralToInt([]byte(val)) + return strconv.FormatUint(v, 10) case mysql.TypeTimestamp: - var ts string - switch v := value.(type) { - case string: - ts = v - // the timestamp value maybe []uint8 if it's queried from upstream TiDB. - case []uint8: - ts = string(v) - } return map[string]interface{}{ "location": location, - "value": ts, + "value": val, } case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeDuration, mysql.TypeTiDBVectorFloat32, mysql.TypeJSON: - return string(value.([]uint8)) + return val case mysql.TypeEnum: - switch v := value.(type) { - case []uint8: - data := string(v) - var enum types.Enum - enum, err = types.ParseEnumName(ft.GetElems(), data, ft.GetCollate()) - value = enum.Value + var enum types.Enum + enum, err = types.ParseEnumName(ft.GetElems(), val, ft.GetCollate()) + if err != nil { + log.Panic("parse enum name failed", + zap.Any("elems", ft.GetElems()), zap.Any("name", value), zap.Error(err)) } + return strconv.FormatUint(enum.Value, 10) case mysql.TypeSet: - switch v := value.(type) { - case []uint8: - data := string(v) - var set types.Set - set, err = types.ParseSetName(ft.GetElems(), data, ft.GetCollate()) - value = set.Value + var set types.Set + set, err = types.ParseSetName(ft.GetElems(), val, ft.GetCollate()) + if err != nil { + log.Panic("parse set name failed", + zap.Any("elems", ft.GetElems()), zap.Any("name", value), zap.Error(err)) } + return strconv.FormatUint(set.Value, 10) default: } if err != nil { log.Panic("parse enum / set name failed", zap.Any("elems", ft.GetElems()), zap.Any("name", value), zap.Error(err)) } - var result string - switch v := value.(type) { - case int64: - result = strconv.FormatInt(v, 10) - case uint64: - result = strconv.FormatUint(v, 10) - case float32: - result = strconv.FormatFloat(float64(v), 'f', -1, 32) - case float64: - result = strconv.FormatFloat(v, 'f', -1, 64) - case string: - result = v - case []byte: - if mysql.HasBinaryFlag(ft.GetFlag()) { - result = base64.StdEncoding.EncodeToString(v) - } else { - result = string(v) - } - case types.VectorFloat32: - result = v.String() - default: - result = fmt.Sprintf("%v", v) + if mysql.HasBinaryFlag(ft.GetFlag()) { + val = base64.StdEncoding.EncodeToString([]byte(val)) } - return result + return val } func buildDMLEvent(msg *message, tableInfo *commonType.TableInfo, enableRowChecksum bool, db *sql.DB) *commonEvent.DMLEvent { diff --git a/pkg/sink/mysql/mysql_writer_dml.go b/pkg/sink/mysql/mysql_writer_dml.go index a3100d6dc5..1d719780bc 100644 --- a/pkg/sink/mysql/mysql_writer_dml.go +++ b/pkg/sink/mysql/mysql_writer_dml.go @@ -85,23 +85,25 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err // Step 2: prepare the dmls for each group var ( - queryList []string - argsList [][]interface{} + queryList []string + argsList [][]interface{} + rowTypesList []common.RowType ) for _, sortedEventGroups := range eventsGroupSortedByUpdateTs { for _, eventsInGroup := range sortedEventGroups { tableInfo := eventsInGroup[0].TableInfo if w.cfg.EnableActiveActive { - queryList, argsList = w.genActiveActiveSQL(tableInfo, eventsInGroup) + queryList, argsList, rowTypesList = w.genActiveActiveSQL(tableInfo, eventsInGroup) } else { if !w.shouldGenBatchSQL(tableInfo, eventsInGroup) { - queryList, argsList = w.generateNormalSQLs(eventsInGroup) + queryList, argsList, rowTypesList = w.generateNormalSQLs(eventsInGroup) } else { - queryList, argsList = w.generateBatchSQL(eventsInGroup) + queryList, argsList, rowTypesList = w.generateBatchSQL(eventsInGroup) } } dmls.sqls = append(dmls.sqls, queryList...) dmls.values = append(dmls.values, argsList...) + dmls.rowTypes = append(dmls.rowTypes, rowTypesList...) } } // Pre-check log level to avoid dmls.String() being called unnecessarily @@ -112,7 +114,7 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err return dmls, nil } -func (w *Writer) genActiveActiveSQL(tableInfo *common.TableInfo, eventsInGroup []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) genActiveActiveSQL(tableInfo *common.TableInfo, eventsInGroup []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { if !w.shouldGenBatchSQL(tableInfo, eventsInGroup) { return w.generateActiveActiveNormalSQLs(eventsInGroup) } diff --git a/pkg/sink/mysql/mysql_writer_dml_active_active.go b/pkg/sink/mysql/mysql_writer_dml_active_active.go index 9e3ba2d432..3b20026586 100644 --- a/pkg/sink/mysql/mysql_writer_dml_active_active.go +++ b/pkg/sink/mysql/mysql_writer_dml_active_active.go @@ -30,9 +30,10 @@ import ( // ===== Normal SQL layer ===== // generateActiveActiveNormalSQLs emits one UPSERT per row without any cross-event batching. -func (w *Writer) generateActiveActiveNormalSQLs(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateActiveActiveNormalSQLs(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { queries := make([]string, 0) argsList := make([][]interface{}, 0) + rowTypes := make([]common.RowType, 0) for _, event := range events { if event.Len() == 0 { continue @@ -50,53 +51,59 @@ func (w *Writer) generateActiveActiveNormalSQLs(events []*commonEvent.DMLEvent) if originTsChecker.shouldDropRow(&row, event.CommitTs) { continue } - sql, args := buildActiveActiveUpsertSQL( + sql, args, rowType := buildActiveActiveUpsertSQL( event.TableInfo, []*commonEvent.RowChange{&row}, []uint64{event.CommitTs}, ) queries = append(queries, sql) argsList = append(argsList, args) + rowTypes = append(rowTypes, rowType) } } - return queries, argsList + return queries, argsList, rowTypes } // ===== Per-event batch layer ===== // generateActiveActiveBatchSQLForPerEvent falls back to per-event batching when merging fails. -func (w *Writer) generateActiveActiveBatchSQLForPerEvent(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateActiveActiveBatchSQLForPerEvent(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { var ( - queries []string - args [][]interface{} + queriesList []string + argsList [][]interface{} + rowTypesList []common.RowType ) for _, event := range events { if event.Len() == 0 { continue } - sqls, vals := w.generateActiveActiveSQLForSingleEvent(event) - queries = append(queries, sqls...) - args = append(args, vals...) + sqls, vals, rowTypes := w.generateActiveActiveSQLForSingleEvent(event) + queriesList = append(queriesList, sqls...) + argsList = append(argsList, vals...) + rowTypesList = append(rowTypesList, rowTypes...) } - return queries, args + return queriesList, argsList, rowTypesList } // generateActiveActiveSQLForSingleEvent merges rows from a single event into one active-active UPSERT. -func (w *Writer) generateActiveActiveSQLForSingleEvent(event *commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateActiveActiveSQLForSingleEvent(event *commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { rows, commitTs := w.collectActiveActiveRows(event) if len(rows) == 0 { - return nil, nil + return nil, nil, nil } - sql, args := buildActiveActiveUpsertSQL(event.TableInfo, rows, commitTs) - return []string{sql}, [][]interface{}{args} + sql, args, rowType := buildActiveActiveUpsertSQL(event.TableInfo, rows, commitTs) + if sql == "" { + return nil, nil, nil + } + return []string{sql}, [][]interface{}{args}, []common.RowType{rowType} } // ===== Cross-event batch layer ===== // generateActiveActiveBatchSQL reuses the unsafe batching logic to build a single LWW UPSERT. -func (w *Writer) generateActiveActiveBatchSQL(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateActiveActiveBatchSQL(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { if len(events) == 0 { - return []string{}, [][]interface{}{} + return []string{}, [][]interface{}{}, []common.RowType{} } if len(events) == 1 { @@ -152,7 +159,7 @@ func (w *Writer) batchSingleTxnActiveRows( commitTs []uint64, tableInfo *common.TableInfo, tableID int64, -) ([]string, [][]interface{}) { +) ([]string, [][]interface{}, []common.RowType) { if len(rows) != len(commitTs) { log.Panic("mismatched rows and commitTs for active active batch", zap.Int("rows", len(rows)), zap.Int("commitTs", len(commitTs))) @@ -171,10 +178,10 @@ func (w *Writer) batchSingleTxnActiveRows( filteredCommitTs = append(filteredCommitTs, commitTs[i]) } if len(filteredRows) == 0 { - return nil, nil + return nil, nil, nil } - sql, args := buildActiveActiveUpsertSQL(tableInfo, filteredRows, filteredCommitTs) - return []string{sql}, [][]interface{}{args} + sql, args, rowType := buildActiveActiveUpsertSQL(tableInfo, filteredRows, filteredCommitTs) + return []string{sql}, [][]interface{}{args}, []common.RowType{rowType} } // originTsChecker filters out rows whose upstream payload already contains a non-NULL _tidb_origin_ts. diff --git a/pkg/sink/mysql/mysql_writer_dml_active_active_test.go b/pkg/sink/mysql/mysql_writer_dml_active_active_test.go index 5b31f51bdc..f3cea842a1 100644 --- a/pkg/sink/mysql/mysql_writer_dml_active_active_test.go +++ b/pkg/sink/mysql/mysql_writer_dml_active_active_test.go @@ -15,6 +15,7 @@ package mysql import ( "testing" + "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/stretchr/testify/require" ) @@ -35,7 +36,7 @@ func TestBuildActiveActiveUpsertSQLMultiRows(t *testing.T) { "insert into t values (2, 'bob', 11, NULL)", ) rows, commitTs := writer.collectActiveActiveRows(event) - sql, args := buildActiveActiveUpsertSQL(event.TableInfo, rows, commitTs) + sql, args, rowTypes := buildActiveActiveUpsertSQL(event.TableInfo, rows, commitTs) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`,`_tidb_origin_ts`,`_tidb_softdelete_time`) VALUES (?,?,?,?),(?,?,?,?) ON DUPLICATE KEY UPDATE `id` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`id`), `id`),`name` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`name`), `name`),`_tidb_origin_ts` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`_tidb_origin_ts`), `_tidb_origin_ts`),`_tidb_softdelete_time` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`_tidb_softdelete_time`), `_tidb_softdelete_time`)", sql) @@ -44,6 +45,7 @@ func TestBuildActiveActiveUpsertSQLMultiRows(t *testing.T) { int64(2), "bob", event.CommitTs, nil, } require.Equal(t, expectedArgs, args) + require.Equal(t, common.RowTypeInsert, rowTypes) } func TestActiveActiveNormalSQLs(t *testing.T) { @@ -65,9 +67,10 @@ func TestActiveActiveNormalSQLs(t *testing.T) { "insert into t values (3, 'c', 12, NULL)", ) - sqls, args := writer.generateActiveActiveNormalSQLs([]*commonEvent.DMLEvent{event}) + sqls, args, rowTypes := writer.generateActiveActiveNormalSQLs([]*commonEvent.DMLEvent{event}) require.Len(t, sqls, 3) require.Len(t, args, 3) + require.Len(t, rowTypes, 3) expectedSQL := "INSERT INTO `test`.`t` (`id`,`name`,`_tidb_origin_ts`,`_tidb_softdelete_time`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`id`), `id`),`name` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`name`), `name`),`_tidb_origin_ts` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`_tidb_origin_ts`), `_tidb_origin_ts`),`_tidb_softdelete_time` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`_tidb_softdelete_time`), `_tidb_softdelete_time`)" require.Equal(t, expectedSQL, sqls[0]) require.Equal(t, expectedSQL, sqls[1]) @@ -95,9 +98,10 @@ func TestActiveActivePerEventBatch(t *testing.T) { "insert into t values (2, 'b', 11, NULL)", ) - sqls, args := writer.generateActiveActiveBatchSQLForPerEvent([]*commonEvent.DMLEvent{event}) + sqls, args, rowTypes := writer.generateActiveActiveBatchSQLForPerEvent([]*commonEvent.DMLEvent{event}) require.Len(t, sqls, 1) require.Len(t, args, 1) + require.Len(t, rowTypes, 1) expectedSQL := "INSERT INTO `test`.`t` (`id`,`name`,`_tidb_origin_ts`,`_tidb_softdelete_time`) VALUES (?,?,?,?),(?,?,?,?) ON DUPLICATE KEY UPDATE `id` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`id`), `id`),`name` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`name`), `name`),`_tidb_origin_ts` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`_tidb_origin_ts`), `_tidb_origin_ts`),`_tidb_softdelete_time` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`_tidb_softdelete_time`), `_tidb_softdelete_time`)" require.Equal(t, expectedSQL, sqls[0]) require.Equal(t, []interface{}{ @@ -126,9 +130,10 @@ func TestActiveActiveCrossEventBatch(t *testing.T) { "insert into t values (2, 'b', 11, NULL)", ) - sqls, args := writer.generateActiveActiveBatchSQL([]*commonEvent.DMLEvent{eventA, eventB}) + sqls, args, rowTypes := writer.generateActiveActiveBatchSQL([]*commonEvent.DMLEvent{eventA, eventB}) require.Len(t, sqls, 1) require.Len(t, args, 1) + require.Len(t, rowTypes, 1) expectedSQL := "INSERT INTO `test`.`t` (`id`,`name`,`_tidb_origin_ts`,`_tidb_softdelete_time`) VALUES (?,?,?,?),(?,?,?,?) ON DUPLICATE KEY UPDATE `id` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`id`), `id`),`name` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`name`), `name`),`_tidb_origin_ts` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`_tidb_origin_ts`), `_tidb_origin_ts`),`_tidb_softdelete_time` = IF((IFNULL(`_tidb_origin_ts`, `_tidb_commit_ts`) <= VALUES(`_tidb_origin_ts`)), VALUES(`_tidb_softdelete_time`), `_tidb_softdelete_time`)" require.Equal(t, expectedSQL, sqls[0]) require.Equal(t, []interface{}{ @@ -154,7 +159,7 @@ func TestActiveActiveDropRowsWithNonNullOriginTsForTiDBDownstream(t *testing.T) event := helper.DML2Event("test", "t", "insert into t values (1, 'a', 10, NULL)", ) - sqls, args := writer.generateActiveActiveNormalSQLs([]*commonEvent.DMLEvent{event}) + sqls, args, rowTypes := writer.generateActiveActiveNormalSQLs([]*commonEvent.DMLEvent{event}) require.Len(t, sqls, 0) require.Len(t, args, 0) @@ -162,9 +167,10 @@ func TestActiveActiveDropRowsWithNonNullOriginTsForTiDBDownstream(t *testing.T) "insert into t values (2, 'b', 11, NULL)", "insert into t values (3, 'c', 12, NULL)", ) - sqls, args = writer.generateActiveActiveBatchSQLForPerEvent([]*commonEvent.DMLEvent{event}) + sqls, args, rowTypes = writer.generateActiveActiveBatchSQLForPerEvent([]*commonEvent.DMLEvent{event}) require.Len(t, sqls, 0) require.Len(t, args, 0) + require.Len(t, rowTypes, 0) eventA := helper.DML2Event("test", "t", "insert into t values (4, 'd', 13, NULL)", @@ -172,9 +178,10 @@ func TestActiveActiveDropRowsWithNonNullOriginTsForTiDBDownstream(t *testing.T) eventB := helper.DML2Event("test", "t", "insert into t values (5, 'e', 14, NULL)", ) - sqls, args = writer.generateActiveActiveBatchSQL([]*commonEvent.DMLEvent{eventA, eventB}) + sqls, args, rowTypes = writer.generateActiveActiveBatchSQL([]*commonEvent.DMLEvent{eventA, eventB}) require.Len(t, sqls, 0) require.Len(t, args, 0) + require.Len(t, rowTypes, 0) } func TestActiveActiveKeepRowsWithNullOriginTsForTiDBDownstream(t *testing.T) { @@ -195,8 +202,9 @@ func TestActiveActiveKeepRowsWithNullOriginTsForTiDBDownstream(t *testing.T) { "insert into t values (1, 'a', NULL, NULL)", ) - sqls, args := writer.generateActiveActiveNormalSQLs([]*commonEvent.DMLEvent{event}) + sqls, args, rowTypes := writer.generateActiveActiveNormalSQLs([]*commonEvent.DMLEvent{event}) require.Len(t, sqls, 1) require.Len(t, args, 1) + require.Len(t, rowTypes, 1) require.Equal(t, []interface{}{int64(1), "a", event.CommitTs, nil}, args[0]) } diff --git a/pkg/sink/mysql/mysql_writer_dml_batch.go b/pkg/sink/mysql/mysql_writer_dml_batch.go index cb6d211d19..f494e5a6ff 100644 --- a/pkg/sink/mysql/mysql_writer_dml_batch.go +++ b/pkg/sink/mysql/mysql_writer_dml_batch.go @@ -43,10 +43,11 @@ type rowChangeWithKeys struct { // ===== Normal SQL (one row -> one statement) ===== // generateNormalSQLs simply iterates each event and produces SQLs without batching. -func (w *Writer) generateNormalSQLs(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateNormalSQLs(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { var ( - queries []string - args [][]interface{} + queries []string + args [][]interface{} + rowTypes []common.RowType ) for _, event := range events { @@ -54,15 +55,16 @@ func (w *Writer) generateNormalSQLs(events []*commonEvent.DMLEvent) ([]string, [ continue } - queryList, argsList := w.generateNormalSQL(event) + queryList, argsList, rowTypesList := w.generateNormalSQL(event) queries = append(queries, queryList...) args = append(args, argsList...) + rowTypes = append(rowTypes, rowTypesList...) } - return queries, args + return queries, args, rowTypes } // generateNormalSQL converts a single DMLEvent into SQL statements, respecting safe mode. -func (w *Writer) generateNormalSQL(event *commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateNormalSQL(event *commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { inSafeMode := w.cfg.SafeMode || w.isInErrorCausedSafeMode || event.CommitTs < event.ReplicatingTs log.Debug("inSafeMode", @@ -75,8 +77,9 @@ func (w *Writer) generateNormalSQL(event *commonEvent.DMLEvent) ([]string, [][]i ) var ( - queries []string - argsList [][]interface{} + queries []string + argsList [][]interface{} + rowTypesList []common.RowType ) for { row, ok := event.GetNextRow() @@ -85,8 +88,9 @@ func (w *Writer) generateNormalSQL(event *commonEvent.DMLEvent) ([]string, [][]i break } var ( - query string - args []interface{} + query string + args []interface{} + rowType common.RowType ) switch row.RowType { case common.RowTypeUpdate: @@ -95,29 +99,35 @@ func (w *Writer) generateNormalSQL(event *commonEvent.DMLEvent) ([]string, [][]i if query != "" { queries = append(queries, query) argsList = append(argsList, args) + rowTypesList = append(rowTypesList, common.RowTypeDelete) } query, args = buildInsert(event.TableInfo, row, inSafeMode) + rowType = common.RowTypeInsert } else { query, args = buildUpdate(event.TableInfo, row) + rowType = common.RowTypeUpdate } case common.RowTypeDelete: query, args = buildDelete(event.TableInfo, row) + rowType = common.RowTypeDelete case common.RowTypeInsert: query, args = buildInsert(event.TableInfo, row, inSafeMode) + rowType = common.RowTypeInsert } if query != "" { queries = append(queries, query) argsList = append(argsList, args) + rowTypesList = append(rowTypesList, rowType) } } - return queries, argsList + return queries, argsList, rowTypesList } // ===== Per-event batch (single DMLEvent) ===== // generateSQLForSingleEvent batches all row changes of one DMLEvent using the given safe-mode flag. -func (w *Writer) generateSQLForSingleEvent(event *commonEvent.DMLEvent, inDataSafeMode bool) ([]string, [][]interface{}) { +func (w *Writer) generateSQLForSingleEvent(event *commonEvent.DMLEvent, inDataSafeMode bool) ([]string, [][]interface{}, []common.RowType) { tableInfo := event.TableInfo rowLists := make([]*commonEvent.RowChange, 0, event.Len()) for { @@ -132,21 +142,23 @@ func (w *Writer) generateSQLForSingleEvent(event *commonEvent.DMLEvent, inDataSa } // generateBatchSQLsPerEvent falls back to per-event batching when cross-event merging is not possible. -func (w *Writer) generateBatchSQLsPerEvent(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateBatchSQLsPerEvent(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { var ( - queries []string - args [][]interface{} + queries []string + args [][]interface{} + rowTypesList []common.RowType ) for _, event := range events { if event.Len() == 0 { continue } inSafeMode := w.cfg.SafeMode || w.isInErrorCausedSafeMode || event.CommitTs < event.ReplicatingTs - sqls, vals := w.generateSQLForSingleEvent(event, inSafeMode) + sqls, vals, rowTypes := w.generateSQLForSingleEvent(event, inSafeMode) queries = append(queries, sqls...) args = append(args, vals...) + rowTypesList = append(rowTypesList, rowTypes...) } - return queries, args + return queries, args, rowTypesList } // ========= Cross-event batch: multiple DMLEvents ======== @@ -178,9 +190,9 @@ func (w *Writer) generateBatchSQLsPerEvent(events []*commonEvent.DMLEvent) ([]st // returns the SQLs/args pairs accordingly. // Considering the batch algorithm in safe mode is O(n^3), which n is the number of rows. // So we need to limit the number of rows in one batch to avoid performance issues. -func (w *Writer) generateBatchSQL(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateBatchSQL(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { if len(events) == 0 { - return []string{}, [][]interface{}{} + return []string{}, [][]interface{}{}, []common.RowType{} } inSafeMode := w.cfg.SafeMode || w.isInErrorCausedSafeMode || events[0].CommitTs < events[0].ReplicatingTs @@ -363,12 +375,12 @@ func (w *Writer) buildRowChangesForUnSafeBatch( // generateBatchSQLInUnSafeMode merges rows with the same keys and produces batch SQLs // following the non-safe path (INSERT/UPDATE/DELETE without REPLACE semantics). -func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { tableInfo := events[0].TableInfo finalRowLists, _, err := w.buildRowChangesForUnSafeBatch(events, tableInfo) if err != nil { - sql, values := w.generateBatchSQLsPerEvent(events) - log.Info("normal sql should be", zap.Any("sql", sql), zap.String("values", util.RedactAny(values)), zap.Int("writerID", w.id)) + sql, values, rowTypes := w.generateBatchSQLsPerEvent(events) + log.Info("normal sql should be", zap.Any("sql", sql), zap.String("values", util.RedactAny(values)), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id)) log.Panic("invalid rows when generating batch SQL in unsafe mode", zap.Error(err), zap.Any("events", events), zap.Int("writerID", w.id)) } @@ -376,14 +388,14 @@ func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([ } // generateBatchSQLInSafeMode rewrites rows into delete/insert pairs to ensure REPLACE semantics. -func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { +func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { tableInfo := events[0].TableInfo // step 1. divide update row to delete row and insert row, and set into map based on the key hash rowsMap := make(map[uint64][]*commonEvent.RowChange) hashToKeyMap := make(map[uint64][]byte) - addRowToMap := func(row *commonEvent.RowChange, rowData *chunk.Row, event *commonEvent.DMLEvent) ([]string, [][]interface{}, bool) { + addRowToMap := func(row *commonEvent.RowChange, rowData *chunk.Row, event *commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType, bool) { hashValue, keyValue := genKeyAndHash(rowData, tableInfo) if _, ok := hashToKeyMap[hashValue]; !ok { hashToKeyMap[hashValue] = keyValue @@ -392,12 +404,12 @@ func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]s log.Warn("the key hash is equal, but the keys is not the same; so we don't use batch generate sql, but use the normal generated sql instead") event.Rewind() // reset event // fallback to per-event batch sql - sql, args := w.generateBatchSQLsPerEvent(events) - return sql, args, false + sql, args, rowTypes := w.generateBatchSQLsPerEvent(events) + return sql, args, rowTypes, false } } rowsMap[hashValue] = append(rowsMap[hashValue], row) - return nil, nil, true + return nil, nil, nil, true } for _, event := range events { @@ -411,28 +423,28 @@ func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]s case common.RowTypeUpdate: { deleteRow := commonEvent.RowChange{RowType: common.RowTypeDelete, PreRow: row.PreRow} - sql, args, ok := addRowToMap(&deleteRow, &row.PreRow, event) + sql, args, rowTypes, ok := addRowToMap(&deleteRow, &row.PreRow, event) if !ok { - return sql, args + return sql, args, rowTypes } } { insertRow := commonEvent.RowChange{RowType: common.RowTypeInsert, Row: row.Row} - sql, args, ok := addRowToMap(&insertRow, &row.Row, event) + sql, args, rowTypes, ok := addRowToMap(&insertRow, &row.Row, event) if !ok { - return sql, args + return sql, args, rowTypes } } case common.RowTypeDelete: - sql, args, ok := addRowToMap(&row, &row.PreRow, event) + sql, args, rowTypes, ok := addRowToMap(&row, &row.PreRow, event) if !ok { - return sql, args + return sql, args, rowTypes } case common.RowTypeInsert: - sql, args, ok := addRowToMap(&row, &row.Row, event) + sql, args, rowTypes, ok := addRowToMap(&row, &row.Row, event) if !ok { - return sql, args + return sql, args, rowTypes } } } @@ -455,9 +467,9 @@ func (w *Writer) generateBatchSQLInSafeMode(events []*commonEvent.DMLEvent) ([]s for i := 1; i < len(rowChanges); i++ { rowType := rowChanges[i].RowType if rowType == prevType { - sql, values := w.generateBatchSQLsPerEvent(events) - log.Info("normal sql should be", zap.Any("sql", sql), zap.String("values", util.RedactAny(values)), zap.Int("writerID", w.id)) - log.Panic("invalid row changes", zap.String("schemaName", tableInfo.GetSchemaName()), + sql, values, rowTypes := w.generateBatchSQLsPerEvent(events) + log.Info("normal sql should be", zap.Any("sql", sql), zap.String("values", util.RedactAny(values)), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id)) + log.Panic("invalid row changes", zap.String("schemaName", tableInfo.GetSchemaName()), zap.Any("PKIndex", tableInfo.GetPKIndex()), zap.String("tableName", tableInfo.GetTableName()), zap.Any("rowChanges", rowChanges), zap.Any("prevType", prevType), zap.Any("currentType", rowType), zap.Int("writerID", w.id)) } @@ -476,7 +488,7 @@ func (w *Writer) batchSingleTxnDmls( rows []*commonEvent.RowChange, tableInfo *common.TableInfo, inSafeMode bool, -) (sqls []string, values [][]interface{}) { +) (sqls []string, values [][]interface{}, rowTypes []common.RowType) { insertRows, updateRows, deleteRows := w.groupRowsByType(rows, tableInfo) // handle delete @@ -485,6 +497,7 @@ func (w *Writer) batchSingleTxnDmls( sql, value := sqlmodel.GenDeleteSQL(rows...) sqls = append(sqls, sql) values = append(values, value) + rowTypes = append(rowTypes, common.RowTypeDelete) } } @@ -492,9 +505,10 @@ func (w *Writer) batchSingleTxnDmls( if len(updateRows) > 0 { if w.cfg.IsTiDB { for _, rows := range updateRows { - s, v := w.genUpdateSQL(rows...) + s, v, rowType := w.genUpdateSQL(rows...) sqls = append(sqls, s...) values = append(values, v...) + rowTypes = append(rowTypes, rowType...) } // The behavior of update statement differs between TiDB and MySQL. // So we don't use batch update statement when downstream is MySQL. @@ -505,6 +519,7 @@ func (w *Writer) batchSingleTxnDmls( sql, value := row.GenSQL(sqlmodel.DMLUpdate) sqls = append(sqls, sql) values = append(values, value) + rowTypes = append(rowTypes, common.RowTypeUpdate) } } } @@ -521,8 +536,8 @@ func (w *Writer) batchSingleTxnDmls( sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLInsert, rows...) sqls = append(sqls, sql) values = append(values, value) - } + rowTypes = append(rowTypes, common.RowTypeInsert) } } @@ -606,7 +621,7 @@ func (w *Writer) groupRowsByType( } // genUpdateSQL creates batched UPDATE statements when the payload size permits. -func (w *Writer) genUpdateSQL(rows ...*sqlmodel.RowChange) ([]string, [][]interface{}) { +func (w *Writer) genUpdateSQL(rows ...*sqlmodel.RowChange) ([]string, [][]interface{}, []common.RowType) { size := 0 for _, r := range rows { size += int(r.GetApproximateDataSize()) @@ -614,15 +629,17 @@ func (w *Writer) genUpdateSQL(rows ...*sqlmodel.RowChange) ([]string, [][]interf if size < w.cfg.MaxMultiUpdateRowSize*len(rows) { // use multi update in one SQL sql, value := sqlmodel.GenUpdateSQL(rows...) - return []string{sql}, [][]interface{}{value} + return []string{sql}, [][]interface{}{value}, []common.RowType{common.RowTypeUpdate} } // each row has one independent update SQL. sqls := make([]string, 0, len(rows)) values := make([][]interface{}, 0, len(rows)) + rowTypes := make([]common.RowType, 0, len(rows)) for _, row := range rows { sql, value := row.GenSQL(sqlmodel.DMLUpdate) sqls = append(sqls, sql) values = append(values, value) + rowTypes = append(rowTypes, common.RowTypeUpdate) } - return sqls, values + return sqls, values, rowTypes } diff --git a/pkg/sink/mysql/mysql_writer_dml_exec.go b/pkg/sink/mysql/mysql_writer_dml_exec.go index 5a237472a6..8a25ceb236 100644 --- a/pkg/sink/mysql/mysql_writer_dml_exec.go +++ b/pkg/sink/mysql/mysql_writer_dml_exec.go @@ -34,8 +34,8 @@ import ( // execDMLWithMaxRetries executes prepared DMLs with retry/backoff handling. func (w *Writer) execDMLWithMaxRetries(dmls *preparedDMLs) error { - if len(dmls.sqls) != len(dmls.values) { - return cerror.ErrUnexpected.FastGenByArgs(fmt.Sprintf("unexpected number of sqls and values, sqls is %s, values is %s", dmls.sqls, util.RedactAny(dmls.values))) + if len(dmls.sqls) != len(dmls.values) || len(dmls.sqls) != len(dmls.rowTypes) { + return cerror.ErrUnexpected.FastGenByArgs(fmt.Sprintf("unexpected number of sqls and values or rowTypes, sqls is %s, values is %s, row types is %s", dmls.sqls, util.RedactAny(dmls.values), dmls.rowTypes)) } // approximateSize is multiplied by 2 because in extreme circustumas, every @@ -144,12 +144,15 @@ func (w *Writer) sequenceExecute( } } - var execError error + var ( + res sql.Result + execError error + ) if prepStmt == nil { - _, execError = tx.ExecContext(ctx, query, args...) + res, execError = tx.ExecContext(ctx, query, args...) } else { //nolint:sqlclosecheck - _, execError = tx.Stmt(prepStmt).ExecContext(ctx, args...) + res, execError = tx.Stmt(prepStmt).ExecContext(ctx, args...) } if execError != nil { @@ -162,6 +165,11 @@ func (w *Writer) sequenceExecute( cancelFunc() return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(execError, fmt.Sprintf("Failed to execute DMLs, query info:%s, args:%v; ", query, util.RedactArgs(args)))) } + if rowsAffected, err := res.RowsAffected(); err != nil { + log.Warn("get rows affected rows failed", zap.Error(err)) + } else { + w.statistics.RecordRowsAffected(rowsAffected, dmls.rowTypes[i]) + } cancelFunc() } return nil @@ -187,7 +195,7 @@ func (w *Writer) multiStmtExecute( // conn.ExecContext only use one RTT, while db.Begin + tx.ExecContext + db.Commit need three RTTs. // When an error happens before COMMIT, the server session can be left with an open transaction. // Best-effort rollback is required to ensure the connection can be safely reused by the pool. - _, err := conn.ExecContext(ctx, multiStmtSQLWithTxn, multiStmtArgs...) + res, err := conn.ExecContext(ctx, multiStmtSQLWithTxn, multiStmtArgs...) if err != nil { rbCtx, rbCancel := context.WithTimeout(w.ctx, writeTimeout) _, rbErr := conn.ExecContext(rbCtx, "ROLLBACK") @@ -199,6 +207,11 @@ func (w *Writer) multiStmtExecute( } return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Failed to execute DMLs, query info:%s, args:%v; ", multiStmtSQLWithTxn, util.RedactArgs(multiStmtArgs)))) } + if rowsAffected, err := res.RowsAffected(); err != nil { + log.Warn("get rows affected rows failed", zap.Error(err)) + } else { + w.statistics.RecordTotalRowsAffected(rowsAffected, int64(len(dmls.sqls))) + } return nil } diff --git a/pkg/sink/mysql/mysql_writer_dml_test.go b/pkg/sink/mysql/mysql_writer_dml_test.go index a0560970fc..587d3c9e68 100644 --- a/pkg/sink/mysql/mysql_writer_dml_test.go +++ b/pkg/sink/mysql/mysql_writer_dml_test.go @@ -267,9 +267,10 @@ func TestGenerateBatchSQL(t *testing.T) { dmlInsertEvent2 := helper.DML2Event("test", "t", "insert into t values (17, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 17") dmlInsertEvent3 := helper.DML2Event("test", "t", "insert into t values (18, 'test')") - sql, args := writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) + sql, args, rowTypes := writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?)", sql[0]) require.Equal(t, []interface{}{int64(16), "test", int64(17), "test"}, args[0]) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[1]) @@ -277,7 +278,7 @@ func TestGenerateBatchSQL(t *testing.T) { writer.cfg.SafeMode = true writer.cfg.MaxTxnRow = 3 - sql, args = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) + sql, args, rowTypes = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0]) @@ -312,7 +313,7 @@ func TestGenerateBatchSQL(t *testing.T) { // Measure execution time start := time.Now() - sql, args = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlEvent}) + sql, args, rowTypes = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlEvent}) duration := time.Since(start) // Verify performance requirement @@ -348,9 +349,10 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { // Delete A + insert A dmlDeleteEvent := helper.DML2DeleteEvent("test", "t", "insert into t values (1, 'test')", "delete from t where id = 1") dmlInsertEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')") - sql, args := writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent}) + sql, args, rowTypes := writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) require.Equal(t, []interface{}{int64(1)}, args[0]) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[1]) @@ -359,9 +361,10 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { // Delete A + Update A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (2, 'test')", "delete from t where id = 2") dmlUpdateEvent, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (2, 'test')", "update t set name = 'test2' where id = 2") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlUpdateEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlUpdateEvent}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) require.Equal(t, []interface{}{int64(2)}, args[0]) require.Equal(t, "UPDATE `test`.`t` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", sql[1]) @@ -369,7 +372,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { // Insert A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (3, 'test')", "delete from t where id = 3") dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (3, 'test')") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -379,7 +382,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (4, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 4") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (4, 'test')", "update t set name = 'test4' where id = 4") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -388,7 +391,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { // Update A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (5, 'test5')", "delete from t where id = 5") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (5, 'test')", "update t set name = 'test5' where id = 5") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -398,7 +401,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test')", "update t set name = 'test6' where id = 6") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 6") dmlUpdateEvent2, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test6')", "update t set name = 'test7' where id = 6") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "UPDATE `test`.`t` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", sql[0]) @@ -409,9 +412,10 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (7, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 7") dmlInsertEvent2 := helper.DML2Event("test", "t", "insert into t values (7, 'test2')") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?) OR (`id` = ?)", sql[0]) require.Equal(t, []interface{}{int64(7), int64(7)}, args[0]) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[1]) @@ -424,7 +428,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (8, 'test')", "update t set name = 'test8' where id = 8") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 8") dmlDeleteEvent2 := helper.DML2DeleteEvent("test", "t", "insert into t values (8, 'test8')", "delete from t where id = 8") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?) OR (`id` = ?)", sql[0]) @@ -439,7 +443,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (9, 'test9')", "update t set name = 'test10' where id = 9") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 9") dmlDeleteEvent2 = helper.DML2DeleteEvent("test", "t", "insert into t values (9, 'test10')", "delete from t where id = 9") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?) OR (`id` = ?)", sql[0]) @@ -450,9 +454,10 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 10") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (10, 'test')", "delete from t where id = 10") dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (10, 'test2')") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) require.Equal(t, []interface{}{int64(10)}, args[0]) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[1]) @@ -464,7 +469,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (11, 'test')", "update t set name = 'test11' where id = 11") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 11") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (11, 'test11')", "delete from t where id = 11") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -476,7 +481,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test')", "update t set name = 'test12' where id = 12") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 12") dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test12')", "update t set name = 'test13' where id = 12") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -487,7 +492,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 13") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (14, 'test')", "delete from t where id = 14") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (15, 'test')", "update t set name = 'test15' where id = 15") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) require.Equal(t, 3, len(sql)) require.Equal(t, 3, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -503,7 +508,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (17, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 17") dmlInsertEvent3 := helper.DML2Event("test", "t", "insert into t values (18, 'test')") - sql, args = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) + sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0]) @@ -524,7 +529,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { // Delete A + insert A dmlDeleteEvent := helper.DML2DeleteEvent("test", "t", "insert into t values (1, 'test')", "delete from t where id = 1") dmlInsertEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')") - sql, args := writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent}) + sql, args, rowTypes := writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -533,7 +538,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { // Insert A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (3, 'test')", "delete from t where id = 3") dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (3, 'test')") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -543,7 +548,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (4, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 4") dmlUpdateEvent, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (4, 'test')", "update t set name = 'test4' where id = 4") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -552,7 +557,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { // Update A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (5, 'test5')", "delete from t where id = 5") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (5, 'test')", "update t set name = 'test5' where id = 5") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -562,7 +567,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test')", "update t set name = 'test6' where id = 6") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 6") dmlUpdateEvent2, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test6')", "update t set name = 'test7' where id = 6") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -573,7 +578,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (7, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 7") dmlInsertEvent2 := helper.DML2Event("test", "t", "insert into t values (7, 'test2')") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -586,7 +591,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (8, 'test')", "update t set name = 'test8' where id = 8") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 8") dmlDeleteEvent2 := helper.DML2DeleteEvent("test", "t", "insert into t values (8, 'test8')", "delete from t where id = 8") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -601,7 +606,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (9, 'test9')", "update t set name = 'test10' where id = 9") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 9") dmlDeleteEvent2 = helper.DML2DeleteEvent("test", "t", "insert into t values (9, 'test10')", "delete from t where id = 9") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -612,7 +617,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 10") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (10, 'test')", "delete from t where id = 10") dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (10, 'test2')") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -624,7 +629,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (11, 'test')", "update t set name = 'test11' where id = 11") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 11") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (11, 'test11')", "delete from t where id = 11") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) @@ -636,7 +641,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test')", "update t set name = 'test12' where id = 12") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 12") dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test12')", "update t set name = 'test13' where id = 12") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -647,9 +652,10 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 13") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (14, 'test')", "delete from t where id = 14") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (15, 'test')", "update t set name = 'test15' where id = 15") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) + require.Equal(t, 2, len(rowTypes)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0]) require.Equal(t, []interface{}{int64(14)}, args[0]) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?)", sql[1]) @@ -667,7 +673,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (17, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 17") dmlInsertEvent3 := helper.DML2Event("test", "t", "insert into t values (18, 'test')") - sql, args = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) + sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0]) diff --git a/pkg/sink/mysql/sql_builder.go b/pkg/sink/mysql/sql_builder.go index cab827699d..54e0ec75da 100644 --- a/pkg/sink/mysql/sql_builder.go +++ b/pkg/sink/mysql/sql_builder.go @@ -35,6 +35,7 @@ type tsPair struct { type preparedDMLs struct { sqls []string values [][]interface{} + rowTypes []common.RowType rowCount int approximateSize int64 tsPairs []tsPair @@ -116,6 +117,20 @@ func (d *preparedDMLs) fmtSqls() string { return builder.String() } +func (d *preparedDMLs) RowsAffected() int64 { + var count int64 + for _, rowType := range d.rowTypes { + switch rowType { + case common.RowTypeInsert, common.RowTypeDelete: + count += 1 + case common.RowTypeUpdate: + count += 2 + default: + } + } + return count +} + var dmlsPool = sync.Pool{ New: func() interface{} { return &preparedDMLs{ @@ -129,6 +144,7 @@ var dmlsPool = sync.Pool{ func (d *preparedDMLs) reset() { d.sqls = d.sqls[:0] d.values = d.values[:0] + d.rowTypes = d.rowTypes[:0] d.tsPairs = d.tsPairs[:0] d.rowCount = 0 d.approximateSize = 0 @@ -243,9 +259,9 @@ func buildActiveActiveUpsertSQL( tableInfo *common.TableInfo, rows []*commonEvent.RowChange, commitTs []uint64, -) (string, []interface{}) { +) (string, []interface{}, common.RowType) { if tableInfo == nil || len(rows) == 0 { - return "", nil + return "", nil, common.RowTypeInsert } if len(commitTs) != len(rows) { log.Panic("mismatched commitTs and rows length", @@ -265,7 +281,7 @@ func buildActiveActiveUpsertSQL( insertColumns = append(insertColumns, col.Name.O) } if len(insertColumns) == 0 { - return "", nil + return "", nil, common.RowTypeInsert } valueOffsets := make([]int, len(insertColumns)) @@ -342,7 +358,7 @@ func buildActiveActiveUpsertSQL( builder.WriteString(fmt.Sprintf("%s = IF((%s), VALUES(%s), %s)", quoted, cond, quoted, quoted)) } - return builder.String(), args + return builder.String(), args, common.RowTypeInsert } func getArgs(row *chunk.Row, tableInfo *common.TableInfo) []interface{} {