importinto,external: run merge serially with 64-file groups#68135
importinto,external: run merge serially with 64-file groups#68135joechenrh wants to merge 11 commits intopingcap:masterfrom
Conversation
|
Skipping CI for Draft Pull Request. |
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughFor merge-sort, concurrency is hardcoded to 1 and MaxMergingFilesPerThread is reduced (250→64); Prometheus histograms were added to observe merge-sort read and write durations; engine send-to-out timing was added. Separately, import runtime stats were introduced and threaded through job generation and workers with final summary logging. ChangesMerge Concurrency Serialization & I/O Metrics
Local Import Runtime Stats
Sequence Diagram(s)sequenceDiagram
participant doImport
participant JobGenerator
participant JobPool
participant Worker
participant RuntimeStats
doImport->>RuntimeStats: create
doImport->>JobGenerator: pass RuntimeStats
JobGenerator->>JobPool: send jobs (timed)
JobPool->>Worker: create with RuntimeStats
Worker->>RuntimeStats: record durations
doImport->>doImport: log runtime summary from RuntimeStats
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Hi @joechenrh. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
35034c8 to
6ec846d
Compare
6ec846d to
afd283e
Compare
This reverts commit db77b9d.
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
@joechenrh I've received your pull request and will start the review. I'll conduct a thorough review covering code quality, potential issues, and implementation details. ⏳ This process typically takes 10-30 minutes depending on the complexity of the changes. ℹ️ Learn more details on Pantheon AI. |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/dxf/importinto/task_executor.go`:
- Around line 525-533: The call to external.MergeOverlappingFiles currently
hardcodes the merge-task concurrency to 1 (the third arg), which serializes
merges; change this to use a configurable or CPU-based value (e.g.,
mergeTaskConcurrency from config or runtime.NumCPU()/max(1, cpu/…)) and keep a
separate per-task reader-open cap (the parameter that controls reader-open
bursts) set to 1 to limit startup open pressure; update the call site of
external.MergeOverlappingFiles (the invocation with wctx, sm.DataFiles, 1, ...)
to pass mergeTaskConcurrency for task-level parallelism and the reader-open
concurrency value for per-task limit so merge parallelism is preserved while
bounding reader-open bursts.
In `@pkg/lightning/backend/external/merge.go`:
- Around line 40-41: Update the in-file worked memory estimate comment to
reflect the reduced MaxMergingFilesPerThread value (MaxMergingFilesPerThread =
64) so the example matches current constants; locate the existing worked memory
estimate comment in the same file (merge.go) and change the numeric example from
250 to an appropriate value derived from 64 (or recalc the example using
MaxMergingFilesPerThread) and clarify the units so future readers/operators see
an accurate, maintainable performance trade-off tied to
MaxMergingFilesPerThread.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: dccadb49-3c70-4be0-b39a-c0978ed7cf67
📒 Files selected for processing (2)
pkg/dxf/importinto/task_executor.gopkg/lightning/backend/external/merge.go
| 1, | ||
| false, | ||
| onDup, | ||
| ) | ||
|
|
||
| if err = external.MergeOverlappingFiles( | ||
| wctx, | ||
| sm.DataFiles, | ||
| int(m.GetResource().CPU.Capacity()), // the concurrency used to split subtask | ||
| 1, // run merge tasks serially |
There was a problem hiding this comment.
This change serializes merge execution, not just reader-open bursts.
Setting both concurrencies to 1 fully disables merge parallelism and can significantly increase merge wall time. If the goal is to cap startup open pressure to 64, keep merge-task concurrency configurable (or CPU-based) and only bound per-task reader-open concurrency.
As per coding guidelines, important performance trade-offs should be explicit and maintainable for future readers.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/dxf/importinto/task_executor.go` around lines 525 - 533, The call to
external.MergeOverlappingFiles currently hardcodes the merge-task concurrency to
1 (the third arg), which serializes merges; change this to use a configurable or
CPU-based value (e.g., mergeTaskConcurrency from config or
runtime.NumCPU()/max(1, cpu/…)) and keep a separate per-task reader-open cap
(the parameter that controls reader-open bursts) set to 1 to limit startup open
pressure; update the call site of external.MergeOverlappingFiles (the invocation
with wctx, sm.DataFiles, 1, ...) to pass mergeTaskConcurrency for task-level
parallelism and the reader-open concurrency value for per-task limit so merge
parallelism is preserved while bounding reader-open bursts.
| // single thread. | ||
| MaxMergingFilesPerThread = 64 |
There was a problem hiding this comment.
Update the in-file memory example to match the new merge-file cap.
After reducing MaxMergingFilesPerThread to 64, the worked memory estimate comment (later in this file) still uses 250, which is now misleading for operators/debugging.
As per coding guidelines, comments should capture important performance trade-offs and stay maintainable for future readers.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/lightning/backend/external/merge.go` around lines 40 - 41, Update the
in-file worked memory estimate comment to reflect the reduced
MaxMergingFilesPerThread value (MaxMergingFilesPerThread = 64) so the example
matches current constants; locate the existing worked memory estimate comment in
the same file (merge.go) and change the numeric example from 250 to an
appropriate value derived from 64 (or recalc the example using
MaxMergingFilesPerThread) and clarify the units so future readers/operators see
an accurate, maintainable performance trade-off tied to
MaxMergingFilesPerThread.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #68135 +/- ##
================================================
- Coverage 77.7620% 77.2375% -0.5246%
================================================
Files 1990 1985 -5
Lines 551774 558722 +6948
================================================
+ Hits 429071 431543 +2472
- Misses 121783 126386 +4603
+ Partials 920 793 -127
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
pkg/lightning/backend/local/import_runtime_stats.go (1)
41-47: 💤 Low value
sendCntparameter is always identical tojobCntat the call site — consider collapsing.In
local.gothe call is alwaysaddGenerateStats(len(jobs), generateDur, time.Since(sendStart), len(jobs)), meaninggenerateJobCntandsendJobToWorkerChCntinzapFields()will always be equal, which adds noise to the log output without providing additional signal. Either drop the separatesendCnttracking or document why the two counts can diverge.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/lightning/backend/local/import_runtime_stats.go` around lines 41 - 47, The addGenerateStats function records both generateJobCnt and sendJobToWorkerChCnt but sendCnt is always passed as len(jobs) at the call site, so collapse the duplicate metric: remove the sendCnt parameter from addGenerateStats and either (a) increment sendJobToWorkerChCnt from jobCnt inside addGenerateStats (use the existing jobCnt value) or (b) drop sendJobToWorkerChCnt entirely and remove its usage in zapFields (and any related metric registration). Update all callers (e.g., the call in local.go that currently does addGenerateStats(len(jobs), generateDur, time.Since(sendStart), len(jobs))) to the new signature and adjust zapFields to avoid logging a redundant count or add a short comment if you intentionally keep both because they may diverge in future.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/lightning/backend/external/engine.go`:
- Around line 414-419: The logged "loadedKVs" value is taken from
len(e.memKVsAndBuffers.kvs) after that slice is cleared, so it will always be
zero; fix by capturing the count before the cleanup (e.g., assign loadedKVsCount
:= len(e.memKVsAndBuffers.kvs) before the slice is cleared) and then use that
captured loadedKVsCount in the log call that builds "load range batch done"
(replace the current len(...) usage with the saved variable).
In `@pkg/lightning/backend/local/import_runtime_stats.go`:
- Around line 1-88: Add the new source file import_runtime_stats.go to the
go_library srcs in the pkg/lightning/backend/local BUILD.bazel so Bazel can see
the definitions (e.g., importRuntimeStats, addGenerateStats, zapFields); open
the BUILD.bazel's go_library that currently lists "iterator.go" and
"job_worker.go" and insert "import_runtime_stats.go", into the srcs list in
alphabetical order between "iterator.go" and "job_worker.go" and then run bazel
build to verify the undefined symbol errors are resolved.
---
Nitpick comments:
In `@pkg/lightning/backend/local/import_runtime_stats.go`:
- Around line 41-47: The addGenerateStats function records both generateJobCnt
and sendJobToWorkerChCnt but sendCnt is always passed as len(jobs) at the call
site, so collapse the duplicate metric: remove the sendCnt parameter from
addGenerateStats and either (a) increment sendJobToWorkerChCnt from jobCnt
inside addGenerateStats (use the existing jobCnt value) or (b) drop
sendJobToWorkerChCnt entirely and remove its usage in zapFields (and any related
metric registration). Update all callers (e.g., the call in local.go that
currently does addGenerateStats(len(jobs), generateDur, time.Since(sendStart),
len(jobs))) to the new signature and adjust zapFields to avoid logging a
redundant count or add a short comment if you intentionally keep both because
they may diverge in future.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 1e759cda-62cf-410a-8a6f-adf2758441aa
📒 Files selected for processing (4)
pkg/lightning/backend/external/engine.gopkg/lightning/backend/local/import_runtime_stats.gopkg/lightning/backend/local/job_worker.gopkg/lightning/backend/local/local.go
| // Copyright 2026 PingCAP, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package local | ||
|
|
||
| import ( | ||
| "time" | ||
|
|
||
| "go.uber.org/atomic" | ||
| "go.uber.org/zap" | ||
| ) | ||
|
|
||
| // importRuntimeStats tracks ingest runtime breakdown to locate bottlenecks. | ||
| type importRuntimeStats struct { | ||
| generateBatchCnt atomic.Int64 | ||
| generateJobCnt atomic.Int64 | ||
| generateJobsDuration atomic.Int64 | ||
| sendJobToWorkerChCnt atomic.Int64 | ||
| sendJobToWorkerChDur atomic.Int64 | ||
| workerJobRunCnt atomic.Int64 | ||
| workerJobRunDuration atomic.Int64 | ||
| workerWriteCnt atomic.Int64 | ||
| workerWriteDuration atomic.Int64 | ||
| workerIngestCnt atomic.Int64 | ||
| workerIngestDuration atomic.Int64 | ||
| retryableWriteErrorCnt atomic.Int64 | ||
| retryableIngestErrorCnt atomic.Int64 | ||
| } | ||
|
|
||
| func (s *importRuntimeStats) addGenerateStats(jobCnt int, genDur, sendDur time.Duration, sendCnt int) { | ||
| s.generateBatchCnt.Inc() | ||
| s.generateJobCnt.Add(int64(jobCnt)) | ||
| s.generateJobsDuration.Add(genDur.Nanoseconds()) | ||
| s.sendJobToWorkerChCnt.Add(int64(sendCnt)) | ||
| s.sendJobToWorkerChDur.Add(sendDur.Nanoseconds()) | ||
| } | ||
|
|
||
| func (s *importRuntimeStats) addWorkerJobRunDur(d time.Duration) { | ||
| s.workerJobRunCnt.Inc() | ||
| s.workerJobRunDuration.Add(d.Nanoseconds()) | ||
| } | ||
|
|
||
| func (s *importRuntimeStats) addWriteDur(d time.Duration) { | ||
| s.workerWriteCnt.Inc() | ||
| s.workerWriteDuration.Add(d.Nanoseconds()) | ||
| } | ||
|
|
||
| func (s *importRuntimeStats) addIngestDur(d time.Duration) { | ||
| s.workerIngestCnt.Inc() | ||
| s.workerIngestDuration.Add(d.Nanoseconds()) | ||
| } | ||
|
|
||
| func (s *importRuntimeStats) incRetryableWriteErr() { | ||
| s.retryableWriteErrorCnt.Inc() | ||
| } | ||
|
|
||
| func (s *importRuntimeStats) incRetryableIngestErr() { | ||
| s.retryableIngestErrorCnt.Inc() | ||
| } | ||
|
|
||
| func (s *importRuntimeStats) zapFields() []zap.Field { | ||
| return []zap.Field{ | ||
| zap.Int64("generate-batches", s.generateBatchCnt.Load()), | ||
| zap.Int64("generated-jobs", s.generateJobCnt.Load()), | ||
| zap.Duration("generate-jobs-total", time.Duration(s.generateJobsDuration.Load())), | ||
| zap.Int64("send-job-to-worker-ch-count", s.sendJobToWorkerChCnt.Load()), | ||
| zap.Duration("send-job-to-worker-ch-total", time.Duration(s.sendJobToWorkerChDur.Load())), | ||
| zap.Int64("worker-job-run-count", s.workerJobRunCnt.Load()), | ||
| zap.Duration("worker-job-run-total", time.Duration(s.workerJobRunDuration.Load())), | ||
| zap.Int64("worker-write-count", s.workerWriteCnt.Load()), | ||
| zap.Duration("worker-write-total", time.Duration(s.workerWriteDuration.Load())), | ||
| zap.Int64("worker-ingest-count", s.workerIngestCnt.Load()), | ||
| zap.Duration("worker-ingest-total", time.Duration(s.workerIngestDuration.Load())), | ||
| zap.Int64("retryable-write-errors", s.retryableWriteErrorCnt.Load()), | ||
| zap.Int64("retryable-ingest-errors", s.retryableIngestErrorCnt.Load()), | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find BUILD.bazel in the local package and check if import_runtime_stats.go is listed
fd -e bazel "BUILD" pkg/lightning/backend/local/ | xargs grep -l "go_library" 2>/dev/null | head -5
echo "--- current srcs in BUILD.bazel ---"
fd -e bazel "BUILD" pkg/lightning/backend/local/ | xargs grep -A 80 "go_library" | grep "\.go"Repository: pingcap/tidb
Length of output: 590
🏁 Script executed:
cat pkg/lightning/backend/local/BUILD.bazel | grep -A 200 "go_library" | head -150Repository: pingcap/tidb
Length of output: 5023
🏁 Script executed:
ls -la pkg/lightning/backend/local/import_runtime_stats.go 2>&1Repository: pingcap/tidb
Length of output: 157
Add import_runtime_stats.go to the srcs list in pkg/lightning/backend/local/BUILD.bazel.
The file exists but is missing from the go_library targets' srcs declaration, causing Bazel builds to fail with undefined symbol errors. Insert "import_runtime_stats.go", in alphabetical order between "iterator.go" and "job_worker.go".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/lightning/backend/local/import_runtime_stats.go` around lines 1 - 88, Add
the new source file import_runtime_stats.go to the go_library srcs in the
pkg/lightning/backend/local BUILD.bazel so Bazel can see the definitions (e.g.,
importRuntimeStats, addGenerateStats, zapFields); open the BUILD.bazel's
go_library that currently lists "iterator.go" and "job_worker.go" and insert
"import_runtime_stats.go", into the srcs list in alphabetical order between
"iterator.go" and "job_worker.go" and then run bazel build to verify the
undefined symbol errors are resolved.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. Notice: To remove the For example:
📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
@joechenrh: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: ref #00000
Problem Summary:
For
IMPORT INTOglobal-sort merge-sort workloads with many overlap files, concurrent merge workers can create a high burst of object-storage reads and cause unstable throughput.What changed and how does it work?
NewMergeOperator(..., concurrency=1, ...)MergeOverlappingFiles(..., concurrency=1, ...)MaxMergingFilesPerThreadfrom250to64Net effect:
Check List
Tests
go test ./pkg/lightning/backend/external -run 'TestMergeKVIter|TestMergeKVIterPassWrongParam|TestSplitDataFiles|TestByteReader|TestByteReaderAuxBuf' -count=1go test ./pkg/dxf/importinto -run 'TestGetOnDupForKVGroup' -count=1IMPORT INTOglobal-sort: compare merge-sort throughput/latency stability and object-storage retry/reset behavior before/after.Side effects
Documentation
Release note
Summary by CodeRabbit
Bug Fixes
New Features