Skip to content

logpuller: fix region leak after remove subscription#4267

Open
lidezhu wants to merge 12 commits intomasterfrom
ldz/fix-log-puller0224
Open

logpuller: fix region leak after remove subscription#4267
lidezhu wants to merge 12 commits intomasterfrom
ldz/fix-log-puller0224

Conversation

@lidezhu
Copy link
Collaborator

@lidezhu lidezhu commented Feb 25, 2026

What problem does this PR solve?

Issue Number: close #4217

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • Refactor

    • Improved stop-request coordination across stores with explicit error propagation and retry/re-queue when enqueueing is busy or fails.
    • Centralized request flow-control with reliable capacity accounting and signaling to reduce redundant work and improve resource handling.
  • Tests

    • Added tests validating retry-to-enqueue when per-store caches are full and verifying correct slot-release behavior when duplicate requests are replaced.

@ti-chi-bot ti-chi-bot bot added release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/needs-triage-completed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Feb 25, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a region leak that occurred when subscriptions were removed in the logpuller component. It refactors the handling of stopped regions to ensure that pending request counts are correctly decremented and resources are released, preventing resource exhaustion and improving system stability during subscription lifecycle management.

Highlights

  • Resource Release Mechanism: Introduced a markDone function in the request cache to consistently decrement pending request counts and signal available capacity, crucial for preventing resource leaks when regions are stopped or requests fail.
  • Stopped Region Handling: Modified region request workers to properly process and clear stopped regions from the cache, ensuring they do not block new requests or consume resources unnecessarily.
  • Robust Stop Request Enqueueing: Refactored the subscription client to use a new helper function, enqueueRegionToAllStores, which includes a retry mechanism to reliably enqueue stop requests to all relevant workers, even if their internal caches are temporarily full.
  • Error Handling Consistency: Updated error reporting for cancelled region requests to use a more specific requestCancelledErr instead of a generic send error.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • logservice/logpuller/region_req_cache.go
    • Simplified decPendingCount to use atomic decrement and added markDone to decrement count and notify for space.
  • logservice/logpuller/region_request_worker.go
    • Ensured markDone is called when a region is stopped or a request fails to send.
    • Updated the error type for failed region processing to requestCancelledErr.
    • Modified the region pop logic to skip stopped regions and mark them as done.
  • logservice/logpuller/subscription_client.go
    • Extracted the logic for enqueuing stop requests to all stores into a new enqueueRegionToAllStores function.
    • Implemented a retry mechanism in handleRegions for stop requests if worker caches are full.
  • logservice/logpuller/subscription_client_test.go
    • Added TestEnqueueRegionToAllStoresRetryWhenCacheFull to verify the retry logic for stop requests.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 25, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Centralizes slot-based flow control in the region request cache (single release path via markDone/spaceAvailable), updates workers to release slots on all completion/error paths, adds enqueueRegionToAllStores with error/retry semantics, and adds tests for cache-full retry and duplicate-slot-release behavior.

Changes

Cohort / File(s) Summary
Request Cache
logservice/logpuller/region_req_cache.go
Introduces pendingCount/spaceAvailable slot-based flow control; adds markDone() as single slot-release path; removes ad-hoc inc/dec helpers; updates add/pop/markSent/markStopped/resolve/clear/clearStaleRequest to use new semantics and adds overwrite/stale logging.
Request Worker
logservice/logpuller/region_request_worker.go
waitForPreFetching now pops request objects; worker paths call requestCache.markDone() on sent/error/cancel/complete branches; batching added for dispatchResolvedTsEvent flush.
Subscription Client
logservice/logpuller/subscription_client.go
Adds enqueueRegionToAllStores() to enqueue stop requests across stores/workers with explicit error propagation and busy-worker handling; handleRegions now retries or surfaces errors based on enqueue outcome.
Tests — subscription client
logservice/logpuller/subscription_client_test.go
Adds TestEnqueueRegionToAllStoresRetryWhenCacheFull validating retry/enqueue behavior when a worker's request cache is full then partially drained.
Tests — request cache
logservice/logpuller/region_req_cache_test.go
Adds TestRequestCacheMarkSent_DuplicateReleaseSlot verifying duplicate add/markSent interactions and correct pending-slot accounting.

Sequence Diagram

