Skip to content

wip-drain-capture-5.2#4190

Open
hongyunyan wants to merge 17 commits intopingcap:masterfrom
hongyunyan:0210-drain-capture-5.2
Open

wip-drain-capture-5.2#4190
hongyunyan wants to merge 17 commits intopingcap:masterfrom
hongyunyan:0210-drain-capture-5.2

Conversation

@hongyunyan
Copy link
Collaborator

@hongyunyan hongyunyan commented Feb 11, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • New Features

    • Node draining workflow: requestable drains, coordinated progression, and remaining-work reporting via a new DrainNode API.
    • Node liveness states and heartbeat propagation (Alive, Draining, Stopping, Unknown) with destination filtering.
    • Schedulers and maintainer integrate liveness to avoid scheduling to draining/stopping nodes; maintainer emits node heartbeats and responds to liveness requests.
  • Tests

    • Added unit tests for drain controller, nodeliveness view, drain scheduler, maintainer liveness handling, and Liveness state machine.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Feb 11, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 11, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign kennytm for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 11, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a NodeLiveness state machine (Alive → Draining → Stopping), a Drain Controller to drive SetNodeLiveness requests, heartbeat-based liveness observation, liveness-aware schedulers (including a drain scheduler), Coordinator.DrainNode API, and maintainer-side heartbeat/SetNodeLiveness handling.

Changes

Cohort / File(s) Summary
Protocol & Messaging
heartbeatpb/heartbeat.proto, pkg/messaging/message.go
Add NodeLiveness enum, NodeHeartbeat, SetNodeLivenessRequest/Response protobufs and new IOType mappings for heartbeat/drain messages.
API Liveness model
pkg/api/util.go, pkg/api/util_test.go
Introduce LivenessCaptureDraining and shift Stopping to preserve monotonic Alive→Draining→Stopping transitions; add enforcement and tests.
Maintainer (node agent)
maintainer/maintainer_manager.go, maintainer/maintainer_manager_test.go, maintainer/node_liveness_test.go
Maintainer tracks liveness and nodeEpoch, sends periodic NodeHeartbeat, handles SetNodeLivenessRequest/Response with epoch checks, and updated constructor usage and tests.
Coordinator: drain orchestration
coordinator/drain/controller.go, coordinator/drain/controller_test.go, coordinator/coordinator.go, coordinator/controller_drain_test.go
New in-memory Drain Controller managing per-node drain state, throttled DRAINING→STOPPING command sends, observation handling, tests, and coordinator/Controller.DrainNode plumbing.
Coordinator: nodeliveness view & integration
coordinator/nodeliveness/view.go, coordinator/nodeliveness/view_test.go, coordinator/controller.go
New TTL-based nodeliveness View; controller now holds nodeLivenessView and drainController, heartbeats feed both, and drain progression invoked during periodic tasks.
Schedulers: filtering and drain scheduler
coordinator/scheduler/basic.go, coordinator/scheduler/balance.go, coordinator/scheduler/drain.go, coordinator/scheduler/drain_test.go, pkg/scheduler/scheduler.go
Basic/Balance schedulers filter destinations by liveness; add a drainScheduler to migrate workloads from draining/stopping nodes and wire optional startup; tests added.
Operator helpers
coordinator/operator/operator_controller.go
Add CountOperatorsInvolvingNode to count in-flight operators touching a node (used for STOP safety checks).
Server & API surface
api/v1/api.go, pkg/server/coordinator.go, server/server.go, server/module_election.go
API v1 drain endpoint now delegates to coordinator.DrainNode; Coordinator interface adds DrainNode; server initializes and passes liveness into MaintainerManager; election logic consults liveness.
Tests & coverage
multiple *_test.go (coordinator, drain, nodeliveness, maintainer, api)
Extensive unit tests for drain controller, nodeliveness view, drain scheduler, maintainer liveness handling, and Liveness state machine.
New public types / methods
various coordinator/maintainer/pkg files
Expose new Controller.DrainNode, coordinator.DrainNode, drain.Controller type, nodeliveness.View, and messaging IOType constants for liveness messages.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant API as API/v1
    participant Coord as Coordinator
    participant DC as DrainController
    participant LV as LivenessView
    participant MM as MaintainerManager
    participant DS as DrainScheduler

    Client->>API: DrainNode(target)
    API->>Coord: DrainNode(ctx, target)
    Coord->>DC: RequestDrain(target)
    DC-->>Coord: ack
    Coord->>DC: GetStatus(target)
    Coord-->>API: remainingCount

    loop periodic heartbeat/advance
        MM->>Coord: NodeHeartbeat(liveness, epoch)
        Coord->>LV: ObserveHeartbeat(...)
        Coord->>DC: ObserveHeartbeat(...)
        DC-->>Coord: mark observed
        alt readyToStop(target)
            DC->>MM: SetNodeLivenessRequest(STOPPING, epoch)
            MM->>Coord: SetNodeLivenessResponse(applied, epoch)
            Coord->>DC: ObserveSetNodeLivenessResponse(...)
        else
            DC->>MM: SetNodeLivenessRequest(DRAINING, epoch)
            MM->>Coord: SetNodeLivenessResponse(...)
        end
    end

    DS->>LV: GetDrainingOrStoppingNodes()
    DS->>Coord: request move operators
    Coord->>DS: create/approve MoveMaintainerOperators
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested labels

lgtm

Suggested reviewers

  • wk989898
  • lidezhu
  • asddongmen
  • flowbehappy

Poem

🐰 I nudged nodes from Alive to Drain,
Heartbeats hopped across the plain,
Controllers whispered stop and go,
Schedulers moved the load just so,
Carrots for code — now servers rest again.

🚥 Pre-merge checks | ❌ 3

❌ Failed checks (2 warnings, 1 inconclusive)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is entirely the template with no actual content filled in—no issue reference, problem statement, implementation details, test results, or release notes provided. Complete the PR description by providing issue number, explaining the problem and solution, selecting applicable tests, answering the checklist questions, and providing a meaningful release note.
Docstring Coverage ⚠️ Warning Docstring coverage is 32.56% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The PR title 'wip-drain-capture-5.2' is vague and uses a work-in-progress prefix without clearly conveying the main functionality being implemented. Replace 'wip-drain-capture-5.2' with a clear, descriptive title that summarizes the main change, e.g., 'Implement node drain coordination and liveness management' or similar.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Feb 11, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a robust node draining mechanism for TiCDC, enabling graceful shutdown and maintenance operations. It establishes a clear lifecycle for nodes (Alive, Draining, Stopping) and integrates this liveness awareness across critical components, including the API, coordinator, schedulers, and maintainers. The changes ensure that tasks are appropriately managed during node transitions, preventing new assignments to unhealthy nodes and facilitating the migration of existing workloads. Additionally, it refines table schema comparison by incorporating index column length for greater accuracy.

