logpuller: fix region leak after remove subscription#4267
logpuller: fix region leak after remove subscription#4267
Conversation
Summary of ChangesHello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a region leak that occurred when subscriptions were removed in the logpuller component. It refactors the handling of stopped regions to ensure that pending request counts are correctly decremented and resources are released, preventing resource exhaustion and improving system stability during subscription lifecycle management. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughCentralizes slot-based flow control in the region request cache (single release path via Changes
Sequence DiagramsequenceDiagram
participant SC as SubscriptionClient
participant Store as Store
participant W as StoreWorker
participant RC as RequestCache
SC->>Store: iterate stores for region stop
Store->>W: worker.add(ctx, region, stop=true)
alt worker busy (cache full)
W-->>Store: ok==false
Note over Store: skip remaining workers in this store
else enqueued
W-->>Store: ok==true
else error
W-->>Store: err
end
Store-->>SC: propagate (enqueued / busy / err)
alt err encountered
SC-->>SC: return (false, err)
else all enqueued
SC-->>SC: return (true, nil)
end
Note over W,RC: when a queued request is removed/completed
W->>RC: markDone()
RC->>RC: pendingCount-- (clamped >=0), maybe signal `spaceAvailable`
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request effectively addresses a resource leak issue in the logpuller by ensuring the pending region request counter is correctly decremented in all relevant scenarios, particularly for stopped or canceled subscriptions. The introduction of the markDone helper function and its consistent application is a solid improvement. Additionally, the new logic to retry enqueuing stop requests enhances the system's robustness. The changes are logical and well-supported by a new unit test. I have one minor suggestion to optimize the process of enqueuing stop requests.
| if !ok { | ||
| enqueued = false | ||
| } |
There was a problem hiding this comment.
When worker.add returns !ok, it indicates the worker's queue is full. In this case, it's more efficient to break out of the inner loop that iterates over workers for the current store. Since the entire enqueueRegionToAllStores operation will be retried later if enqueued is false, continuing to try other workers for the same (and likely busy) store is unnecessary. This change is a small optimization to avoid redundant work.
if !ok {
enqueued = false
break
}There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
logservice/logpuller/region_req_cache.go (1)
305-319:markStopped/resolveduplicate themarkDonenotification tail — consider extracting
markStopped(lines 179–184) andresolve(lines 204–216) both calldecPendingCount()followed by an identical non-blockingspaceAvailablesend — exactly whatmarkDonedoes. Since both methods handlesentRequestsremoval separately, they could end withc.markDone()instead of inlining the same two-step pattern, eliminating the duplication.♻️ Proposed refactor for `markStopped` and `resolve`
func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) { ... delete(regionReqs, regionID) - c.decPendingCount() - select { - case c.spaceAvailable <- struct{}{}: - default: - } + c.markDone() } // inside resolve, replace the tail: - c.decPendingCount() - ... - select { - case c.spaceAvailable <- struct{}{}: - default: - } + c.markDone()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_req_cache.go` around lines 305 - 319, Replace the duplicated tail logic in markStopped and resolve with a call to the existing markDone method: both functions currently call decPendingCount() and then perform the same non-blocking send to c.spaceAvailable; remove those two lines in markStopped and resolve and invoke c.markDone() instead to centralize pending-count decrement and notification logic (keep existing sentRequests removal logic in markStopped/resolve intact).logservice/logpuller/region_request_worker.go (1)
353-356:regionReqwraps a freshcreateTime, discarding the original enqueue time
regionReq := newRegionReq(region)(line 355) re-timestamps the pre-fetched region withtime.Now(). The original enqueue time fromwaitForPreFetchingis lost, soRegionRequestFinishScanDurationmetrics will slightly undercount latency (the time waiting in the prefetch queue is excluded).This is minor, but if accurate latency tracking matters, you can pass the original
reqthroughpreFetchForConnecting(or store bothregionInfoandcreateTime):💡 Preserving original timestamp
-// preFetchForConnecting *regionInfo +// preFetchForConnecting *regionReq // keep original createTime // in waitForPreFetching: -worker.preFetchForConnecting = new(regionInfo) -*worker.preFetchForConnecting = req.regionInfo +worker.preFetchForConnecting = new(regionReq) +*worker.preFetchForConnecting = req // in processRegionSendTask: -region := *s.preFetchForConnecting -s.preFetchForConnecting = nil -regionReq := newRegionReq(region) +regionReq := *s.preFetchForConnecting +s.preFetchForConnecting = nil +region := regionReq.regionInfo🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_request_worker.go` around lines 353 - 356, preFetchForConnecting currently holds only a regionInfo which is re-wrapped with newRegionReq(region) losing the original enqueue timestamp and skewing RegionRequestFinishScanDuration; modify preFetchForConnecting to carry the original request timestamp (or the entire RegionReq) from waitForPreFetching and use that preserved createTime when constructing regionReq instead of calling newRegionReq with a fresh time so the original enqueue time is retained for accurate latency metrics (update references in waitForPreFetching, preFetchForConnecting, and where regionReq is consumed).logservice/logpuller/subscription_client_test.go (1)
219-261:TestEnqueueRegionToAllStoresNoRedundantStopRequests— LGTM; consider adding an error-path testThe deduplication invariant (worker1's already-enqueued stop is not re-sent on retry) is correctly validated. For completeness, consider a third test covering the
worker.adderror path (returning a non-nil error) to assert thatenqueueRegionToAllStoresreturns(false, err)and thatenqueuedWorkersentries are not leaked (since the cleanup at lines 690–695 ofsubscription_client.gois skipped on error).Would you like me to draft a
TestEnqueueRegionToAllStoresErrorPathtest? I can also open a new issue to track this gap.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/subscription_client_test.go` around lines 219 - 261, Add a new test (e.g., TestEnqueueRegionToAllStoresErrorPath) that simulates regionRequestWorker.add returning a non-nil error and asserts that enqueueRegionToAllStores(ctx, region) returns (false, err) and does not leak enqueued worker state; specifically, create a subscriptionClient with two regionRequestWorker instances, make one worker's add return an error (inject by overriding or wrapping regionRequestWorker.add), call enqueueRegionToAllStores and require a non-nil error and enqueued==false, and finally verify no workers were left marked/enqueued (check pendingQueue lengths on both workers and that client.stores still contains the store and that no per-worker enqueued flags/entries were added).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 644-698: enqueueRegionToAllStores records worker IDs in
s.stopTasks.enqueuedWorkers[subID] but if worker.add returns an error (firstErr
set) the function returns immediately without removing the partially-populated
map entry; update enqueueRegionToAllStores to perform defensive cleanup before
returning on error: acquire s.stopTasks lock, delete
s.stopTasks.enqueuedWorkers[subID] (or undo only the entries added by this call)
and then release the lock, ensuring any early return paths that set firstErr
remove the subID entry so enqueuedWorkers is not left stale; reference the
symbols enqueueRegionToAllStores, s.stopTasks.enqueuedWorkers, subID, firstErr
and the worker.add error-return path to locate where to insert the cleanup.
- Around line 600-608: When enqueueRegionToAllStores returns enqueued==false we
currently re-push regionTask immediately (regionTaskQueue.Push(regionTask)),
which causes tight retry spinning and high CPU when workers are busy; modify the
retry branch in the function that calls enqueueRegionToAllStores to add a small
backoff before re-pushing (e.g. time.Sleep with a few milliseconds or use a
non-blocking time.After/select) so that regionTask isn't immediately popped
again; update the block around enqueueRegionToAllStores, regionTaskQueue.Push,
and regionTask to perform the delay only on the enqueued==false path and keep
existing logging intact.
---
Nitpick comments:
In `@logservice/logpuller/region_req_cache.go`:
- Around line 305-319: Replace the duplicated tail logic in markStopped and
resolve with a call to the existing markDone method: both functions currently
call decPendingCount() and then perform the same non-blocking send to
c.spaceAvailable; remove those two lines in markStopped and resolve and invoke
c.markDone() instead to centralize pending-count decrement and notification
logic (keep existing sentRequests removal logic in markStopped/resolve intact).
In `@logservice/logpuller/region_request_worker.go`:
- Around line 353-356: preFetchForConnecting currently holds only a regionInfo
which is re-wrapped with newRegionReq(region) losing the original enqueue
timestamp and skewing RegionRequestFinishScanDuration; modify
preFetchForConnecting to carry the original request timestamp (or the entire
RegionReq) from waitForPreFetching and use that preserved createTime when
constructing regionReq instead of calling newRegionReq with a fresh time so the
original enqueue time is retained for accurate latency metrics (update
references in waitForPreFetching, preFetchForConnecting, and where regionReq is
consumed).
In `@logservice/logpuller/subscription_client_test.go`:
- Around line 219-261: Add a new test (e.g.,
TestEnqueueRegionToAllStoresErrorPath) that simulates regionRequestWorker.add
returning a non-nil error and asserts that enqueueRegionToAllStores(ctx, region)
returns (false, err) and does not leak enqueued worker state; specifically,
create a subscriptionClient with two regionRequestWorker instances, make one
worker's add return an error (inject by overriding or wrapping
regionRequestWorker.add), call enqueueRegionToAllStores and require a non-nil
error and enqueued==false, and finally verify no workers were left
marked/enqueued (check pendingQueue lengths on both workers and that
client.stores still contains the store and that no per-worker enqueued
flags/entries were added).
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
logservice/logpuller/region_req_cache.gologservice/logpuller/region_request_worker.gologservice/logpuller/subscription_client.gologservice/logpuller/subscription_client_test.go
| enqueued, err := s.enqueueRegionToAllStores(ctx, region) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if !enqueued { | ||
| log.Debug("enqueue stop request failed, retry later", | ||
| zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID))) | ||
| s.regionTaskQueue.Push(regionTask) | ||
| } |
There was a problem hiding this comment.
Tight retry spin when workers are busy may cause high CPU burn
When enqueueRegionToAllStores returns enqueued=false (all cache slots occupied), the task is re-pushed to regionTaskQueue with no delay. If the queue is otherwise empty, Pop returns it immediately, and the 3 ms of back-off inside add is the only throttle, resulting in a busy-polling loop that burns CPU while waiting for a worker to drain its cache.
Consider adding a brief sleep before re-pushing, or using a small time.After in the retry branch:
💡 Suggested mitigation
if !enqueued {
log.Debug("enqueue stop request failed, retry later",
zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)))
+ if err := util.Hang(ctx, 10*time.Millisecond); err != nil {
+ return err
+ }
s.regionTaskQueue.Push(regionTask)
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@logservice/logpuller/subscription_client.go` around lines 600 - 608, When
enqueueRegionToAllStores returns enqueued==false we currently re-push regionTask
immediately (regionTaskQueue.Push(regionTask)), which causes tight retry
spinning and high CPU when workers are busy; modify the retry branch in the
function that calls enqueueRegionToAllStores to add a small backoff before
re-pushing (e.g. time.Sleep with a few milliseconds or use a non-blocking
time.After/select) so that regionTask isn't immediately popped again; update the
block around enqueueRegionToAllStores, regionTaskQueue.Push, and regionTask to
perform the delay only on the enqueued==false path and keep existing logging
intact.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
logservice/logpuller/subscription_client.go (1)
597-601:⚠️ Potential issue | 🟠 MajorNo backoff before retry when workers are busy
When
enqueueRegionToAllStoresreturnsenqueued=false, the task is re-pushed immediately toregionTaskQueue. If the queue is otherwise empty,Popreturns it right away, causing a tight polling loop that burns CPU while waiting for workers to drain their caches.Consider adding a brief delay before re-pushing:
if !enqueued { log.Debug("enqueue stop request failed, retry later", zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID))) + if err := util.Hang(ctx, 10*time.Millisecond); err != nil { + return err + } s.regionTaskQueue.Push(regionTask) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/subscription_client.go` around lines 597 - 601, When enqueueRegionToAllStores returns enqueued == false the code immediately calls s.regionTaskQueue.Push(regionTask) which can create a tight loop; modify the handler in subscription_client.go (the code surrounding enqueueRegionToAllStores, regionTaskQueue.Push and regionTask) to wait briefly before re-pushing—e.g., apply a short fixed sleep or exponential backoff with jitter (using time.Sleep and a small cap) when enqueued is false, then re-push the task; ensure the backoff is per-task (not global) so Pop/Push semantics remain unchanged and avoid blocking worker shutdown paths.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 597-601: When enqueueRegionToAllStores returns enqueued == false
the code immediately calls s.regionTaskQueue.Push(regionTask) which can create a
tight loop; modify the handler in subscription_client.go (the code surrounding
enqueueRegionToAllStores, regionTaskQueue.Push and regionTask) to wait briefly
before re-pushing—e.g., apply a short fixed sleep or exponential backoff with
jitter (using time.Sleep and a small cap) when enqueued is false, then re-push
the task; ensure the backoff is per-task (not global) so Pop/Push semantics
remain unchanged and avoid blocking worker shutdown paths.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
logservice/logpuller/subscription_client.gologservice/logpuller/subscription_client_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- logservice/logpuller/subscription_client_test.go
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
logservice/logpuller/region_req_cache_test.go (1)
87-87:⚠️ Potential issue | 🟡 MinorTypo: "ann" should be "and".
📝 Fix
- // consume the pending queue ann add with force + // consume the pending queue and add with force🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_req_cache_test.go` at line 87, Fix the typo in the test comment "consume the pending queue ann add with force" by replacing "ann" with "and" so it reads "consume the pending queue and add with force"; update the comment near the test that contains that exact string to correct the wording.
🧹 Nitpick comments (1)
logservice/logpuller/region_req_cache.go (1)
100-100: Consider renaming local variable to avoid shadowing the package constant.
addReqRetryLimit := addReqRetryLimitshadows the package-level constant with the same name, which can be confusing. Consider using a distinct name likeretriesRemaining.♻️ Suggested rename
- addReqRetryLimit := addReqRetryLimit + retriesRemaining := addReqRetryLimit for { current := c.pendingCount.Load() if current < c.maxPendingCount || force { // Try to add the request req := newRegionReq(region) select { case <-ctx.Done(): return false, ctx.Err() case c.pendingQueue <- req: c.pendingCount.Inc() cost := time.Since(start) metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds()) return true, nil case <-c.spaceAvailable: continue case <-ticker.C: - addReqRetryLimit-- - if addReqRetryLimit <= 0 { + retriesRemaining-- + if retriesRemaining <= 0 { return false, nil } continue } } // Wait for space to become available select { case <-ticker.C: - addReqRetryLimit-- - if addReqRetryLimit <= 0 { + retriesRemaining-- + if retriesRemaining <= 0 { return false, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_req_cache.go` at line 100, The local variable currently declared as addReqRetryLimit := addReqRetryLimit shadows the package-level constant and is confusing; rename the local variable (for example to retriesRemaining or localAddReqRetryLimit) wherever it's declared/used in region_req_cache.go (look for the identifier addReqRetryLimit in the scope around the request-retry logic) and update all references to use the new name so the package-level constant remains unshadowed and the intent is clear.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@logservice/logpuller/region_req_cache_test.go`:
- Line 87: Fix the typo in the test comment "consume the pending queue ann add
with force" by replacing "ann" with "and" so it reads "consume the pending queue
and add with force"; update the comment near the test that contains that exact
string to correct the wording.
---
Nitpick comments:
In `@logservice/logpuller/region_req_cache.go`:
- Line 100: The local variable currently declared as addReqRetryLimit :=
addReqRetryLimit shadows the package-level constant and is confusing; rename the
local variable (for example to retriesRemaining or localAddReqRetryLimit)
wherever it's declared/used in region_req_cache.go (look for the identifier
addReqRetryLimit in the scope around the request-retry logic) and update all
references to use the new name so the package-level constant remains unshadowed
and the intent is clear.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
logservice/logpuller/region_req_cache.go (1)
103-112:⚠️ Potential issue | 🔴 CriticalEnforce slot admission atomically to preserve backpressure
pendingCount.Load()+ laterpendingCount.Inc()is not atomic. Under concurrentaddcalls, multiple goroutines can pass the guard at line 103–104 and all successfully increment after their channel sends, causingpendingCountto exceedmaxPendingCount.Additionally,
markDone()has a race in its negative clamp: betweenpendingCount.Dec()at line 299 and the unconditionalStore(0)at line 301, a concurrentInc()can be lost, leaking slots.Proposed fix
func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (bool, error) { start := time.Now() ticker := time.NewTicker(addReqRetryInterval) defer ticker.Stop() addReqRetryLimit := addReqRetryLimit for { - current := c.pendingCount.Load() - if current < c.maxPendingCount || force { + if force || c.tryAcquireSlot() { // Try to add the request req := newRegionReq(region) select { case <-ctx.Done(): + c.markDone() // rollback acquired slot return false, ctx.Err() case c.pendingQueue <- req: - c.pendingCount.Inc() cost := time.Since(start) metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds()) return true, nil case <-ticker.C: + c.markDone() // rollback acquired slot addReqRetryLimit-- if addReqRetryLimit <= 0 { return false, nil } continue } } @@ } } + +func (c *requestCache) tryAcquireSlot() bool { + for { + current := c.pendingCount.Load() + if current >= c.maxPendingCount { + return false + } + if c.pendingCount.CompareAndSwap(current, current+1) { + return true + } + } +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_req_cache.go` around lines 103 - 112, The admission check is racy because pendingCount.Load() followed by pendingCount.Inc() isn't atomic; change the add path (the code that checks pendingCount vs maxPendingCount and then sends to c.pendingQueue and calls c.pendingCount.Inc()) to perform an atomic compare-and-set or atomic add-with-check loop on c.pendingCount so a slot is reserved before enqueueing (e.g., loop reading curr := c.pendingCount.Load(); if curr >= c.maxPendingCount && !force return; if atomic.CompareAndSwap(&c.pendingCount, curr, curr+1) break). Likewise, fix markDone() to avoid the race between Dec() and Store(0) by using an atomic CAS loop to decrement and clamp to zero (read curr, if curr==0 return, attempt CAS from curr to curr-1, retry on failure) instead of calling Dec() then unconditionally Store(0). Ensure you update uses of pendingCount.Inc()/Dec() to the new atomic patterns so increments are never lost and the count never exceeds maxPendingCount.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@logservice/logpuller/region_req_cache.go`:
- Around line 298-302: In markDone(), avoid overwriting concurrent increments by
replacing the unconditional Store(0) with a compare-and-swap retry: after
newCount := c.pendingCount.Dec() returns < 0, loop reading the current value and
use c.pendingCount.CompareAndSwap(old, 0) only when old is still negative,
retrying until either CAS succeeds or the current value is >= 0; this ensures
you only correct negative counts and never clobber concurrent Inc() updates.
Reference: function markDone(), field pendingCount, and methods Dec(), Store(),
Inc() — use CompareAndSwap in a small retry loop to safely set to zero when
needed.
---
Outside diff comments:
In `@logservice/logpuller/region_req_cache.go`:
- Around line 103-112: The admission check is racy because pendingCount.Load()
followed by pendingCount.Inc() isn't atomic; change the add path (the code that
checks pendingCount vs maxPendingCount and then sends to c.pendingQueue and
calls c.pendingCount.Inc()) to perform an atomic compare-and-set or atomic
add-with-check loop on c.pendingCount so a slot is reserved before enqueueing
(e.g., loop reading curr := c.pendingCount.Load(); if curr >= c.maxPendingCount
&& !force return; if atomic.CompareAndSwap(&c.pendingCount, curr, curr+1)
break). Likewise, fix markDone() to avoid the race between Dec() and Store(0) by
using an atomic CAS loop to decrement and clamp to zero (read curr, if curr==0
return, attempt CAS from curr to curr-1, retry on failure) instead of calling
Dec() then unconditionally Store(0). Ensure you update uses of
pendingCount.Inc()/Dec() to the new atomic patterns so increments are never lost
and the count never exceeds maxPendingCount.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
logservice/logpuller/region_req_cache.go (1)
265-270:⚠️ Potential issue | 🟠 MajorAvoid clobbering concurrent increments when correcting
pendingCountBoth reset sites use unconditional
pendingCount.Store(0)after detecting an abnormal value. Under concurrency, this can erase validInc()updates and break slot accounting.💡 Proposed fix
diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go @@ - if reqCount == 0 && len(c.pendingQueue) == 0 && c.pendingCount.Load() != 0 { + if reqCount == 0 && len(c.pendingQueue) == 0 && c.pendingCount.Load() != 0 { log.Info("region worker pending request count is not equal to actual region request count, correct it", zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("actualReqCount", reqCount), zap.Int("pendingQueueLen", len(c.pendingQueue))) - c.pendingCount.Store(0) + for { + cur := c.pendingCount.Load() + if cur == 0 { + break + } + if c.pendingCount.CompareAndSwap(cur, 0) { + break + } + } @@ func (c *requestCache) markDone() { newCount := c.pendingCount.Dec() if newCount < 0 { - c.pendingCount.Store(0) + for { + cur := c.pendingCount.Load() + if cur >= 0 { + break + } + if c.pendingCount.CompareAndSwap(cur, 0) { + break + } + } }#!/bin/bash set -euo pipefail file="logservice/logpuller/region_req_cache.go" echo "1) Find non-CAS pendingCount zeroing:" rg -n 'pendingCount\.Store\(0\)' "$file" echo echo "2) Inspect stale-correction block:" sed -n '260,276p' "$file" echo echo "3) Inspect markDone():" sed -n '316,326p' "$file"Expected: if
Store(0)is still present in either block, the race concern remains.Also applies to: 316-320
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_req_cache.go` around lines 265 - 270, The code currently unconditionally calls pendingCount.Store(0) (seen near the block checking reqCount==0 && len(c.pendingQueue)==0 and also in markDone) which can clobber concurrent pendingCount.Inc() updates; replace those unconditional stores with a compare-and-swap style correction: read cur := c.pendingCount.Load(), if cur != 0 then attempt c.pendingCount.CompareAndSwap(cur, 0) (or loop retrying Load+CompareAndSwap once) so you only reset when the value you observed is still current; reference the symbols pendingCount.Store(0), c.pendingCount.Load(), pendingQueue, and markDone and apply this CAS-based fix in both places where Store(0) is used.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@logservice/logpuller/region_request_worker.go`:
- Around line 95-97: The code assigns req.regionInfo into
worker.preFetchForConnecting but if stream setup fails before
processRegionSendTask runs, clearPendingRegions drops that pre-fetched region
without calling markDone(), leaking a pendingCount slot; update the
error/reconnect and cleanup paths so any existing worker.preFetchForConnecting
is released by calling worker.markDone(...) before it is nilled or overwritten
(i.e., add markDone call in clearPendingRegions and any reconnect/setup failure
branches that discard preFetchForConnecting), ensuring pendingCount is
decremented and enqueue cannot block.
---
Duplicate comments:
In `@logservice/logpuller/region_req_cache.go`:
- Around line 265-270: The code currently unconditionally calls
pendingCount.Store(0) (seen near the block checking reqCount==0 &&
len(c.pendingQueue)==0 and also in markDone) which can clobber concurrent
pendingCount.Inc() updates; replace those unconditional stores with a
compare-and-swap style correction: read cur := c.pendingCount.Load(), if cur !=
0 then attempt c.pendingCount.CompareAndSwap(cur, 0) (or loop retrying
Load+CompareAndSwap once) so you only reset when the value you observed is still
current; reference the symbols pendingCount.Store(0), c.pendingCount.Load(),
pendingQueue, and markDone and apply this CAS-based fix in both places where
Store(0) is used.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
logservice/logpuller/region_req_cache.gologservice/logpuller/region_request_worker.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
♻️ Duplicate comments (1)
logservice/logpuller/region_request_worker.go (1)
493-501:⚠️ Potential issue | 🟠 MajorRelease slot when clearing pre-fetched region on reconnect/setup failure.
When
clearPendingRegions()clearspreFetchForConnecting, the slot consumed bypop()inwaitForPreFetching()is not released. If stream setup fails beforeprocessRegionSendTaskruns, this leakspendingCountslots and can eventually block enqueue.🐛 Proposed fix
func (s *regionRequestWorker) clearPendingRegions() []regionInfo { var regions []regionInfo // Clear pre-fetched region if s.preFetchForConnecting != nil { region := *s.preFetchForConnecting s.preFetchForConnecting = nil + s.requestCache.markDone() regions = append(regions, region) } // Clear all regions from cache cacheRegions := s.requestCache.clear()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_request_worker.go` around lines 493 - 501, clearPendingRegions currently nukes s.preFetchForConnecting but does not release the slot that was consumed by pop() in waitForPreFetching(), leaking pendingCount and eventually blocking enqueue; modify clearPendingRegions() to call the corresponding release method (the partner of pop()) or decrement the pending slot counter when it clears s.preFetchForConnecting so the slot is returned if setup fails before processRegionSendTask runs; reference symbols to change: clearPendingRegions(), preFetchForConnecting, waitForPreFetching(), pop(), pendingCount, and ensure behavior aligns with the release mechanism used elsewhere (e.g., the matching push/release function that waitForPreFetching/pop uses).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@logservice/logpuller/region_request_worker.go`:
- Around line 493-501: clearPendingRegions currently nukes
s.preFetchForConnecting but does not release the slot that was consumed by pop()
in waitForPreFetching(), leaking pendingCount and eventually blocking enqueue;
modify clearPendingRegions() to call the corresponding release method (the
partner of pop()) or decrement the pending slot counter when it clears
s.preFetchForConnecting so the slot is returned if setup fails before
processRegionSendTask runs; reference symbols to change: clearPendingRegions(),
preFetchForConnecting, waitForPreFetching(), pop(), pendingCount, and ensure
behavior aligns with the release mechanism used elsewhere (e.g., the matching
push/release function that waitForPreFetching/pop uses).
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request effectively addresses several resource leaks related to pendingCount in the requestCache. The core fix in markSent to handle overwritten requests is crucial and well-tested. The refactoring of decPendingCount into a race-free markDone function and its consistent application across various code paths (for stopped regions, send failures, etc.) significantly improves the robustness of the request management logic. Additionally, the introduction of batching for resolved-ts events is a good performance optimization, and making the stop-request enqueuing more robust by adding retry logic is a valuable improvement.
I've found one potential leak scenario related to worker connection failures that I've commented on. Overall, this is a high-quality contribution that enhances the stability and performance of the log puller.
|
/run-check-issue-triage-complete |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand, asddongmen The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/test all |
What problem does this PR solve?
Issue Number: close #4217
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Refactor
Tests