Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a NodeLiveness state machine (Alive → Draining → Stopping), a Drain Controller to drive SetNodeLiveness requests, heartbeat-based liveness observation, liveness-aware schedulers (including a drain scheduler), Coordinator.DrainNode API, and maintainer-side heartbeat/SetNodeLiveness handling. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as API/v1
participant Coord as Coordinator
participant DC as DrainController
participant LV as LivenessView
participant MM as MaintainerManager
participant DS as DrainScheduler
Client->>API: DrainNode(target)
API->>Coord: DrainNode(ctx, target)
Coord->>DC: RequestDrain(target)
DC-->>Coord: ack
Coord->>DC: GetStatus(target)
Coord-->>API: remainingCount
loop periodic heartbeat/advance
MM->>Coord: NodeHeartbeat(liveness, epoch)
Coord->>LV: ObserveHeartbeat(...)
Coord->>DC: ObserveHeartbeat(...)
DC-->>Coord: mark observed
alt readyToStop(target)
DC->>MM: SetNodeLivenessRequest(STOPPING, epoch)
MM->>Coord: SetNodeLivenessResponse(applied, epoch)
Coord->>DC: ObserveSetNodeLivenessResponse(...)
else
DC->>MM: SetNodeLivenessRequest(DRAINING, epoch)
MM->>Coord: SetNodeLivenessResponse(...)
end
end
DS->>LV: GetDrainingOrStoppingNodes()
DS->>Coord: request move operators
Coord->>DS: create/approve MoveMaintainerOperators
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust node draining mechanism for TiCDC, enabling graceful shutdown and maintenance operations. It establishes a clear lifecycle for nodes (Alive, Draining, Stopping) and integrates this liveness awareness across critical components, including the API, coordinator, schedulers, and maintainers. The changes ensure that tasks are appropriately managed during node transitions, preventing new assignments to unhealthy nodes and facilitating the migration of existing workloads. Additionally, it refines table schema comparison by incorporating index column length for greater accuracy. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive node draining mechanism, a significant feature for improving cluster operability. The changes are extensive, touching upon the API, coordinator, schedulers, maintainers, and core server logic.
Key changes include:
- A new
drainpackage with aControllerto manage the node draining state machine. - A new
nodelivenesspackage with aViewto track node liveness with TTL. - A new
drainSchedulerto move work off draining nodes. - Updates to
basicSchedulerandbalanceSchedulerto be aware of node liveness. - A new implementation for the v1
drainCaptureAPI. - Enhancements to the server election and shutdown logic to handle draining/stopping states gracefully.
- Improved correctness in
Liveness.Storeto enforce monotonic state transitions.
The implementation appears robust and well-tested, with new unit and integration tests covering the new functionality. I have one suggestion for code simplification in the DrainNode function to improve readability.
coordinator/controller.go
Outdated
| // v1 drain API must not return 0 until drain completion is proven. | ||
| if nodeState == nodeliveness.StateUnknown || !drainingObserved { | ||
| if remaining == 0 { | ||
| remaining = 1 | ||
| } | ||
| return remaining, nil | ||
| } | ||
|
|
||
| // Return 0 only after STOPPING is observed and all work is done. | ||
| if stoppingObserved && maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 { | ||
| return 0, nil | ||
| } | ||
| if remaining == 0 { | ||
| remaining = 1 | ||
| } | ||
| return remaining, nil |
There was a problem hiding this comment.
The logic to ensure remaining is at least 1 unless drain is complete is a bit complex and can be simplified. The current implementation has a duplicated check if remaining == 0 { remaining = 1 } and an early return that makes the flow harder to follow. A refactoring can make the logic more straightforward.
| // v1 drain API must not return 0 until drain completion is proven. | |
| if nodeState == nodeliveness.StateUnknown || !drainingObserved { | |
| if remaining == 0 { | |
| remaining = 1 | |
| } | |
| return remaining, nil | |
| } | |
| // Return 0 only after STOPPING is observed and all work is done. | |
| if stoppingObserved && maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 { | |
| return 0, nil | |
| } | |
| if remaining == 0 { | |
| remaining = 1 | |
| } | |
| return remaining, nil | |
| // v1 drain API must not return 0 until drain completion is proven. | |
| // Return 0 only after STOPPING is observed and all work is done, and the | |
| // node liveness is not unknown. | |
| if nodeState != nodeliveness.StateUnknown && stoppingObserved && | |
| maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 { | |
| return 0, nil | |
| } | |
| if remaining == 0 { | |
| return 1, nil | |
| } | |
| return remaining, nil |
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
api/v1/api.go (1)
195-195:⚠️ Potential issue | 🟡 MinorStale TODO comment — the drain API is now implemented.
The comment says "currently it is a no-op" but the method now delegates to
co.DrainNode. Remove or update the TODO.Suggested fix
// drainCapture drains all tables from a capture. // Usage: // curl -X PUT http://127.0.0.1:8300/api/v1/captures/drain -// TODO: Implement this API in the future, currently it is a no-op. func (o *OpenAPIV1) drainCapture(c *gin.Context) {server/module_election.go (1)
322-341:⚠️ Potential issue | 🟡 MinorLog messages for "resign failed" and "resign timeout" are swapped.
When
DeadlineExceededis the cause (line 330), the code logs "resign failed" (line 331) and returns the error. For other errors, it logs "resign timeout" (line 336). This is the inverse of the coordinator resign flow (lines 173-184) whereDeadlineExceededlogs "timeout" and other errors log "failed".Proposed fix
func (e *elector) resignLogCoordinator() error { if e.logElection == nil { return nil } nodeID := string(e.svr.info.ID) // use a new context to prevent the context from being cancelled. resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { - if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { - log.Info("log coordinator resign failed", + if errors.Cause(resignErr) != context.DeadlineExceeded { + log.Info("log coordinator resign failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) cancel() return errors.Trace(resignErr) } log.Warn("log coordinator resign timeout", zap.String("nodeID", nodeID), zap.Error(resignErr)) } cancel() return nil }
🤖 Fix all issues with AI agents
In `@coordinator/controller_drain_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header used across the
repository to the top of this file (above the "package coordinator" declaration)
so CI stops failing; copy the exact header text used in other files (for example
the header at the top of coordinator.go) and place it verbatim at the top of
coordinator/controller_drain_test.go.
In `@coordinator/drain/controller_test.go`:
- Line 1: Add the standard PingCAP copyright header to the top of
controller_test.go (above the line "package drain")—mirror the exact header used
in view_test.go so CI recognizes it; ensure the header appears before the
package declaration in controller_test.go.
In `@coordinator/drain/controller.go`:
- Line 1: Missing Apache 2.0 copyright header: add the project's standard Apache
2.0 header comment block to the top of controller.go above the existing package
drain declaration, ensuring it includes the correct copyright year and copyright
holder name used across the repo and matches the exact header format used in
other files.
In `@coordinator/nodeliveness/view_test.go`:
- Line 1: Add the standard PingCAP copyright/header block to the top of this
file (view_test.go) above the package nodeliveness declaration so the header
matches the project's required copyright format; ensure the exact standard
header text used across the repo is pasted verbatim and that there are no extra
characters or blank lines before the package statement.
In `@coordinator/nodeliveness/view.go`:
- Line 1: This file is missing the standard Apache 2.0 copyright header; add the
project's canonical Apache-2.0 header comment block immediately above the
"package nodeliveness" declaration in view.go (matching the same header style
used across the repo, e.g., full copyright line(s) and license notice or SPDX
identifier) so CI/pipeline recognizes the file as licensed.
In `@coordinator/scheduler/balance.go`:
- Around line 83-93: The code can call pkgScheduler.CheckBalanceStatus with an
empty activeNodes map which leads to division by zero inside CheckBalanceStatus;
modify the block around activeNodes (where s.nodeManager.GetAliveNodes(), the
liveness filter using s.livenessView.IsSchedulableDest, and the call to
pkgScheduler.CheckBalanceStatus with s.changefeedDB.GetTaskSizePerNode()) to
check if activeNodes is empty after filtering and return early (or set moveSize
to zero / skip calling CheckBalanceStatus) so CheckBalanceStatus never receives
an empty nodeSize map.
In `@coordinator/scheduler/drain_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header to the top of
drain_test.go (the file that begins with "package scheduler") so the CI
recognizes proper copyright information; insert the full multi-line header
comment block immediately above the package declaration and ensure it matches
the project's standard header format exactly.
In `@coordinator/scheduler/drain.go`:
- Line 1: The file starting with "package scheduler" is missing the standard
Apache-2.0 copyright header which causes CI failure; add the same Apache 2.0
header block used in drain_test.go at the top of this file above the package
declaration so the header text and year/owner match the repository's standard.
In `@maintainer/node_liveness_test.go`:
- Line 1: Add the project's standard copyright/header to the top of the new test
file (node_liveness_test.go) so CI stops failing; open the file that declares
"package maintainer" and prepend the same license/author header used across
other test files in the repo (matching formatting and year) before the package
declaration.
In `@pkg/api/util_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header to the top of this file
(pkg/api/util_test.go) so CI stops failing; open the file beginning with the
standard multi-line Apache 2.0 header comment block above the package api
declaration and ensure the header text exactly matches the project’s canonical
Apache 2.0 template.
In `@pkg/api/util.go`:
- Around line 170-185: The loop allows invalid Liveness values >
LivenessCaptureStopping to be CAS'd when old == LivenessCaptureStopping; add a
defensive range check at the start of the setter so any incoming v outside the
valid enum range (v < LivenessCaptureAlive || v > LivenessCaptureStopping)
returns false immediately. Locate the loop using l.Load(), the constants
LivenessCaptureAlive/LivenessCaptureDraining/LivenessCaptureStopping, and the
atomic.CompareAndSwapInt32 call and enforce the range check before the existing
guards and CAS attempt.
In `@server/module_election.go`:
- Around line 255-258: The resign failure is being swallowed because the handler
returns errors.Trace(err) (where err is the prior campaign result and likely
nil) instead of the resign error; update the error return in the block that
calls e.resignLogCoordinator() (the resignErr variable) to return
errors.Trace(resignErr) so the actual resign failure from
e.resignLogCoordinator() is propagated and not masked by the earlier err
variable.
🧹 Nitpick comments (10)
pkg/api/util.go (1)
193-203: Pre-existing:String()reads*lnon-atomically.Line 194 dereferences
*ldirectly instead of usingl.Load(). IfString()is called concurrently withStore(), this is a data race. Not introduced by this PR, but worth noting since you're touching this area.Suggested fix
func (l *Liveness) String() string { - switch *l { + switch l.Load() { case LivenessCaptureAlive:api/v1/api.go (1)
202-219: Implementation looks good overall.The flow is clean: validate input → get coordinator → call
DrainNode→ return remaining count with HTTP 202. One consideration: ifreq.CaptureIDis empty, it will pass through toDrainNodewith an emptynode.ID. You may want to validate thatCaptureIDis non-empty up front and return a 400 error, consistent with how other endpoints handle missing parameters.Optional: validate non-empty CaptureID
+ if req.CaptureID == "" { + _ = c.Error(errors.ErrAPIInvalidParam.GenWithStackByArgs("capture_id is required")) + return + } + co, err := o.server.GetCoordinator()coordinator/controller.go (1)
845-883:DrainNodedoes not acquireapiLockunlike other public API methods.Other API methods (
CreateChangefeed,RemoveChangefeed,PauseChangefeed, etc.) acquirec.apiLockbefore accessingchangefeedDBandoperatorController.DrainNodeskips this. If this is intentional (becauseDrainNodeis read-only and the underlying stores are internally synchronized), consider adding a brief comment to document the rationale. Otherwise, consider acquiringc.apiLock.RLock()for consistency.coordinator/scheduler/drain.go (1)
49-112: Round-robin cursor advancement may skip draining nodes unfairly.
s.rrCursoris incremented on every outer-loop iteration (line 98), regardless of how many inner-loop iterations completed. If the outer loop runs multiple times in a singleExecute()call, the cursor advances by the number of outer iterations, potentially skipping nodes. For example, with 3 draining nodes andavailableSize = 6, the cursor could advance by up to 6 in one call, wrapping around multiple times.This is a minor fairness concern rather than a correctness bug, since all draining nodes are still visited in each inner-loop pass. If you want strict round-robin across
Execute()calls, consider advancing the cursor only once perExecute()invocation.server/module_election.go (1)
266-284: Resign error silently discarded when liveness transitions to STOPPING.Line 278 discards the
resignLogCoordinator()error with_ =. If the resign fails, the log coordinator's etcd key remains held, butleaderCancel()on line 279 stops the coordinator anyway. This could cause a brief period where another node cannot win the election. Consider logging the error at minimum.Proposed fix
case <-ticker.C: if e.svr.liveness.Load() == api.LivenessCaptureStopping { log.Info("node is stopping, resign log coordinator actively", zap.String("nodeID", nodeID)) - _ = e.resignLogCoordinator() + if resignErr := e.resignLogCoordinator(); resignErr != nil { + log.Warn("resign log coordinator failed during stop", + zap.String("nodeID", nodeID), zap.Error(resignErr)) + } leaderCancel() return }coordinator/nodeliveness/view.go (1)
60-86: Observe methods unconditionally overwrite records; no epoch or monotonicity guard.Both
ObserveHeartbeatandObserveSetNodeLivenessResponsereplace the record without checkingnodeEpochor ensuring monotonic liveness transitions. If a delayed/reordered heartbeat withALIVEarrives after aSetNodeLivenessResponsewithDRAINING, the view briefly regresses toAlive, potentially causing the drain scheduler to skip the node.Consider adding an epoch check or a monotonicity guard (e.g., only accept transitions forward: Alive → Draining → Stopping).
maintainer/maintainer_manager.go (1)
442-466: Conversion helpers are stateless — could be package-level functions.
fromNodeLivenessPBandtoNodeLivenessPBdon't use anyManagerstate. Making them package-level functions (or moving them to a shared conversion package) would clarify their nature and improve reusability.coordinator/drain/controller.go (3)
29-37: Consider usingsync.RWMutexfor better read concurrency.
GetStatus(line 106) is a read-only method but acquires a fullMutex.Lock(). IfGetStatusis called on a hot path (e.g., fromDrainNodeAPI), switching tosync.RWMutexand usingRLockinGetStatuswould avoid contention with concurrent reads.
200-203: Info-level log on every successful liveness command send may be noisy.
sendSetNodeLivenesslogs atInfolevel on every successful send (line 200). Since commands are resent everyresendInterval(1 second, as wired incontroller.goline 168) until observed, this could produce substantial log volume during drains. ConsiderDebuglevel for resends, or loggingInfoonly on the first send.As per coding guidelines, "log message strings should not include function names and should avoid hyphens (use spaces instead)" — the current message format is compliant.
181-204: Epoch 0 sent when liveness view has no record for the node.When
livenessView.GetNodeEpochreturnsfalse(no record),epochdefaults to0. The maintainer rejects requests wherereq.NodeEpoch != m.nodeEpoch(seemaintainer_manager.goline 402), so the command will be silently rejected if the maintainer's epoch is non-zero. The command will be resent on the next tick, which is fine, but consider logging a debug message when the epoch is not found to aid troubleshooting.
| for { | ||
| old := l.Load() | ||
| if v <= old { | ||
| return false | ||
| } | ||
| if old == LivenessCaptureAlive && v != LivenessCaptureDraining { | ||
| return false | ||
| } | ||
| if old == LivenessCaptureDraining && v != LivenessCaptureStopping { | ||
| return false | ||
| } | ||
|
|
||
| if atomic.CompareAndSwapInt32((*int32)(l), int32(old), int32(v)) { | ||
| return true | ||
| } | ||
| } |
There was a problem hiding this comment.
Missing guard for out-of-range values when old == LivenessCaptureStopping.
If old is Stopping(2), the v <= old check on line 172 rejects values ≤ 2, but any value > 2 (e.g., an invalid Liveness(3)) falls through all guards and gets CAS'd in. While callers should only use defined constants, adding a range check would make the state machine defensive.
Proposed fix
func (l *Liveness) Store(v Liveness) bool {
+ if v < LivenessCaptureDraining || v > LivenessCaptureStopping {
+ return false
+ }
for {
old := l.Load()🤖 Prompt for AI Agents
In `@pkg/api/util.go` around lines 170 - 185, The loop allows invalid Liveness
values > LivenessCaptureStopping to be CAS'd when old ==
LivenessCaptureStopping; add a defensive range check at the start of the setter
so any incoming v outside the valid enum range (v < LivenessCaptureAlive || v >
LivenessCaptureStopping) returns false immediately. Locate the loop using
l.Load(), the constants
LivenessCaptureAlive/LivenessCaptureDraining/LivenessCaptureStopping, and the
atomic.CompareAndSwapInt32 call and enforce the range check before the existing
guards and CAS attempt.
server/module_election.go
Outdated
| if resignErr := e.resignLogCoordinator(); resignErr != nil { | ||
| log.Warn("resign log coordinator actively failed", | ||
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | ||
| return errors.Trace(err) |
There was a problem hiding this comment.
Bug: returns err (which is nil after a successful campaign) instead of resignErr.
When resignLogCoordinator() fails, the error is logged but line 258 returns errors.Trace(err) where err is the campaign result (nil at this point), effectively swallowing the resign failure.
Proposed fix
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
- return errors.Trace(err)
+ return errors.Trace(resignErr)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if resignErr := e.resignLogCoordinator(); resignErr != nil { | |
| log.Warn("resign log coordinator actively failed", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| return errors.Trace(err) | |
| if resignErr := e.resignLogCoordinator(); resignErr != nil { | |
| log.Warn("resign log coordinator actively failed", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| return errors.Trace(resignErr) | |
| } |
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 255 - 258, The resign failure is
being swallowed because the handler returns errors.Trace(err) (where err is the
prior campaign result and likely nil) instead of the resign error; update the
error return in the block that calls e.resignLogCoordinator() (the resignErr
variable) to return errors.Trace(resignErr) so the actual resign failure from
e.resignLogCoordinator() is propagated and not masked by the earlier err
variable.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
server/module_election.go (2)
119-122:⚠️ Potential issue | 🔴 CriticalBug: returns
err(which isnilafter a successful campaign) instead ofresignErr.Same class of bug as flagged at line 239 for the log coordinator path. Here
erris the campaign result (nilon success), so the resign failure is silently swallowed.Proposed fix
if resignErr := e.resign(ctx); resignErr != nil { log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) - return errors.Trace(err) + return errors.Trace(resignErr) }
289-298:⚠️ Potential issue | 🟡 MinorBug: "resign failed" and "resign timeout" log messages are swapped.
When
context.DeadlineExceededis matched (line 290), the log says "resign failed" (line 291). The else branch (non-timeout error) logs "resign timeout" (line 296). These labels are reversed.Proposed fix
if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { - log.Info("log coordinator resign failed", + log.Warn("log coordinator resign timeout", zap.String("nodeID", nodeID), zap.Error(resignErr)) cancel() return errors.Trace(resignErr) } - log.Warn("log coordinator resign timeout", + log.Warn("log coordinator resign failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) }
🤖 Fix all issues with AI agents
In `@coordinator/drain/controller.go`:
- Around line 210-233: sendSetNodeLiveness currently sends epoch=0 when
livenessView.GetNodeEpoch(nodeID) returns false, causing the maintainer
(maintainer_manager.go) to reject the request as an epoch mismatch; update
Controller.sendSetNodeLiveness to detect the missing epoch (ok == false) and log
a clear warning (include nodeID and that node epoch is unknown) before sending
the command so operators can diagnose why the drain may be delayed; keep the
message creation and send flow unchanged (still use epoch variable) so behavior
is unchanged but observable.
In `@coordinator/nodeliveness/view.go`:
- Around line 61-87: ObserveHeartbeat and ObserveSetNodeLivenessResponse
unconditionally overwrite the node record, allowing out-of-order messages to
regress liveness; fix by making updates monotonic per node epoch: in both
functions load existing record from v.nodes[nodeID], if resp.NodeEpoch or
hb.NodeEpoch is greater than the stored nodeEpoch accept and replace (new epoch
resets to ALIVE), if equal only accept the update when the new liveness is the
same or a forward transition (ALIVE→DRAINING→STOPPING) and update lastSeen
appropriately, and if the new epoch is older or the liveness would move backward
reject the update; use the record struct fields nodeEpoch and liveness to
perform these checks inside ObserveHeartbeat and ObserveSetNodeLivenessResponse
while holding v.mu.
🧹 Nitpick comments (7)
api/v1/api.go (1)
201-229: Wrap errors from internal calls before passing to gin context.Per coding guidelines, errors from internal/library calls should be wrapped with
errors.Trace(err)to attach stack traces. The errors fromSelfInfo(),GetCoordinator(), andDrainNode()are passed directly toc.Error()without wrapping.Proposed fix
self, err := o.server.SelfInfo() if err != nil { - _ = c.Error(err) + _ = c.Error(errors.Trace(err)) return } // For compatibility with old arch TiCDC, draining the current owner is not allowed. if target == self.ID { _ = c.Error(errors.ErrSchedulerRequestFailed.GenWithStackByArgs("cannot drain the owner")) return } co, err := o.server.GetCoordinator() if err != nil { - _ = c.Error(err) + _ = c.Error(errors.Trace(err)) return } remaining, err := co.DrainNode(c.Request.Context(), target) if err != nil { - _ = c.Error(err) + _ = c.Error(errors.Trace(err)) return }As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace".coordinator/nodeliveness/view.go (1)
99-122:GetStateusestime.Now()internally, unlike other methods that acceptnow.
GetDrainingOrStoppingNodestakes anowparameter for TTL checks, butGetState(and by extensionIsSchedulableDest) callstime.Now()internally. This inconsistency makes the TTL behavior harder to test and could lead to subtle differences in TTL evaluation within the same tick.Consider accepting
now time.Timeas a parameter for consistency, or documenting why the divergence is intentional.maintainer/maintainer_manager.go (2)
448-472:fromNodeLivenessPBandtoNodeLivenessPBdon't use the receiver — make them standalone functions.These methods are bound to
*Managerbut reference no fields fromm. Converting them to package-level functions (or even placing them in a shared conversion utility) would clarify that they're pure mappings and allow reuse without aManagerinstance.♻️ Proposed refactor
-func (m *Manager) fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness { +func fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness { switch l { case heartbeatpb.NodeLiveness_ALIVE: return api.LivenessCaptureAlive @@ ... -func (m *Manager) toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness { +func toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness { switch l { case api.LivenessCaptureAlive: return heartbeatpb.NodeLiveness_ALIVEThen update call sites (Lines 211, 417, 436) to drop the
m.prefix.
279-284: Good: rejectingAddMaintainerwhen stopping prevents scheduling work onto a draining node.This is a solid safety check. Consider also logging at
Warnlevel instead ofInfosince this represents an unexpected request from the coordinator's perspective (it shouldn't be sending work to a stopping node).coordinator/drain/controller.go (3)
132-150:AdvanceLivenessre-acquires the lock per node viaGetStatus— consider a locked snapshot.
listDrainRequestedNodeIDs()takes a lock to snapshot node IDs, then for each node,GetStatus()re-acquires the lock, andtrySendDrainCommand/trySendStopCommandeach acquire it again viacheckAndMarkCommandSend. This is 3 lock round-trips per node per tick.Since state only moves forward (monotonic), this isn't a correctness issue, but it's worth noting for performance if the number of draining nodes grows. A single locked pass that snapshots both IDs and states would reduce contention.
36-55: No mechanism to remove nodes from thenodesmap.Once a node enters the drain workflow, its
nodeStatepersists in the map indefinitely. If the coordinator is long-lived and nodes are added/removed over time, this is a slow memory leak. Consider adding aRemoveNode(nodeID)method or clearing entries oncestoppingObservedis true and the node has been fully removed from the cluster.
15-15:resendIntervalis a hardcoded constant — consider making it configurable.The AI summary mentioned a
resendIntervalparameter in the constructor. The current implementation uses a package-level constant of 1 second. For testing and tuning in production, injecting this via the constructor (or a config struct) would be more flexible.
| // sendSetNodeLiveness sends a liveness command to the target maintainer manager. | ||
| func (c *Controller) sendSetNodeLiveness(nodeID node.ID, target heartbeatpb.NodeLiveness) { | ||
| var epoch uint64 | ||
| if c.livenessView != nil { | ||
| if e, ok := c.livenessView.GetNodeEpoch(nodeID); ok { | ||
| epoch = e | ||
| } | ||
| } | ||
|
|
||
| msg := messaging.NewSingleTargetMessage(nodeID, messaging.MaintainerManagerTopic, &heartbeatpb.SetNodeLivenessRequest{ | ||
| Target: target, | ||
| NodeEpoch: epoch, | ||
| }) | ||
| if err := c.mc.SendCommand(msg); err != nil { | ||
| log.Warn("send set node liveness command failed", | ||
| zap.Stringer("nodeID", nodeID), | ||
| zap.String("target", target.String()), | ||
| zap.Error(err)) | ||
| return | ||
| } | ||
| log.Info("send set node liveness command", | ||
| zap.Stringer("nodeID", nodeID), | ||
| zap.String("target", target.String()), | ||
| zap.Uint64("epoch", epoch)) |
There was a problem hiding this comment.
Drain commands will be silently rejected when the node epoch is unknown.
When livenessView has no epoch for the target node (Line 214, ok is false), epoch defaults to 0. On the receiving side (maintainer_manager.go Line 408), the node compares req.NodeEpoch != m.nodeEpoch — and since newNodeEpoch() guarantees a non-zero value, the request is always rejected with an epoch mismatch log.
This means drain commands sent before the coordinator observes any heartbeat from the target node will be silently ineffective. The resend logic will eventually retry after a heartbeat arrives and populates the epoch, so it's self-healing, but it could delay drain initiation unexpectedly.
Consider either:
- Logging a warning when epoch is not found so operators can diagnose delays, or
- Treating epoch=0 as "match any" on the receiver side for the initial transition.
#!/bin/bash
# Verify how livenessView.GetNodeEpoch is populated — check if heartbeat observation stores the epoch
ast-grep --pattern $'func ($_ *View) GetNodeEpoch($_, $_) ($_, $_) {
$$$
}'
# Also check where ObserveHeartbeat feeds into the view
rg -n "ObserveHeartbeat\|UpdateNodeEpoch\|SetNodeEpoch\|nodeEpoch" --type go -g '*/nodeliveness/*'🤖 Prompt for AI Agents
In `@coordinator/drain/controller.go` around lines 210 - 233, sendSetNodeLiveness
currently sends epoch=0 when livenessView.GetNodeEpoch(nodeID) returns false,
causing the maintainer (maintainer_manager.go) to reject the request as an epoch
mismatch; update Controller.sendSetNodeLiveness to detect the missing epoch (ok
== false) and log a clear warning (include nodeID and that node epoch is
unknown) before sending the command so operators can diagnose why the drain may
be delayed; keep the message creation and send flow unchanged (still use epoch
variable) so behavior is unchanged but observable.
coordinator/nodeliveness/view.go
Outdated
| func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) { | ||
| if hb == nil { | ||
| return | ||
| } | ||
|
|
||
| v.mu.Lock() | ||
| v.nodes[nodeID] = record{ | ||
| lastSeen: now, | ||
| nodeEpoch: hb.NodeEpoch, | ||
| liveness: hb.Liveness, | ||
| } | ||
| v.mu.Unlock() | ||
| } | ||
|
|
||
| func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) { | ||
| if resp == nil { | ||
| return | ||
| } | ||
|
|
||
| v.mu.Lock() | ||
| v.nodes[nodeID] = record{ | ||
| lastSeen: now, | ||
| nodeEpoch: resp.NodeEpoch, | ||
| liveness: resp.Applied, | ||
| } | ||
| v.mu.Unlock() | ||
| } |
There was a problem hiding this comment.
Unconditional record overwrite may regress liveness state on out-of-order messages.
Both ObserveHeartbeat and ObserveSetNodeLivenessResponse unconditionally overwrite the record. If a delayed heartbeat (reporting ALIVE) arrives after a SetNodeLivenessResponse (applying DRAINING), the view would briefly revert to ALIVE, which could cause the scheduler to route work to a draining node.
Since liveness transitions are monotonic (ALIVE→DRAINING→STOPPING), consider rejecting updates that would move backward:
Sketch
func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) {
if hb == nil {
return
}
v.mu.Lock()
+ if existing, ok := v.nodes[nodeID]; ok {
+ if hb.NodeEpoch == existing.nodeEpoch && hb.Liveness < existing.liveness {
+ v.mu.Unlock()
+ return
+ }
+ }
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: hb.NodeEpoch,
liveness: hb.Liveness,
}
v.mu.Unlock()
}This would also need analogous handling in ObserveSetNodeLivenessResponse. A new epoch (node restart) should still be allowed to reset to ALIVE.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) { | |
| if hb == nil { | |
| return | |
| } | |
| v.mu.Lock() | |
| v.nodes[nodeID] = record{ | |
| lastSeen: now, | |
| nodeEpoch: hb.NodeEpoch, | |
| liveness: hb.Liveness, | |
| } | |
| v.mu.Unlock() | |
| } | |
| func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) { | |
| if resp == nil { | |
| return | |
| } | |
| v.mu.Lock() | |
| v.nodes[nodeID] = record{ | |
| lastSeen: now, | |
| nodeEpoch: resp.NodeEpoch, | |
| liveness: resp.Applied, | |
| } | |
| v.mu.Unlock() | |
| } | |
| func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) { | |
| if hb == nil { | |
| return | |
| } | |
| v.mu.Lock() | |
| if existing, ok := v.nodes[nodeID]; ok { | |
| if hb.NodeEpoch == existing.nodeEpoch && hb.Liveness < existing.liveness { | |
| v.mu.Unlock() | |
| return | |
| } | |
| } | |
| v.nodes[nodeID] = record{ | |
| lastSeen: now, | |
| nodeEpoch: hb.NodeEpoch, | |
| liveness: hb.Liveness, | |
| } | |
| v.mu.Unlock() | |
| } | |
| func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) { | |
| if resp == nil { | |
| return | |
| } | |
| v.mu.Lock() | |
| v.nodes[nodeID] = record{ | |
| lastSeen: now, | |
| nodeEpoch: resp.NodeEpoch, | |
| liveness: resp.Applied, | |
| } | |
| v.mu.Unlock() | |
| } |
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 61 - 87, ObserveHeartbeat and
ObserveSetNodeLivenessResponse unconditionally overwrite the node record,
allowing out-of-order messages to regress liveness; fix by making updates
monotonic per node epoch: in both functions load existing record from
v.nodes[nodeID], if resp.NodeEpoch or hb.NodeEpoch is greater than the stored
nodeEpoch accept and replace (new epoch resets to ALIVE), if equal only accept
the update when the new liveness is the same or a forward transition
(ALIVE→DRAINING→STOPPING) and update lastSeen appropriately, and if the new
epoch is older or the liveness would move backward reject the update; use the
record struct fields nodeEpoch and liveness to perform these checks inside
ObserveHeartbeat and ObserveSetNodeLivenessResponse while holding v.mu.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@coordinator/nodeliveness/view.go`:
- Around line 111-134: GetState currently calls time.Now() internally which
prevents deterministic tests and is inconsistent with
GetDrainingOrStoppingNodes; change View.GetState to accept a now time.Time
parameter (func (v *View) GetState(nodeID node.ID, now time.Time) State), update
its internal uses of now (replace the time.Now() call), and then update all
callers (notably IsSchedulableDest) to pass through a now value (e.g., take now
as a parameter or forward the now they already have) so TTL comparisons use the
same time source as GetDrainingOrStoppingNodes.
🧹 Nitpick comments (6)
coordinator/scheduler/drain.go (2)
105-118: Round-robin cursor advances on every outer-loop iteration, not perExecutecall.
s.rrCursoris incremented at line 114 inside thefor scheduled < availableSizeloop, meaning it advances multiple times perExecutecall (once per full sweep of draining nodes). This provides fairness across multiple batches within a single call, but is worth confirming this is the intended behavior — the comment on line 41 says "rotates the starting draining node" which could imply once-per-call advancement.
159-180:chooseLeastLoadedDestreturns a false positive when all candidates equal origin andnode.IDzero value is"".If every candidate in
destCandidatesmatchesorigin,chosenremains the zero-value empty string and the guard at line 176 (chosen.IsEmpty()) catches it. This works, but only becausenode.ID("")is never a legitimate node ID. The logic is correct but brittle — consider returningfalseexplicitly whenminSize == math.MaxIntfor clarity.coordinator/drain/controller.go (2)
130-139:GetStatusacquires an exclusive lock for a read-only operation.Use
RLock/RUnlockhere to allow concurrent readers.Suggested fix
func (c *Controller) GetStatus(nodeID node.ID) (drainRequested, drainingObserved, stoppingObserved bool) { - c.mu.Lock() - defer c.mu.Unlock() + c.mu.RLock() + defer c.mu.RUnlock() st, ok := c.nodes[nodeID]
144-162: TOCTOU betweenGetStatusandtrySendDrainCommand/trySendStopCommand— benign but worth noting.
AdvanceLivenessreads state viaGetStatus(acquires and releases lock), then conditionally calls send methods (which re-acquire the lock). Between these calls another goroutine could change the state. In practice this is harmless — the worst case is an extra throttled command or a skipped iteration that self-heals on the next cycle. No action needed now, but if the controller becomes multi-goroutine-hot, consolidating the check-and-send under a single lock hold would eliminate the gap.coordinator/drain/controller_test.go (2)
38-45: Blocking channel reads will hang the test forever if no message is sent.Lines 39, 60, and 71 use
<-mc.GetMessageChannel()which blocks indefinitely. If a code change causes the message not to be sent, the test hangs rather than failing with a useful message.Consider using a helper with a timeout:
Suggested approach
func receiveOrFail(t *testing.T, ch <-chan *messaging.TargetMessage) *messaging.TargetMessage { t.Helper() select { case msg := <-ch: return msg case <-time.After(3 * time.Second): require.FailNow(t, "timed out waiting for message") return nil } }Also applies to: 59-62, 70-73
26-74: Test coverage is thin — consider adding cases for key scenarios.The single test covers the happy path well. Missing scenarios that would strengthen confidence:
- Unknown epoch:
RequestDrainbefore any heartbeat observed (epoch=0 path).- Multiple draining nodes: verifying fair round-robin or independent progression.
- readyToStop=false:
AdvanceLivenessshould not send STOPPING when the callback returns false.- Idempotent
RequestDrain: calling it twice should not double-send.
coordinator/nodeliveness/view.go
Outdated
| func (v *View) GetState(nodeID node.ID) State { | ||
| v.mu.RLock() | ||
| r, ok := v.nodes[nodeID] | ||
| v.mu.RUnlock() | ||
|
|
||
| now := time.Now() | ||
| if !ok { | ||
| // Never observed: keep compatibility during rollout. | ||
| return StateAlive | ||
| } | ||
| if now.Sub(r.lastSeen) > v.ttl { | ||
| return StateUnknown | ||
| } | ||
| switch r.liveness { | ||
| case heartbeatpb.NodeLiveness_ALIVE: | ||
| return StateAlive | ||
| case heartbeatpb.NodeLiveness_DRAINING: | ||
| return StateDraining | ||
| case heartbeatpb.NodeLiveness_STOPPING: | ||
| return StateStopping | ||
| default: | ||
| return StateAlive | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
GetState uses time.Now() internally, making it untestable and inconsistent with GetDrainingOrStoppingNodes.
GetDrainingOrStoppingNodes accepts a now time.Time parameter, but GetState (and therefore IsSchedulableDest) hardcodes time.Now(). This makes TTL-dependent behavior impossible to test deterministically and creates an inconsistency in the API surface.
Suggested fix
-func (v *View) GetState(nodeID node.ID) State {
+func (v *View) GetState(nodeID node.ID, now time.Time) State {
v.mu.RLock()
r, ok := v.nodes[nodeID]
v.mu.RUnlock()
- now := time.Now()
if !ok {
return StateAlive
}Then update IsSchedulableDest similarly:
-func (v *View) IsSchedulableDest(nodeID node.ID) bool {
- return v.GetState(nodeID) == StateAlive
+func (v *View) IsSchedulableDest(nodeID node.ID, now time.Time) bool {
+ return v.GetState(nodeID, now) == StateAlive
}🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 111 - 134, GetState currently
calls time.Now() internally which prevents deterministic tests and is
inconsistent with GetDrainingOrStoppingNodes; change View.GetState to accept a
now time.Time parameter (func (v *View) GetState(nodeID node.ID, now time.Time)
State), update its internal uses of now (replace the time.Now() call), and then
update all callers (notably IsSchedulableDest) to pass through a now value
(e.g., take now as a parameter or forward the now they already have) so TTL
comparisons use the same time source as GetDrainingOrStoppingNodes.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive node draining mechanism, a significant feature for improving cluster manageability. However, a critical security concern has been identified: the new drainCapture API endpoint in api/v1/api.go is exposed without authentication, which could allow unauthenticated users to disrupt the cluster. Additionally, a potential issue regarding error handling during log coordinator resignation should be addressed.
| // Usage: | ||
| // curl -X PUT http://127.0.0.1:8300/api/v1/captures/drain | ||
| // TODO: Implement this API in the future, currently it is a no-op. | ||
| func (o *OpenAPIV1) drainCapture(c *gin.Context) { |
There was a problem hiding this comment.
The drainCapture function implements a sensitive cluster management operation (draining a capture), but the pull request does not ensure that the corresponding route registration is protected by authentication middleware. In RegisterOpenAPIV1Routes, the PUT /drain route is registered without authenticateMiddleware, unlike other sensitive operations in the same file (e.g., createChangefeed, updateChangefeed). This could allow unauthenticated users to disrupt the cluster by triggering node drains. It is recommended to add authenticateMiddleware to the route registration for drainCapture in RegisterOpenAPIV1Routes.
server/module_election.go
Outdated
| if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { | ||
| if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { | ||
| log.Info("log coordinator resign failed", | ||
| log.Info("log coordinator resign timeout", | ||
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | ||
| cancel() | ||
| return errors.Trace(resignErr) | ||
| } | ||
| log.Warn("log coordinator resign timeout", | ||
| log.Warn("log coordinator resign failed", | ||
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | ||
| } | ||
| cancel() |
There was a problem hiding this comment.
This function currently swallows non-timeout errors from e.logElection.Resign. If Resign fails for a reason other than a timeout, the error is logged as a warning, but the function returns nil. This could mislead the caller into believing the resignation was successful when it wasn't, potentially leading to an inconsistent state where the node is still the log coordinator leader in etcd.
The error should be returned to the caller to handle it properly. Using defer cancel() would also simplify the context cancellation logic.
| if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { | |
| if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { | |
| log.Info("log coordinator resign failed", | |
| log.Info("log coordinator resign timeout", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| cancel() | |
| return errors.Trace(resignErr) | |
| } | |
| log.Warn("log coordinator resign timeout", | |
| log.Warn("log coordinator resign failed", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| } | |
| cancel() | |
| defer cancel() | |
| if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { | |
| if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { | |
| log.Info("log coordinator resign timeout", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| } else { | |
| log.Warn("log coordinator resign failed", | |
| zap.String("nodeID", nodeID), zap.Error(resignErr)) | |
| } | |
| return errors.Trace(resignErr) | |
| } |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
server/module_election.go (2)
116-123:⚠️ Potential issue | 🔴 CriticalBug:
errors.Trace(err)returnsnilinstead of the resign error.At line 121,
erris the result ofCampaign(line 95), which isnilat this point (the campaign succeeded for execution to reach this branch). This swallows the resign failure. Same pattern as the known bug on line 239.Proposed fix
if resignErr := e.resign(ctx); resignErr != nil { log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) - return errors.Trace(err) + return errors.Trace(resignErr) }
282-301:⚠️ Potential issue | 🟠 Major
resignLogCoordinatorsilently swallows non-timeout resign errors.When
Resignfails with an error other thancontext.DeadlineExceeded, line 296 logs a warning but line 300 returnsnil. This means a genuine resign failure (e.g., network error) is silently ignored and the caller assumes success.Compare with the coordinator's resign path (line 155-161) which propagates the error. Consider returning the error here as well.
Proposed fix
if resignErr := e.logElection.Resign(resignCtx); resignErr != nil { if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { log.Info("log coordinator resign timeout", zap.String("nodeID", nodeID), zap.Error(resignErr)) cancel() return errors.Trace(resignErr) } log.Warn("log coordinator resign failed", zap.String("nodeID", nodeID), zap.Error(resignErr)) + cancel() + return errors.Trace(resignErr) } cancel() return nil
🤖 Fix all issues with AI agents
In `@coordinator/drain/controller.go`:
- Around line 12-13: Add a single blank line between the file license header and
the package declaration to match project formatting; update the top of
controller.go so the license block is followed by one empty line before the
"package drain" statement (the package declaration symbol is package drain).
In `@coordinator/nodeliveness/view_test.go`:
- Around line 12-13: Add a blank line between the license header and the package
declaration in this file so it matches the project's formatting (same change as
in view.go); specifically, insert an empty line above the "package nodeliveness"
declaration to separate the header from the package statement.
In `@coordinator/nodeliveness/view.go`:
- Around line 12-13: Add a single blank line between the file's license header
and the package declaration to match the repository style and avoid
gofmt/gofumpt or copyright-check lint failures; specifically insert one empty
line immediately above the existing "package nodeliveness" declaration so the
license block is separated from the package line.
🧹 Nitpick comments (4)
coordinator/nodeliveness/view_test.go (1)
24-43: Test relies on wall-clock time for TTL assertions, making it timing-sensitive.
GetStatecallstime.Now()internally. The test works because the observation timestamps are set far enough in the past, but on a very slow CI runner or with clock skew, the 5-second-from-now observation at line 35 could flicker. IfGetStateis refactored to acceptnow time.Time(as suggested inview.go), these tests become fully deterministic.Current approach is acceptable for now given the generous margins (5s vs 30s TTL, 35s vs 30s TTL).
maintainer/maintainer_manager.go (2)
404-437: Potential TOCTOU betweenm.liveness.Load()andm.liveness.Store(target).Lines 423-425 load
current, comparetarget > current, then callStore(target). Between the load and the store, another goroutine (e.g., the server's signal handler callingStorefor stopping) could advance the liveness. TheStoreCAS loop handles this correctly (it would fail and retry or return false), so the state is safe. However, line 436 sends a response with thecurrentlocal variable which may be stale ifStorereturned false due to a concurrent advance.This is a minor concern since
Storereturning false meanscurrenton line 431 is not updated, and the response on line 436 would report the pre-race value rather than the actual current state. This could cause the coordinator to have a momentarily stale view, which is self-correcting via the next heartbeat.
453-477:fromNodeLivenessPBandtoNodeLivenessPBdon't use the receiver — consider making them package-level functions.These methods don't access any
Managerstate. Making them free functions (or at minimum unexported package-level functions) would better convey their stateless nature and allow reuse without aManagerinstance.Proposed change
-func (m *Manager) fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness { +func fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness {-func (m *Manager) toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness { +func toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness {Then update callers (e.g., lines 211, 422, 441) to drop the
m.prefix.coordinator/drain/controller.go (1)
141-162:AdvanceLivenessre-acquires the lock per-node viaGetStatusafter snapshotting the list — consider whether a single-pass under one lock would be simpler.
listDrainRequestedNodeIDsacquires and releases the lock, then the loop callsGetStatus(lock again) for each node, thentrySendDrainCommand/trySendStopCommand(lock again incheckAndMarkCommandSend). This is correct and avoids holding the lock during I/O, but the triple lock acquisition per node adds overhead. Given the expected low number of draining nodes, this is fine.
| // limitations under the License. | ||
| package drain |
There was a problem hiding this comment.
Missing blank line between license header and package declaration.
Same formatting issue as the other new files.
🤖 Prompt for AI Agents
In `@coordinator/drain/controller.go` around lines 12 - 13, Add a single blank
line between the file license header and the package declaration to match
project formatting; update the top of controller.go so the license block is
followed by one empty line before the "package drain" statement (the package
declaration symbol is package drain).
| // limitations under the License. | ||
| package nodeliveness |
There was a problem hiding this comment.
Missing blank line between license header and package declaration.
Same as view.go — add a blank line for consistency with the rest of the codebase.
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view_test.go` around lines 12 - 13, Add a blank line
between the license header and the package declaration in this file so it
matches the project's formatting (same change as in view.go); specifically,
insert an empty line above the "package nodeliveness" declaration to separate
the header from the package statement.
coordinator/nodeliveness/view.go
Outdated
| // limitations under the License. | ||
| package nodeliveness |
There was a problem hiding this comment.
Missing blank line between license header and package declaration.
Most Go files in this repo separate the license block from the package line with a blank line. This may also trigger gofmt/gofumpt or copyright-check lint failures.
Proposed fix
// limitations under the License.
+
package nodeliveness📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // limitations under the License. | |
| package nodeliveness | |
| // limitations under the License. | |
| package nodeliveness |
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 12 - 13, Add a single blank
line between the file's license header and the package declaration to match the
repository style and avoid gofmt/gofumpt or copyright-check lint failures;
specifically insert one empty line immediately above the existing "package
nodeliveness" declaration so the license block is separated from the package
line.
|
/test mysql |
|
@hongyunyan: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
…rain-capture-5.2
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
maintainer/node_liveness_test.go (1)
1-13: Copyright header is present.The previously flagged missing copyright header has been added.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@maintainer/node_liveness_test.go` around lines 1 - 13, The review notes that the copyright header was added to the maintainer package but the reviewer left a duplicate comment; resolve this by removing or marking as outdated the redundant review comment (the duplicate on maintainer/node_liveness_test.go) so only the single confirmation of the added header remains, and then update the PR status or reply to the reviewer indicating the duplicate has been cleared.coordinator/controller_drain_test.go (1)
1-13: Copyright header present — past issue resolved.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/controller_drain_test.go` around lines 1 - 13, The file contains a duplicated copyright header comment at the top of controller_drain_test.go; remove the extra header block so only one standard Apache 2.0 license header remains immediately above the package declaration ("package coordinator"), ensuring there are no repeated comment blocks before the package line.
🧹 Nitpick comments (3)
maintainer/node_liveness_test.go (2)
45-45: Blocking channel reads with no timeout can cause the test suite to hang.Both tests use bare
<-mc.GetMessageChannel()reads. IfonSetNodeLivenessRequestever sends via a goroutine (or a code path changes to be async), these reads block indefinitely — no test-level timeout guard is in place. A common Go pattern is to wrap the read in aselectwith atime.After(orrequire.Eventually) so the test fails fast with a useful message.♻️ Proposed helper — fail fast instead of hanging
// recvMsg reads one message from mc with a short deadline. func recvMsg(t *testing.T, mc *messaging.MockMessageCenter) *messaging.TargetMessage { t.Helper() select { case msg := <-mc.GetMessageChannel(): return msg case <-time.After(2 * time.Second): t.Fatal("timed out waiting for message") return nil } }Then replace every
<-mc.GetMessageChannel()call withrecvMsg(t, mc).Also applies to: 72-73
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@maintainer/node_liveness_test.go` at line 45, The tests block on bare reads from mc.GetMessageChannel() which can hang; add a short-deadline helper (e.g. recvMsg(t *testing.T, mc *messaging.MockMessageCenter)) that selects on mc.GetMessageChannel() and time.After(2s) and calls t.Fatal on timeout, then replace each direct `<-mc.GetMessageChannel()` use (tests that exercise onSetNodeLivenessRequest and similar paths) with recvMsg(t, mc) so tests fail fast instead of hanging.
27-34: Duplicate test setup — extract a helper.Both tests repeat the same six-line setup block (mock center,
SetService,NewMaintainerManager, coordinator fields). A sharednewTestMaintainerManagerhelper would reduce noise and make future changes (e.g., adding a new required field) a single-line fix.♻️ Proposed helper
func newTestMaintainerManager(t *testing.T, mc *messaging.MockMessageCenter, liveness *api.Liveness) *Manager { t.Helper() appcontext.SetService(appcontext.MessageCenter, mc) m := NewMaintainerManager(&node.Info{ID: node.ID("n1")}, &config.SchedulerConfig{}, liveness) m.coordinatorID = node.ID("coordinator") m.coordinatorVersion = 1 return m }Also applies to: 53-60
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@maintainer/node_liveness_test.go` around lines 27 - 34, Extract the repeated test setup into a helper to reduce duplication: create a helper function (e.g., newTestMaintainerManager) that takes testing.T, the mock message center from messaging.NewMockMessageCenter and a *api.Liveness, calls t.Helper(), sets appcontext.SetService(appcontext.MessageCenter, mc), constructs the manager with NewMaintainerManager(&node.Info{ID: node.ID("n1")}, &config.SchedulerConfig{}, liveness), sets m.coordinatorID = node.ID("coordinator") and m.coordinatorVersion = 1, and returns *Manager; then replace the six-line setup in TestSetNodeLivenessRejectEpochMismatch and the other test (lines 53–60) with calls to this helper.coordinator/controller_drain_test.go (1)
32-59: Consider isolating globalappcontextside-effects.
appcontext.SetServicewrites to a package-level singleton. While these tests don't callt.Parallel(), any existing or future parallel test in thecoordinatorpackage that also touchesappcontext.MessageCenterorwatcher.NodeManagerNamewill race against this setup.♻️ Suggested mitigation
Add a
t.Cleanupto restore or reset the relevant appcontext entries after each test, or use a per-test context scope if the framework supports it:appcontext.SetService(appcontext.MessageCenter, mc) +t.Cleanup(func() { appcontext.SetService(appcontext.MessageCenter, nil) }) appcontext.SetService(watcher.NodeManagerName, nodeManager) +t.Cleanup(func() { appcontext.SetService(watcher.NodeManagerName, nil) })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/controller_drain_test.go` around lines 32 - 59, The helper newDrainTestController mutates global appcontext singleton via appcontext.SetService for appcontext.MessageCenter and watcher.NodeManagerName causing potential race in parallel tests; modify newDrainTestController to capture the previous values (or presence) for these keys, call appcontext.SetService as before, and register a t.Cleanup that restores the originals (or clears the entries) after the test completes so the global appcontext is reset; reference the newDrainTestController function and the SetService calls for appcontext.MessageCenter and watcher.NodeManagerName and use t.Cleanup to perform the restoration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@coordinator/controller_drain_test.go`:
- Line 49: The test mutates the map returned by GetAliveNodes()
(nodeManager.GetAliveNodes()[target] = &node.Info{ID: target}), which violates
the API contract; replace this direct map mutation by using NodeManager's
provided setup/helper to register the node (e.g., call the NodeManager test
helper or API that adds/registers an alive node instead of writing into
GetAliveNodes()), creating a node.Info with ID == target and registering it via
the NodeManager instance rather than assigning into the returned map.
In `@maintainer/node_liveness_test.go`:
- Around line 71-77: The test verifies message types but omits validating the
SetNodeLivenessResponse payload; after reading the two messages from
mc.GetMessageChannel() (variables first/second) locate the message whose Type ==
messaging.TypeSetNodeLivenessResponse, decode/cast its Payload to
api.SetNodeLivenessResponse, and assert response.Applied ==
api.LivenessCaptureDraining (or the expected enum value) and response.NodeEpoch
equals the expected epoch value used in the test (matching the epoch sent); use
require.Equal (or require.True) to fail the test on mismatch so a wrong Applied
or stale NodeEpoch is caught.
---
Duplicate comments:
In `@coordinator/controller_drain_test.go`:
- Around line 1-13: The file contains a duplicated copyright header comment at
the top of controller_drain_test.go; remove the extra header block so only one
standard Apache 2.0 license header remains immediately above the package
declaration ("package coordinator"), ensuring there are no repeated comment
blocks before the package line.
In `@maintainer/node_liveness_test.go`:
- Around line 1-13: The review notes that the copyright header was added to the
maintainer package but the reviewer left a duplicate comment; resolve this by
removing or marking as outdated the redundant review comment (the duplicate on
maintainer/node_liveness_test.go) so only the single confirmation of the added
header remains, and then update the PR status or reply to the reviewer
indicating the duplicate has been cleared.
---
Nitpick comments:
In `@coordinator/controller_drain_test.go`:
- Around line 32-59: The helper newDrainTestController mutates global appcontext
singleton via appcontext.SetService for appcontext.MessageCenter and
watcher.NodeManagerName causing potential race in parallel tests; modify
newDrainTestController to capture the previous values (or presence) for these
keys, call appcontext.SetService as before, and register a t.Cleanup that
restores the originals (or clears the entries) after the test completes so the
global appcontext is reset; reference the newDrainTestController function and
the SetService calls for appcontext.MessageCenter and watcher.NodeManagerName
and use t.Cleanup to perform the restoration.
In `@maintainer/node_liveness_test.go`:
- Line 45: The tests block on bare reads from mc.GetMessageChannel() which can
hang; add a short-deadline helper (e.g. recvMsg(t *testing.T, mc
*messaging.MockMessageCenter)) that selects on mc.GetMessageChannel() and
time.After(2s) and calls t.Fatal on timeout, then replace each direct
`<-mc.GetMessageChannel()` use (tests that exercise onSetNodeLivenessRequest and
similar paths) with recvMsg(t, mc) so tests fail fast instead of hanging.
- Around line 27-34: Extract the repeated test setup into a helper to reduce
duplication: create a helper function (e.g., newTestMaintainerManager) that
takes testing.T, the mock message center from messaging.NewMockMessageCenter and
a *api.Liveness, calls t.Helper(), sets
appcontext.SetService(appcontext.MessageCenter, mc), constructs the manager with
NewMaintainerManager(&node.Info{ID: node.ID("n1")}, &config.SchedulerConfig{},
liveness), sets m.coordinatorID = node.ID("coordinator") and
m.coordinatorVersion = 1, and returns *Manager; then replace the six-line setup
in TestSetNodeLivenessRejectEpochMismatch and the other test (lines 53–60) with
calls to this helper.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
coordinator/controller_drain_test.gomaintainer/node_liveness_test.goserver/module_election.go
🚧 Files skipped from review as they are similar to previous changes (1)
- server/module_election.go
| oc := operator.NewOperatorController(selfNode, db, nil, 10) | ||
|
|
||
| target := node.ID("target") | ||
| nodeManager.GetAliveNodes()[target] = &node.Info{ID: target} |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find GetAliveNodes implementation
rg "func.*GetAliveNodes" --type=go -B 2 -A 10Repository: pingcap/ticdc
Length of output: 908
🏁 Script executed:
#!/bin/bash
# Also search for the NodeManager type definition to understand the data structure
rg "type NodeManager" --type=go -A 20Repository: pingcap/ticdc
Length of output: 1301
🏁 Script executed:
#!/bin/bash
# Check the full context of the test file around line 49
sed -n '40,60p' coordinator/controller_drain_test.goRepository: pingcap/ticdc
Length of output: 619
Do not mutate the map returned by GetAliveNodes(); it violates the API contract.
The API documentation explicitly states "the caller mustn't modify the returned map". Instead of mutating the returned map at line 49, use a proper test helper method or setup approach provided by NodeManager to register the target node.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@coordinator/controller_drain_test.go` at line 49, The test mutates the map
returned by GetAliveNodes() (nodeManager.GetAliveNodes()[target] =
&node.Info{ID: target}), which violates the API contract; replace this direct
map mutation by using NodeManager's provided setup/helper to register the node
(e.g., call the NodeManager test helper or API that adds/registers an alive node
instead of writing into GetAliveNodes()), creating a node.Info with ID == target
and registering it via the NodeManager instance rather than assigning into the
returned map.
| // Successful transition sends both a node heartbeat and a response. | ||
| first := <-mc.GetMessageChannel() | ||
| second := <-mc.GetMessageChannel() | ||
|
|
||
| require.ElementsMatch(t, []messaging.IOType{messaging.TypeNodeHeartbeatRequest, messaging.TypeSetNodeLivenessResponse}, | ||
| []messaging.IOType{first.Type, second.Type}) | ||
| require.Equal(t, api.LivenessCaptureDraining, liveness.Load()) |
There was a problem hiding this comment.
TestSetNodeLivenessApplyTransition is missing response-content assertions.
The test verifies the two message types via ElementsMatch but never extracts or inspects the SetNodeLivenessResponse payload — specifically Applied and NodeEpoch. This is inconsistent with TestSetNodeLivenessRejectEpochMismatch, which validates both fields. A successful transition that sends the wrong Applied value (e.g., still ALIVE) or a stale epoch would go undetected.
🛠️ Proposed fix — assert response payload after the type check
require.ElementsMatch(t, []messaging.IOType{messaging.TypeNodeHeartbeatRequest, messaging.TypeSetNodeLivenessResponse},
[]messaging.IOType{first.Type, second.Type})
+
+// Extract the response (order not guaranteed, so find it explicitly).
+var liveResp *heartbeatpb.SetNodeLivenessResponse
+for _, m := range []*messaging.TargetMessage{first, second} {
+ if m.Type == messaging.TypeSetNodeLivenessResponse {
+ liveResp = m.Message[0].(*heartbeatpb.SetNodeLivenessResponse)
+ }
+}
+require.NotNil(t, liveResp)
+require.Equal(t, heartbeatpb.NodeLiveness_DRAINING, liveResp.Applied)
+require.Equal(t, m.nodeEpoch, liveResp.NodeEpoch)
+
require.Equal(t, api.LivenessCaptureDraining, liveness.Load())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@maintainer/node_liveness_test.go` around lines 71 - 77, The test verifies
message types but omits validating the SetNodeLivenessResponse payload; after
reading the two messages from mc.GetMessageChannel() (variables first/second)
locate the message whose Type == messaging.TypeSetNodeLivenessResponse,
decode/cast its Payload to api.SetNodeLivenessResponse, and assert
response.Applied == api.LivenessCaptureDraining (or the expected enum value) and
response.NodeEpoch equals the expected epoch value used in the test (matching
the epoch sent); use require.Equal (or require.True) to fail the test on
mismatch so a wrong Applied or stale NodeEpoch is caught.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Tests