Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 62 additions & 38 deletions logservice/logpuller/region_req_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ type requestCache struct {
regionReqs map[SubscriptionID]map[uint64]regionReq
}

// counter for sent but not initialized requests
// pendingCount is a flow control slot counter.
// A slot is acquired when a request is successfully enqueued into pendingQueue (see add),
// and is released when the request is finished/removed (resolve/markStopped/markDone/clear).
// pop and markSent don't change it. If markSent overwrites an existing request for the same region,
// it will release a slot for the replaced request to avoid leaking pendingCount.
pendingCount atomic.Int64
// maximum number of pending requests allowed
maxPendingCount int64
Expand Down Expand Up @@ -104,12 +108,10 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (
case <-ctx.Done():
return false, ctx.Err()
case c.pendingQueue <- req:
c.incPendingCount()
c.pendingCount.Inc()
cost := time.Since(start)
metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds())
return true, nil
case <-c.spaceAvailable:
continue
case <-ticker.C:
addReqRetryLimit--
if addReqRetryLimit <= 0 {
Expand All @@ -135,7 +137,9 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (
}
}

// pop gets the next pending request, returns nil if queue is empty
// pop gets the next pending request.
// Note: it doesn't change pendingCount. The slot acquired in add() should be released later
// (e.g. resolve/markStopped/markDone).
func (c *requestCache) pop(ctx context.Context) (regionReq, error) {
select {
case req := <-c.pendingQueue:
Expand All @@ -145,7 +149,8 @@ func (c *requestCache) pop(ctx context.Context) (regionReq, error) {
}
}

// markSent marks a request as sent and adds it to sent requests
// markSent marks a request as sent and adds it to sent requests.
// It doesn't change pendingCount: the slot is released when the request is finished/removed.
func (c *requestCache) markSent(req regionReq) {
c.sentRequests.Lock()
defer c.sentRequests.Unlock()
Expand All @@ -157,10 +162,20 @@ func (c *requestCache) markSent(req regionReq) {
c.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] = m
}

if oldReq, exists := m[req.regionInfo.verID.GetID()]; exists {
log.Warn("region request overwritten",
zap.Uint64("subID", uint64(req.regionInfo.subscribedSpan.subID)),
zap.Uint64("regionID", req.regionInfo.verID.GetID()),
zap.Float64("oldAgeSec", time.Since(oldReq.createTime).Seconds()),
zap.Float64("newAgeSec", time.Since(req.createTime).Seconds()),
zap.Int("pendingCount", int(c.pendingCount.Load())),
zap.Int("pendingQueueLen", len(c.pendingQueue)))
c.markDone()
}
m[req.regionInfo.verID.GetID()] = req
}

// markStopped removes a sent request without changing pending count (for stopped regions)
// markStopped removes a sent request and releases a slot.
func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) {
c.sentRequests.Lock()
defer c.sentRequests.Unlock()
Expand All @@ -176,12 +191,7 @@ func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) {
}

delete(regionReqs, regionID)
c.decPendingCount()
// Notify waiting add operations that there's space available
select {
case c.spaceAvailable <- struct{}{}:
default: // If channel is full, skip notification
}
c.markDone()
}

// resolve marks a region as initialized and removes it from sent requests
Expand All @@ -201,19 +211,14 @@ func (c *requestCache) resolve(subscriptionID SubscriptionID, regionID uint64) b
// Check if the subscription ID matches
if req.regionInfo.subscribedSpan.subID == subscriptionID {
delete(regionReqs, regionID)
c.decPendingCount()
c.markDone()
cost := time.Since(req.createTime).Seconds()
if cost > 0 && cost < abnormalRequestDurationInSec {
log.Debug("cdc resolve region request", zap.Uint64("subID", uint64(subscriptionID)), zap.Uint64("regionID", regionID), zap.Float64("cost", cost), zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("pendingQueueLen", len(c.pendingQueue)))
metrics.RegionRequestFinishScanDuration.Observe(cost)
} else {
log.Info("region request duration abnormal, skip metric", zap.Float64("cost", cost), zap.Uint64("regionID", regionID))
}
// Notify waiting add operations that there's space available
select {
case c.spaceAvailable <- struct{}{}:
default: // If channel is full, skip notification
}
return true
}

