Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"io"
"net/http"
"strconv"
"sync/atomic"

"github.com/gin-gonic/gin"
"github.com/pingcap/log"
Expand Down Expand Up @@ -189,28 +188,41 @@ func (o *OpenAPIV1) rebalanceTables(c *gin.Context) {
c.Status(http.StatusAccepted)
}

// drainCapture drains all tables from a capture.
// drainCapture triggers node drain for the given capture ID.
// Usage:
// curl -X PUT http://127.0.0.1:8300/api/v1/captures/drain
// TODO: Implement this API in the future, currently it is a no-op.
// The response field name keeps API compatibility with old architecture and
// represents remaining drain work instead of a literal table count.
func (o *OpenAPIV1) drainCapture(c *gin.Context) {
var req drainCaptureRequest
if err := c.ShouldBindJSON(&req); err != nil {
_ = c.Error(errors.ErrAPIInvalidParam.Wrap(err))
return
}
drainCaptureCounter.Add(1)
if drainCaptureCounter.Load()%10 == 0 {
log.Info("api v1 drainCapture", zap.Any("captureID", req.CaptureID), zap.Int64("currentTableCount", drainCaptureCounter.Load()))
c.JSON(http.StatusAccepted, &drainCaptureResp{
CurrentTableCount: 10,
})
} else {
log.Info("api v1 drainCapture done", zap.Any("captureID", req.CaptureID), zap.Int64("currentTableCount", drainCaptureCounter.Load()))
c.JSON(http.StatusAccepted, &drainCaptureResp{
CurrentTableCount: 0,
})
if req.CaptureID == "" {
_ = c.Error(errors.ErrAPIInvalidParam.GenWithStackByArgs("capture_id is required"))
return
}

co, err := o.server.GetCoordinator()
if err != nil {
_ = c.Error(err)
return
}
if co == nil || !co.Initialized() {
_ = c.Error(errors.New("coordinator is not fully initialized, wait a moment"))
return
}

remaining, err := co.DrainNode(c.Request.Context(), req.CaptureID)
if err != nil {
_ = c.Error(err)
return
}
log.Info("api v1 drainCapture",
zap.String("captureID", req.CaptureID),
zap.Int("currentTableCount", remaining))
c.JSON(http.StatusAccepted, &drainCaptureResp{CurrentTableCount: remaining})
}

func getV2ChangefeedConfig(changefeedConfig changefeedConfig) *v2.ChangefeedConfig {
Expand Down Expand Up @@ -258,8 +270,6 @@ type drainCaptureRequest struct {
CaptureID string `json:"capture_id"`
}

var drainCaptureCounter atomic.Int64

// drainCaptureResp is response for manual `DrainCapture`
type drainCaptureResp struct {
CurrentTableCount int `json:"current_table_count"`
Expand Down
55 changes: 53 additions & 2 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Controller struct {
pdClock pdutil.Clock
scheduler *scheduler.Controller
operatorController *operator.Controller
drainController *drainController
changefeedDB *changefeed.ChangefeedDB
backend changefeed.Backend
eventCh *chann.DrainableChann[*Event]
Expand Down Expand Up @@ -119,30 +120,47 @@ func NewController(
changefeedDB := changefeed.NewChangefeedDB(version)

oc := operator.NewOperatorController(selfNode, changefeedDB, backend, batchSize)
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
nodeLivenessView := newNodeLivenessView(nodeHeartbeatTTL)
messageCenter := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)
drainCtl := newDrainController(selfNode.ID, nodeLivenessView, nodeManager, messageCenter, oc, changefeedDB)
c := &Controller{
version: version,
selfNode: selfNode,
initialized: atomic.NewBool(false),
scheduler: scheduler.NewController(map[string]scheduler.Scheduler{
scheduler.DrainScheduler: coscheduler.NewDrainScheduler(
batchSize,
oc,
changefeedDB,
newControllerDrainView(drainCtl),
),
scheduler.BasicScheduler: coscheduler.NewBasicScheduler(
selfNode.ID.String(),
batchSize,
oc,
changefeedDB,
func() []node.ID {
return drainCtl.getSchedulableDestNodeIDs(nodeManager.GetAliveNodes(), time.Now())
},
),
scheduler.BalanceScheduler: coscheduler.NewBalanceScheduler(
selfNode.ID.String(),
batchSize,
oc,
changefeedDB,
balanceInterval,
func() map[node.ID]*node.Info {
return drainCtl.getSchedulableDestNodes(nodeManager.GetAliveNodes(), time.Now())
},
),
}),
eventCh: eventCh,
operatorController: oc,
messageCenter: appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter),
drainController: drainCtl,
messageCenter: messageCenter,
changefeedDB: changefeedDB,
nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName),
nodeManager: nodeManager,
taskScheduler: threadpool.NewThreadPoolDefault(),
backend: backend,
changefeedChangeCh: changefeedChangeCh,
Expand Down Expand Up @@ -254,6 +272,7 @@ func (c *Controller) checkOnNodeChanged(ctx context.Context) {
c.onNodeChanged(ctx)
c.nodeChanged.changed = false
}
c.handleDrainOnPeriodTask()
}

func (c *Controller) onPeriodTask() {
Expand All @@ -264,6 +283,20 @@ func (c *Controller) onPeriodTask() {
}
}

func (c *Controller) handleDrainOnPeriodTask() {
now := time.Now()
drainNodes := c.drainController.listDrainNodes()
for _, targetNode := range drainNodes {
if c.drainController.shouldSendDrainRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_DRAINING)
}
if c.drainController.canPromoteToStopping(targetNode, now) &&
c.drainController.shouldSendStopRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_STOPPING)
}
Comment on lines +290 to +296

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure time consistency and avoid redundant time.Now() calls, it's better to pass the now variable to sendSetNodeLivenessRequest. This will require changing the signature of sendSetNodeLivenessRequest in drain_controller.go as well, which I've commented on separately.