sequenceDiagram
    participant SC as SubscriptionClient
    participant Store as Store
    participant W as StoreWorker
    participant RC as RequestCache

    SC->>Store: iterate stores for region stop
    Store->>W: worker.add(ctx, region, stop=true)
    alt worker busy (cache full)
        W-->>Store: ok==false
        Note over Store: skip remaining workers in this store
    else enqueued
        W-->>Store: ok==true
    else error
        W-->>Store: err
    end
    Store-->>SC: propagate (enqueued / busy / err)
    alt err encountered
        SC-->>SC: return (false, err)
    else all enqueued
        SC-->>SC: return (true, nil)
    end

    Note over W,RC: when a queued request is removed/completed
    W->>RC: markDone()
    RC->>RC: pendingCount-- (clamped >=0), maybe signal `spaceAvailable`
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~30 minutes

Suggested labels

lgtm, approved, size/M

Suggested reviewers

  • hongyunyan
  • asddongmen
  • flowbehappy

Poem

🐰 I nudged the slots and rang the bell with glee,
Freed tiny spaces so queues move happily.
I hopped through stores and workers, light and fleet,
Enqueue, retry — then every task we meet.
A carrot cheer for tidy slots and beat.

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is largely incomplete. While it includes the linked issue number (#4217), it lacks implementation details under 'What is changed and how it works?' and the release note section is unfilled. Complete the 'What is changed and how it works?' section with implementation details and provide a substantive release note.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'logpuller: fix region leak after remove subscription' is specific and directly related to the main objective of preventing region leaks during subscription removal.
Linked Issues check ✅ Passed The code changes address the core objective of preventing region leaks: flow-control via pendingCount ensures proper cleanup, markSent handles duplicates, and enqueueRegionToAllStores ensures reliable stop request propagation across stores.
Out of Scope Changes check ✅ Passed All changes are scoped to addressing the region leak: region_req_cache adds flow-control, region_request_worker ensures request cleanup, subscription_client implements retry logic, and tests verify the new behaviors.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ldz/fix-log-puller0224

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a resource leak issue in the logpuller by ensuring the pending region request counter is correctly decremented in all relevant scenarios, particularly for stopped or canceled subscriptions. The introduction of the markDone helper function and its consistent application is a solid improvement. Additionally, the new logic to retry enqueuing stop requests enhances the system's robustness. The changes are logical and well-supported by a new unit test. I have one minor suggestion to optimize the process of enqueuing stop requests.

Comment on lines +651 to +653
if !ok {
enqueued = false
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When worker.add returns !ok, it indicates the worker's queue is full. In this case, it's more efficient to break out of the inner loop that iterates over workers for the current store. Since the entire enqueueRegionToAllStores operation will be retried later if enqueued is false, continuing to try other workers for the same (and likely busy) store is unnecessary. This change is a small optimization to avoid redundant work.

            if !ok {
                enqueued = false
                break
            }

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
logservice/logpuller/region_req_cache.go (1)

305-319: markStopped/resolve duplicate the markDone notification tail — consider extracting

markStopped (lines 179–184) and resolve (lines 204–216) both call decPendingCount() followed by an identical non-blocking spaceAvailable send — exactly what markDone does. Since both methods handle sentRequests removal separately, they could end with c.markDone() instead of inlining the same two-step pattern, eliminating the duplication.

♻️ Proposed refactor for `markStopped` and `resolve`
 func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) {
     ...
     delete(regionReqs, regionID)
-    c.decPendingCount()
-    select {
-    case c.spaceAvailable <- struct{}{}:
-    default:
-    }
+    c.markDone()
 }

 // inside resolve, replace the tail:
-    c.decPendingCount()
-    ...
-    select {
-    case c.spaceAvailable <- struct{}{}:
-    default:
-    }
+    c.markDone()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_req_cache.go` around lines 305 - 319, Replace the
duplicated tail logic in markStopped and resolve with a call to the existing
markDone method: both functions currently call decPendingCount() and then
perform the same non-blocking send to c.spaceAvailable; remove those two lines
in markStopped and resolve and invoke c.markDone() instead to centralize
pending-count decrement and notification logic (keep existing sentRequests
removal logic in markStopped/resolve intact).
logservice/logpuller/region_request_worker.go (1)

353-356: regionReq wraps a fresh createTime, discarding the original enqueue time

regionReq := newRegionReq(region) (line 355) re-timestamps the pre-fetched region with time.Now(). The original enqueue time from waitForPreFetching is lost, so RegionRequestFinishScanDuration metrics will slightly undercount latency (the time waiting in the prefetch queue is excluded).

This is minor, but if accurate latency tracking matters, you can pass the original req through preFetchForConnecting (or store both regionInfo and createTime):

💡 Preserving original timestamp
-// preFetchForConnecting *regionInfo
+// preFetchForConnecting *regionReq   // keep original createTime

 // in waitForPreFetching:
-worker.preFetchForConnecting = new(regionInfo)
-*worker.preFetchForConnecting = req.regionInfo
+worker.preFetchForConnecting = new(regionReq)
+*worker.preFetchForConnecting = req

 // in processRegionSendTask:
-region := *s.preFetchForConnecting
-s.preFetchForConnecting = nil
-regionReq := newRegionReq(region)
+regionReq := *s.preFetchForConnecting
+s.preFetchForConnecting = nil
+region := regionReq.regionInfo
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_request_worker.go` around lines 353 - 356,
preFetchForConnecting currently holds only a regionInfo which is re-wrapped with
newRegionReq(region) losing the original enqueue timestamp and skewing
RegionRequestFinishScanDuration; modify preFetchForConnecting to carry the
original request timestamp (or the entire RegionReq) from waitForPreFetching and
use that preserved createTime when constructing regionReq instead of calling
newRegionReq with a fresh time so the original enqueue time is retained for
accurate latency metrics (update references in waitForPreFetching,
preFetchForConnecting, and where regionReq is consumed).
logservice/logpuller/subscription_client_test.go (1)

219-261: TestEnqueueRegionToAllStoresNoRedundantStopRequests — LGTM; consider adding an error-path test

The deduplication invariant (worker1's already-enqueued stop is not re-sent on retry) is correctly validated. For completeness, consider a third test covering the worker.add error path (returning a non-nil error) to assert that enqueueRegionToAllStores returns (false, err) and that enqueuedWorkers entries are not leaked (since the cleanup at lines 690–695 of subscription_client.go is skipped on error).

Would you like me to draft a TestEnqueueRegionToAllStoresErrorPath test? I can also open a new issue to track this gap.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/subscription_client_test.go` around lines 219 - 261, Add
a new test (e.g., TestEnqueueRegionToAllStoresErrorPath) that simulates
regionRequestWorker.add returning a non-nil error and asserts that
enqueueRegionToAllStores(ctx, region) returns (false, err) and does not leak
enqueued worker state; specifically, create a subscriptionClient with two
regionRequestWorker instances, make one worker's add return an error (inject by
overriding or wrapping regionRequestWorker.add), call enqueueRegionToAllStores
and require a non-nil error and enqueued==false, and finally verify no workers
were left marked/enqueued (check pendingQueue lengths on both workers and that
client.stores still contains the store and that no per-worker enqueued
flags/entries were added).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 644-698: enqueueRegionToAllStores records worker IDs in
s.stopTasks.enqueuedWorkers[subID] but if worker.add returns an error (firstErr
set) the function returns immediately without removing the partially-populated
map entry; update enqueueRegionToAllStores to perform defensive cleanup before
returning on error: acquire s.stopTasks lock, delete
s.stopTasks.enqueuedWorkers[subID] (or undo only the entries added by this call)
and then release the lock, ensuring any early return paths that set firstErr
remove the subID entry so enqueuedWorkers is not left stale; reference the
symbols enqueueRegionToAllStores, s.stopTasks.enqueuedWorkers, subID, firstErr
and the worker.add error-return path to locate where to insert the cleanup.
- Around line 600-608: When enqueueRegionToAllStores returns enqueued==false we
currently re-push regionTask immediately (regionTaskQueue.Push(regionTask)),
which causes tight retry spinning and high CPU when workers are busy; modify the
retry branch in the function that calls enqueueRegionToAllStores to add a small
backoff before re-pushing (e.g. time.Sleep with a few milliseconds or use a
non-blocking time.After/select) so that regionTask isn't immediately popped
again; update the block around enqueueRegionToAllStores, regionTaskQueue.Push,
and regionTask to perform the delay only on the enqueued==false path and keep
existing logging intact.

---

Nitpick comments:
In `@logservice/logpuller/region_req_cache.go`:
- Around line 305-319: Replace the duplicated tail logic in markStopped and
resolve with a call to the existing markDone method: both functions currently
call decPendingCount() and then perform the same non-blocking send to
c.spaceAvailable; remove those two lines in markStopped and resolve and invoke
c.markDone() instead to centralize pending-count decrement and notification
logic (keep existing sentRequests removal logic in markStopped/resolve intact).

In `@logservice/logpuller/region_request_worker.go`:
- Around line 353-356: preFetchForConnecting currently holds only a regionInfo
which is re-wrapped with newRegionReq(region) losing the original enqueue
timestamp and skewing RegionRequestFinishScanDuration; modify
preFetchForConnecting to carry the original request timestamp (or the entire
RegionReq) from waitForPreFetching and use that preserved createTime when
constructing regionReq instead of calling newRegionReq with a fresh time so the
original enqueue time is retained for accurate latency metrics (update
references in waitForPreFetching, preFetchForConnecting, and where regionReq is
consumed).

In `@logservice/logpuller/subscription_client_test.go`:
- Around line 219-261: Add a new test (e.g.,
TestEnqueueRegionToAllStoresErrorPath) that simulates regionRequestWorker.add
returning a non-nil error and asserts that enqueueRegionToAllStores(ctx, region)
returns (false, err) and does not leak enqueued worker state; specifically,
create a subscriptionClient with two regionRequestWorker instances, make one
worker's add return an error (inject by overriding or wrapping
regionRequestWorker.add), call enqueueRegionToAllStores and require a non-nil
error and enqueued==false, and finally verify no workers were left
marked/enqueued (check pendingQueue lengths on both workers and that
client.stores still contains the store and that no per-worker enqueued
flags/entries were added).

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a13afb3 and ba00712.

📒 Files selected for processing (4)
  • logservice/logpuller/region_req_cache.go
  • logservice/logpuller/region_request_worker.go
  • logservice/logpuller/subscription_client.go
  • logservice/logpuller/subscription_client_test.go

Comment on lines +600 to +608
enqueued, err := s.enqueueRegionToAllStores(ctx, region)
if err != nil {
return err
}
if !enqueued {
log.Debug("enqueue stop request failed, retry later",
zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)))
s.regionTaskQueue.Push(regionTask)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Tight retry spin when workers are busy may cause high CPU burn

When enqueueRegionToAllStores returns enqueued=false (all cache slots occupied), the task is re-pushed to regionTaskQueue with no delay. If the queue is otherwise empty, Pop returns it immediately, and the 3 ms of back-off inside add is the only throttle, resulting in a busy-polling loop that burns CPU while waiting for a worker to drain its cache.

Consider adding a brief sleep before re-pushing, or using a small time.After in the retry branch:

💡 Suggested mitigation
 			if !enqueued {
 				log.Debug("enqueue stop request failed, retry later",
 					zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)))
+				if err := util.Hang(ctx, 10*time.Millisecond); err != nil {
+					return err
+				}
 				s.regionTaskQueue.Push(regionTask)
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/subscription_client.go` around lines 600 - 608, When
enqueueRegionToAllStores returns enqueued==false we currently re-push regionTask
immediately (regionTaskQueue.Push(regionTask)), which causes tight retry
spinning and high CPU when workers are busy; modify the retry branch in the
function that calls enqueueRegionToAllStores to add a small backoff before
re-pushing (e.g. time.Sleep with a few milliseconds or use a non-blocking
time.After/select) so that regionTask isn't immediately popped again; update the
block around enqueueRegionToAllStores, regionTaskQueue.Push, and regionTask to
perform the delay only on the enqueued==false path and keep existing logging
intact.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
logservice/logpuller/subscription_client.go (1)

597-601: ⚠️ Potential issue | 🟠 Major

No backoff before retry when workers are busy

When enqueueRegionToAllStores returns enqueued=false, the task is re-pushed immediately to regionTaskQueue. If the queue is otherwise empty, Pop returns it right away, causing a tight polling loop that burns CPU while waiting for workers to drain their caches.

Consider adding a brief delay before re-pushing:

 			if !enqueued {
 				log.Debug("enqueue stop request failed, retry later",
 					zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)))
+				if err := util.Hang(ctx, 10*time.Millisecond); err != nil {
+					return err
+				}
 				s.regionTaskQueue.Push(regionTask)
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/subscription_client.go` around lines 597 - 601, When
enqueueRegionToAllStores returns enqueued == false the code immediately calls
s.regionTaskQueue.Push(regionTask) which can create a tight loop; modify the
handler in subscription_client.go (the code surrounding
enqueueRegionToAllStores, regionTaskQueue.Push and regionTask) to wait briefly
before re-pushing—e.g., apply a short fixed sleep or exponential backoff with
jitter (using time.Sleep and a small cap) when enqueued is false, then re-push
the task; ensure the backoff is per-task (not global) so Pop/Push semantics
remain unchanged and avoid blocking worker shutdown paths.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 597-601: When enqueueRegionToAllStores returns enqueued == false
the code immediately calls s.regionTaskQueue.Push(regionTask) which can create a
tight loop; modify the handler in subscription_client.go (the code surrounding
enqueueRegionToAllStores, regionTaskQueue.Push and regionTask) to wait briefly
before re-pushing—e.g., apply a short fixed sleep or exponential backoff with
jitter (using time.Sleep and a small cap) when enqueued is false, then re-push
the task; ensure the backoff is per-task (not global) so Pop/Push semantics
remain unchanged and avoid blocking worker shutdown paths.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ba00712 and 98074a7.

📒 Files selected for processing (2)
  • logservice/logpuller/subscription_client.go
  • logservice/logpuller/subscription_client_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • logservice/logpuller/subscription_client_test.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
logservice/logpuller/region_req_cache_test.go (1)

87-87: ⚠️ Potential issue | 🟡 Minor

Typo: "ann" should be "and".

📝 Fix
-	// consume the pending queue ann add with force
+	// consume the pending queue and add with force
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_req_cache_test.go` at line 87, Fix the typo in
the test comment "consume the pending queue ann add with force" by replacing
"ann" with "and" so it reads "consume the pending queue and add with force";
update the comment near the test that contains that exact string to correct the
wording.
🧹 Nitpick comments (1)
logservice/logpuller/region_req_cache.go (1)

100-100: Consider renaming local variable to avoid shadowing the package constant.

addReqRetryLimit := addReqRetryLimit shadows the package-level constant with the same name, which can be confusing. Consider using a distinct name like retriesRemaining.

♻️ Suggested rename
-	addReqRetryLimit := addReqRetryLimit
+	retriesRemaining := addReqRetryLimit

 	for {
 		current := c.pendingCount.Load()
 		if current < c.maxPendingCount || force {
 			// Try to add the request
 			req := newRegionReq(region)
 			select {
 			case <-ctx.Done():
 				return false, ctx.Err()
 			case c.pendingQueue <- req:
 				c.pendingCount.Inc()
 				cost := time.Since(start)
 				metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds())
 				return true, nil
 			case <-c.spaceAvailable:
 				continue
 			case <-ticker.C:
-				addReqRetryLimit--
-				if addReqRetryLimit <= 0 {
+				retriesRemaining--
+				if retriesRemaining <= 0 {
 					return false, nil
 				}
 				continue
 			}
 		}

 		// Wait for space to become available
 		select {
 		case <-ticker.C:
-			addReqRetryLimit--
-			if addReqRetryLimit <= 0 {
+			retriesRemaining--
+			if retriesRemaining <= 0 {
 				return false, nil
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_req_cache.go` at line 100, The local variable
currently declared as addReqRetryLimit := addReqRetryLimit shadows the
package-level constant and is confusing; rename the local variable (for example
to retriesRemaining or localAddReqRetryLimit) wherever it's declared/used in
region_req_cache.go (look for the identifier addReqRetryLimit in the scope
around the request-retry logic) and update all references to use the new name so
the package-level constant remains unshadowed and the intent is clear.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@logservice/logpuller/region_req_cache_test.go`:
- Line 87: Fix the typo in the test comment "consume the pending queue ann add
with force" by replacing "ann" with "and" so it reads "consume the pending queue
and add with force"; update the comment near the test that contains that exact
string to correct the wording.

---

Nitpick comments:
In `@logservice/logpuller/region_req_cache.go`:
- Line 100: The local variable currently declared as addReqRetryLimit :=
addReqRetryLimit shadows the package-level constant and is confusing; rename the
local variable (for example to retriesRemaining or localAddReqRetryLimit)
wherever it's declared/used in region_req_cache.go (look for the identifier
addReqRetryLimit in the scope around the request-retry logic) and update all
references to use the new name so the package-level constant remains unshadowed
and the intent is clear.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 98074a7 and 451baff.

📒 Files selected for processing (2)
  • logservice/logpuller/region_req_cache.go
  • logservice/logpuller/region_req_cache_test.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
logservice/logpuller/region_req_cache.go (1)

103-112: ⚠️ Potential issue | 🔴 Critical

Enforce slot admission atomically to preserve backpressure

pendingCount.Load() + later pendingCount.Inc() is not atomic. Under concurrent add calls, multiple goroutines can pass the guard at line 103–104 and all successfully increment after their channel sends, causing pendingCount to exceed maxPendingCount.

Additionally, markDone() has a race in its negative clamp: between pendingCount.Dec() at line 299 and the unconditional Store(0) at line 301, a concurrent Inc() can be lost, leaking slots.

Proposed fix
 func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (bool, error) {
 	start := time.Now()
 	ticker := time.NewTicker(addReqRetryInterval)
 	defer ticker.Stop()
 	addReqRetryLimit := addReqRetryLimit
 
 	for {
-		current := c.pendingCount.Load()
-		if current < c.maxPendingCount || force {
+		if force || c.tryAcquireSlot() {
 			// Try to add the request
 			req := newRegionReq(region)
 			select {
 			case <-ctx.Done():
+				c.markDone() // rollback acquired slot
 				return false, ctx.Err()
 			case c.pendingQueue <- req:
-				c.pendingCount.Inc()
 				cost := time.Since(start)
 				metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds())
 				return true, nil
 			case <-ticker.C:
+				c.markDone() // rollback acquired slot
 				addReqRetryLimit--
 				if addReqRetryLimit <= 0 {
 					return false, nil
 				}
 				continue
 			}
 		}
@@
 	}
 }
+
+func (c *requestCache) tryAcquireSlot() bool {
+	for {
+		current := c.pendingCount.Load()
+		if current >= c.maxPendingCount {
+			return false
+		}
+		if c.pendingCount.CompareAndSwap(current, current+1) {
+			return true
+		}
+	}
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_req_cache.go` around lines 103 - 112, The
admission check is racy because pendingCount.Load() followed by
pendingCount.Inc() isn't atomic; change the add path (the code that checks
pendingCount vs maxPendingCount and then sends to c.pendingQueue and calls
c.pendingCount.Inc()) to perform an atomic compare-and-set or atomic
add-with-check loop on c.pendingCount so a slot is reserved before enqueueing
(e.g., loop reading curr := c.pendingCount.Load(); if curr >= c.maxPendingCount
&& !force return; if atomic.CompareAndSwap(&c.pendingCount, curr, curr+1)
break). Likewise, fix markDone() to avoid the race between Dec() and Store(0) by
using an atomic CAS loop to decrement and clamp to zero (read curr, if curr==0
return, attempt CAS from curr to curr-1, retry on failure) instead of calling
Dec() then unconditionally Store(0). Ensure you update uses of
pendingCount.Inc()/Dec() to the new atomic patterns so increments are never lost
and the count never exceeds maxPendingCount.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@logservice/logpuller/region_req_cache.go`:
- Around line 298-302: In markDone(), avoid overwriting concurrent increments by
replacing the unconditional Store(0) with a compare-and-swap retry: after
newCount := c.pendingCount.Dec() returns < 0, loop reading the current value and
use c.pendingCount.CompareAndSwap(old, 0) only when old is still negative,
retrying until either CAS succeeds or the current value is >= 0; this ensures
you only correct negative counts and never clobber concurrent Inc() updates.
Reference: function markDone(), field pendingCount, and methods Dec(), Store(),
Inc() — use CompareAndSwap in a small retry loop to safely set to zero when
needed.

---

Outside diff comments:
In `@logservice/logpuller/region_req_cache.go`:
- Around line 103-112: The admission check is racy because pendingCount.Load()
followed by pendingCount.Inc() isn't atomic; change the add path (the code that
checks pendingCount vs maxPendingCount and then sends to c.pendingQueue and
calls c.pendingCount.Inc()) to perform an atomic compare-and-set or atomic
add-with-check loop on c.pendingCount so a slot is reserved before enqueueing
(e.g., loop reading curr := c.pendingCount.Load(); if curr >= c.maxPendingCount
&& !force return; if atomic.CompareAndSwap(&c.pendingCount, curr, curr+1)
break). Likewise, fix markDone() to avoid the race between Dec() and Store(0) by
using an atomic CAS loop to decrement and clamp to zero (read curr, if curr==0
return, attempt CAS from curr to curr-1, retry on failure) instead of calling
Dec() then unconditionally Store(0). Ensure you update uses of
pendingCount.Inc()/Dec() to the new atomic patterns so increments are never lost
and the count never exceeds maxPendingCount.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 451baff and aceff2e.

📒 Files selected for processing (1)
  • logservice/logpuller/region_req_cache.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
logservice/logpuller/region_req_cache.go (1)

265-270: ⚠️ Potential issue | 🟠 Major

Avoid clobbering concurrent increments when correcting pendingCount

Both reset sites use unconditional pendingCount.Store(0) after detecting an abnormal value. Under concurrency, this can erase valid Inc() updates and break slot accounting.

💡 Proposed fix
diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go
@@
-	if reqCount == 0 && len(c.pendingQueue) == 0 && c.pendingCount.Load() != 0 {
+	if reqCount == 0 && len(c.pendingQueue) == 0 && c.pendingCount.Load() != 0 {
 		log.Info("region worker pending request count is not equal to actual region request count, correct it",
 			zap.Int("pendingCount", int(c.pendingCount.Load())),
 			zap.Int("actualReqCount", reqCount),
 			zap.Int("pendingQueueLen", len(c.pendingQueue)))
-		c.pendingCount.Store(0)
+		for {
+			cur := c.pendingCount.Load()
+			if cur == 0 {
+				break
+			}
+			if c.pendingCount.CompareAndSwap(cur, 0) {
+				break
+			}
+		}
@@
 func (c *requestCache) markDone() {
 	newCount := c.pendingCount.Dec()
 	if newCount < 0 {
-		c.pendingCount.Store(0)
+		for {
+			cur := c.pendingCount.Load()
+			if cur >= 0 {
+				break
+			}
+			if c.pendingCount.CompareAndSwap(cur, 0) {
+				break
+			}
+		}
 	}
#!/bin/bash
set -euo pipefail

file="logservice/logpuller/region_req_cache.go"

echo "1) Find non-CAS pendingCount zeroing:"
rg -n 'pendingCount\.Store\(0\)' "$file"

echo
echo "2) Inspect stale-correction block:"
sed -n '260,276p' "$file"

echo
echo "3) Inspect markDone():"
sed -n '316,326p' "$file"

Expected: if Store(0) is still present in either block, the race concern remains.

Also applies to: 316-320

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_req_cache.go` around lines 265 - 270, The code
currently unconditionally calls pendingCount.Store(0) (seen near the block
checking reqCount==0 && len(c.pendingQueue)==0 and also in markDone) which can
clobber concurrent pendingCount.Inc() updates; replace those unconditional
stores with a compare-and-swap style correction: read cur :=
c.pendingCount.Load(), if cur != 0 then attempt
c.pendingCount.CompareAndSwap(cur, 0) (or loop retrying Load+CompareAndSwap
once) so you only reset when the value you observed is still current; reference
the symbols pendingCount.Store(0), c.pendingCount.Load(), pendingQueue, and
markDone and apply this CAS-based fix in both places where Store(0) is used.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@logservice/logpuller/region_request_worker.go`:
- Around line 95-97: The code assigns req.regionInfo into
worker.preFetchForConnecting but if stream setup fails before
processRegionSendTask runs, clearPendingRegions drops that pre-fetched region
without calling markDone(), leaking a pendingCount slot; update the
error/reconnect and cleanup paths so any existing worker.preFetchForConnecting
is released by calling worker.markDone(...) before it is nilled or overwritten
(i.e., add markDone call in clearPendingRegions and any reconnect/setup failure
branches that discard preFetchForConnecting), ensuring pendingCount is
decremented and enqueue cannot block.

---

Duplicate comments:
In `@logservice/logpuller/region_req_cache.go`:
- Around line 265-270: The code currently unconditionally calls
pendingCount.Store(0) (seen near the block checking reqCount==0 &&
len(c.pendingQueue)==0 and also in markDone) which can clobber concurrent
pendingCount.Inc() updates; replace those unconditional stores with a
compare-and-swap style correction: read cur := c.pendingCount.Load(), if cur !=
0 then attempt c.pendingCount.CompareAndSwap(cur, 0) (or loop retrying
Load+CompareAndSwap once) so you only reset when the value you observed is still
current; reference the symbols pendingCount.Store(0), c.pendingCount.Load(),
pendingQueue, and markDone and apply this CAS-based fix in both places where
Store(0) is used.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aceff2e and 4d03402.

📒 Files selected for processing (2)
  • logservice/logpuller/region_req_cache.go
  • logservice/logpuller/region_request_worker.go

@lidezhu
Copy link
Collaborator Author

lidezhu commented Feb 27, 2026

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 27, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
logservice/logpuller/region_request_worker.go (1)

493-501: ⚠️ Potential issue | 🟠 Major

Release slot when clearing pre-fetched region on reconnect/setup failure.

When clearPendingRegions() clears preFetchForConnecting, the slot consumed by pop() in waitForPreFetching() is not released. If stream setup fails before processRegionSendTask runs, this leaks pendingCount slots and can eventually block enqueue.

🐛 Proposed fix
 func (s *regionRequestWorker) clearPendingRegions() []regionInfo {
 	var regions []regionInfo

 	// Clear pre-fetched region
 	if s.preFetchForConnecting != nil {
 		region := *s.preFetchForConnecting
 		s.preFetchForConnecting = nil
+		s.requestCache.markDone()
 		regions = append(regions, region)
 	}

 	// Clear all regions from cache
 	cacheRegions := s.requestCache.clear()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_request_worker.go` around lines 493 - 501,
clearPendingRegions currently nukes s.preFetchForConnecting but does not release
the slot that was consumed by pop() in waitForPreFetching(), leaking
pendingCount and eventually blocking enqueue; modify clearPendingRegions() to
call the corresponding release method (the partner of pop()) or decrement the
pending slot counter when it clears s.preFetchForConnecting so the slot is
returned if setup fails before processRegionSendTask runs; reference symbols to
change: clearPendingRegions(), preFetchForConnecting, waitForPreFetching(),
pop(), pendingCount, and ensure behavior aligns with the release mechanism used
elsewhere (e.g., the matching push/release function that waitForPreFetching/pop
uses).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@logservice/logpuller/region_request_worker.go`:
- Around line 493-501: clearPendingRegions currently nukes
s.preFetchForConnecting but does not release the slot that was consumed by pop()
in waitForPreFetching(), leaking pendingCount and eventually blocking enqueue;
modify clearPendingRegions() to call the corresponding release method (the
partner of pop()) or decrement the pending slot counter when it clears
s.preFetchForConnecting so the slot is returned if setup fails before
processRegionSendTask runs; reference symbols to change: clearPendingRegions(),
preFetchForConnecting, waitForPreFetching(), pop(), pendingCount, and ensure
behavior aligns with the release mechanism used elsewhere (e.g., the matching
push/release function that waitForPreFetching/pop uses).

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4d03402 and 4e48f79.

📒 Files selected for processing (1)
  • logservice/logpuller/region_request_worker.go

@lidezhu
Copy link
Collaborator Author

lidezhu commented Feb 27, 2026

/gemini review

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses several resource leaks related to pendingCount in the requestCache. The core fix in markSent to handle overwritten requests is crucial and well-tested. The refactoring of decPendingCount into a race-free markDone function and its consistent application across various code paths (for stopped regions, send failures, etc.) significantly improves the robustness of the request management logic. Additionally, the introduction of batching for resolved-ts events is a good performance optimization, and making the stop-request enqueuing more robust by adding retry logic is a valuable improvement.

I've found one potential leak scenario related to worker connection failures that I've commented on. Overall, this is a high-quality contribution that enhances the stability and performance of the log puller.

@lidezhu
Copy link
Collaborator Author

lidezhu commented Feb 27, 2026

/run-check-issue-triage-complete

@ti-chi-bot ti-chi-bot bot added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. and removed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Mar 1, 2026
@ti-chi-bot ti-chi-bot bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels Mar 2, 2026
@ti-chi-bot ti-chi-bot bot added lgtm and removed needs-1-more-lgtm Indicates a PR needs 1 more LGTM. labels Mar 3, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 3, 2026

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: 3AceShowHand, asddongmen

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:
  • OWNERS [3AceShowHand,asddongmen]

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 3, 2026

[LGTM Timeline notifier]

Timeline:

  • 2026-03-02 06:49:48.677522328 +0000 UTC m=+168033.255601512: ☑️ agreed by 3AceShowHand.
  • 2026-03-03 03:56:43.786897443 +0000 UTC m=+244048.364976636: ☑️ agreed by asddongmen.

@asddongmen
Copy link
Collaborator

/test all

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Some regions failed to be deregistered after unregistering the subscription

3 participants