Expand All @@ -235,8 +240,8 @@ func (c *requestCache) clearStaleRequest() {
regionReq.regionInfo.subscribedSpan.stopped.Load() ||
regionReq.regionInfo.lockedRangeState.Initialized.Load() ||
regionReq.isStale() {
c.decPendingCount()
log.Info("region worker delete stale region request",
c.markDone()
log.Warn("region worker delete stale region request",
zap.Uint64("subID", uint64(subID)),
zap.Uint64("regionID", regionID),
zap.Int("pendingCount", int(c.pendingCount.Load())),
Expand All @@ -247,17 +252,27 @@ func (c *requestCache) clearStaleRequest() {
zap.Time("createTime", regionReq.createTime))
delete(regionReqs, regionID)
} else {
reqCount += 1
reqCount++
}
}
if len(regionReqs) == 0 {
delete(c.sentRequests.regionReqs, subID)
}
}

if reqCount == 0 && c.pendingCount.Load() != 0 {
log.Info("region worker pending request count is not equal to actual region request count, correct it", zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("actualReqCount", reqCount))
// If there are no in-cache region requests but pendingCount isn't 0, it means pendingCount is stale.
// Reset it to avoid blocking add() forever.
if reqCount == 0 && len(c.pendingQueue) == 0 && c.pendingCount.Load() != 0 {
log.Info("region worker pending request count is not equal to actual region request count, correct it",
zap.Int("pendingCount", int(c.pendingCount.Load())),
zap.Int("actualReqCount", reqCount),
zap.Int("pendingQueueLen", len(c.pendingQueue)))
c.pendingCount.Store(0)
// Notify waiting add operations that there's space available.
select {
case c.spaceAvailable <- struct{}{}:
default:
}
}

c.lastCheckStaleRequestTime.Store(time.Now())
Expand All @@ -273,7 +288,7 @@ LOOP:
select {
case req := <-c.pendingQueue:
regions = append(regions, req.regionInfo)
c.decPendingCount()
c.markDone()
default:
break LOOP
}
Expand All @@ -286,7 +301,7 @@ LOOP:
for regionID := range regionReqs {
regions = append(regions, regionReqs[regionID].regionInfo)
delete(regionReqs, regionID)
c.decPendingCount()
c.markDone()
}
delete(c.sentRequests.regionReqs, subID)
}
Expand All @@ -298,17 +313,26 @@ func (c *requestCache) getPendingCount() int {
return int(c.pendingCount.Load())
}

func (c *requestCache) incPendingCount() {
c.pendingCount.Inc()
}

func (c *requestCache) decPendingCount() {
// Ensure pendingCount doesn't go below 0
current := c.pendingCount.Load()
newCount := current - int64(1)
if newCount < 0 {
c.pendingCount.Store(0)
return
func (c *requestCache) markDone() {
// Decrement pendingCount by 1, but never let it go below 0.
// Do it with CAS to avoid clobbering concurrent Inc() calls.
for {
old := c.pendingCount.Load()
if old == 0 {
break
} else if old < 0 {
if c.pendingCount.CompareAndSwap(old, 0) {
break
}
} else {
if c.pendingCount.CompareAndSwap(old, old-1) {
break
}
}
}
// Notify waiting add operations that there's space available.
select {
case c.spaceAvailable <- struct{}{}:
default: // If channel is full, skip notification
}
c.pendingCount.Dec()
}
32 changes: 32 additions & 0 deletions logservice/logpuller/region_req_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,35 @@ func TestRequestCacheAdd_WithStoppedRegion(t *testing.T) {
// The stopped region should be cleaned up
require.Equal(t, 0, cache.getPendingCount())
}

func TestRequestCacheMarkSent_DuplicateReleaseSlot(t *testing.T) {
cache := newRequestCache(10)
ctx := context.Background()

region := createTestRegionInfo(1, 1)

ok, err := cache.add(ctx, region, false)
require.True(t, ok)
require.NoError(t, err)

// Add a duplicate request for the same region. It should not leak pendingCount even if
// markSent overwrites the existing entry.
ok, err = cache.add(ctx, region, false)
require.True(t, ok)
require.NoError(t, err)
require.Equal(t, 2, cache.getPendingCount())

req1, err := cache.pop(ctx)
require.NoError(t, err)
cache.markSent(req1)
require.Equal(t, 2, cache.getPendingCount())

req2, err := cache.pop(ctx)
require.NoError(t, err)
cache.markSent(req2)
require.Equal(t, 1, cache.getPendingCount())

// Finish the remaining tracked request.
require.True(t, cache.resolve(region.subscribedSpan.subID, region.verID.GetID()))
require.Equal(t, 0, cache.getPendingCount())
}
46 changes: 31 additions & 15 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ func newRegionRequestWorker(
zap.String("addr", store.storeAddr))
}
for {
region, err := worker.requestCache.pop(ctx)
req, err := worker.requestCache.pop(ctx)
if err != nil {
return err
}
if !region.regionInfo.isStopped() {
worker.preFetchForConnecting = new(regionInfo)
*worker.preFetchForConnecting = region.regionInfo
return nil
} else {
if req.regionInfo.isStopped() {
worker.requestCache.markDone()
continue
}
worker.preFetchForConnecting = new(regionInfo)
*worker.preFetchForConnecting = req.regionInfo
return nil
}
}

Expand Down Expand Up @@ -298,10 +298,27 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res
zap.Any("regionIDs", resolvedTsEvent.Regions))
return
}
resolvedStates := make([]*regionFeedState, 0, len(resolvedTsEvent.Regions))
// Avoid allocating a huge states slice when resolvedTsEvent.Regions is large.
// Push resolved-ts events in batches to reduce peak memory usage and improve GC behavior.
const resolvedTsStateBatchSize = 1024
resolvedStates := make([]*regionFeedState, 0, resolvedTsStateBatchSize)
flush := func() {
if len(resolvedStates) == 0 {
return
}
states := resolvedStates
s.client.pushRegionEventToDS(subscriptionID, regionEvent{
resolvedTs: resolvedTsEvent.Ts,
states: states,
})
resolvedStates = make([]*regionFeedState, 0, resolvedTsStateBatchSize)
}
for _, regionID := range resolvedTsEvent.Regions {
if state := s.getRegionState(subscriptionID, regionID); state != nil {
resolvedStates = append(resolvedStates, state)
if len(resolvedStates) >= resolvedTsStateBatchSize {
flush()
}
continue
}
log.Warn("region request worker receives a resolved ts event for an untracked region",
Expand All @@ -310,13 +327,7 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res
zap.Uint64("regionID", regionID),
zap.Uint64("resolvedTs", resolvedTsEvent.Ts))
}
if len(resolvedStates) == 0 {
return
}
s.client.pushRegionEventToDS(subscriptionID, regionEvent{
resolvedTs: resolvedTsEvent.Ts,
states: resolvedStates,
})
flush()
}

