feat: parallelize archive finalization#59
Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 5 minutes and 25 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (21)
📝 WalkthroughWalkthroughThis PR implements concurrent archive finalization by running chunk-index flushing and tree building/uploading in parallel instead of sequentially. It adds progress reporting for both operations via new events, parallelizes chunk-index shard flushing, and refactors tree building with a bounded producer/consumer upload pipeline. Changes
Sequence DiagramsequenceDiagram
participant ACH as ArchiveCommandHandler
participant CIS as ChunkIndexService
participant MS as ManifestSorter
participant FTB as FileTreeBuilder
participant SS as SnapshotService
participant M as Mediator
ACH->>CIS: FlushAsync(progress)
ACH->>MS: SortAsync()
Note over CIS: Parallel by prefix<br/>(load/merge/serialize/<br/>upload/cache)
Note over FTB: Compute tree hashes<br/>Spool missing trees<br/>Parallel upload workers
CIS-->>M: ChunkIndexFlushProgressEvent
FTB-->>M: TreeUploadProgressEvent
par Concurrent
Note over CIS: Uploading shards
and
Note over MS,FTB: Building/uploading trees
end
CIS->>ACH: Flush complete
FTB->>ACH: BuildAsync returns rootHash
ACH->>SS: CreateAsync(rootHash)
Note over SS: Repository commit point
SS-->>ACH: Snapshot created
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #59 +/- ##
==========================================
+ Coverage 72.75% 73.41% +0.65%
==========================================
Files 66 66
Lines 4831 4947 +116
Branches 654 668 +14
==========================================
+ Hits 3515 3632 +117
+ Misses 1171 1169 -2
- Partials 145 146 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (3)
src/Arius.Cli/Commands/Archive/ArchiveProgressHandlers.cs (1)
188-208: Split the new handlers into their own files to match repository convention.Lines 190-208 add two more top-level classes in a multi-class file; please move each handler to a dedicated file (filename == class name).
As per coding guidelines
**/*.cs: Prefer one top-level class per file, with the filename matching the class name.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/Arius.Cli/Commands/Archive/ArchiveProgressHandlers.cs` around lines 188 - 208, The two new top-level classes ChunkIndexFlushProgressHandler and TreeUploadProgressHandler in ArchiveProgressHandlers.cs should each live in their own file: create ChunkIndexFlushProgressHandler.cs and TreeUploadProgressHandler.cs, copy the respective class definitions (including the constructor parameter ProgressState and implemented INotificationHandler<T> signature) into those files, keep the same namespace and usings as the original file, remove the classes from ArchiveProgressHandlers.cs, and ensure any project/file references or tests still compile (no API changes to SetChunkIndexFlushProgress/SetTreeUploadProgress usage).src/Arius.Core.Tests/Shared/FileTree/Fakes/RecordingFileTreeBlobContainerService.cs (1)
35-64: Minor: Decrement not executed if simulated failure or cancellation occurs before try block.The
Interlocked.Decrementat line 62 only executes if thetryblock at line 53 is entered. If_throwOnFileTreeUploadthrows at line 48, or ifWaitAsync/Task.DelaythrowsOperationCanceledException, the active counter remains incremented.For test purposes this is likely acceptable since failure scenarios typically test single-upload failures, but if you want accurate concurrency tracking during failure tests:
🔧 Suggested fix
var isFileTree = blobName.StartsWith(BlobPaths.FileTrees, StringComparison.Ordinal); if (isFileTree) { _firstFileTreeUploadStarted.TrySetResult(); var active = Interlocked.Increment(ref _activeFileTreeUploads); UpdateMaxConcurrency(active); - await _allowFileTreeUploads.Task.WaitAsync(cancellationToken); - if (_throwOnFileTreeUpload) - throw new IOException("Simulated filetree upload failure"); - if (_fileTreeUploadDelay > TimeSpan.Zero) - await Task.Delay(_fileTreeUploadDelay, cancellationToken); + try + { + await _allowFileTreeUploads.Task.WaitAsync(cancellationToken); + if (_throwOnFileTreeUpload) + throw new IOException("Simulated filetree upload failure"); + if (_fileTreeUploadDelay > TimeSpan.Zero) + await Task.Delay(_fileTreeUploadDelay, cancellationToken); + } + catch + { + Interlocked.Decrement(ref _activeFileTreeUploads); + throw; + } } try { await using var ms = new MemoryStream(); await content.CopyToAsync(ms, cancellationToken); _blobs[blobName] = new StoredBlob(ms.ToArray(), new Dictionary<string, string>(metadata), tier, contentType, false); } finally { if (isFileTree) Interlocked.Decrement(ref _activeFileTreeUploads); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/Arius.Core.Tests/Shared/FileTree/Fakes/RecordingFileTreeBlobContainerService.cs` around lines 35 - 64, The UploadAsync method increments _activeFileTreeUploads for file-tree blobs but only decrements inside the try/finally that begins after the waits/throws, so exceptions from _allowFileTreeUploads.Task.WaitAsync, _throwOnFileTreeUpload, or Task.Delay leave the counter incremented; wrap the file-tree pre-upload section (the Interlocked.Increment call and any waits/throws: _firstFileTreeUploadStarted.TrySetResult, Interlocked.Increment(ref _activeFileTreeUploads), _allowFileTreeUploads.Task.WaitAsync, _throwOnFileTreeUpload check, and _fileTreeUploadDelay) in a try/finally (or move the decrement finally to cover that entire region) so that Interlocked.Decrement(ref _activeFileTreeUploads) always runs when isFileTree is true, referencing UploadAsync, _activeFileTreeUploads, _firstFileTreeUploadStarted, _allowFileTreeUploads, _throwOnFileTreeUpload, and _fileTreeUploadDelay.src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cs (1)
480-481: Consider async-compatible progress reporting pattern.The
.AsTask().GetAwaiter().GetResult()call blocks synchronously inside theProgress<T>callback, which executes on thread pool threads fromParallel.ForEachAsync. While this works becauseIMediator.Publishfor in-process handlers is typically fast, it introduces a blocking wait that could contribute to thread pool pressure under high concurrency.If this becomes a concern, consider using
IProgress<T>with a channel-based collector that drains asynchronously, or accept the trade-off given the low frequency of progress reports during finalization.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cs` around lines 480 - 481, The current Progress<(int Completed,int Total)> callback blocks synchronously by calling _mediator.Publish(...).AsTask().GetAwaiter().GetResult(), which can cause thread-pool pressure; replace this with an async-friendly pattern: create a Channel<(int Completed,int Total)> (or ConcurrentQueue plus a signaling Task) and in the Progress callback only TryWrite the tuple to the channel (non-blocking), start a background async DrainProgressAsync task that reads from the channel and awaits _mediator.Publish(new ChunkIndexFlushProgressEvent(completed,total), cancellationToken) for each item, and ensure you await or cancel the drain task after the Parallel.ForEachAsync section completes; update references to flushProgress, _mediator.Publish, and ChunkIndexFlushProgressEvent accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/Arius.Core.Tests/Shared/ChunkIndex/ChunkIndexServiceTests.cs`:
- Around line 103-105: The test uses a non-thread-safe List<(int Completed,int
Total)> named updates fed by SynchronousProgress<(int Completed,int Total)>
callbacks from parallel workers (e.g., in FlushAsync), causing race conditions;
make the progress probe thread-safe by replacing the shared List with a
thread-safe collection (e.g., ConcurrentBag or ConcurrentQueue) or synchronizing
access with a lock around updates.Add in the SynchronousProgress callback, and
apply the same change to the other occurrence around lines 129-131 so all
progress callbacks are collected safely.
In
`@src/Arius.Core.Tests/Shared/ChunkIndex/Fakes/RecordingChunkIndexBlobContainerService.cs`:
- Around line 30-36: The increment of _activeChunkIndexUploads in
RecordingChunkIndexBlobContainerService can be left undecremented if
cancellation occurs during Task.Delay; wrap the increment + any awaited delay in
a try/finally so Interlocked.Decrement(ref _activeChunkIndexUploads) runs in the
finally block (or register a cancellation callback that decrements) to guarantee
decrement on cancellation; apply the same change to the other similar block
(lines 44-48) and keep UpdateMaxConcurrency calls consistent (call before await
on increment and after decrement or update inside finally as appropriate).
In `@src/Arius.Core/Shared/ChunkIndex/ChunkIndexService.cs`:
- Around line 147-156: The current FlushAsync logic eagerly drains
_pendingEntries into pending and groups by Shard.PrefixOf, which loses entries
if any per-prefix persistence fails; change FlushAsync so after taking a
snapshot (pending) you hold it as a temporary snapshot and, if any prefix
pipeline (the byPrefix processing) throws, requeue either the entire snapshot or
at minimum the unprocessed/failed prefix groups back into _pendingEntries before
rethrowing; locate the snapshot/while (_pendingEntries.TryTake...) block and the
per-prefix processing loop that iterates over byPrefix and add requeue logic in
the catch/finally path to push failed ShardEntry items back onto _pendingEntries
to make FlushAsync recoverable.
In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs`:
- Around line 57-90: The producer catch/finally must stop and await the
background upload workers before deleting spool files or rethrowing: on
producer-side failure, cancel the worker CancellationTokenSource used to drive
the consumers (the token passed into Task.Run and ReadAllAsync), call
uploadChannel.Writer.Complete(ex) if not already, then await
Task.WhenAll(consumers) to ensure all consumer tasks (the consumers array
created from Task.Run(...) reading from uploadChannel.Reader) have finished
before performing any File.Delete of spoolPaths or returning; update the
exception handling around the producer path that spools files so it cancels the
token and awaits consumers in the catch/finally, and consider switching to a
spool-then-publish flow where the final cache entry is published only after
_fileTreeService.WriteAsync completes successfully.
- Around line 154-171: The current loop that scans dirHashMap for children for
each directory (using GetDirectoryPath and GetLastSegment to form FileTreeEntry
items added to entries) is quadratic; instead maintain a parent→children map or
push each completed child into its parent's list when you compute it: create a
Dictionary<string, List<FileTreeEntry>> (or add to an existing map) keyed by
parent path, when finalizing a directory build create the FileTreeEntry for that
directory and Add it into parentChildrenMap[parentPath] (creating the list if
missing), and in the directory-finalization logic replace the foreach over
dirHashMap with a single parentChildrenMap.TryGetValue(dirPath, out children)
lookup to append those entries to entries; ensure root/no-parent cases are
handled.
---
Nitpick comments:
In `@src/Arius.Cli/Commands/Archive/ArchiveProgressHandlers.cs`:
- Around line 188-208: The two new top-level classes
ChunkIndexFlushProgressHandler and TreeUploadProgressHandler in
ArchiveProgressHandlers.cs should each live in their own file: create
ChunkIndexFlushProgressHandler.cs and TreeUploadProgressHandler.cs, copy the
respective class definitions (including the constructor parameter ProgressState
and implemented INotificationHandler<T> signature) into those files, keep the
same namespace and usings as the original file, remove the classes from
ArchiveProgressHandlers.cs, and ensure any project/file references or tests
still compile (no API changes to
SetChunkIndexFlushProgress/SetTreeUploadProgress usage).
In
`@src/Arius.Core.Tests/Shared/FileTree/Fakes/RecordingFileTreeBlobContainerService.cs`:
- Around line 35-64: The UploadAsync method increments _activeFileTreeUploads
for file-tree blobs but only decrements inside the try/finally that begins after
the waits/throws, so exceptions from _allowFileTreeUploads.Task.WaitAsync,
_throwOnFileTreeUpload, or Task.Delay leave the counter incremented; wrap the
file-tree pre-upload section (the Interlocked.Increment call and any
waits/throws: _firstFileTreeUploadStarted.TrySetResult,
Interlocked.Increment(ref _activeFileTreeUploads),
_allowFileTreeUploads.Task.WaitAsync, _throwOnFileTreeUpload check, and
_fileTreeUploadDelay) in a try/finally (or move the decrement finally to cover
that entire region) so that Interlocked.Decrement(ref _activeFileTreeUploads)
always runs when isFileTree is true, referencing UploadAsync,
_activeFileTreeUploads, _firstFileTreeUploadStarted, _allowFileTreeUploads,
_throwOnFileTreeUpload, and _fileTreeUploadDelay.
In `@src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cs`:
- Around line 480-481: The current Progress<(int Completed,int Total)> callback
blocks synchronously by calling
_mediator.Publish(...).AsTask().GetAwaiter().GetResult(), which can cause
thread-pool pressure; replace this with an async-friendly pattern: create a
Channel<(int Completed,int Total)> (or ConcurrentQueue plus a signaling Task)
and in the Progress callback only TryWrite the tuple to the channel
(non-blocking), start a background async DrainProgressAsync task that reads from
the channel and awaits _mediator.Publish(new
ChunkIndexFlushProgressEvent(completed,total), cancellationToken) for each item,
and ensure you await or cancel the drain task after the Parallel.ForEachAsync
section completes; update references to flushProgress, _mediator.Publish, and
ChunkIndexFlushProgressEvent accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 3a57d4a4-9379-4f1a-8859-58f023a5444d
📒 Files selected for processing (23)
AGENTS.mdREADME.mdopenspec/changes/chunk-index-prefix-strategy/.openspec.yamlopenspec/changes/chunk-index-prefix-strategy/proposal.mdopenspec/changes/parallel-flush-and-tree-upload/design.mdopenspec/changes/parallel-flush-and-tree-upload/proposal.mdopenspec/changes/parallel-flush-and-tree-upload/tasks.mdsrc/Arius.Cli.Tests/Commands/Archive/BuildArchiveDisplayTests.cssrc/Arius.Cli.Tests/Commands/Archive/NotificationHandlerTests.cssrc/Arius.Cli.Tests/MediatorEventRoutingIntegrationTests.cssrc/Arius.Cli/Commands/Archive/ArchiveProgressHandlers.cssrc/Arius.Cli/Commands/Archive/ArchiveVerb.cssrc/Arius.Cli/ProgressState.cssrc/Arius.Core.Tests/Features/ArchiveCommand/ArchiveFinalizationTests.cssrc/Arius.Core.Tests/Features/ArchiveCommand/Fakes/CoordinatedArchiveBlobContainerService.cssrc/Arius.Core.Tests/Shared/ChunkIndex/ChunkIndexServiceTests.cssrc/Arius.Core.Tests/Shared/ChunkIndex/Fakes/RecordingChunkIndexBlobContainerService.cssrc/Arius.Core.Tests/Shared/FileTree/Fakes/RecordingFileTreeBlobContainerService.cssrc/Arius.Core.Tests/Shared/FileTree/FileTreeBuilderTests.cssrc/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cssrc/Arius.Core/Features/ArchiveCommand/Events.cssrc/Arius.Core/Shared/ChunkIndex/ChunkIndexService.cssrc/Arius.Core/Shared/FileTree/FileTreeBuilder.cs
| var uploadChannel = Channel.CreateBounded<PendingTreeUpload>(UploadQueueCapacity); | ||
| var totalUploadsKnown = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
| var bufferedProgress = new ConcurrentQueue<int>(); | ||
| var spoolPaths = new ConcurrentBag<string>(); | ||
| var uploadsQueued = 0; | ||
| var uploadsCompleted = 0; | ||
|
|
||
| // Stream through sorted manifest entries | ||
| await foreach (var manifestEntry in ReadManifestAsync(sortedManifestPath, cancellationToken)) | ||
| { | ||
| var filePath = manifestEntry.Path; // e.g., "photos/2024/june/a.jpg" | ||
| var dirPath = GetDirectoryPath(filePath); // e.g., "photos/2024/june" | ||
| var name = Path.GetFileName(filePath); | ||
| var consumers = Enumerable.Range(0, UploadWorkers) | ||
| .Select(_ => Task.Run(async () => | ||
| { | ||
| await foreach (var upload in uploadChannel.Reader.ReadAllAsync(cancellationToken)) | ||
| { | ||
| try | ||
| { | ||
| var plaintext = await File.ReadAllBytesAsync(upload.SpoolPath, cancellationToken); | ||
| var tree = FileTreeBlobSerializer.Deserialize(plaintext); | ||
| await _fileTreeService.WriteAsync(upload.Hash, tree, cancellationToken); | ||
|
|
||
| var done = Interlocked.Increment(ref uploadsCompleted); | ||
| if (progress is not null) | ||
| { | ||
| if (totalUploadsKnown.Task.IsCompletedSuccessfully) | ||
| progress.Report((done, totalUploadsKnown.Task.Result)); | ||
| else | ||
| bufferedProgress.Enqueue(done); | ||
| } | ||
| } | ||
| finally | ||
| { | ||
| try { File.Delete(upload.SpoolPath); } catch { /* best-effort cleanup */ } | ||
| } | ||
| } | ||
| }, cancellationToken)) | ||
| .ToArray(); |
There was a problem hiding this comment.
Stop the upload workers before cleanup on producer-side failures.
If manifest parsing, hashing, or spooling throws before Task.WhenAll(consumers), the catch path only completes the channel and rethrows. The consumers can still be uploading while finally deletes the same spool files, so one failure turns into background uploads plus racy cleanup. Cancel the worker token and await the consumer tasks before deleting spool files or returning.
Based on learnings, when local cache publication depends on remote durability, prefer a spool-then-publish flow: write tentative data outside the final cache path, upload it, then publish the final cache entry only after upload succeeds.
Also applies to: 207-217
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs` around lines 57 - 90, The
producer catch/finally must stop and await the background upload workers before
deleting spool files or rethrowing: on producer-side failure, cancel the worker
CancellationTokenSource used to drive the consumers (the token passed into
Task.Run and ReadAllAsync), call uploadChannel.Writer.Complete(ex) if not
already, then await Task.WhenAll(consumers) to ensure all consumer tasks (the
consumers array created from Task.Run(...) reading from uploadChannel.Reader)
have finished before performing any File.Delete of spoolPaths or returning;
update the exception handling around the producer path that spools files so it
cancels the token and awaits consumers in the catch/finally, and consider
switching to a spool-then-publish flow where the final cache entry is published
only after _fileTreeService.WriteAsync completes successfully.
| // Inject already-computed child directory entries | ||
| // (any immediate child directories of dirPath that have been processed) | ||
| foreach (var (childDirPath, childHash) in dirHashMap) | ||
| { | ||
| var childName = GetLastSegment(childDirPath); | ||
| entries.Add(new FileTreeEntry | ||
| var childParent = GetDirectoryPath(childDirPath); | ||
| if (childParent == dirPath) | ||
| { | ||
| Name = childName + "/", | ||
| Type = FileTreeEntryType.Dir, | ||
| Hash = childHash, | ||
| Created = null, | ||
| Modified = null | ||
| }); | ||
| var childName = GetLastSegment(childDirPath); | ||
| entries.Add(new FileTreeEntry | ||
| { | ||
| Name = childName + "/", | ||
| Type = FileTreeEntryType.Dir, | ||
| Hash = childHash, | ||
| Created = null, | ||
| Modified = null | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
This child-directory lookup is quadratic in directory count.
Every directory walks the full dirHashMap to find its immediate children. On large trees that makes build time O(directories²), which can easily dominate the finalization tail this PR is trying to shorten. Push each completed child directly into its parent entry list, or keep a parent→children map, instead of rescanning all prior directories.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs` around lines 154 - 171,
The current loop that scans dirHashMap for children for each directory (using
GetDirectoryPath and GetLastSegment to form FileTreeEntry items added to
entries) is quadratic; instead maintain a parent→children map or push each
completed child into its parent's list when you compute it: create a
Dictionary<string, List<FileTreeEntry>> (or add to an existing map) keyed by
parent path, when finalizing a directory build create the FileTreeEntry for that
directory and Add it into parentChildrenMap[parentPath] (creating the list if
missing), and in the directory-finalization logic replace the foreach over
dirHashMap with a single parentChildrenMap.TryGetValue(dirPath, out children)
lookup to append those entries to entries; ensure root/no-parent cases are
handled.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs (1)
64-88:⚠️ Potential issue | 🟠 MajorAwait
uploadTaskbefore cleaning up spool files on producer failures.Line 208 only completes the channel. If the producer throws after queueing work, Line 213 can delete a spool file while a worker is still reading it, and any later failure from
uploadTaskgoes unobserved. Cancel the worker token and awaituploadTaskbefore cleanup/rethrow.Based on learnings, when local cache publication depends on remote durability, prefer a spool-then-publish flow: write tentative data outside the final cache path, upload it, then publish the final cache entry only after upload succeeds.
Also applies to: 206-216
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs` around lines 64 - 88, The producer completes the channel but may delete spool files while Parallel.ForEachAsync workers (uploadTask) are still reading them and any exceptions from uploadTask go unobserved; fix by cancelling the worker CancellationTokenSource (the token passed into ParallelOptions), awaiting the uploadTask task to observe failures, then perform spool-file cleanup and rethrow if uploadTask faulted; specifically, when using uploadChannel.Reader and creating uploadTask via Parallel.ForEachAsync (with ParallelOptions { MaxDegreeOfParallelism = UploadWorkers, CancellationToken = cancellationToken }), ensure you call cancellationSource.Cancel(), await uploadTask, then delete spool files (or run best-effort cleanup) and propagate any exceptions from uploadTask before returning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs`:
- Around line 61-63: The build currently can schedule multiple identical subtree
uploads because it checks ExistsInRemote per subtree and then enqueues
WriteAsync tasks; fix this by deduplicating tree hashes before enqueuing to the
worker pool: in FileTreeBuilder (the method that computes treeHash and
increments uploadsQueued/uploadsCompleted), maintain a thread-safe hash set (or
local HashSet before parallel scheduling) of scheduled treeHash values and only
schedule WriteAsync for a hash not already present, ensuring the same de-dup
logic is applied wherever tree hashes are gathered (also check the call sites
that use ExistsInRemote and scheduling at FileTreeService.WriteAsync to avoid
races), and ensure the dedupe set is synchronized (or built single-threaded) so
encryption paths and cache file writes remain thread-safe.
---
Duplicate comments:
In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs`:
- Around line 64-88: The producer completes the channel but may delete spool
files while Parallel.ForEachAsync workers (uploadTask) are still reading them
and any exceptions from uploadTask go unobserved; fix by cancelling the worker
CancellationTokenSource (the token passed into ParallelOptions), awaiting the
uploadTask task to observe failures, then perform spool-file cleanup and rethrow
if uploadTask faulted; specifically, when using uploadChannel.Reader and
creating uploadTask via Parallel.ForEachAsync (with ParallelOptions {
MaxDegreeOfParallelism = UploadWorkers, CancellationToken = cancellationToken
}), ensure you call cancellationSource.Cancel(), await uploadTask, then delete
spool files (or run best-effort cleanup) and propagate any exceptions from
uploadTask before returning.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: e73c96fb-a0ef-4123-a670-5fb070ee5d77
📒 Files selected for processing (2)
src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cssrc/Arius.Core/Shared/FileTree/FileTreeBuilder.cs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cs
| var uploadsQueued = 0; | ||
| var uploadsCompleted = 0; | ||
|
|
There was a problem hiding this comment.
Deduplicate scheduled tree hashes before handing them to the worker pool.
Two identical subtrees in the same build can compute the same treeHash and both pass ExistsInRemote before either worker finishes. That queues concurrent WriteAsync calls for the same hash, which then race on the same cache file in src/Arius.Core/Shared/FileTree/FileTreeService.cs:197-225.
💡 Suggested fix
@@
- var uploadsQueued = 0;
+ var uploadsQueued = 0;
+ var queuedHashes = new HashSet<string>(StringComparer.Ordinal);
@@
- await QueueUploadIfNeededAsync(hash, tree, uploadChannel.Writer, spoolPaths, () => uploadsQueued++, cancellationToken);
+ await QueueUploadIfNeededAsync(hash, tree, queuedHashes, uploadChannel.Writer, spoolPaths, () => uploadsQueued++, cancellationToken);
@@
- rootHash = await BuildRootBlobAsync(dirHashMap, dirEntries, uploadChannel.Writer, spoolPaths, () => uploadsQueued++, cancellationToken);
+ rootHash = await BuildRootBlobAsync(dirHashMap, dirEntries, queuedHashes, uploadChannel.Writer, spoolPaths, () => uploadsQueued++, cancellationToken);
@@
- ChannelWriter<PendingTreeUpload> uploadWriter,
+ HashSet<string> queuedHashes,
+ ChannelWriter<PendingTreeUpload> uploadWriter,
@@
- if (_fileTreeService.ExistsInRemote(treeHash))
+ if (_fileTreeService.ExistsInRemote(treeHash) || !queuedHashes.Add(treeHash))
return;As per coding guidelines, src/Arius.Core/**: Core library. Pay close attention to deduplication logic, encryption correctness, and thread safety.
Also applies to: 175-175, 194-194, 271-287
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs` around lines 61 - 63, The
build currently can schedule multiple identical subtree uploads because it
checks ExistsInRemote per subtree and then enqueues WriteAsync tasks; fix this
by deduplicating tree hashes before enqueuing to the worker pool: in
FileTreeBuilder (the method that computes treeHash and increments
uploadsQueued/uploadsCompleted), maintain a thread-safe hash set (or local
HashSet before parallel scheduling) of scheduled treeHash values and only
schedule WriteAsync for a hash not already present, ensuring the same de-dup
logic is applied wherever tree hashes are gathered (also check the call sites
that use ExistsInRemote and scheduling at FileTreeService.WriteAsync to avoid
races), and ensure the dedupe set is synchronized (or built single-threaded) so
encryption paths and cache file writes remain thread-safe.
…o parallel-flush-and-tree-upload-master
…into parallel-flush-and-tree-upload-master
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
AGENTS.md (1)
45-51: Consolidate duplicate durability sections to prevent guidance drift.
Scale And Durability(Lines 35-44) andDurability And Scale(Lines 45-51) overlap heavily. Consider merging into one canonical section to keep agent instructions consistent.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@AGENTS.md` around lines 45 - 51, There are two overlapping sections "Scale And Durability" and "Durability And Scale" in AGENTS.md; consolidate them into a single canonical section (pick one header name, e.g., "Durability and Scale") by merging unique guidance from both, removing duplicate bullets, preserving the authoritative lines about snapshot commit points, tentative partial state, parallelization constraints, and spool-then-publish flow, and update any internal references or links to point to the single section so agent instructions remain consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@AGENTS.md`:
- Line 48: Revise the sentence about snapshots to state that chunk-index flushes
and filetree uploads performed before snapshot creation are already durably
persisted but remain uncommitted until snapshot publication; specifically
replace the phrase "tentative partial state, not a completed archive" with
wording that clarifies these writes are "durable partial state (persisted
remotely) but uncommitted and not visible until snapshot publication commits the
repository); keep the guidance that snapshots are the repository commit point
and retain the note about parallelizing independent work only when it does not
weaken crash-recovery semantics. Reference phrases to update: "Snapshot
creation", "chunk-index flushes", "filetree uploads", and "snapshot
publication".
In
`@src/Arius.Core.Tests/Shared/ChunkIndex/Fakes/RecordingChunkIndexBlobContainerService.cs`:
- Around line 31-62: UploadAsync currently always replaces _blobs entries
ignoring the overwrite parameter; update UploadAsync (the method named
UploadAsync) to check whether a blob already exists in _blobs for the given
blobName before writing and honor overwrite semantics: if a blob exists and
overwrite is false, do not replace it and throw an appropriate exception (e.g.,
IOException) to match IBlobContainerService.UploadAsync behavior; if overwrite
is true or the blob doesn't exist, proceed with the existing logic that copies
the stream and sets _blobs[blobName] (apply this check in both the chunk-index
branch and the non-chunk branch), ensuring the Interlocked increments/decrements
and delay/failure simulation remain unchanged.
---
Nitpick comments:
In `@AGENTS.md`:
- Around line 45-51: There are two overlapping sections "Scale And Durability"
and "Durability And Scale" in AGENTS.md; consolidate them into a single
canonical section (pick one header name, e.g., "Durability and Scale") by
merging unique guidance from both, removing duplicate bullets, preserving the
authoritative lines about snapshot commit points, tentative partial state,
parallelization constraints, and spool-then-publish flow, and update any
internal references or links to point to the single section so agent
instructions remain consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 64280643-5a14-4be4-89c8-47e911a29fda
📒 Files selected for processing (4)
AGENTS.mdsrc/Arius.Core.Tests/Shared/ChunkIndex/ChunkIndexServiceTests.cssrc/Arius.Core.Tests/Shared/ChunkIndex/Fakes/RecordingChunkIndexBlobContainerService.cssrc/Arius.Core/Shared/ChunkIndex/ChunkIndexService.cs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/Arius.Core/Shared/ChunkIndex/ChunkIndexService.cs
| ## Durability And Scale | ||
|
|
||
| - Arius is a backup/archive tool. Prefer recoverability and correctness over throughput. | ||
| - Snapshot creation is the only repository commit point. Chunk-index flushes and filetree uploads that complete before a snapshot exists are tentative partial state, not a completed archive. |
There was a problem hiding this comment.
Clarify that pre-snapshot state is durable but uncommitted.
Line 48 currently implies flush/upload outputs are merely tentative, but these writes are already persisted remotely; they are durable partial state until snapshot publication commits visibility. Please tighten wording to avoid misleading future implementations.
Based on learnings: “Snapshots are the repository commit point; do not publish a snapshot until all referenced repository data is durably available” and “Parallelize independent work when useful, but do not weaken crash recovery semantics to do it.”
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@AGENTS.md` at line 48, Revise the sentence about snapshots to state that
chunk-index flushes and filetree uploads performed before snapshot creation are
already durably persisted but remain uncommitted until snapshot publication;
specifically replace the phrase "tentative partial state, not a completed
archive" with wording that clarifies these writes are "durable partial state
(persisted remotely) but uncommitted and not visible until snapshot publication
commits the repository); keep the guidance that snapshots are the repository
commit point and retain the note about parallelizing independent work only when
it does not weaken crash-recovery semantics. Reference phrases to update:
"Snapshot creation", "chunk-index flushes", "filetree uploads", and "snapshot
publication".
| public async Task UploadAsync(string blobName, Stream content, IReadOnlyDictionary<string, string> metadata, BlobTier tier, string? contentType = null, bool overwrite = false, CancellationToken cancellationToken = default) | ||
| { | ||
| var isChunkIndex = blobName.StartsWith(BlobPaths.ChunkIndex, StringComparison.Ordinal); | ||
| if (isChunkIndex) | ||
| { | ||
| var active = Interlocked.Increment(ref _activeChunkIndexUploads); | ||
| UpdateMaxConcurrency(active); | ||
| try | ||
| { | ||
| if (_chunkIndexUploadDelay > TimeSpan.Zero) | ||
| await Task.Delay(_chunkIndexUploadDelay, cancellationToken); | ||
|
|
||
| if (_failUploadForPrefix is not null && blobName == BlobPaths.ChunkIndexShard(_failUploadForPrefix)) | ||
| throw new IOException($"Simulated chunk-index upload failure for prefix {_failUploadForPrefix}."); | ||
|
|
||
| await using var ms = new MemoryStream(); | ||
| await content.CopyToAsync(ms, cancellationToken); | ||
| _blobs[blobName] = new StoredBlob(ms.ToArray(), new Dictionary<string, string>(metadata), tier, contentType, false); | ||
| } | ||
| finally | ||
| { | ||
| var remaining = Interlocked.Decrement(ref _activeChunkIndexUploads); | ||
| UpdateMaxConcurrency(remaining); | ||
| } | ||
| } | ||
| else | ||
| { | ||
| await using var ms = new MemoryStream(); | ||
| await content.CopyToAsync(ms, cancellationToken); | ||
| _blobs[blobName] = new StoredBlob(ms.ToArray(), new Dictionary<string, string>(metadata), tier, contentType, false); | ||
| } | ||
| } |
There was a problem hiding this comment.
Honor overwrite semantics in UploadAsync.
Line 31 accepts overwrite, but Lines 48 and 60 always replace existing entries. This diverges from IBlobContainerService.UploadAsync behavior and can mask bugs where callers expect non-overwrite writes to fail.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@src/Arius.Core.Tests/Shared/ChunkIndex/Fakes/RecordingChunkIndexBlobContainerService.cs`
around lines 31 - 62, UploadAsync currently always replaces _blobs entries
ignoring the overwrite parameter; update UploadAsync (the method named
UploadAsync) to check whether a blob already exists in _blobs for the given
blobName before writing and honor overwrite semantics: if a blob exists and
overwrite is false, do not replace it and throw an appropriate exception (e.g.,
IOException) to match IBlobContainerService.UploadAsync behavior; if overwrite
is true or the blob doesn't exist, proceed with the existing logic that copies
the stream and sets _blobs[blobName] (apply this check in both the chunk-index
branch and the non-chunk branch), ensuring the Interlocked increments/decrements
and delay/failure simulation remain unchanged.
Summary
Testing
Summary by CodeRabbit
Release Notes