Highlights

  • Node Draining Mechanism: Introduced a comprehensive node draining mechanism, defining new liveness states (Alive, Draining, Stopping) to enable graceful shutdown and maintenance of TiCDC nodes.
  • Coordinator Drain Controller: Implemented a new drain.Controller within the coordinator to manage the node draining process, including sending liveness update requests to individual nodes.
  • API Integration for Drain Capture: Updated the v1 API endpoint for draining captures to utilize the new coordinator-level drain functionality, replacing previous dummy logic.
  • Scheduler Liveness Awareness: Enhanced existing schedulers (balanceScheduler, basicScheduler) to consider node liveness when selecting destination nodes for task scheduling, preventing new tasks from being assigned to draining or stopping nodes. A new drainScheduler was added to actively migrate tasks from draining nodes.
  • Maintainer Node Liveness Management: Modified the maintainer.Manager to periodically report its liveness state to the coordinator and to respond to liveness change requests, ensuring nodes accurately reflect their operational status. It also prevents new tasks from being added to stopping nodes.
  • Election Process Refinement: Adjusted the coordinator and log coordinator election logic to check node liveness before campaigning and to actively resign leadership if the node transitions to a stopping state.
  • Improved Table Schema Hashing: Updated the table information hashing and comparison logic to include index column length, which is crucial for accurate schema matching and handle decoding.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • api/v1/api.go
    • Removed sync/atomic import and added pkg/node import.
    • Replaced dummy drainCaptureCounter logic with actual calls to the coordinator's DrainNode method.
    • Removed the global drainCaptureCounter variable.
  • coordinator/controller.go
    • Imported coordinator/drain and coordinator/nodeliveness packages.
    • Added nodeLivenessView and drainController fields to the Controller struct.
    • Initialized nodeLivenessView and drainController in the NewController function.
    • Integrated drainController.Tick into the onPeriodTask method.
    • Added message handlers for TypeNodeHeartbeatRequest and TypeSetNodeLivenessResponse.
    • Implemented the DrainNode method to handle node drain requests from the API.
  • coordinator/controller_drain_test.go
    • Added a new test file to verify the DrainNode functionality in the coordinator.
  • coordinator/coordinator.go
    • Added the DrainNode method to the coordinator interface implementation.
  • coordinator/drain/controller.go
    • Added a new file defining the drain.Controller responsible for managing node drain progression.
  • coordinator/drain/controller_test.go
    • Added a new test file for the drain.Controller.
  • coordinator/nodeliveness/view.go
    • Added a new file defining nodeliveness.View to maintain an in-memory view of node liveness.
  • coordinator/nodeliveness/view_test.go
    • Added a new test file for nodeliveness.View.
  • coordinator/operator/operator_controller.go
    • Added CountOperatorsInvolvingNode method to count in-flight operators affecting a specific node.
  • coordinator/scheduler/balance.go
    • Imported coordinator/nodeliveness and added livenessView field.
    • Updated NewBalanceScheduler to accept livenessView.
    • Modified Execute method to filter active nodes based on livenessView.IsSchedulableDest.
  • coordinator/scheduler/basic.go
    • Imported coordinator/nodeliveness and added livenessView field.
    • Updated NewBasicScheduler to accept livenessView.
    • Modified doBasicSchedule method to filter node IDs based on livenessView.IsSchedulableDest.
  • coordinator/scheduler/drain.go
    • Added a new file defining drainScheduler to generate move operators for draining nodes.
  • coordinator/scheduler/drain_test.go
    • Added a new test file for drainScheduler.
  • heartbeatpb/heartbeat.pb.go
    • Added NodeLiveness enum with ALIVE, DRAINING, and STOPPING states.
    • Introduced new protobuf messages: NodeHeartbeat, SetNodeLivenessRequest, and SetNodeLivenessResponse.
    • Updated enum and message descriptor indices, and generated marshal/unmarshal/size methods for new messages.
  • heartbeatpb/heartbeat.proto
    • Defined the NodeLiveness enum and new protobuf messages for node heartbeats and liveness control.
  • maintainer/maintainer_manager.go
    • Imported pkg/api and added nodeHeartbeatInterval constant.
    • Added liveness and nodeEpoch fields to the Manager struct.
    • Updated NewMaintainerManager to initialize node liveness.
    • Added handling for TypeSetNodeLivenessRequest messages.
    • Implemented sendNodeHeartbeat for periodic liveness reporting and sendSetNodeLivenessResponse.
    • Added logic to reject AddMaintainerRequest if the node is in a stopping state.
    • Included helper functions fromNodeLivenessPB and toNodeLivenessPB for liveness state conversion.
  • maintainer/maintainer_manager_test.go
    • Updated NewMaintainerManager calls in tests to pass the liveness argument.
  • maintainer/node_liveness_test.go
    • Added a new test file to verify node liveness transitions and epoch mismatch handling in the maintainer.
  • pkg/api/util.go
    • Expanded the Liveness enum to include LivenessCaptureDraining.
    • Modified the Store method for Liveness to enforce monotonic state transitions (Alive -> Draining -> Stopping) and prevent skipping states.
    • Updated the String() method for Liveness to reflect the new states.
  • pkg/api/util_test.go
    • Added a new test file to validate the monotonic behavior of the Liveness.Store method.
  • pkg/common/table_info_helper.go
    • Included col.Length in the hashTableInfo digest calculation for index columns.
    • Included col.Length in the sameColumnsAndIndices comparison for index columns.
  • pkg/common/table_info_helper_test.go
    • Added a test case TestGetOrSetColumnSchema_DifferentIndexColumnLength to ensure digest changes when index column length differs.
  • pkg/messaging/message.go
    • Added new IOType constants for node drain related messages.
    • Updated String() method, decodeIOType, and NewSingleTargetMessage to support the new message types.
  • pkg/scheduler/scheduler.go
    • Added DrainScheduler constant.
    • Modified the Start method to include the drainScheduler in the task pool and exclude it from general checker schedulers.
  • pkg/server/coordinator.go
    • Imported pkg/node and added the DrainNode method to the Coordinator interface.
  • server/module_election.go
    • Modified campaignCoordinator and campaignLogCoordinator to check for LivenessCaptureAlive before campaigning.
    • Added logic to actively resign coordinator/log coordinator roles if the node's liveness transitions to LivenessCaptureStopping.
  • server/server.go
    • Updated the maintainer.NewMaintainerManager initialization to pass the server's liveness state.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive node draining mechanism, a significant feature for improving cluster operability. The changes are extensive, touching upon the API, coordinator, schedulers, maintainers, and core server logic.

Key changes include:

  • A new drain package with a Controller to manage the node draining state machine.
  • A new nodeliveness package with a View to track node liveness with TTL.
  • A new drainScheduler to move work off draining nodes.
  • Updates to basicScheduler and balanceScheduler to be aware of node liveness.
  • A new implementation for the v1 drainCapture API.
  • Enhancements to the server election and shutdown logic to handle draining/stopping states gracefully.
  • Improved correctness in Liveness.Store to enforce monotonic state transitions.

The implementation appears robust and well-tested, with new unit and integration tests covering the new functionality. I have one suggestion for code simplification in the DrainNode function to improve readability.

Comment on lines +867 to +882
// v1 drain API must not return 0 until drain completion is proven.
if nodeState == nodeliveness.StateUnknown || !drainingObserved {
if remaining == 0 {
remaining = 1
}
return remaining, nil
}

// Return 0 only after STOPPING is observed and all work is done.
if stoppingObserved && maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 {
return 0, nil
}
if remaining == 0 {
remaining = 1
}
return remaining, nil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to ensure remaining is at least 1 unless drain is complete is a bit complex and can be simplified. The current implementation has a duplicated check if remaining == 0 { remaining = 1 } and an early return that makes the flow harder to follow. A refactoring can make the logic more straightforward.

Suggested change
// v1 drain API must not return 0 until drain completion is proven.
if nodeState == nodeliveness.StateUnknown || !drainingObserved {
if remaining == 0 {
remaining = 1
}
return remaining, nil
}
// Return 0 only after STOPPING is observed and all work is done.
if stoppingObserved && maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 {
return 0, nil
}
if remaining == 0 {
remaining = 1
}
return remaining, nil
// v1 drain API must not return 0 until drain completion is proven.
// Return 0 only after STOPPING is observed and all work is done, and the
// node liveness is not unknown.
if nodeState != nodeliveness.StateUnknown && stoppingObserved &&
maintainersOnTarget == 0 && inflightOpsInvolvingTarget == 0 {
return 0, nil
}
if remaining == 0 {
return 1, nil
}
return remaining, nil

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
api/v1/api.go (1)

195-195: ⚠️ Potential issue | 🟡 Minor

Stale TODO comment — the drain API is now implemented.

The comment says "currently it is a no-op" but the method now delegates to co.DrainNode. Remove or update the TODO.

Suggested fix
 // drainCapture drains all tables from a capture.
 // Usage:
 // curl -X PUT http://127.0.0.1:8300/api/v1/captures/drain
