metrics: add DML rows affected metric#3909
Conversation
Summary of ChangesHello @wk989898, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances TiCDC's monitoring capabilities by introducing a new metric that precisely tracks the number of rows affected by Data Manipulation Language (DML) operations as they are flushed to the downstream system. This provides critical insights into the volume and type of data changes being replicated, improving observability and enabling more accurate performance analysis of the data synchronization process. The new metric is also integrated into the existing Grafana dashboards for immediate visualization. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new metric, ticdc_sink_dml_event_affected_row_count, to monitor the number of rows affected by DML operations in the sink. The changes are well-implemented, including the definition of the new Prometheus counter, the logic in the MySQL sink to track DML row types, and the recording of both expected and actual affected row counts. The Grafana dashboards have also been updated to visualize this new metric. I have a couple of suggestions for pkg/sink/mysql/mysql_writer_dml.go to improve code maintainability and cleanliness.
|
Please paste the screenshots of your newly added metrics in the pr description. |
pkg/eventservice/event_scanner.go
Outdated
| if err != nil { | ||
| return err | ||
| } | ||
| shouldSplit, err = event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo) |
There was a problem hiding this comment.
Do we have a type like 'PkChanged'?
There was a problem hiding this comment.
No. If the PK changed, there should be two raw events.
Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds Grafana panels for DML event types and affected rows; introduces EventService and sink Prometheus counters; threads a Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Source as EventSource
participant ES as EventService
participant Metrics as Metrics
participant Sink as Sink
participant MySQL as MySQLWriter
Source->>ES: send raw KV events (include mode)
ES->>ES: classify event via RawKVEntry.GetType()
ES->>Metrics: update EventServiceSendDMLTypeCount(mode,dml_type)
ES->>Sink: forward DML events (include mode, raw type)
Sink->>MySQL: generate batched SQLs and collect rowTypes
MySQL->>Metrics: update ExecDMLEventRowsAffectedCounter(keyspace,changefeed,type,row_type)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
pkg/sink/mysql/sql_builder.go (1)
144-150:⚠️ Potential issue | 🔴 CriticalBug:
rowTypesis not cleared inreset(), causing stale data when the object is reused fromdmlsPool.Since
preparedDMLsinstances are pooled viadmlsPool(line 134), omittingrowTypesfromreset()means a recycled object will carry over the previous caller's row types, inflatingRowsAffected()and corrupting metrics.Proposed fix
func (d *preparedDMLs) reset() { d.sqls = d.sqls[:0] d.values = d.values[:0] d.tsPairs = d.tsPairs[:0] + d.rowTypes = d.rowTypes[:0] d.rowCount = 0 d.approximateSize = 0 }pkg/sink/mysql/mysql_writer_dml.go (1)
112-123:⚠️ Potential issue | 🔴 Critical
genActiveActiveSQLdoes not returnrowTypes— causes index-out-of-bounds insequenceExecute.When
EnableActiveActiveis true (line 112),genActiveActiveSQLreturns only([]string, [][]interface{})without row types. TherowTypesListremains empty, butdmls.sqlsis populated. Later insequenceExecuteat line 777,dmls.rowTypes[i]will panic.Either
genActiveActiveSQLneeds to be updated to return[]common.RowType, orsequenceExecuteneeds a bounds check ondmls.rowTypes.Proposed fix — add bounds check in sequenceExecute
rowsAffected, err := res.RowsAffected() if err != nil { log.Warn("get rows affected rows failed", zap.Error(err)) - } else { + } else if i < len(dmls.rowTypes) { w.statistics.RecordRowsAffected(rowsAffected, dmls.rowTypes[i]) + } else { + w.statistics.RecordTotalRowsAffected(rowsAffected, 1) }
🤖 Fix all issues with AI agents
In `@pkg/eventservice/event_scanner.go`:
- Around line 760-778: The change causes event.IsUKChanged to be called for all
updates, changing semantics and cost when p.outputRawChangeEvent is true; revert
to the original behavior by short-circuiting when outputRawChangeEvent is true:
call updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false) and return
p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter) without
invoking event.IsUKChanged; only call event.IsUKChanged and
updateMetricEventServiceSendDMLTypeCount with the real shouldSplit when
outputRawChangeEvent is false so UK-change decoding happens only when splitting
is actually allowed.
In `@pkg/metrics/event_service.go`:
- Around line 173-178: The Help string for the Prometheus metric created in
EventServiceSendDMLTypeCount (prometheus.NewCounterVec with CounterOpts) has a
grammar mistake; update the CounterOpts.Help value from "The number of different
dml events type sent by the event service" to "The number of different dml event
types sent by the event service" so the metric help text reads correctly.
In `@pkg/metrics/statistics.go`:
- Around line 111-124: The Close() method currently omits cleanup for
ExecDMLEventRowsAffectedCounter causing metric label leaks; update Close() to
call ExecDMLEventRowsAffectedCounter.Delete(...) for the same label sets you use
elsewhere by iterating/removing combinations of keyspace
(b.changefeedID.Keyspace()), changefeedID (b.changefeedID.Name()), the
"actual"/"expected" value dimension and the four row_type values ("total",
"insert", "update", "delete") so that any time-series created by
RecordTotalRowsAffected and RecordRowsAffected are deleted when Close() runs.
In `@pkg/sink/mysql/mysql_writer_dml.go`:
- Around line 571-589: There is a duplicate method generateNormalSQLs; remove
this redundant declaration from this file and rely on the single canonical
implementation in mysql_writer_dml_batch.go (or vice versa if you prefer to keep
this one) so the build no longer sees two definitions. If you remove this file's
generateNormalSQLs, ensure any callers still resolve (they can call the
remaining generateNormalSQLs) and, if needed, replace direct logic here with a
call to the kept function (generateNormalSQLs) or to the helper
generateNormalSQL to avoid duplicating behavior; also clean up any now-unused
imports/variables in this file.
- Around line 366-371: The current conflict-detection path in
mysql_writer_dml.go logs full SQL and values with log.Info immediately before
log.Panic (see variables/functions nextRowType, compareKeys, rowKey, rowLists,
and generateBatchSQLsPerEvent), which is noisy and may leak sensitive data;
change the log.Info call to log.Error or log.Warn and remove or redact sensitive
payloads (avoid logging full values array or replace with a safe summary like
value counts/types or a hashed/redacted representation) while keeping contextual
fields (e.g., writer id, i, j, Events metadata) so the panic still includes
useful non-PII diagnostics.
- Around line 870-891: genUpdateSQL may return multiple SQL strings but the TiDB
branch only appends a single common.RowTypeUpdate per call, causing len(sqls) >
len(rowTypes) and panics in sequenceExecute; fix by appending one RowTypeUpdate
per SQL returned: after s, v := w.genUpdateSQL(rows...) and appending s and v to
sqls/values, append len(s) copies of common.RowTypeUpdate to rowTypes (e.g.,
loop or use slice multiplication) so rowTypes, sqls, and values stay aligned.
- Around line 813-818: The current code calls res.RowsAffected() and passes that
to w.statistics.RecordTotalRowsAffected, but res.RowsAffected() on
multi-statement transactions returns only the final statement's count (often 0);
change the logic to use the aggregated count from dmls.RowsAffected() (or if you
can access the driver raw result, use mysql.Result.AllRowsAffected()) when
calling RecordTotalRowsAffected so total rows across all statements are
recorded; update the block around res.RowsAffected() (referencing
res.RowsAffected(), dmls.RowsAffected(), and
w.statistics.RecordTotalRowsAffected()) to prefer dmls.RowsAffected() and fall
back to per-statement totals only if available from the raw mysql result.
🧹 Nitpick comments (7)
pkg/sink/mysql/sql_builder.go (1)
134-142: Pool'sNewfunc doesn't pre-allocaterowTypes, unlike the other slices.For consistency and to avoid an initial nil-slice allocation on first append, consider pre-allocating
rowTypesthe same waysqls,values, andtsPairsare.Proposed fix
var dmlsPool = sync.Pool{ New: func() interface{} { return &preparedDMLs{ sqls: make([]string, 0, 128), values: make([][]interface{}, 0, 128), + rowTypes: make([]common.RowType, 0, 128), tsPairs: make([]tsPair, 0, 128), } }, }pkg/common/kv_entry.go (1)
78-93: Minor: redundant condition inelse if.On line 84, since we already know
len(v.OldValue) != 0(theifon line 82 was false), thelen(v.OldValue) > 0check is always true here. You can simplify to justlen(v.Value) > 0.Also note: an
OpTypePutwithOldValuepresent butValueempty will fall through to"unknown". This matches the existingIsInsert/IsUpdatehelpers, so it seems intentional, but worth a comment if that state is truly impossible.Simplification
func (v *RawKVEntry) GetType() string { switch v.OpType { case OpTypePut: if len(v.OldValue) == 0 { return "insert" - } else if len(v.OldValue) > 0 && len(v.Value) > 0 { + } else if len(v.Value) > 0 { return "update" } case OpTypeDelete:pkg/metrics/statistics.go (1)
111-116: Exported methods missing doc comments.
RecordTotalRowsAffectedandRecordRowsAffectedare exported but lack Go doc comments. The rest of the exported methods in this file (RecordBatchExecution,RecordDDLExecution,Close) all have doc comments.Add doc comments
+// RecordTotalRowsAffected records the total actual and expected rows affected by DML execution. func (b *Statistics) RecordTotalRowsAffected(actualRowsAffected, expectedRowsAffected int64) {+// RecordRowsAffected records the actual rows affected for a specific row type and updates the total counters. func (b *Statistics) RecordRowsAffected(rowsAffected int64, rowType common.RowType) {pkg/sink/mysql/mysql_writer_dml.go (1)
681-685:tx.Commit()error not wrapped witherrors.Trace.Per coding guidelines, errors from library calls should be wrapped immediately. The error from
tx.Commit()is returned bare.Proposed fix
if err = tx.Commit(); err != nil { - return 0, 0, err + return 0, 0, errors.Trace(err) }As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace".pkg/sink/mysql/mysql_writer_dml_test.go (2)
149-156:rowTypesvalues not asserted beyond length checks.Throughout the tests,
rowTypesis captured but only its length is sometimes asserted (e.g., line 152). The actual content ofrowTypes(e.g., whether it containsRowTypeInsert,RowTypeDelete,RowTypeUpdate) is never verified. Adding value assertions would strengthen the test coverage for the new metric propagation.For example, at line 152 you could additionally check:
require.Equal(t, common.RowTypeInsert, rowTypes[0]) require.Equal(t, common.RowTypeInsert, rowTypes[1])
160-163: Missinglen(rowTypes)assertion for safe-mode batch SQL test.Line 160 captures
rowTypesfromgenerateBatchSQLin safe mode, but unlike the unsafe-mode test at line 152, there is norequire.Equal(t, 1, len(rowTypes))assertion here. This should be consistent with other test cases.Proposed fix
sql, args, rowTypes = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) + require.Equal(t, 1, len(rowTypes)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0])pkg/eventservice/event_scanner.go (1)
143-143: Redundantmodeparameter —scanAndMergeEventsalready has access via receiver.
scanAndMergeEventsis a method on*eventScanner, which already storesmodeas a field (s.mode). Passing it as an explicit parameter is unnecessary.♻️ Proposed simplification
-func (s *eventScanner) scanAndMergeEvents( - session *session, - merger *eventMerger, - iter eventstore.EventIterator, - mode int64, -) (bool, error) { +func (s *eventScanner) scanAndMergeEvents( + session *session, + merger *eventMerger, + iter eventstore.EventIterator, +) (bool, error) { tableID := session.dataRange.Span.TableID dispatcher := session.dispatcherStat - processor := newDMLProcessor(s.mounter, s.schemaGetter, dispatcher.filter, dispatcher.info.IsOutputRawChangeEvent(), mode) + processor := newDMLProcessor(s.mounter, s.schemaGetter, dispatcher.filter, dispatcher.info.IsOutputRawChangeEvent(), s.mode)And update the call site:
- interrupted, err := s.scanAndMergeEvents(sess, merger, iter, s.mode) + interrupted, err := s.scanAndMergeEvents(sess, merger, iter)Also applies to: 186-194
| EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: "ticdc", | ||
| Subsystem: "event_service", | ||
| Name: "send_dml_type_count", | ||
| Help: "The number of different dml events type sent by the event service", | ||
| }, []string{"mode", "dml_type"}) |
There was a problem hiding this comment.
Minor grammar nit in Help text.
"The number of different dml events type sent by the event service" should be "The number of different dml event types sent by the event service".
Proposed fix
EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "event_service",
Name: "send_dml_type_count",
- Help: "The number of different dml events type sent by the event service",
+ Help: "The number of different dml event types sent by the event service",
}, []string{"mode", "dml_type"})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{ | |
| Namespace: "ticdc", | |
| Subsystem: "event_service", | |
| Name: "send_dml_type_count", | |
| Help: "The number of different dml events type sent by the event service", | |
| }, []string{"mode", "dml_type"}) | |
| EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{ | |
| Namespace: "ticdc", | |
| Subsystem: "event_service", | |
| Name: "send_dml_type_count", | |
| Help: "The number of different dml event types sent by the event service", | |
| }, []string{"mode", "dml_type"}) |
🤖 Prompt for AI Agents
In `@pkg/metrics/event_service.go` around lines 173 - 178, The Help string for the
Prometheus metric created in EventServiceSendDMLTypeCount
(prometheus.NewCounterVec with CounterOpts) has a grammar mistake; update the
CounterOpts.Help value from "The number of different dml events type sent by the
event service" to "The number of different dml event types sent by the event
service" so the metric help text reads correctly.
pkg/sink/mysql/mysql_writer_dml.go
Outdated
| if nextRowType == common.RowTypeInsert { | ||
| if compareKeys(rowKey, rowLists[j].RowKeys) { | ||
| sql, values, rowTypes := w.generateBatchSQLsPerEvent(events) | ||
| log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id)) | ||
| log.Panic("Here are two invalid rows with the same row type and keys", zap.Any("Events", events), zap.Any("i", i), zap.Any("j", j), zap.Int("writerID", w.id)) | ||
| } |
There was a problem hiding this comment.
log.Info with SQL values on conflict detection paths — noisy and potentially sensitive.
Lines 369 and 408 use log.Info to dump SQL queries and their values right before a log.Panic. Since log.Panic will already terminate the process, this log.Info line should use log.Error or log.Warn at minimum, and consider whether logging full SQL values is appropriate in production (may contain PII).
🤖 Prompt for AI Agents
In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 366 - 371, The current
conflict-detection path in mysql_writer_dml.go logs full SQL and values with
log.Info immediately before log.Panic (see variables/functions nextRowType,
compareKeys, rowKey, rowLists, and generateBatchSQLsPerEvent), which is noisy
and may leak sensitive data; change the log.Info call to log.Error or log.Warn
and remove or redact sensitive payloads (avoid logging full values array or
replace with a safe summary like value counts/types or a hashed/redacted
representation) while keeping contextual fields (e.g., writer id, i, j, Events
metadata) so the panic still includes useful non-PII diagnostics.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/sink/mysql/mysql_writer_dml.go (1)
92-107:⚠️ Potential issue | 🟠 MajorActive-active path omits
rowTypestracking — metrics will silently fail.The active-active SQL generation path (
genActiveActiveSQLand its callees) returns only([]string, [][]interface{})while all other paths return([]string, [][]interface{}, []common.RowType). At line 96,genActiveActiveSQLis called but only two values are captured, leavingrowTypesListuninitialized. Line 106 then appendsniltodmls.rowTypes, creating a length mismatch:dmls.sqlswill be populated whiledmls.rowTypesremains empty. TheRowsAffected()method (sql_builder.go:120) will iterate an empty slice and return 0, causing metrics collection to silently fail for active-active replication.Update
genActiveActiveSQL,generateActiveActiveNormalSQLs, andgenerateActiveActiveBatchSQLto return[]common.RowTypealongside the current return values, and capture all three values at line 96.
Signed-off-by: wk989898 <nhsmwk@gmail.com>
This reverts commit ecbe2b4.
|
/test all |
|
/test all |
What problem does this PR solve?
Issue Number: close #3889
What is changed and how it works?
update go-sql-driver/mysql, it influences the decoder if querying the db by the snapshot read. see https://github.com/go-sql-driver/mysql/releases/tag/v1.8.0
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Improvements