// processRegionSendTask receives region requests from the channel and sends them to the remote store.
Expand Down Expand Up @@ -374,6 +385,7 @@ func (s *regionRequestWorker) processRegionSendTask(
},
FilterLoop: region.filterLoop,
}
s.requestCache.markDone()
if err := doSend(req); err != nil {
return err
}
Expand All @@ -384,17 +396,18 @@ func (s *regionRequestWorker) processRegionSendTask(
}
s.client.pushRegionEventToDS(subID, regionEvent)
}

} else if region.subscribedSpan.stopped.Load() {
// It can be skipped directly because there must be no pending states from
// the stopped subscribedTable, or the special singleRegionInfo for stopping
// the table will be handled later.
s.client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{}))
s.requestCache.markDone()
} else {
state := newRegionFeedState(region, uint64(subID), s)
state.start()
s.addRegionState(subID, region.verID.GetID(), state)
if err := doSend(s.createRegionRequest(region)); err != nil {
s.requestCache.markDone()
return err
}
s.requestCache.markSent(regionReq)
Expand Down Expand Up @@ -485,6 +498,9 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo {
region := *s.preFetchForConnecting
s.preFetchForConnecting = nil
regions = append(regions, region)
// The pre-fetched region was popped from pendingQueue but hasn't been marked as sent or done yet.
// Release its pendingCount slot to avoid leaking flow control credits on worker failures.
s.requestCache.markDone()
}

// Clear all regions from cache
Expand Down
26 changes: 26 additions & 0 deletions logservice/logpuller/region_request_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package logpuller

import (
"context"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -38,3 +39,28 @@ func TestRegionStatesOperation(t *testing.T) {
require.Nil(t, worker.getRegionState(1, 2))
require.Equal(t, 0, len(worker.requestedRegions.subscriptions))
}

func TestClearPendingRegionsReleaseSlotForPreFetchedRegion(t *testing.T) {
worker := &regionRequestWorker{
requestCache: newRequestCache(10),
}

ctx := context.Background()
region := createTestRegionInfo(1, 1)

ok, err := worker.requestCache.add(ctx, region, false)
require.NoError(t, err)
require.True(t, ok)

req, err := worker.requestCache.pop(ctx)
require.NoError(t, err)
require.Equal(t, 1, worker.requestCache.getPendingCount())

worker.preFetchForConnecting = new(regionInfo)
*worker.preFetchForConnecting = req.regionInfo

regions := worker.clearPendingRegions()
require.Len(t, regions, 1)
require.Nil(t, worker.preFetchForConnecting)
require.Equal(t, 0, worker.requestCache.getPendingCount())
}
Loading