Suggested change
if c.drainController.shouldSendDrainRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_DRAINING)
}
if c.drainController.canPromoteToStopping(targetNode, now) &&
c.drainController.shouldSendStopRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_STOPPING)
}
if c.drainController.shouldSendDrainRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_DRAINING, now)
}
if c.drainController.canPromoteToStopping(targetNode, now) &&
c.drainController.shouldSendStopRequest(targetNode, now) {
c.drainController.sendSetNodeLivenessRequest(targetNode, heartbeatpb.NodeLiveness_STOPPING, now)
}

}
}

func (c *Controller) onMessage(ctx context.Context, msg *messaging.TargetMessage) {
switch msg.Type {
case messaging.TypeCoordinatorBootstrapResponse:
Expand All @@ -273,6 +306,16 @@ func (c *Controller) onMessage(ctx context.Context, msg *messaging.TargetMessage
req := msg.Message[0].(*heartbeatpb.MaintainerHeartbeat)
c.handleMaintainerStatus(msg.From, req.Statuses)
}
case messaging.TypeNodeHeartbeatRequest:
if c.bootstrapper.AllNodesReady() {
req := msg.Message[0].(*heartbeatpb.NodeHeartbeat)
c.drainController.handleNodeHeartbeat(msg.From, req, time.Now())
}
case messaging.TypeSetNodeLivenessResponse:
if c.bootstrapper.AllNodesReady() {
resp := msg.Message[0].(*heartbeatpb.SetNodeLivenessResponse)
c.drainController.handleSetNodeLivenessResponse(msg.From, resp)
}
case messaging.TypeLogCoordinatorResolvedTsResponse:
c.onLogCoordinatorReportResolvedTs(msg)
default:
Expand Down Expand Up @@ -871,6 +914,14 @@ func (c *Controller) calculateKeyspaceGCBarrier() map[common.KeyspaceMeta]uint64
return c.changefeedDB.CalculateKeyspaceGCBarrier()
}

func (c *Controller) RequestDrain(targetNode node.ID) {
c.drainController.requestDrain(targetNode)
}

func (c *Controller) DrainSummary(targetNode node.ID) drainSummary {
return c.drainController.summarizeDrain(targetNode, time.Now())
}

func shouldRunChangefeed(state config.FeedState) bool {
switch state {
case config.StateStopped, config.StateFailed, config.StateFinished:
Expand Down
22 changes: 22 additions & 0 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,28 @@ func (c *coordinator) RequestResolvedTsFromLogCoordinator(ctx context.Context, c
c.controller.RequestResolvedTsFromLogCoordinator(ctx, changefeedDisplayName)
}

func (c *coordinator) DrainNode(_ context.Context, targetNodeID string) (int, error) {
target := node.ID(targetNodeID)
if target.IsEmpty() {
return 0, errors.ErrAPIInvalidParam.GenWithStackByArgs("empty capture_id")
}
if c.nodeInfo != nil && c.nodeInfo.ID == target {
return 0, errors.ErrSchedulerRequestFailed.GenWithStackByArgs("can not drain coordinator node")
}

c.controller.RequestDrain(target)
summary := c.controller.DrainSummary(target)
log.Info("drain node requested",
zap.Stringer("targetNode", target),
zap.Int("remaining", summary.remaining),
zap.Int("maintainersOnTarget", summary.maintainersOnTarget),
zap.Int("inflightOperatorCount", summary.inflightOperatorCount),
zap.Bool("drainingObserved", summary.drainingObserved),
zap.Bool("stoppingObserved", summary.stoppingObserved),
zap.String("nodeLiveness", summary.nodeLiveness.String()))
return summary.remaining, nil
}

func (c *coordinator) sendMessages(msgs []*messaging.TargetMessage) {
for _, msg := range msgs {
err := c.mc.SendCommand(msg)
Expand Down
Loading