-// TODO: Implement this API in the future, currently it is a no-op.
 func (o *OpenAPIV1) drainCapture(c *gin.Context) {
server/module_election.go (1)

322-341: ⚠️ Potential issue | 🟡 Minor

Log messages for "resign failed" and "resign timeout" are swapped.

When DeadlineExceeded is the cause (line 330), the code logs "resign failed" (line 331) and returns the error. For other errors, it logs "resign timeout" (line 336). This is the inverse of the coordinator resign flow (lines 173-184) where DeadlineExceeded logs "timeout" and other errors log "failed".

Proposed fix
 func (e *elector) resignLogCoordinator() error {
 	if e.logElection == nil {
 		return nil
 	}
 	nodeID := string(e.svr.info.ID)
 	// use a new context to prevent the context from being cancelled.
 	resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
-		if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
-			log.Info("log coordinator resign failed",
+		if errors.Cause(resignErr) != context.DeadlineExceeded {
+			log.Info("log coordinator resign failed",
 				zap.String("nodeID", nodeID), zap.Error(resignErr))
 			cancel()
 			return errors.Trace(resignErr)
 		}
 		log.Warn("log coordinator resign timeout",
 			zap.String("nodeID", nodeID), zap.Error(resignErr))
 	}
 	cancel()
 	return nil
 }
🤖 Fix all issues with AI agents
In `@coordinator/controller_drain_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header used across the
repository to the top of this file (above the "package coordinator" declaration)
so CI stops failing; copy the exact header text used in other files (for example
the header at the top of coordinator.go) and place it verbatim at the top of
coordinator/controller_drain_test.go.

In `@coordinator/drain/controller_test.go`:
- Line 1: Add the standard PingCAP copyright header to the top of
controller_test.go (above the line "package drain")—mirror the exact header used
in view_test.go so CI recognizes it; ensure the header appears before the
package declaration in controller_test.go.

In `@coordinator/drain/controller.go`:
- Line 1: Missing Apache 2.0 copyright header: add the project's standard Apache
2.0 header comment block to the top of controller.go above the existing package
drain declaration, ensuring it includes the correct copyright year and copyright
holder name used across the repo and matches the exact header format used in
other files.

In `@coordinator/nodeliveness/view_test.go`:
- Line 1: Add the standard PingCAP copyright/header block to the top of this
file (view_test.go) above the package nodeliveness declaration so the header
matches the project's required copyright format; ensure the exact standard
header text used across the repo is pasted verbatim and that there are no extra
characters or blank lines before the package statement.

In `@coordinator/nodeliveness/view.go`:
- Line 1: This file is missing the standard Apache 2.0 copyright header; add the
project's canonical Apache-2.0 header comment block immediately above the
"package nodeliveness" declaration in view.go (matching the same header style
used across the repo, e.g., full copyright line(s) and license notice or SPDX
identifier) so CI/pipeline recognizes the file as licensed.

In `@coordinator/scheduler/balance.go`:
- Around line 83-93: The code can call pkgScheduler.CheckBalanceStatus with an
empty activeNodes map which leads to division by zero inside CheckBalanceStatus;
modify the block around activeNodes (where s.nodeManager.GetAliveNodes(), the
liveness filter using s.livenessView.IsSchedulableDest, and the call to
pkgScheduler.CheckBalanceStatus with s.changefeedDB.GetTaskSizePerNode()) to
check if activeNodes is empty after filtering and return early (or set moveSize
to zero / skip calling CheckBalanceStatus) so CheckBalanceStatus never receives
an empty nodeSize map.

In `@coordinator/scheduler/drain_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header to the top of
drain_test.go (the file that begins with "package scheduler") so the CI
recognizes proper copyright information; insert the full multi-line header
comment block immediately above the package declaration and ensure it matches
the project's standard header format exactly.

In `@coordinator/scheduler/drain.go`:
- Line 1: The file starting with "package scheduler" is missing the standard
Apache-2.0 copyright header which causes CI failure; add the same Apache 2.0
header block used in drain_test.go at the top of this file above the package
declaration so the header text and year/owner match the repository's standard.

In `@maintainer/node_liveness_test.go`:
- Line 1: Add the project's standard copyright/header to the top of the new test
file (node_liveness_test.go) so CI stops failing; open the file that declares
"package maintainer" and prepend the same license/author header used across
other test files in the repo (matching formatting and year) before the package
declaration.

In `@pkg/api/util_test.go`:
- Line 1: Add the standard Apache 2.0 copyright header to the top of this file
(pkg/api/util_test.go) so CI stops failing; open the file beginning with the
standard multi-line Apache 2.0 header comment block above the package api
declaration and ensure the header text exactly matches the project’s canonical
Apache 2.0 template.

In `@pkg/api/util.go`:
- Around line 170-185: The loop allows invalid Liveness values >
LivenessCaptureStopping to be CAS'd when old == LivenessCaptureStopping; add a
defensive range check at the start of the setter so any incoming v outside the
valid enum range (v < LivenessCaptureAlive || v > LivenessCaptureStopping)
returns false immediately. Locate the loop using l.Load(), the constants
LivenessCaptureAlive/LivenessCaptureDraining/LivenessCaptureStopping, and the
atomic.CompareAndSwapInt32 call and enforce the range check before the existing
guards and CAS attempt.

In `@server/module_election.go`:
- Around line 255-258: The resign failure is being swallowed because the handler
returns errors.Trace(err) (where err is the prior campaign result and likely
nil) instead of the resign error; update the error return in the block that
calls e.resignLogCoordinator() (the resignErr variable) to return
errors.Trace(resignErr) so the actual resign failure from
e.resignLogCoordinator() is propagated and not masked by the earlier err
variable.
🧹 Nitpick comments (10)
pkg/api/util.go (1)

193-203: Pre-existing: String() reads *l non-atomically.

Line 194 dereferences *l directly instead of using l.Load(). If String() is called concurrently with Store(), this is a data race. Not introduced by this PR, but worth noting since you're touching this area.

Suggested fix
 func (l *Liveness) String() string {
-	switch *l {
+	switch l.Load() {
 	case LivenessCaptureAlive:
api/v1/api.go (1)

202-219: Implementation looks good overall.

The flow is clean: validate input → get coordinator → call DrainNode → return remaining count with HTTP 202. One consideration: if req.CaptureID is empty, it will pass through to DrainNode with an empty node.ID. You may want to validate that CaptureID is non-empty up front and return a 400 error, consistent with how other endpoints handle missing parameters.

Optional: validate non-empty CaptureID
+	if req.CaptureID == "" {
+		_ = c.Error(errors.ErrAPIInvalidParam.GenWithStackByArgs("capture_id is required"))
+		return
+	}
+
 	co, err := o.server.GetCoordinator()
coordinator/controller.go (1)

845-883: DrainNode does not acquire apiLock unlike other public API methods.

Other API methods (CreateChangefeed, RemoveChangefeed, PauseChangefeed, etc.) acquire c.apiLock before accessing changefeedDB and operatorController. DrainNode skips this. If this is intentional (because DrainNode is read-only and the underlying stores are internally synchronized), consider adding a brief comment to document the rationale. Otherwise, consider acquiring c.apiLock.RLock() for consistency.

coordinator/scheduler/drain.go (1)

49-112: Round-robin cursor advancement may skip draining nodes unfairly.

s.rrCursor is incremented on every outer-loop iteration (line 98), regardless of how many inner-loop iterations completed. If the outer loop runs multiple times in a single Execute() call, the cursor advances by the number of outer iterations, potentially skipping nodes. For example, with 3 draining nodes and availableSize = 6, the cursor could advance by up to 6 in one call, wrapping around multiple times.

This is a minor fairness concern rather than a correctness bug, since all draining nodes are still visited in each inner-loop pass. If you want strict round-robin across Execute() calls, consider advancing the cursor only once per Execute() invocation.

server/module_election.go (1)

266-284: Resign error silently discarded when liveness transitions to STOPPING.

Line 278 discards the resignLogCoordinator() error with _ =. If the resign fails, the log coordinator's etcd key remains held, but leaderCancel() on line 279 stops the coordinator anyway. This could cause a brief period where another node cannot win the election. Consider logging the error at minimum.

Proposed fix
 				case <-ticker.C:
 					if e.svr.liveness.Load() == api.LivenessCaptureStopping {
 						log.Info("node is stopping, resign log coordinator actively",
 							zap.String("nodeID", nodeID))
-						_ = e.resignLogCoordinator()
+						if resignErr := e.resignLogCoordinator(); resignErr != nil {
+							log.Warn("resign log coordinator failed during stop",
+								zap.String("nodeID", nodeID), zap.Error(resignErr))
+						}
 						leaderCancel()
 						return
 					}
coordinator/nodeliveness/view.go (1)

60-86: Observe methods unconditionally overwrite records; no epoch or monotonicity guard.

Both ObserveHeartbeat and ObserveSetNodeLivenessResponse replace the record without checking nodeEpoch or ensuring monotonic liveness transitions. If a delayed/reordered heartbeat with ALIVE arrives after a SetNodeLivenessResponse with DRAINING, the view briefly regresses to Alive, potentially causing the drain scheduler to skip the node.

Consider adding an epoch check or a monotonicity guard (e.g., only accept transitions forward: Alive → Draining → Stopping).

maintainer/maintainer_manager.go (1)

442-466: Conversion helpers are stateless — could be package-level functions.

fromNodeLivenessPB and toNodeLivenessPB don't use any Manager state. Making them package-level functions (or moving them to a shared conversion package) would clarify their nature and improve reusability.

coordinator/drain/controller.go (3)

29-37: Consider using sync.RWMutex for better read concurrency.

GetStatus (line 106) is a read-only method but acquires a full Mutex.Lock(). If GetStatus is called on a hot path (e.g., from DrainNode API), switching to sync.RWMutex and using RLock in GetStatus would avoid contention with concurrent reads.


200-203: Info-level log on every successful liveness command send may be noisy.

sendSetNodeLiveness logs at Info level on every successful send (line 200). Since commands are resent every resendInterval (1 second, as wired in controller.go line 168) until observed, this could produce substantial log volume during drains. Consider Debug level for resends, or logging Info only on the first send.

As per coding guidelines, "log message strings should not include function names and should avoid hyphens (use spaces instead)" — the current message format is compliant.


181-204: Epoch 0 sent when liveness view has no record for the node.

When livenessView.GetNodeEpoch returns false (no record), epoch defaults to 0. The maintainer rejects requests where req.NodeEpoch != m.nodeEpoch (see maintainer_manager.go line 402), so the command will be silently rejected if the maintainer's epoch is non-zero. The command will be resent on the next tick, which is fine, but consider logging a debug message when the epoch is not found to aid troubleshooting.

Comment on lines +170 to +185
for {
old := l.Load()
if v <= old {
return false
}
if old == LivenessCaptureAlive && v != LivenessCaptureDraining {
return false
}
if old == LivenessCaptureDraining && v != LivenessCaptureStopping {
return false
}

if atomic.CompareAndSwapInt32((*int32)(l), int32(old), int32(v)) {
return true
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing guard for out-of-range values when old == LivenessCaptureStopping.

If old is Stopping(2), the v <= old check on line 172 rejects values ≤ 2, but any value > 2 (e.g., an invalid Liveness(3)) falls through all guards and gets CAS'd in. While callers should only use defined constants, adding a range check would make the state machine defensive.

Proposed fix
 func (l *Liveness) Store(v Liveness) bool {
+	if v < LivenessCaptureDraining || v > LivenessCaptureStopping {
+		return false
+	}
 	for {
 		old := l.Load()
🤖 Prompt for AI Agents
In `@pkg/api/util.go` around lines 170 - 185, The loop allows invalid Liveness
values > LivenessCaptureStopping to be CAS'd when old ==
LivenessCaptureStopping; add a defensive range check at the start of the setter
so any incoming v outside the valid enum range (v < LivenessCaptureAlive || v >
LivenessCaptureStopping) returns false immediately. Locate the loop using
l.Load(), the constants
LivenessCaptureAlive/LivenessCaptureDraining/LivenessCaptureStopping, and the
atomic.CompareAndSwapInt32 call and enforce the range check before the existing
guards and CAS attempt.

Comment on lines 255 to 258
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: returns err (which is nil after a successful campaign) instead of resignErr.

When resignLogCoordinator() fails, the error is logged but line 258 returns errors.Trace(err) where err is the campaign result (nil at this point), effectively swallowing the resign failure.

Proposed fix
 		if resignErr := e.resignLogCoordinator(); resignErr != nil {
 			log.Warn("resign log coordinator actively failed",
 				zap.String("nodeID", nodeID), zap.Error(resignErr))
-			return errors.Trace(err)
+			return errors.Trace(resignErr)
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(err)
if resignErr := e.resignLogCoordinator(); resignErr != nil {
log.Warn("resign log coordinator actively failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
return errors.Trace(resignErr)
}
🤖 Prompt for AI Agents
In `@server/module_election.go` around lines 255 - 258, The resign failure is
being swallowed because the handler returns errors.Trace(err) (where err is the
prior campaign result and likely nil) instead of the resign error; update the
error return in the block that calls e.resignLogCoordinator() (the resignErr
variable) to return errors.Trace(resignErr) so the actual resign failure from
e.resignLogCoordinator() is propagated and not masked by the earlier err
variable.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
server/module_election.go (2)

119-122: ⚠️ Potential issue | 🔴 Critical

Bug: returns err (which is nil after a successful campaign) instead of resignErr.

Same class of bug as flagged at line 239 for the log coordinator path. Here err is the campaign result (nil on success), so the resign failure is silently swallowed.

Proposed fix
 		if resignErr := e.resign(ctx); resignErr != nil {
 			log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr))
-			return errors.Trace(err)
+			return errors.Trace(resignErr)
 		}

289-298: ⚠️ Potential issue | 🟡 Minor

Bug: "resign failed" and "resign timeout" log messages are swapped.

When context.DeadlineExceeded is matched (line 290), the log says "resign failed" (line 291). The else branch (non-timeout error) logs "resign timeout" (line 296). These labels are reversed.

Proposed fix
 	if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
 		if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
-			log.Info("log coordinator resign failed",
+			log.Warn("log coordinator resign timeout",
 				zap.String("nodeID", nodeID), zap.Error(resignErr))
 			cancel()
 			return errors.Trace(resignErr)
 		}
-		log.Warn("log coordinator resign timeout",
+		log.Warn("log coordinator resign failed",
 			zap.String("nodeID", nodeID), zap.Error(resignErr))
 	}
🤖 Fix all issues with AI agents
In `@coordinator/drain/controller.go`:
- Around line 210-233: sendSetNodeLiveness currently sends epoch=0 when
livenessView.GetNodeEpoch(nodeID) returns false, causing the maintainer
(maintainer_manager.go) to reject the request as an epoch mismatch; update
Controller.sendSetNodeLiveness to detect the missing epoch (ok == false) and log
a clear warning (include nodeID and that node epoch is unknown) before sending
the command so operators can diagnose why the drain may be delayed; keep the
message creation and send flow unchanged (still use epoch variable) so behavior
is unchanged but observable.

In `@coordinator/nodeliveness/view.go`:
- Around line 61-87: ObserveHeartbeat and ObserveSetNodeLivenessResponse
unconditionally overwrite the node record, allowing out-of-order messages to
regress liveness; fix by making updates monotonic per node epoch: in both
functions load existing record from v.nodes[nodeID], if resp.NodeEpoch or
hb.NodeEpoch is greater than the stored nodeEpoch accept and replace (new epoch
resets to ALIVE), if equal only accept the update when the new liveness is the
same or a forward transition (ALIVE→DRAINING→STOPPING) and update lastSeen
appropriately, and if the new epoch is older or the liveness would move backward
reject the update; use the record struct fields nodeEpoch and liveness to
perform these checks inside ObserveHeartbeat and ObserveSetNodeLivenessResponse
while holding v.mu.
🧹 Nitpick comments (7)
api/v1/api.go (1)

201-229: Wrap errors from internal calls before passing to gin context.

Per coding guidelines, errors from internal/library calls should be wrapped with errors.Trace(err) to attach stack traces. The errors from SelfInfo(), GetCoordinator(), and DrainNode() are passed directly to c.Error() without wrapping.

Proposed fix
 	self, err := o.server.SelfInfo()
 	if err != nil {
-		_ = c.Error(err)
+		_ = c.Error(errors.Trace(err))
 		return
 	}
 	// For compatibility with old arch TiCDC, draining the current owner is not allowed.
 	if target == self.ID {
 		_ = c.Error(errors.ErrSchedulerRequestFailed.GenWithStackByArgs("cannot drain the owner"))
 		return
 	}

 	co, err := o.server.GetCoordinator()
 	if err != nil {
-		_ = c.Error(err)
+		_ = c.Error(errors.Trace(err))
 		return
 	}

 	remaining, err := co.DrainNode(c.Request.Context(), target)
 	if err != nil {
-		_ = c.Error(err)
+		_ = c.Error(errors.Trace(err))
 		return
 	}

As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace".

coordinator/nodeliveness/view.go (1)

99-122: GetState uses time.Now() internally, unlike other methods that accept now.

GetDrainingOrStoppingNodes takes a now parameter for TTL checks, but GetState (and by extension IsSchedulableDest) calls time.Now() internally. This inconsistency makes the TTL behavior harder to test and could lead to subtle differences in TTL evaluation within the same tick.

Consider accepting now time.Time as a parameter for consistency, or documenting why the divergence is intentional.

maintainer/maintainer_manager.go (2)

448-472: fromNodeLivenessPB and toNodeLivenessPB don't use the receiver — make them standalone functions.

These methods are bound to *Manager but reference no fields from m. Converting them to package-level functions (or even placing them in a shared conversion utility) would clarify that they're pure mappings and allow reuse without a Manager instance.

♻️ Proposed refactor
-func (m *Manager) fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness {
+func fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness {
 	switch l {
 	case heartbeatpb.NodeLiveness_ALIVE:
 		return api.LivenessCaptureAlive
@@ ...
-func (m *Manager) toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness {
+func toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness {
 	switch l {
 	case api.LivenessCaptureAlive:
 		return heartbeatpb.NodeLiveness_ALIVE

Then update call sites (Lines 211, 417, 436) to drop the m. prefix.


279-284: Good: rejecting AddMaintainer when stopping prevents scheduling work onto a draining node.

This is a solid safety check. Consider also logging at Warn level instead of Info since this represents an unexpected request from the coordinator's perspective (it shouldn't be sending work to a stopping node).

coordinator/drain/controller.go (3)

132-150: AdvanceLiveness re-acquires the lock per node via GetStatus — consider a locked snapshot.

listDrainRequestedNodeIDs() takes a lock to snapshot node IDs, then for each node, GetStatus() re-acquires the lock, and trySendDrainCommand/trySendStopCommand each acquire it again via checkAndMarkCommandSend. This is 3 lock round-trips per node per tick.

Since state only moves forward (monotonic), this isn't a correctness issue, but it's worth noting for performance if the number of draining nodes grows. A single locked pass that snapshots both IDs and states would reduce contention.


36-55: No mechanism to remove nodes from the nodes map.

Once a node enters the drain workflow, its nodeState persists in the map indefinitely. If the coordinator is long-lived and nodes are added/removed over time, this is a slow memory leak. Consider adding a RemoveNode(nodeID) method or clearing entries once stoppingObserved is true and the node has been fully removed from the cluster.


15-15: resendInterval is a hardcoded constant — consider making it configurable.

The AI summary mentioned a resendInterval parameter in the constructor. The current implementation uses a package-level constant of 1 second. For testing and tuning in production, injecting this via the constructor (or a config struct) would be more flexible.

Comment on lines +210 to +233
// sendSetNodeLiveness sends a liveness command to the target maintainer manager.
func (c *Controller) sendSetNodeLiveness(nodeID node.ID, target heartbeatpb.NodeLiveness) {
var epoch uint64
if c.livenessView != nil {
if e, ok := c.livenessView.GetNodeEpoch(nodeID); ok {
epoch = e
}
}

msg := messaging.NewSingleTargetMessage(nodeID, messaging.MaintainerManagerTopic, &heartbeatpb.SetNodeLivenessRequest{
Target: target,
NodeEpoch: epoch,
})
if err := c.mc.SendCommand(msg); err != nil {
log.Warn("send set node liveness command failed",
zap.Stringer("nodeID", nodeID),
zap.String("target", target.String()),
zap.Error(err))
return
}
log.Info("send set node liveness command",
zap.Stringer("nodeID", nodeID),
zap.String("target", target.String()),
zap.Uint64("epoch", epoch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Drain commands will be silently rejected when the node epoch is unknown.

When livenessView has no epoch for the target node (Line 214, ok is false), epoch defaults to 0. On the receiving side (maintainer_manager.go Line 408), the node compares req.NodeEpoch != m.nodeEpoch — and since newNodeEpoch() guarantees a non-zero value, the request is always rejected with an epoch mismatch log.

This means drain commands sent before the coordinator observes any heartbeat from the target node will be silently ineffective. The resend logic will eventually retry after a heartbeat arrives and populates the epoch, so it's self-healing, but it could delay drain initiation unexpectedly.

Consider either:

  1. Logging a warning when epoch is not found so operators can diagnose delays, or
  2. Treating epoch=0 as "match any" on the receiver side for the initial transition.
#!/bin/bash
# Verify how livenessView.GetNodeEpoch is populated — check if heartbeat observation stores the epoch
ast-grep --pattern $'func ($_ *View) GetNodeEpoch($_, $_) ($_, $_) {
  $$$
}'

# Also check where ObserveHeartbeat feeds into the view
rg -n "ObserveHeartbeat\|UpdateNodeEpoch\|SetNodeEpoch\|nodeEpoch" --type go -g '*/nodeliveness/*'
🤖 Prompt for AI Agents
In `@coordinator/drain/controller.go` around lines 210 - 233, sendSetNodeLiveness
currently sends epoch=0 when livenessView.GetNodeEpoch(nodeID) returns false,
causing the maintainer (maintainer_manager.go) to reject the request as an epoch
mismatch; update Controller.sendSetNodeLiveness to detect the missing epoch (ok
== false) and log a clear warning (include nodeID and that node epoch is
unknown) before sending the command so operators can diagnose why the drain may
be delayed; keep the message creation and send flow unchanged (still use epoch
variable) so behavior is unchanged but observable.

Comment on lines +61 to +87
func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) {
if hb == nil {
return
}

v.mu.Lock()
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: hb.NodeEpoch,
liveness: hb.Liveness,
}
v.mu.Unlock()
}

func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) {
if resp == nil {
return
}

v.mu.Lock()
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: resp.NodeEpoch,
liveness: resp.Applied,
}
v.mu.Unlock()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unconditional record overwrite may regress liveness state on out-of-order messages.

Both ObserveHeartbeat and ObserveSetNodeLivenessResponse unconditionally overwrite the record. If a delayed heartbeat (reporting ALIVE) arrives after a SetNodeLivenessResponse (applying DRAINING), the view would briefly revert to ALIVE, which could cause the scheduler to route work to a draining node.

Since liveness transitions are monotonic (ALIVE→DRAINING→STOPPING), consider rejecting updates that would move backward:

Sketch
 func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) {
 	if hb == nil {
 		return
 	}
 	v.mu.Lock()
+	if existing, ok := v.nodes[nodeID]; ok {
+		if hb.NodeEpoch == existing.nodeEpoch && hb.Liveness < existing.liveness {
+			v.mu.Unlock()
+			return
+		}
+	}
 	v.nodes[nodeID] = record{
 		lastSeen:  now,
 		nodeEpoch: hb.NodeEpoch,
 		liveness:  hb.Liveness,
 	}
 	v.mu.Unlock()
 }

This would also need analogous handling in ObserveSetNodeLivenessResponse. A new epoch (node restart) should still be allowed to reset to ALIVE.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) {
if hb == nil {
return
}
v.mu.Lock()
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: hb.NodeEpoch,
liveness: hb.Liveness,
}
v.mu.Unlock()
}
func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) {
if resp == nil {
return
}
v.mu.Lock()
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: resp.NodeEpoch,
liveness: resp.Applied,
}
v.mu.Unlock()
}
func (v *View) ObserveHeartbeat(nodeID node.ID, hb *heartbeatpb.NodeHeartbeat, now time.Time) {
if hb == nil {
return
}
v.mu.Lock()
if existing, ok := v.nodes[nodeID]; ok {
if hb.NodeEpoch == existing.nodeEpoch && hb.Liveness < existing.liveness {
v.mu.Unlock()
return
}
}
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: hb.NodeEpoch,
liveness: hb.Liveness,
}
v.mu.Unlock()
}
func (v *View) ObserveSetNodeLivenessResponse(nodeID node.ID, resp *heartbeatpb.SetNodeLivenessResponse, now time.Time) {
if resp == nil {
return
}
v.mu.Lock()
v.nodes[nodeID] = record{
lastSeen: now,
nodeEpoch: resp.NodeEpoch,
liveness: resp.Applied,
}
v.mu.Unlock()
}
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 61 - 87, ObserveHeartbeat and
ObserveSetNodeLivenessResponse unconditionally overwrite the node record,
allowing out-of-order messages to regress liveness; fix by making updates
monotonic per node epoch: in both functions load existing record from
v.nodes[nodeID], if resp.NodeEpoch or hb.NodeEpoch is greater than the stored
nodeEpoch accept and replace (new epoch resets to ALIVE), if equal only accept
the update when the new liveness is the same or a forward transition
(ALIVE→DRAINING→STOPPING) and update lastSeen appropriately, and if the new
epoch is older or the liveness would move backward reject the update; use the
record struct fields nodeEpoch and liveness to perform these checks inside
ObserveHeartbeat and ObserveSetNodeLivenessResponse while holding v.mu.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@coordinator/nodeliveness/view.go`:
- Around line 111-134: GetState currently calls time.Now() internally which
prevents deterministic tests and is inconsistent with
GetDrainingOrStoppingNodes; change View.GetState to accept a now time.Time
parameter (func (v *View) GetState(nodeID node.ID, now time.Time) State), update
its internal uses of now (replace the time.Now() call), and then update all
callers (notably IsSchedulableDest) to pass through a now value (e.g., take now
as a parameter or forward the now they already have) so TTL comparisons use the
same time source as GetDrainingOrStoppingNodes.
🧹 Nitpick comments (6)
coordinator/scheduler/drain.go (2)

105-118: Round-robin cursor advances on every outer-loop iteration, not per Execute call.

s.rrCursor is incremented at line 114 inside the for scheduled < availableSize loop, meaning it advances multiple times per Execute call (once per full sweep of draining nodes). This provides fairness across multiple batches within a single call, but is worth confirming this is the intended behavior — the comment on line 41 says "rotates the starting draining node" which could imply once-per-call advancement.


159-180: chooseLeastLoadedDest returns a false positive when all candidates equal origin and node.ID zero value is "".

If every candidate in destCandidates matches origin, chosen remains the zero-value empty string and the guard at line 176 (chosen.IsEmpty()) catches it. This works, but only because node.ID("") is never a legitimate node ID. The logic is correct but brittle — consider returning false explicitly when minSize == math.MaxInt for clarity.

coordinator/drain/controller.go (2)

130-139: GetStatus acquires an exclusive lock for a read-only operation.

Use RLock/RUnlock here to allow concurrent readers.

Suggested fix
 func (c *Controller) GetStatus(nodeID node.ID) (drainRequested, drainingObserved, stoppingObserved bool) {
-	c.mu.Lock()
-	defer c.mu.Unlock()
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 
 	st, ok := c.nodes[nodeID]

144-162: TOCTOU between GetStatus and trySendDrainCommand/trySendStopCommand — benign but worth noting.

AdvanceLiveness reads state via GetStatus (acquires and releases lock), then conditionally calls send methods (which re-acquire the lock). Between these calls another goroutine could change the state. In practice this is harmless — the worst case is an extra throttled command or a skipped iteration that self-heals on the next cycle. No action needed now, but if the controller becomes multi-goroutine-hot, consolidating the check-and-send under a single lock hold would eliminate the gap.

coordinator/drain/controller_test.go (2)

38-45: Blocking channel reads will hang the test forever if no message is sent.

Lines 39, 60, and 71 use <-mc.GetMessageChannel() which blocks indefinitely. If a code change causes the message not to be sent, the test hangs rather than failing with a useful message.

Consider using a helper with a timeout:

Suggested approach
func receiveOrFail(t *testing.T, ch <-chan *messaging.TargetMessage) *messaging.TargetMessage {
	t.Helper()
	select {
	case msg := <-ch:
		return msg
	case <-time.After(3 * time.Second):
		require.FailNow(t, "timed out waiting for message")
		return nil
	}
}

Also applies to: 59-62, 70-73


26-74: Test coverage is thin — consider adding cases for key scenarios.

The single test covers the happy path well. Missing scenarios that would strengthen confidence:

  • Unknown epoch: RequestDrain before any heartbeat observed (epoch=0 path).
  • Multiple draining nodes: verifying fair round-robin or independent progression.
  • readyToStop=false: AdvanceLiveness should not send STOPPING when the callback returns false.
  • Idempotent RequestDrain: calling it twice should not double-send.

Comment on lines +111 to +134
func (v *View) GetState(nodeID node.ID) State {
v.mu.RLock()
r, ok := v.nodes[nodeID]
v.mu.RUnlock()

now := time.Now()
if !ok {
// Never observed: keep compatibility during rollout.
return StateAlive
}
if now.Sub(r.lastSeen) > v.ttl {
return StateUnknown
}
switch r.liveness {
case heartbeatpb.NodeLiveness_ALIVE:
return StateAlive
case heartbeatpb.NodeLiveness_DRAINING:
return StateDraining
case heartbeatpb.NodeLiveness_STOPPING:
return StateStopping
default:
return StateAlive
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

GetState uses time.Now() internally, making it untestable and inconsistent with GetDrainingOrStoppingNodes.

GetDrainingOrStoppingNodes accepts a now time.Time parameter, but GetState (and therefore IsSchedulableDest) hardcodes time.Now(). This makes TTL-dependent behavior impossible to test deterministically and creates an inconsistency in the API surface.

Suggested fix
-func (v *View) GetState(nodeID node.ID) State {
+func (v *View) GetState(nodeID node.ID, now time.Time) State {
 	v.mu.RLock()
 	r, ok := v.nodes[nodeID]
 	v.mu.RUnlock()
 
-	now := time.Now()
 	if !ok {
 		return StateAlive
 	}

Then update IsSchedulableDest similarly:

-func (v *View) IsSchedulableDest(nodeID node.ID) bool {
-	return v.GetState(nodeID) == StateAlive
+func (v *View) IsSchedulableDest(nodeID node.ID, now time.Time) bool {
+	return v.GetState(nodeID, now) == StateAlive
 }
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 111 - 134, GetState currently
calls time.Now() internally which prevents deterministic tests and is
inconsistent with GetDrainingOrStoppingNodes; change View.GetState to accept a
now time.Time parameter (func (v *View) GetState(nodeID node.ID, now time.Time)
State), update its internal uses of now (replace the time.Now() call), and then
update all callers (notably IsSchedulableDest) to pass through a now value
(e.g., take now as a parameter or forward the now they already have) so TTL
comparisons use the same time source as GetDrainingOrStoppingNodes.

@hongyunyan
Copy link
Collaborator Author

/gemini review

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive node draining mechanism, a significant feature for improving cluster manageability. However, a critical security concern has been identified: the new drainCapture API endpoint in api/v1/api.go is exposed without authentication, which could allow unauthenticated users to disrupt the cluster. Additionally, a potential issue regarding error handling during log coordinator resignation should be addressed.

// Usage:
// curl -X PUT http://127.0.0.1:8300/api/v1/captures/drain
// TODO: Implement this API in the future, currently it is a no-op.
func (o *OpenAPIV1) drainCapture(c *gin.Context) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The drainCapture function implements a sensitive cluster management operation (draining a capture), but the pull request does not ensure that the corresponding route registration is protected by authentication middleware. In RegisterOpenAPIV1Routes, the PUT /drain route is registered without authenticateMiddleware, unlike other sensitive operations in the same file (e.g., createChangefeed, updateChangefeed). This could allow unauthenticated users to disrupt the cluster by triggering node drains. It is recommended to add authenticateMiddleware to the route registration for drainCapture in RegisterOpenAPIV1Routes.

Comment on lines 289 to 299
if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
log.Info("log coordinator resign failed",
log.Info("log coordinator resign timeout",
zap.String("nodeID", nodeID), zap.Error(resignErr))
cancel()
return errors.Trace(resignErr)
}
log.Warn("log coordinator resign timeout",
log.Warn("log coordinator resign failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
}
cancel()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This function currently swallows non-timeout errors from e.logElection.Resign. If Resign fails for a reason other than a timeout, the error is logged as a warning, but the function returns nil. This could mislead the caller into believing the resignation was successful when it wasn't, potentially leading to an inconsistent state where the node is still the log coordinator leader in etcd.

The error should be returned to the caller to handle it properly. Using defer cancel() would also simplify the context cancellation logic.

Suggested change
if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
log.Info("log coordinator resign failed",
log.Info("log coordinator resign timeout",
zap.String("nodeID", nodeID), zap.Error(resignErr))
cancel()
return errors.Trace(resignErr)
}
log.Warn("log coordinator resign timeout",
log.Warn("log coordinator resign failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
}
cancel()
defer cancel()
if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
log.Info("log coordinator resign timeout",
zap.String("nodeID", nodeID), zap.Error(resignErr))
} else {
log.Warn("log coordinator resign failed",
zap.String("nodeID", nodeID), zap.Error(resignErr))
}
return errors.Trace(resignErr)
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
server/module_election.go (2)

116-123: ⚠️ Potential issue | 🔴 Critical

Bug: errors.Trace(err) returns nil instead of the resign error.

At line 121, err is the result of Campaign (line 95), which is nil at this point (the campaign succeeded for execution to reach this branch). This swallows the resign failure. Same pattern as the known bug on line 239.

Proposed fix
 		if resignErr := e.resign(ctx); resignErr != nil {
 			log.Warn("resign coordinator actively failed", zap.String("nodeID", nodeID), zap.Error(resignErr))
-			return errors.Trace(err)
+			return errors.Trace(resignErr)
 		}

282-301: ⚠️ Potential issue | 🟠 Major

resignLogCoordinator silently swallows non-timeout resign errors.

When Resign fails with an error other than context.DeadlineExceeded, line 296 logs a warning but line 300 returns nil. This means a genuine resign failure (e.g., network error) is silently ignored and the caller assumes success.

Compare with the coordinator's resign path (line 155-161) which propagates the error. Consider returning the error here as well.

Proposed fix
 	if resignErr := e.logElection.Resign(resignCtx); resignErr != nil {
 		if errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) {
 			log.Info("log coordinator resign timeout",
 				zap.String("nodeID", nodeID), zap.Error(resignErr))
 			cancel()
 			return errors.Trace(resignErr)
 		}
 		log.Warn("log coordinator resign failed",
 			zap.String("nodeID", nodeID), zap.Error(resignErr))
+		cancel()
+		return errors.Trace(resignErr)
 	}
 	cancel()
 	return nil
🤖 Fix all issues with AI agents
In `@coordinator/drain/controller.go`:
- Around line 12-13: Add a single blank line between the file license header and
the package declaration to match project formatting; update the top of
controller.go so the license block is followed by one empty line before the
"package drain" statement (the package declaration symbol is package drain).

In `@coordinator/nodeliveness/view_test.go`:
- Around line 12-13: Add a blank line between the license header and the package
declaration in this file so it matches the project's formatting (same change as
in view.go); specifically, insert an empty line above the "package nodeliveness"
declaration to separate the header from the package statement.

In `@coordinator/nodeliveness/view.go`:
- Around line 12-13: Add a single blank line between the file's license header
and the package declaration to match the repository style and avoid
gofmt/gofumpt or copyright-check lint failures; specifically insert one empty
line immediately above the existing "package nodeliveness" declaration so the
license block is separated from the package line.
🧹 Nitpick comments (4)
coordinator/nodeliveness/view_test.go (1)

24-43: Test relies on wall-clock time for TTL assertions, making it timing-sensitive.

GetState calls time.Now() internally. The test works because the observation timestamps are set far enough in the past, but on a very slow CI runner or with clock skew, the 5-second-from-now observation at line 35 could flicker. If GetState is refactored to accept now time.Time (as suggested in view.go), these tests become fully deterministic.

Current approach is acceptable for now given the generous margins (5s vs 30s TTL, 35s vs 30s TTL).

maintainer/maintainer_manager.go (2)

404-437: Potential TOCTOU between m.liveness.Load() and m.liveness.Store(target).

Lines 423-425 load current, compare target > current, then call Store(target). Between the load and the store, another goroutine (e.g., the server's signal handler calling Store for stopping) could advance the liveness. The Store CAS loop handles this correctly (it would fail and retry or return false), so the state is safe. However, line 436 sends a response with the current local variable which may be stale if Store returned false due to a concurrent advance.

This is a minor concern since Store returning false means current on line 431 is not updated, and the response on line 436 would report the pre-race value rather than the actual current state. This could cause the coordinator to have a momentarily stale view, which is self-correcting via the next heartbeat.


453-477: fromNodeLivenessPB and toNodeLivenessPB don't use the receiver — consider making them package-level functions.

These methods don't access any Manager state. Making them free functions (or at minimum unexported package-level functions) would better convey their stateless nature and allow reuse without a Manager instance.

Proposed change
-func (m *Manager) fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness {
+func fromNodeLivenessPB(l heartbeatpb.NodeLiveness) api.Liveness {
-func (m *Manager) toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness {
+func toNodeLivenessPB(l api.Liveness) heartbeatpb.NodeLiveness {

Then update callers (e.g., lines 211, 422, 441) to drop the m. prefix.

coordinator/drain/controller.go (1)

141-162: AdvanceLiveness re-acquires the lock per-node via GetStatus after snapshotting the list — consider whether a single-pass under one lock would be simpler.

listDrainRequestedNodeIDs acquires and releases the lock, then the loop calls GetStatus (lock again) for each node, then trySendDrainCommand/trySendStopCommand (lock again in checkAndMarkCommandSend). This is correct and avoids holding the lock during I/O, but the triple lock acquisition per node adds overhead. Given the expected low number of draining nodes, this is fine.

Comment on lines +12 to +13
// limitations under the License.
package drain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing blank line between license header and package declaration.

Same formatting issue as the other new files.

🤖 Prompt for AI Agents
In `@coordinator/drain/controller.go` around lines 12 - 13, Add a single blank
line between the file license header and the package declaration to match
project formatting; update the top of controller.go so the license block is
followed by one empty line before the "package drain" statement (the package
declaration symbol is package drain).

Comment on lines +12 to +13
// limitations under the License.
package nodeliveness
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing blank line between license header and package declaration.

Same as view.go — add a blank line for consistency with the rest of the codebase.

🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view_test.go` around lines 12 - 13, Add a blank line
between the license header and the package declaration in this file so it
matches the project's formatting (same change as in view.go); specifically,
insert an empty line above the "package nodeliveness" declaration to separate
the header from the package statement.

Comment on lines +12 to +13
// limitations under the License.
package nodeliveness
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing blank line between license header and package declaration.

Most Go files in this repo separate the license block from the package line with a blank line. This may also trigger gofmt/gofumpt or copyright-check lint failures.

Proposed fix
 // limitations under the License.
+
 package nodeliveness
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// limitations under the License.
package nodeliveness
// limitations under the License.
package nodeliveness
🤖 Prompt for AI Agents
In `@coordinator/nodeliveness/view.go` around lines 12 - 13, Add a single blank
line between the file's license header and the package declaration to match the
repository style and avoid gofmt/gofumpt or copyright-check lint failures;
specifically insert one empty line immediately above the existing "package
nodeliveness" declaration so the license block is separated from the package
line.

@hongyunyan
Copy link
Collaborator Author

/test mysql

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 13, 2026

@hongyunyan: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-mysql-integration-light 9582e10 link true /test pull-cdc-mysql-integration-light

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
maintainer/node_liveness_test.go (1)

1-13: Copyright header is present.

The previously flagged missing copyright header has been added.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/node_liveness_test.go` around lines 1 - 13, The review notes that
the copyright header was added to the maintainer package but the reviewer left a
duplicate comment; resolve this by removing or marking as outdated the redundant
review comment (the duplicate on maintainer/node_liveness_test.go) so only the
single confirmation of the added header remains, and then update the PR status
or reply to the reviewer indicating the duplicate has been cleared.
coordinator/controller_drain_test.go (1)

1-13: Copyright header present — past issue resolved.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@coordinator/controller_drain_test.go` around lines 1 - 13, The file contains
a duplicated copyright header comment at the top of controller_drain_test.go;
remove the extra header block so only one standard Apache 2.0 license header
remains immediately above the package declaration ("package coordinator"),
ensuring there are no repeated comment blocks before the package line.
🧹 Nitpick comments (3)
maintainer/node_liveness_test.go (2)

45-45: Blocking channel reads with no timeout can cause the test suite to hang.

Both tests use bare <-mc.GetMessageChannel() reads. If onSetNodeLivenessRequest ever sends via a goroutine (or a code path changes to be async), these reads block indefinitely — no test-level timeout guard is in place. A common Go pattern is to wrap the read in a select with a time.After (or require.Eventually) so the test fails fast with a useful message.

♻️ Proposed helper — fail fast instead of hanging
// recvMsg reads one message from mc with a short deadline.
func recvMsg(t *testing.T, mc *messaging.MockMessageCenter) *messaging.TargetMessage {
	t.Helper()
	select {
	case msg := <-mc.GetMessageChannel():
		return msg
	case <-time.After(2 * time.Second):
		t.Fatal("timed out waiting for message")
		return nil
	}
}

Then replace every <-mc.GetMessageChannel() call with recvMsg(t, mc).

Also applies to: 72-73

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/node_liveness_test.go` at line 45, The tests block on bare reads
from mc.GetMessageChannel() which can hang; add a short-deadline helper (e.g.
recvMsg(t *testing.T, mc *messaging.MockMessageCenter)) that selects on
mc.GetMessageChannel() and time.After(2s) and calls t.Fatal on timeout, then
replace each direct `<-mc.GetMessageChannel()` use (tests that exercise
onSetNodeLivenessRequest and similar paths) with recvMsg(t, mc) so tests fail
fast instead of hanging.

27-34: Duplicate test setup — extract a helper.

Both tests repeat the same six-line setup block (mock center, SetService, NewMaintainerManager, coordinator fields). A shared newTestMaintainerManager helper would reduce noise and make future changes (e.g., adding a new required field) a single-line fix.

♻️ Proposed helper
func newTestMaintainerManager(t *testing.T, mc *messaging.MockMessageCenter, liveness *api.Liveness) *Manager {
	t.Helper()
	appcontext.SetService(appcontext.MessageCenter, mc)
	m := NewMaintainerManager(&node.Info{ID: node.ID("n1")}, &config.SchedulerConfig{}, liveness)
	m.coordinatorID = node.ID("coordinator")
	m.coordinatorVersion = 1
	return m
}

Also applies to: 53-60

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/node_liveness_test.go` around lines 27 - 34, Extract the repeated
test setup into a helper to reduce duplication: create a helper function (e.g.,
newTestMaintainerManager) that takes testing.T, the mock message center from
messaging.NewMockMessageCenter and a *api.Liveness, calls t.Helper(), sets
appcontext.SetService(appcontext.MessageCenter, mc), constructs the manager with
NewMaintainerManager(&node.Info{ID: node.ID("n1")}, &config.SchedulerConfig{},
liveness), sets m.coordinatorID = node.ID("coordinator") and
m.coordinatorVersion = 1, and returns *Manager; then replace the six-line setup
in TestSetNodeLivenessRejectEpochMismatch and the other test (lines 53–60) with
calls to this helper.
coordinator/controller_drain_test.go (1)

32-59: Consider isolating global appcontext side-effects.

appcontext.SetService writes to a package-level singleton. While these tests don't call t.Parallel(), any existing or future parallel test in the coordinator package that also touches appcontext.MessageCenter or watcher.NodeManagerName will race against this setup.

♻️ Suggested mitigation

Add a t.Cleanup to restore or reset the relevant appcontext entries after each test, or use a per-test context scope if the framework supports it:

 appcontext.SetService(appcontext.MessageCenter, mc)
+t.Cleanup(func() { appcontext.SetService(appcontext.MessageCenter, nil) })

 appcontext.SetService(watcher.NodeManagerName, nodeManager)
+t.Cleanup(func() { appcontext.SetService(watcher.NodeManagerName, nil) })
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@coordinator/controller_drain_test.go` around lines 32 - 59, The helper
newDrainTestController mutates global appcontext singleton via
appcontext.SetService for appcontext.MessageCenter and watcher.NodeManagerName
causing potential race in parallel tests; modify newDrainTestController to
capture the previous values (or presence) for these keys, call
appcontext.SetService as before, and register a t.Cleanup that restores the
originals (or clears the entries) after the test completes so the global
appcontext is reset; reference the newDrainTestController function and the
SetService calls for appcontext.MessageCenter and watcher.NodeManagerName and
use t.Cleanup to perform the restoration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@coordinator/controller_drain_test.go`:
- Line 49: The test mutates the map returned by GetAliveNodes()
(nodeManager.GetAliveNodes()[target] = &node.Info{ID: target}), which violates
the API contract; replace this direct map mutation by using NodeManager's
provided setup/helper to register the node (e.g., call the NodeManager test
helper or API that adds/registers an alive node instead of writing into
GetAliveNodes()), creating a node.Info with ID == target and registering it via
the NodeManager instance rather than assigning into the returned map.

In `@maintainer/node_liveness_test.go`:
- Around line 71-77: The test verifies message types but omits validating the
SetNodeLivenessResponse payload; after reading the two messages from
mc.GetMessageChannel() (variables first/second) locate the message whose Type ==
messaging.TypeSetNodeLivenessResponse, decode/cast its Payload to
api.SetNodeLivenessResponse, and assert response.Applied ==
api.LivenessCaptureDraining (or the expected enum value) and response.NodeEpoch
equals the expected epoch value used in the test (matching the epoch sent); use
require.Equal (or require.True) to fail the test on mismatch so a wrong Applied
or stale NodeEpoch is caught.

---

Duplicate comments:
In `@coordinator/controller_drain_test.go`:
- Around line 1-13: The file contains a duplicated copyright header comment at
the top of controller_drain_test.go; remove the extra header block so only one
standard Apache 2.0 license header remains immediately above the package
declaration ("package coordinator"), ensuring there are no repeated comment
blocks before the package line.

In `@maintainer/node_liveness_test.go`:
- Around line 1-13: The review notes that the copyright header was added to the
maintainer package but the reviewer left a duplicate comment; resolve this by
removing or marking as outdated the redundant review comment (the duplicate on
maintainer/node_liveness_test.go) so only the single confirmation of the added
header remains, and then update the PR status or reply to the reviewer
indicating the duplicate has been cleared.

---

Nitpick comments:
In `@coordinator/controller_drain_test.go`:
- Around line 32-59: The helper newDrainTestController mutates global appcontext
singleton via appcontext.SetService for appcontext.MessageCenter and
watcher.NodeManagerName causing potential race in parallel tests; modify
newDrainTestController to capture the previous values (or presence) for these
keys, call appcontext.SetService as before, and register a t.Cleanup that
restores the originals (or clears the entries) after the test completes so the
global appcontext is reset; reference the newDrainTestController function and
the SetService calls for appcontext.MessageCenter and watcher.NodeManagerName
and use t.Cleanup to perform the restoration.

In `@maintainer/node_liveness_test.go`:
- Line 45: The tests block on bare reads from mc.GetMessageChannel() which can
hang; add a short-deadline helper (e.g. recvMsg(t *testing.T, mc
*messaging.MockMessageCenter)) that selects on mc.GetMessageChannel() and
time.After(2s) and calls t.Fatal on timeout, then replace each direct
`<-mc.GetMessageChannel()` use (tests that exercise onSetNodeLivenessRequest and
similar paths) with recvMsg(t, mc) so tests fail fast instead of hanging.
- Around line 27-34: Extract the repeated test setup into a helper to reduce
duplication: create a helper function (e.g., newTestMaintainerManager) that
takes testing.T, the mock message center from messaging.NewMockMessageCenter and
a *api.Liveness, calls t.Helper(), sets
appcontext.SetService(appcontext.MessageCenter, mc), constructs the manager with
NewMaintainerManager(&node.Info{ID: node.ID("n1")}, &config.SchedulerConfig{},
liveness), sets m.coordinatorID = node.ID("coordinator") and
m.coordinatorVersion = 1, and returns *Manager; then replace the six-line setup
in TestSetNodeLivenessRejectEpochMismatch and the other test (lines 53–60) with
calls to this helper.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f6fcbe3 and 6e90fde.

📒 Files selected for processing (3)
  • coordinator/controller_drain_test.go
  • maintainer/node_liveness_test.go
  • server/module_election.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • server/module_election.go

oc := operator.NewOperatorController(selfNode, db, nil, 10)

target := node.ID("target")
nodeManager.GetAliveNodes()[target] = &node.Info{ID: target}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find GetAliveNodes implementation
rg "func.*GetAliveNodes" --type=go -B 2 -A 10

Repository: pingcap/ticdc

Length of output: 908


🏁 Script executed:

#!/bin/bash
# Also search for the NodeManager type definition to understand the data structure
rg "type NodeManager" --type=go -A 20

Repository: pingcap/ticdc

Length of output: 1301


🏁 Script executed:

#!/bin/bash
# Check the full context of the test file around line 49
sed -n '40,60p' coordinator/controller_drain_test.go

Repository: pingcap/ticdc

Length of output: 619


Do not mutate the map returned by GetAliveNodes(); it violates the API contract.

The API documentation explicitly states "the caller mustn't modify the returned map". Instead of mutating the returned map at line 49, use a proper test helper method or setup approach provided by NodeManager to register the target node.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@coordinator/controller_drain_test.go` at line 49, The test mutates the map
returned by GetAliveNodes() (nodeManager.GetAliveNodes()[target] =
&node.Info{ID: target}), which violates the API contract; replace this direct
map mutation by using NodeManager's provided setup/helper to register the node
(e.g., call the NodeManager test helper or API that adds/registers an alive node
instead of writing into GetAliveNodes()), creating a node.Info with ID == target
and registering it via the NodeManager instance rather than assigning into the
returned map.

Comment on lines +71 to +77
// Successful transition sends both a node heartbeat and a response.
first := <-mc.GetMessageChannel()
second := <-mc.GetMessageChannel()

require.ElementsMatch(t, []messaging.IOType{messaging.TypeNodeHeartbeatRequest, messaging.TypeSetNodeLivenessResponse},
[]messaging.IOType{first.Type, second.Type})
require.Equal(t, api.LivenessCaptureDraining, liveness.Load())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

TestSetNodeLivenessApplyTransition is missing response-content assertions.

The test verifies the two message types via ElementsMatch but never extracts or inspects the SetNodeLivenessResponse payload — specifically Applied and NodeEpoch. This is inconsistent with TestSetNodeLivenessRejectEpochMismatch, which validates both fields. A successful transition that sends the wrong Applied value (e.g., still ALIVE) or a stale epoch would go undetected.

🛠️ Proposed fix — assert response payload after the type check
 require.ElementsMatch(t, []messaging.IOType{messaging.TypeNodeHeartbeatRequest, messaging.TypeSetNodeLivenessResponse},
 	[]messaging.IOType{first.Type, second.Type})
+
+// Extract the response (order not guaranteed, so find it explicitly).
+var liveResp *heartbeatpb.SetNodeLivenessResponse
+for _, m := range []*messaging.TargetMessage{first, second} {
+	if m.Type == messaging.TypeSetNodeLivenessResponse {
+		liveResp = m.Message[0].(*heartbeatpb.SetNodeLivenessResponse)
+	}
+}
+require.NotNil(t, liveResp)
+require.Equal(t, heartbeatpb.NodeLiveness_DRAINING, liveResp.Applied)
+require.Equal(t, m.nodeEpoch, liveResp.NodeEpoch)
+
 require.Equal(t, api.LivenessCaptureDraining, liveness.Load())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/node_liveness_test.go` around lines 71 - 77, The test verifies
message types but omits validating the SetNodeLivenessResponse payload; after
reading the two messages from mc.GetMessageChannel() (variables first/second)
locate the message whose Type == messaging.TypeSetNodeLivenessResponse,
decode/cast its Payload to api.SetNodeLivenessResponse, and assert
response.Applied == api.LivenessCaptureDraining (or the expected enum value) and
response.NodeEpoch equals the expected epoch value used in the test (matching
the epoch sent); use require.Equal (or require.True) to fail the test on
mismatch so a wrong Applied or stale NodeEpoch is caught.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 3, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant