diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go index 2a01d78026..07dfd49d11 100644 --- a/logservice/logpuller/region_req_cache.go +++ b/logservice/logpuller/region_req_cache.go @@ -60,7 +60,11 @@ type requestCache struct { regionReqs map[SubscriptionID]map[uint64]regionReq } - // counter for sent but not initialized requests + // pendingCount is a flow control slot counter. + // A slot is acquired when a request is successfully enqueued into pendingQueue (see add), + // and is released when the request is finished/removed (resolve/markStopped/markDone/clear). + // pop and markSent don't change it. If markSent overwrites an existing request for the same region, + // it will release a slot for the replaced request to avoid leaking pendingCount. pendingCount atomic.Int64 // maximum number of pending requests allowed maxPendingCount int64 @@ -104,12 +108,10 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) ( case <-ctx.Done(): return false, ctx.Err() case c.pendingQueue <- req: - c.incPendingCount() + c.pendingCount.Inc() cost := time.Since(start) metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds()) return true, nil - case <-c.spaceAvailable: - continue case <-ticker.C: addReqRetryLimit-- if addReqRetryLimit <= 0 { @@ -135,7 +137,9 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) ( } } -// pop gets the next pending request, returns nil if queue is empty +// pop gets the next pending request. +// Note: it doesn't change pendingCount. The slot acquired in add() should be released later +// (e.g. resolve/markStopped/markDone). func (c *requestCache) pop(ctx context.Context) (regionReq, error) { select { case req := <-c.pendingQueue: @@ -145,7 +149,8 @@ func (c *requestCache) pop(ctx context.Context) (regionReq, error) { } } -// markSent marks a request as sent and adds it to sent requests +// markSent marks a request as sent and adds it to sent requests. +// It doesn't change pendingCount: the slot is released when the request is finished/removed. func (c *requestCache) markSent(req regionReq) { c.sentRequests.Lock() defer c.sentRequests.Unlock() @@ -157,10 +162,20 @@ func (c *requestCache) markSent(req regionReq) { c.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] = m } + if oldReq, exists := m[req.regionInfo.verID.GetID()]; exists { + log.Warn("region request overwritten", + zap.Uint64("subID", uint64(req.regionInfo.subscribedSpan.subID)), + zap.Uint64("regionID", req.regionInfo.verID.GetID()), + zap.Float64("oldAgeSec", time.Since(oldReq.createTime).Seconds()), + zap.Float64("newAgeSec", time.Since(req.createTime).Seconds()), + zap.Int("pendingCount", int(c.pendingCount.Load())), + zap.Int("pendingQueueLen", len(c.pendingQueue))) + c.markDone() + } m[req.regionInfo.verID.GetID()] = req } -// markStopped removes a sent request without changing pending count (for stopped regions) +// markStopped removes a sent request and releases a slot. func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) { c.sentRequests.Lock() defer c.sentRequests.Unlock() @@ -176,12 +191,7 @@ func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) { } delete(regionReqs, regionID) - c.decPendingCount() - // Notify waiting add operations that there's space available - select { - case c.spaceAvailable <- struct{}{}: - default: // If channel is full, skip notification - } + c.markDone() } // resolve marks a region as initialized and removes it from sent requests @@ -201,7 +211,7 @@ func (c *requestCache) resolve(subscriptionID SubscriptionID, regionID uint64) b // Check if the subscription ID matches if req.regionInfo.subscribedSpan.subID == subscriptionID { delete(regionReqs, regionID) - c.decPendingCount() + c.markDone() cost := time.Since(req.createTime).Seconds() if cost > 0 && cost < abnormalRequestDurationInSec { log.Debug("cdc resolve region request", zap.Uint64("subID", uint64(subscriptionID)), zap.Uint64("regionID", regionID), zap.Float64("cost", cost), zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("pendingQueueLen", len(c.pendingQueue))) @@ -209,11 +219,6 @@ func (c *requestCache) resolve(subscriptionID SubscriptionID, regionID uint64) b } else { log.Info("region request duration abnormal, skip metric", zap.Float64("cost", cost), zap.Uint64("regionID", regionID)) } - // Notify waiting add operations that there's space available - select { - case c.spaceAvailable <- struct{}{}: - default: // If channel is full, skip notification - } return true } @@ -235,8 +240,8 @@ func (c *requestCache) clearStaleRequest() { regionReq.regionInfo.subscribedSpan.stopped.Load() || regionReq.regionInfo.lockedRangeState.Initialized.Load() || regionReq.isStale() { - c.decPendingCount() - log.Info("region worker delete stale region request", + c.markDone() + log.Warn("region worker delete stale region request", zap.Uint64("subID", uint64(subID)), zap.Uint64("regionID", regionID), zap.Int("pendingCount", int(c.pendingCount.Load())), @@ -247,7 +252,7 @@ func (c *requestCache) clearStaleRequest() { zap.Time("createTime", regionReq.createTime)) delete(regionReqs, regionID) } else { - reqCount += 1 + reqCount++ } } if len(regionReqs) == 0 { @@ -255,9 +260,19 @@ func (c *requestCache) clearStaleRequest() { } } - if reqCount == 0 && c.pendingCount.Load() != 0 { - log.Info("region worker pending request count is not equal to actual region request count, correct it", zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("actualReqCount", reqCount)) + // If there are no in-cache region requests but pendingCount isn't 0, it means pendingCount is stale. + // Reset it to avoid blocking add() forever. + if reqCount == 0 && len(c.pendingQueue) == 0 && c.pendingCount.Load() != 0 { + log.Info("region worker pending request count is not equal to actual region request count, correct it", + zap.Int("pendingCount", int(c.pendingCount.Load())), + zap.Int("actualReqCount", reqCount), + zap.Int("pendingQueueLen", len(c.pendingQueue))) c.pendingCount.Store(0) + // Notify waiting add operations that there's space available. + select { + case c.spaceAvailable <- struct{}{}: + default: + } } c.lastCheckStaleRequestTime.Store(time.Now()) @@ -273,7 +288,7 @@ LOOP: select { case req := <-c.pendingQueue: regions = append(regions, req.regionInfo) - c.decPendingCount() + c.markDone() default: break LOOP } @@ -286,7 +301,7 @@ LOOP: for regionID := range regionReqs { regions = append(regions, regionReqs[regionID].regionInfo) delete(regionReqs, regionID) - c.decPendingCount() + c.markDone() } delete(c.sentRequests.regionReqs, subID) } @@ -298,17 +313,26 @@ func (c *requestCache) getPendingCount() int { return int(c.pendingCount.Load()) } -func (c *requestCache) incPendingCount() { - c.pendingCount.Inc() -} - -func (c *requestCache) decPendingCount() { - // Ensure pendingCount doesn't go below 0 - current := c.pendingCount.Load() - newCount := current - int64(1) - if newCount < 0 { - c.pendingCount.Store(0) - return +func (c *requestCache) markDone() { + // Decrement pendingCount by 1, but never let it go below 0. + // Do it with CAS to avoid clobbering concurrent Inc() calls. + for { + old := c.pendingCount.Load() + if old == 0 { + break + } else if old < 0 { + if c.pendingCount.CompareAndSwap(old, 0) { + break + } + } else { + if c.pendingCount.CompareAndSwap(old, old-1) { + break + } + } + } + // Notify waiting add operations that there's space available. + select { + case c.spaceAvailable <- struct{}{}: + default: // If channel is full, skip notification } - c.pendingCount.Dec() } diff --git a/logservice/logpuller/region_req_cache_test.go b/logservice/logpuller/region_req_cache_test.go index 76e725fa8e..e745f54e45 100644 --- a/logservice/logpuller/region_req_cache_test.go +++ b/logservice/logpuller/region_req_cache_test.go @@ -279,3 +279,35 @@ func TestRequestCacheAdd_WithStoppedRegion(t *testing.T) { // The stopped region should be cleaned up require.Equal(t, 0, cache.getPendingCount()) } + +func TestRequestCacheMarkSent_DuplicateReleaseSlot(t *testing.T) { + cache := newRequestCache(10) + ctx := context.Background() + + region := createTestRegionInfo(1, 1) + + ok, err := cache.add(ctx, region, false) + require.True(t, ok) + require.NoError(t, err) + + // Add a duplicate request for the same region. It should not leak pendingCount even if + // markSent overwrites the existing entry. + ok, err = cache.add(ctx, region, false) + require.True(t, ok) + require.NoError(t, err) + require.Equal(t, 2, cache.getPendingCount()) + + req1, err := cache.pop(ctx) + require.NoError(t, err) + cache.markSent(req1) + require.Equal(t, 2, cache.getPendingCount()) + + req2, err := cache.pop(ctx) + require.NoError(t, err) + cache.markSent(req2) + require.Equal(t, 1, cache.getPendingCount()) + + // Finish the remaining tracked request. + require.True(t, cache.resolve(region.subscribedSpan.subID, region.verID.GetID())) + require.Equal(t, 0, cache.getPendingCount()) +} diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 35d5037038..fafe7fcade 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -84,17 +84,17 @@ func newRegionRequestWorker( zap.String("addr", store.storeAddr)) } for { - region, err := worker.requestCache.pop(ctx) + req, err := worker.requestCache.pop(ctx) if err != nil { return err } - if !region.regionInfo.isStopped() { - worker.preFetchForConnecting = new(regionInfo) - *worker.preFetchForConnecting = region.regionInfo - return nil - } else { + if req.regionInfo.isStopped() { + worker.requestCache.markDone() continue } + worker.preFetchForConnecting = new(regionInfo) + *worker.preFetchForConnecting = req.regionInfo + return nil } } @@ -298,10 +298,27 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res zap.Any("regionIDs", resolvedTsEvent.Regions)) return } - resolvedStates := make([]*regionFeedState, 0, len(resolvedTsEvent.Regions)) + // Avoid allocating a huge states slice when resolvedTsEvent.Regions is large. + // Push resolved-ts events in batches to reduce peak memory usage and improve GC behavior. + const resolvedTsStateBatchSize = 1024 + resolvedStates := make([]*regionFeedState, 0, resolvedTsStateBatchSize) + flush := func() { + if len(resolvedStates) == 0 { + return + } + states := resolvedStates + s.client.pushRegionEventToDS(subscriptionID, regionEvent{ + resolvedTs: resolvedTsEvent.Ts, + states: states, + }) + resolvedStates = make([]*regionFeedState, 0, resolvedTsStateBatchSize) + } for _, regionID := range resolvedTsEvent.Regions { if state := s.getRegionState(subscriptionID, regionID); state != nil { resolvedStates = append(resolvedStates, state) + if len(resolvedStates) >= resolvedTsStateBatchSize { + flush() + } continue } log.Warn("region request worker receives a resolved ts event for an untracked region", @@ -310,13 +327,7 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res zap.Uint64("regionID", regionID), zap.Uint64("resolvedTs", resolvedTsEvent.Ts)) } - if len(resolvedStates) == 0 { - return - } - s.client.pushRegionEventToDS(subscriptionID, regionEvent{ - resolvedTs: resolvedTsEvent.Ts, - states: resolvedStates, - }) + flush() } // processRegionSendTask receives region requests from the channel and sends them to the remote store. @@ -374,6 +385,7 @@ func (s *regionRequestWorker) processRegionSendTask( }, FilterLoop: region.filterLoop, } + s.requestCache.markDone() if err := doSend(req); err != nil { return err } @@ -384,17 +396,18 @@ func (s *regionRequestWorker) processRegionSendTask( } s.client.pushRegionEventToDS(subID, regionEvent) } - } else if region.subscribedSpan.stopped.Load() { // It can be skipped directly because there must be no pending states from // the stopped subscribedTable, or the special singleRegionInfo for stopping // the table will be handled later. s.client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{})) + s.requestCache.markDone() } else { state := newRegionFeedState(region, uint64(subID), s) state.start() s.addRegionState(subID, region.verID.GetID(), state) if err := doSend(s.createRegionRequest(region)); err != nil { + s.requestCache.markDone() return err } s.requestCache.markSent(regionReq) @@ -485,6 +498,9 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo { region := *s.preFetchForConnecting s.preFetchForConnecting = nil regions = append(regions, region) + // The pre-fetched region was popped from pendingQueue but hasn't been marked as sent or done yet. + // Release its pendingCount slot to avoid leaking flow control credits on worker failures. + s.requestCache.markDone() } // Clear all regions from cache diff --git a/logservice/logpuller/region_request_worker_test.go b/logservice/logpuller/region_request_worker_test.go index dcaff299ac..a855e45299 100644 --- a/logservice/logpuller/region_request_worker_test.go +++ b/logservice/logpuller/region_request_worker_test.go @@ -14,6 +14,7 @@ package logpuller import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -38,3 +39,28 @@ func TestRegionStatesOperation(t *testing.T) { require.Nil(t, worker.getRegionState(1, 2)) require.Equal(t, 0, len(worker.requestedRegions.subscriptions)) } + +func TestClearPendingRegionsReleaseSlotForPreFetchedRegion(t *testing.T) { + worker := ®ionRequestWorker{ + requestCache: newRequestCache(10), + } + + ctx := context.Background() + region := createTestRegionInfo(1, 1) + + ok, err := worker.requestCache.add(ctx, region, false) + require.NoError(t, err) + require.True(t, ok) + + req, err := worker.requestCache.pop(ctx) + require.NoError(t, err) + require.Equal(t, 1, worker.requestCache.getPendingCount()) + + worker.preFetchForConnecting = new(regionInfo) + *worker.preFetchForConnecting = req.regionInfo + + regions := worker.clearPendingRegions() + require.Len(t, regions, 1) + require.Nil(t, worker.preFetchForConnecting) + require.Equal(t, 0, worker.requestCache.getPendingCount()) +} diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 8b25609a6a..47f6bce22d 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -590,15 +590,15 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro region := regionTask.GetRegionInfo() if region.isStopped() { - s.stores.Range(func(key, value any) bool { - rs := value.(*requestedStore) - rs.requestWorkers.RLock() - for _, worker := range rs.requestWorkers.s { - worker.add(ctx, region, true) - } - rs.requestWorkers.RUnlock() - return true - }) + enqueued, err := s.enqueueRegionToAllStores(ctx, region) + if err != nil { + return err + } + if !enqueued { + log.Debug("enqueue stop request failed, retry later", + zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID))) + s.regionTaskQueue.Push(regionTask) + } continue } @@ -634,6 +634,32 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro } } +func (s *subscriptionClient) enqueueRegionToAllStores(ctx context.Context, region regionInfo) (bool, error) { + enqueued := true + var firstErr error + s.stores.Range(func(_ any, value any) bool { + rs := value.(*requestedStore) + rs.requestWorkers.RLock() + workers := rs.requestWorkers.s + rs.requestWorkers.RUnlock() + for _, worker := range workers { + ok, err := worker.add(ctx, region, true) + if err != nil { + firstErr = err + enqueued = false + return false + } + if !ok { + enqueued = false + // It is likely the store is busy, no need to try other workers in this store now. + break + } + } + return true + }) + return enqueued, firstErr +} + func (s *subscriptionClient) attachRPCContextForRegion(ctx context.Context, region regionInfo) (regionInfo, bool) { bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) rpcCtx, err := s.regionCache.GetTiKVRPCContext(bo, region.verID, kvclientv2.ReplicaReadLeader, 0) diff --git a/logservice/logpuller/subscription_client_test.go b/logservice/logpuller/subscription_client_test.go index 3a07d6624a..3dd4b90ce9 100644 --- a/logservice/logpuller/subscription_client_test.go +++ b/logservice/logpuller/subscription_client_test.go @@ -181,6 +181,41 @@ func TestStopTaskUsesSubscribedSpanFilterLoop(t *testing.T) { require.True(t, region.filterLoop) } +func TestEnqueueRegionToAllStoresRetryWhenCacheFull(t *testing.T) { + ctx := context.Background() + client := &subscriptionClient{} + + worker := ®ionRequestWorker{ + requestCache: newRequestCache(1), + } + store := &requestedStore{storeAddr: "store-1"} + store.requestWorkers.s = []*regionRequestWorker{worker} + client.stores.Store(store.storeAddr, store) + + dummyRegion := regionInfo{ + subscribedSpan: &subscribedSpan{subID: SubscriptionID(2)}, + lockedRangeState: ®ionlock.LockedRangeState{}, + } + ok, err := worker.add(ctx, dummyRegion, true) + require.NoError(t, err) + require.True(t, ok) + + stopRegion := regionInfo{ + subscribedSpan: &subscribedSpan{subID: SubscriptionID(1)}, + } + enqueued, err := client.enqueueRegionToAllStores(ctx, stopRegion) + require.NoError(t, err) + require.False(t, enqueued) + + <-worker.requestCache.pendingQueue + worker.requestCache.markDone() + + enqueued, err = client.enqueueRegionToAllStores(ctx, stopRegion) + require.NoError(t, err) + require.True(t, enqueued) + require.Equal(t, 1, len(worker.requestCache.pendingQueue)) +} + func TestSubscriptionWithFailedTiKV(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) mockPDClock := pdutil.NewClock4Test()