Skip to content

feat: parallelize archive finalization#59

Open
woutervanranst wants to merge 10 commits intomasterfrom
parallel-flush-and-tree-upload-master
Open

feat: parallelize archive finalization#59
woutervanranst wants to merge 10 commits intomasterfrom
parallel-flush-and-tree-upload-master

Conversation

@woutervanranst
Copy link
Copy Markdown
Owner

@woutervanranst woutervanranst commented Apr 12, 2026

Summary

  • overlap archive finalization so chunk-index flush runs in parallel with manifest sort and filetree build/upload while snapshot creation still waits for both branches
  • parallelize chunk-index shard flushes and refactor filetree building into a bounded disk-spooled upload pipeline with CLI progress and regression coverage for snapshot commit-point semantics
  • update OpenSpec tasks plus README/AGENTS guidance to reflect durability rules and the archive-manifest versus snapshot terminology

Testing

  • dotnet test --project src/Arius.Core.Tests/Arius.Core.Tests.csproj
  • dotnet test --project src/Arius.Cli.Tests/Arius.Cli.Tests.csproj
  • dotnet test --project src/Arius.Integration.Tests/Arius.Integration.Tests.csproj
  • dotnet test --project src/Arius.AzureBlob.Tests/Arius.AzureBlob.Tests.csproj
  • dotnet test --project src/Arius.Architecture.Tests/Arius.Architecture.Tests.csproj
  • dotnet test --project src/Arius.E2E.Tests/Arius.E2E.Tests.csproj

Summary by CodeRabbit

Release Notes

  • New Features
    • Added real-time progress reporting during archive finalization with separate displays for "Finalizing Index" (showing shard counts) and "Uploading Trees" (showing blob counts), each with visual completion indicators.
    • Optimized archive finalization pipeline to execute index flush and tree operations concurrently rather than sequentially, improving completion time.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 12, 2026

Warning

Rate limit exceeded

@woutervanranst has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 5 minutes and 25 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 5 minutes and 25 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: d126d881-d336-45a9-a65f-296d806fdcf2

📥 Commits

Reviewing files that changed from the base of the PR and between 747f39d and 49b33c0.

📒 Files selected for processing (21)
  • .agents/dotnet-benchmark-designer.md
  • .agents/dotnet-concurrency-specialist.md
  • .agents/dotnet-performance-analyst.md
  • .agents/skills/crap-analysis/SKILL.md
  • .agents/skills/csharp-api-design/SKILL.md
  • .agents/skills/csharp-coding-standards/SKILL.md
  • .agents/skills/csharp-coding-standards/anti-patterns-and-reflection.md
  • .agents/skills/csharp-coding-standards/composition-and-error-handling.md
  • .agents/skills/csharp-coding-standards/performance-and-api-design.md
  • .agents/skills/csharp-coding-standards/value-objects-and-patterns.md
  • .agents/skills/csharp-type-design-performance/SKILL.md
  • .agents/skills/ilspy-decompile/SKILL.md
  • .agents/skills/microsoft-extensions-configuration/SKILL.md
  • .agents/skills/microsoft-extensions-configuration/advanced-patterns.md
  • .agents/skills/package-management/SKILL.md
  • .agents/skills/project-structure/SKILL.md
  • .agents/skills/serialization/SKILL.md
  • .agents/skills/slopwatch/SKILL.md
  • .agents/skills/testcontainers/SKILL.md
  • .agents/skills/testcontainers/database-patterns.md
  • .agents/skills/testcontainers/infrastructure-patterns.md
📝 Walkthrough

Walkthrough

This PR implements concurrent archive finalization by running chunk-index flushing and tree building/uploading in parallel instead of sequentially. It adds progress reporting for both operations via new events, parallelizes chunk-index shard flushing, and refactors tree building with a bounded producer/consumer upload pipeline.

Changes

Cohort / File(s) Summary
Documentation
AGENTS.md, openspec/changes/chunk-index-prefix-strategy/..., openspec/changes/parallel-flush-and-tree-upload/...
Updated durability model and snapshot commit semantics; added proposals and design doc for concurrent finalization overlap and prefix strategy generalization; added task checklist for implementation work.
CLI Progress State & Rendering
src/Arius.Cli/ProgressState.cs, src/Arius.Cli/Commands/Archive/ArchiveVerb.cs, src/Arius.Cli/Commands/Archive/ArchiveProgressHandlers.cs
Added finalization progress fields and setters for chunk-index flush (shards) and tree upload (blobs); added two new notification handlers to update progress state; extended BuildDisplay() to show "Finalizing Index" and "Uploading Trees" status lines with progress counters.
CLI Progress Tests
src/Arius.Cli.Tests/Commands/Archive/BuildArchiveDisplayTests.cs, src/Arius.Cli.Tests/Commands/Archive/NotificationHandlerTests.cs, src/Arius.Cli.Tests/MediatorEventRoutingIntegrationTests.cs
Added UI rendering tests for finalization progress display with open/filled circle indicators; added handler tests for chunk-index flush and tree upload progress events; extended integration test to verify progress state updates from new events.
Archive Events
src/Arius.Core/Features/ArchiveCommand/Events.cs
Added ChunkIndexFlushProgressEvent and TreeUploadProgressEvent notification records for concurrent finalization progress reporting.
Archive Finalization Orchestration
src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cs
Refactored to execute chunk-index flush and tree-build tasks concurrently via Parallel.ForEachAsync; removed redundant pre-validation; changed manifest writer disposal ordering; added progress event publishing for both parallel branches.
ChunkIndex Flush
src/Arius.Core/Shared/ChunkIndex/ChunkIndexService.cs, src/Arius.Core.Tests/Shared/ChunkIndex/ChunkIndexServiceTests.cs, src/Arius.Core.Tests/Shared/ChunkIndex/Fakes/RecordingChunkIndexBlobContainerService.cs
FlushAsync() now parallelizes by shard prefix with bounded concurrency and reports per-prefix progress; added tests covering parallel uploads, cache merging, progress reporting, and failure recovery; added recording fake service for tracking uploads and concurrency.
FileTree Building
src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs, src/Arius.Core.Tests/Shared/FileTree/FileTreeBuilderTests.cs, src/Arius.Core.Tests/Shared/FileTree/Fakes/RecordingFileTreeBlobContainerService.cs
Refactored BuildAsync() to implement bounded producer/consumer pipeline for tree uploads (compute locally, spool to temp, upload with worker pool, publish to cache only after upload succeeds); added progress reporting; added tests for parallel uploads, progress events, and failure handling; added recording fake service.
Archive Finalization Integration Tests
src/Arius.Core.Tests/Features/ArchiveCommand/ArchiveFinalizationTests.cs, src/Arius.Core.Tests/Features/ArchiveCommand/Fakes/CoordinatedArchiveBlobContainerService.cs
Added integration tests verifying snapshot creation does not occur before index flush completes; added fake service with coordination points to control tree and index upload ordering and detect premature snapshot uploads.

Sequence Diagram

sequenceDiagram
    participant ACH as ArchiveCommandHandler
    participant CIS as ChunkIndexService
    participant MS as ManifestSorter
    participant FTB as FileTreeBuilder
    participant SS as SnapshotService
    participant M as Mediator

    ACH->>CIS: FlushAsync(progress)
    ACH->>MS: SortAsync()
    Note over CIS: Parallel by prefix<br/>(load/merge/serialize/<br/>upload/cache)
    Note over FTB: Compute tree hashes<br/>Spool missing trees<br/>Parallel upload workers
    CIS-->>M: ChunkIndexFlushProgressEvent
    FTB-->>M: TreeUploadProgressEvent
    par Concurrent
        Note over CIS: Uploading shards
        and
        Note over MS,FTB: Building/uploading trees
    end
    CIS->>ACH: Flush complete
    FTB->>ACH: BuildAsync returns rootHash
    ACH->>SS: CreateAsync(rootHash)
    Note over SS: Repository commit point
    SS-->>ACH: Snapshot created
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~70 minutes

Possibly related PRs

  • PR #50: Both PRs modify the same archive finalization surfaces—ArchiveCommandHandler, chunk-index flush, tree-builder/filetree upload flow, and new progress events—making them directly related at the code level.
  • PR #28: Both PRs extend the CLI progress-tracking surface (ProgressState, display rendering, notification handlers) and introduce new progress-related events and handlers for archive operations.
  • PR #55: Both PRs touch the same file-tree components (ArchiveCommandHandler, FileTreeBuilder, FileTreeService types and related file-tree code paths), with concurrent changes to implementation and control flow.
🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.39% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: parallelize archive finalization' directly and accurately summarizes the main objective of the changeset: enabling parallel execution of chunk-index flushing and file-tree building/upload operations during archive finalization.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch parallel-flush-and-tree-upload-master

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 12, 2026

Codecov Report

❌ Patch coverage is 99.51220% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 73.41%. Comparing base (339696a) to head (49b33c0).

Files with missing lines Patch % Lines
src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs 99.09% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master      #59      +/-   ##
==========================================
+ Coverage   72.75%   73.41%   +0.65%     
==========================================
  Files          66       66              
  Lines        4831     4947     +116     
  Branches      654      668      +14     
==========================================
+ Hits         3515     3632     +117     
+ Misses       1171     1169       -2     
- Partials      145      146       +1     
Flag Coverage Δ
linux 82.41% <99.51%> (+0.51%) ⬆️
windows 70.34% <97.56%> (+1.27%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (3)
src/Arius.Cli/Commands/Archive/ArchiveProgressHandlers.cs (1)

188-208: Split the new handlers into their own files to match repository convention.

Lines 190-208 add two more top-level classes in a multi-class file; please move each handler to a dedicated file (filename == class name).

As per coding guidelines **/*.cs: Prefer one top-level class per file, with the filename matching the class name.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Arius.Cli/Commands/Archive/ArchiveProgressHandlers.cs` around lines 188 -
208, The two new top-level classes ChunkIndexFlushProgressHandler and
TreeUploadProgressHandler in ArchiveProgressHandlers.cs should each live in
their own file: create ChunkIndexFlushProgressHandler.cs and
TreeUploadProgressHandler.cs, copy the respective class definitions (including
the constructor parameter ProgressState and implemented INotificationHandler<T>
signature) into those files, keep the same namespace and usings as the original
file, remove the classes from ArchiveProgressHandlers.cs, and ensure any
project/file references or tests still compile (no API changes to
SetChunkIndexFlushProgress/SetTreeUploadProgress usage).
src/Arius.Core.Tests/Shared/FileTree/Fakes/RecordingFileTreeBlobContainerService.cs (1)

35-64: Minor: Decrement not executed if simulated failure or cancellation occurs before try block.

The Interlocked.Decrement at line 62 only executes if the try block at line 53 is entered. If _throwOnFileTreeUpload throws at line 48, or if WaitAsync/Task.Delay throws OperationCanceledException, the active counter remains incremented.

For test purposes this is likely acceptable since failure scenarios typically test single-upload failures, but if you want accurate concurrency tracking during failure tests:

🔧 Suggested fix
     var isFileTree = blobName.StartsWith(BlobPaths.FileTrees, StringComparison.Ordinal);
     if (isFileTree)
     {
         _firstFileTreeUploadStarted.TrySetResult();
         var active = Interlocked.Increment(ref _activeFileTreeUploads);
         UpdateMaxConcurrency(active);
-        await _allowFileTreeUploads.Task.WaitAsync(cancellationToken);
-        if (_throwOnFileTreeUpload)
-            throw new IOException("Simulated filetree upload failure");
-        if (_fileTreeUploadDelay > TimeSpan.Zero)
-            await Task.Delay(_fileTreeUploadDelay, cancellationToken);
+        try
+        {
+            await _allowFileTreeUploads.Task.WaitAsync(cancellationToken);
+            if (_throwOnFileTreeUpload)
+                throw new IOException("Simulated filetree upload failure");
+            if (_fileTreeUploadDelay > TimeSpan.Zero)
+                await Task.Delay(_fileTreeUploadDelay, cancellationToken);
+        }
+        catch
+        {
+            Interlocked.Decrement(ref _activeFileTreeUploads);
+            throw;
+        }
     }

     try
     {
         await using var ms = new MemoryStream();
         await content.CopyToAsync(ms, cancellationToken);
         _blobs[blobName] = new StoredBlob(ms.ToArray(), new Dictionary<string, string>(metadata), tier, contentType, false);
     }
     finally
     {
         if (isFileTree)
             Interlocked.Decrement(ref _activeFileTreeUploads);
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@src/Arius.Core.Tests/Shared/FileTree/Fakes/RecordingFileTreeBlobContainerService.cs`
around lines 35 - 64, The UploadAsync method increments _activeFileTreeUploads
for file-tree blobs but only decrements inside the try/finally that begins after
the waits/throws, so exceptions from _allowFileTreeUploads.Task.WaitAsync,
_throwOnFileTreeUpload, or Task.Delay leave the counter incremented; wrap the
file-tree pre-upload section (the Interlocked.Increment call and any
waits/throws: _firstFileTreeUploadStarted.TrySetResult,
Interlocked.Increment(ref _activeFileTreeUploads),
_allowFileTreeUploads.Task.WaitAsync, _throwOnFileTreeUpload check, and
_fileTreeUploadDelay) in a try/finally (or move the decrement finally to cover
that entire region) so that Interlocked.Decrement(ref _activeFileTreeUploads)
always runs when isFileTree is true, referencing UploadAsync,
_activeFileTreeUploads, _firstFileTreeUploadStarted, _allowFileTreeUploads,
_throwOnFileTreeUpload, and _fileTreeUploadDelay.
src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cs (1)

480-481: Consider async-compatible progress reporting pattern.

The .AsTask().GetAwaiter().GetResult() call blocks synchronously inside the Progress<T> callback, which executes on thread pool threads from Parallel.ForEachAsync. While this works because IMediator.Publish for in-process handlers is typically fast, it introduces a blocking wait that could contribute to thread pool pressure under high concurrency.

If this becomes a concern, consider using IProgress<T> with a channel-based collector that drains asynchronously, or accept the trade-off given the low frequency of progress reports during finalization.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cs` around lines
480 - 481, The current Progress<(int Completed,int Total)> callback blocks
synchronously by calling
_mediator.Publish(...).AsTask().GetAwaiter().GetResult(), which can cause
thread-pool pressure; replace this with an async-friendly pattern: create a
Channel<(int Completed,int Total)> (or ConcurrentQueue plus a signaling Task)
and in the Progress callback only TryWrite the tuple to the channel
(non-blocking), start a background async DrainProgressAsync task that reads from
the channel and awaits _mediator.Publish(new
ChunkIndexFlushProgressEvent(completed,total), cancellationToken) for each item,
and ensure you await or cancel the drain task after the Parallel.ForEachAsync
section completes; update references to flushProgress, _mediator.Publish, and
ChunkIndexFlushProgressEvent accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/Arius.Core.Tests/Shared/ChunkIndex/ChunkIndexServiceTests.cs`:
- Around line 103-105: The test uses a non-thread-safe List<(int Completed,int
Total)> named updates fed by SynchronousProgress<(int Completed,int Total)>
callbacks from parallel workers (e.g., in FlushAsync), causing race conditions;
make the progress probe thread-safe by replacing the shared List with a
thread-safe collection (e.g., ConcurrentBag or ConcurrentQueue) or synchronizing
access with a lock around updates.Add in the SynchronousProgress callback, and
apply the same change to the other occurrence around lines 129-131 so all
progress callbacks are collected safely.

In
`@src/Arius.Core.Tests/Shared/ChunkIndex/Fakes/RecordingChunkIndexBlobContainerService.cs`:
- Around line 30-36: The increment of _activeChunkIndexUploads in
RecordingChunkIndexBlobContainerService can be left undecremented if
cancellation occurs during Task.Delay; wrap the increment + any awaited delay in
a try/finally so Interlocked.Decrement(ref _activeChunkIndexUploads) runs in the
finally block (or register a cancellation callback that decrements) to guarantee
decrement on cancellation; apply the same change to the other similar block
(lines 44-48) and keep UpdateMaxConcurrency calls consistent (call before await
on increment and after decrement or update inside finally as appropriate).

In `@src/Arius.Core/Shared/ChunkIndex/ChunkIndexService.cs`:
- Around line 147-156: The current FlushAsync logic eagerly drains
_pendingEntries into pending and groups by Shard.PrefixOf, which loses entries
if any per-prefix persistence fails; change FlushAsync so after taking a
snapshot (pending) you hold it as a temporary snapshot and, if any prefix
pipeline (the byPrefix processing) throws, requeue either the entire snapshot or
at minimum the unprocessed/failed prefix groups back into _pendingEntries before
rethrowing; locate the snapshot/while (_pendingEntries.TryTake...) block and the
per-prefix processing loop that iterates over byPrefix and add requeue logic in
the catch/finally path to push failed ShardEntry items back onto _pendingEntries
to make FlushAsync recoverable.

In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs`:
- Around line 57-90: The producer catch/finally must stop and await the
background upload workers before deleting spool files or rethrowing: on
producer-side failure, cancel the worker CancellationTokenSource used to drive
the consumers (the token passed into Task.Run and ReadAllAsync), call
uploadChannel.Writer.Complete(ex) if not already, then await
Task.WhenAll(consumers) to ensure all consumer tasks (the consumers array
created from Task.Run(...) reading from uploadChannel.Reader) have finished
before performing any File.Delete of spoolPaths or returning; update the
exception handling around the producer path that spools files so it cancels the
token and awaits consumers in the catch/finally, and consider switching to a
spool-then-publish flow where the final cache entry is published only after
_fileTreeService.WriteAsync completes successfully.
- Around line 154-171: The current loop that scans dirHashMap for children for
each directory (using GetDirectoryPath and GetLastSegment to form FileTreeEntry
items added to entries) is quadratic; instead maintain a parent→children map or
push each completed child into its parent's list when you compute it: create a
Dictionary<string, List<FileTreeEntry>> (or add to an existing map) keyed by
parent path, when finalizing a directory build create the FileTreeEntry for that
directory and Add it into parentChildrenMap[parentPath] (creating the list if
missing), and in the directory-finalization logic replace the foreach over
dirHashMap with a single parentChildrenMap.TryGetValue(dirPath, out children)
lookup to append those entries to entries; ensure root/no-parent cases are
handled.

---

Nitpick comments:
In `@src/Arius.Cli/Commands/Archive/ArchiveProgressHandlers.cs`:
- Around line 188-208: The two new top-level classes
ChunkIndexFlushProgressHandler and TreeUploadProgressHandler in
ArchiveProgressHandlers.cs should each live in their own file: create
ChunkIndexFlushProgressHandler.cs and TreeUploadProgressHandler.cs, copy the
respective class definitions (including the constructor parameter ProgressState
and implemented INotificationHandler<T> signature) into those files, keep the
same namespace and usings as the original file, remove the classes from
ArchiveProgressHandlers.cs, and ensure any project/file references or tests
still compile (no API changes to
SetChunkIndexFlushProgress/SetTreeUploadProgress usage).

In
`@src/Arius.Core.Tests/Shared/FileTree/Fakes/RecordingFileTreeBlobContainerService.cs`:
- Around line 35-64: The UploadAsync method increments _activeFileTreeUploads
for file-tree blobs but only decrements inside the try/finally that begins after
the waits/throws, so exceptions from _allowFileTreeUploads.Task.WaitAsync,
_throwOnFileTreeUpload, or Task.Delay leave the counter incremented; wrap the
file-tree pre-upload section (the Interlocked.Increment call and any
waits/throws: _firstFileTreeUploadStarted.TrySetResult,
Interlocked.Increment(ref _activeFileTreeUploads),
_allowFileTreeUploads.Task.WaitAsync, _throwOnFileTreeUpload check, and
_fileTreeUploadDelay) in a try/finally (or move the decrement finally to cover
that entire region) so that Interlocked.Decrement(ref _activeFileTreeUploads)
always runs when isFileTree is true, referencing UploadAsync,
_activeFileTreeUploads, _firstFileTreeUploadStarted, _allowFileTreeUploads,
_throwOnFileTreeUpload, and _fileTreeUploadDelay.

In `@src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cs`:
- Around line 480-481: The current Progress<(int Completed,int Total)> callback
blocks synchronously by calling
_mediator.Publish(...).AsTask().GetAwaiter().GetResult(), which can cause
thread-pool pressure; replace this with an async-friendly pattern: create a
Channel<(int Completed,int Total)> (or ConcurrentQueue plus a signaling Task)
and in the Progress callback only TryWrite the tuple to the channel
(non-blocking), start a background async DrainProgressAsync task that reads from
the channel and awaits _mediator.Publish(new
ChunkIndexFlushProgressEvent(completed,total), cancellationToken) for each item,
and ensure you await or cancel the drain task after the Parallel.ForEachAsync
section completes; update references to flushProgress, _mediator.Publish, and
ChunkIndexFlushProgressEvent accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 3a57d4a4-9379-4f1a-8859-58f023a5444d

📥 Commits

Reviewing files that changed from the base of the PR and between 26bf43b and 1fd77d4.

📒 Files selected for processing (23)
  • AGENTS.md
  • README.md
  • openspec/changes/chunk-index-prefix-strategy/.openspec.yaml
  • openspec/changes/chunk-index-prefix-strategy/proposal.md
  • openspec/changes/parallel-flush-and-tree-upload/design.md
  • openspec/changes/parallel-flush-and-tree-upload/proposal.md
  • openspec/changes/parallel-flush-and-tree-upload/tasks.md
  • src/Arius.Cli.Tests/Commands/Archive/BuildArchiveDisplayTests.cs
  • src/Arius.Cli.Tests/Commands/Archive/NotificationHandlerTests.cs
  • src/Arius.Cli.Tests/MediatorEventRoutingIntegrationTests.cs
  • src/Arius.Cli/Commands/Archive/ArchiveProgressHandlers.cs
  • src/Arius.Cli/Commands/Archive/ArchiveVerb.cs
  • src/Arius.Cli/ProgressState.cs
  • src/Arius.Core.Tests/Features/ArchiveCommand/ArchiveFinalizationTests.cs
  • src/Arius.Core.Tests/Features/ArchiveCommand/Fakes/CoordinatedArchiveBlobContainerService.cs
  • src/Arius.Core.Tests/Shared/ChunkIndex/ChunkIndexServiceTests.cs
  • src/Arius.Core.Tests/Shared/ChunkIndex/Fakes/RecordingChunkIndexBlobContainerService.cs
  • src/Arius.Core.Tests/Shared/FileTree/Fakes/RecordingFileTreeBlobContainerService.cs
  • src/Arius.Core.Tests/Shared/FileTree/FileTreeBuilderTests.cs
  • src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cs
  • src/Arius.Core/Features/ArchiveCommand/Events.cs
  • src/Arius.Core/Shared/ChunkIndex/ChunkIndexService.cs
  • src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs

Comment thread src/Arius.Core.Tests/Shared/ChunkIndex/ChunkIndexServiceTests.cs Outdated
Comment thread src/Arius.Core/Shared/ChunkIndex/ChunkIndexService.cs
Comment on lines +57 to +90
var uploadChannel = Channel.CreateBounded<PendingTreeUpload>(UploadQueueCapacity);
var totalUploadsKnown = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
var bufferedProgress = new ConcurrentQueue<int>();
var spoolPaths = new ConcurrentBag<string>();
var uploadsQueued = 0;
var uploadsCompleted = 0;

// Stream through sorted manifest entries
await foreach (var manifestEntry in ReadManifestAsync(sortedManifestPath, cancellationToken))
{
var filePath = manifestEntry.Path; // e.g., "photos/2024/june/a.jpg"
var dirPath = GetDirectoryPath(filePath); // e.g., "photos/2024/june"
var name = Path.GetFileName(filePath);
var consumers = Enumerable.Range(0, UploadWorkers)
.Select(_ => Task.Run(async () =>
{
await foreach (var upload in uploadChannel.Reader.ReadAllAsync(cancellationToken))
{
try
{
var plaintext = await File.ReadAllBytesAsync(upload.SpoolPath, cancellationToken);
var tree = FileTreeBlobSerializer.Deserialize(plaintext);
await _fileTreeService.WriteAsync(upload.Hash, tree, cancellationToken);

var done = Interlocked.Increment(ref uploadsCompleted);
if (progress is not null)
{
if (totalUploadsKnown.Task.IsCompletedSuccessfully)
progress.Report((done, totalUploadsKnown.Task.Result));
else
bufferedProgress.Enqueue(done);
}
}
finally
{
try { File.Delete(upload.SpoolPath); } catch { /* best-effort cleanup */ }
}
}
}, cancellationToken))
.ToArray();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Stop the upload workers before cleanup on producer-side failures.

If manifest parsing, hashing, or spooling throws before Task.WhenAll(consumers), the catch path only completes the channel and rethrows. The consumers can still be uploading while finally deletes the same spool files, so one failure turns into background uploads plus racy cleanup. Cancel the worker token and await the consumer tasks before deleting spool files or returning.

Based on learnings, when local cache publication depends on remote durability, prefer a spool-then-publish flow: write tentative data outside the final cache path, upload it, then publish the final cache entry only after upload succeeds.

Also applies to: 207-217

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs` around lines 57 - 90, The
producer catch/finally must stop and await the background upload workers before
deleting spool files or rethrowing: on producer-side failure, cancel the worker
CancellationTokenSource used to drive the consumers (the token passed into
Task.Run and ReadAllAsync), call uploadChannel.Writer.Complete(ex) if not
already, then await Task.WhenAll(consumers) to ensure all consumer tasks (the
consumers array created from Task.Run(...) reading from uploadChannel.Reader)
have finished before performing any File.Delete of spoolPaths or returning;
update the exception handling around the producer path that spools files so it
cancels the token and awaits consumers in the catch/finally, and consider
switching to a spool-then-publish flow where the final cache entry is published
only after _fileTreeService.WriteAsync completes successfully.

Comment on lines +154 to 171
// Inject already-computed child directory entries
// (any immediate child directories of dirPath that have been processed)
foreach (var (childDirPath, childHash) in dirHashMap)
{
var childName = GetLastSegment(childDirPath);
entries.Add(new FileTreeEntry
var childParent = GetDirectoryPath(childDirPath);
if (childParent == dirPath)
{
Name = childName + "/",
Type = FileTreeEntryType.Dir,
Hash = childHash,
Created = null,
Modified = null
});
var childName = GetLastSegment(childDirPath);
entries.Add(new FileTreeEntry
{
Name = childName + "/",
Type = FileTreeEntryType.Dir,
Hash = childHash,
Created = null,
Modified = null
});
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This child-directory lookup is quadratic in directory count.

Every directory walks the full dirHashMap to find its immediate children. On large trees that makes build time O(directories²), which can easily dominate the finalization tail this PR is trying to shorten. Push each completed child directly into its parent entry list, or keep a parent→children map, instead of rescanning all prior directories.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs` around lines 154 - 171,
The current loop that scans dirHashMap for children for each directory (using
GetDirectoryPath and GetLastSegment to form FileTreeEntry items added to
entries) is quadratic; instead maintain a parent→children map or push each
completed child into its parent's list when you compute it: create a
Dictionary<string, List<FileTreeEntry>> (or add to an existing map) keyed by
parent path, when finalizing a directory build create the FileTreeEntry for that
directory and Add it into parentChildrenMap[parentPath] (creating the list if
missing), and in the directory-finalization logic replace the foreach over
dirHashMap with a single parentChildrenMap.TryGetValue(dirPath, out children)
lookup to append those entries to entries; ensure root/no-parent cases are
handled.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs (1)

64-88: ⚠️ Potential issue | 🟠 Major

Await uploadTask before cleaning up spool files on producer failures.

Line 208 only completes the channel. If the producer throws after queueing work, Line 213 can delete a spool file while a worker is still reading it, and any later failure from uploadTask goes unobserved. Cancel the worker token and await uploadTask before cleanup/rethrow.

Based on learnings, when local cache publication depends on remote durability, prefer a spool-then-publish flow: write tentative data outside the final cache path, upload it, then publish the final cache entry only after upload succeeds.

Also applies to: 206-216

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs` around lines 64 - 88, The
producer completes the channel but may delete spool files while
Parallel.ForEachAsync workers (uploadTask) are still reading them and any
exceptions from uploadTask go unobserved; fix by cancelling the worker
CancellationTokenSource (the token passed into ParallelOptions), awaiting the
uploadTask task to observe failures, then perform spool-file cleanup and rethrow
if uploadTask faulted; specifically, when using uploadChannel.Reader and
creating uploadTask via Parallel.ForEachAsync (with ParallelOptions {
MaxDegreeOfParallelism = UploadWorkers, CancellationToken = cancellationToken
}), ensure you call cancellationSource.Cancel(), await uploadTask, then delete
spool files (or run best-effort cleanup) and propagate any exceptions from
uploadTask before returning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs`:
- Around line 61-63: The build currently can schedule multiple identical subtree
uploads because it checks ExistsInRemote per subtree and then enqueues
WriteAsync tasks; fix this by deduplicating tree hashes before enqueuing to the
worker pool: in FileTreeBuilder (the method that computes treeHash and
increments uploadsQueued/uploadsCompleted), maintain a thread-safe hash set (or
local HashSet before parallel scheduling) of scheduled treeHash values and only
schedule WriteAsync for a hash not already present, ensuring the same de-dup
logic is applied wherever tree hashes are gathered (also check the call sites
that use ExistsInRemote and scheduling at FileTreeService.WriteAsync to avoid
races), and ensure the dedupe set is synchronized (or built single-threaded) so
encryption paths and cache file writes remain thread-safe.

---

Duplicate comments:
In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs`:
- Around line 64-88: The producer completes the channel but may delete spool
files while Parallel.ForEachAsync workers (uploadTask) are still reading them
and any exceptions from uploadTask go unobserved; fix by cancelling the worker
CancellationTokenSource (the token passed into ParallelOptions), awaiting the
uploadTask task to observe failures, then perform spool-file cleanup and rethrow
if uploadTask faulted; specifically, when using uploadChannel.Reader and
creating uploadTask via Parallel.ForEachAsync (with ParallelOptions {
MaxDegreeOfParallelism = UploadWorkers, CancellationToken = cancellationToken
}), ensure you call cancellationSource.Cancel(), await uploadTask, then delete
spool files (or run best-effort cleanup) and propagate any exceptions from
uploadTask before returning.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: e73c96fb-a0ef-4123-a670-5fb070ee5d77

📥 Commits

Reviewing files that changed from the base of the PR and between 1fd77d4 and c3083cf.

📒 Files selected for processing (2)
  • src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cs
  • src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/Arius.Core/Features/ArchiveCommand/ArchiveCommandHandler.cs

Comment on lines +61 to +63
var uploadsQueued = 0;
var uploadsCompleted = 0;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Deduplicate scheduled tree hashes before handing them to the worker pool.

Two identical subtrees in the same build can compute the same treeHash and both pass ExistsInRemote before either worker finishes. That queues concurrent WriteAsync calls for the same hash, which then race on the same cache file in src/Arius.Core/Shared/FileTree/FileTreeService.cs:197-225.

💡 Suggested fix
@@
-        var uploadsQueued = 0;
+        var uploadsQueued = 0;
+        var queuedHashes = new HashSet<string>(StringComparer.Ordinal);
@@
-                await QueueUploadIfNeededAsync(hash, tree, uploadChannel.Writer, spoolPaths, () => uploadsQueued++, cancellationToken);
+                await QueueUploadIfNeededAsync(hash, tree, queuedHashes, uploadChannel.Writer, spoolPaths, () => uploadsQueued++, cancellationToken);
@@
-                rootHash = await BuildRootBlobAsync(dirHashMap, dirEntries, uploadChannel.Writer, spoolPaths, () => uploadsQueued++, cancellationToken);
+                rootHash = await BuildRootBlobAsync(dirHashMap, dirEntries, queuedHashes, uploadChannel.Writer, spoolPaths, () => uploadsQueued++, cancellationToken);
@@
-        ChannelWriter<PendingTreeUpload> uploadWriter,
+        HashSet<string>                  queuedHashes,
+        ChannelWriter<PendingTreeUpload> uploadWriter,
@@
-        if (_fileTreeService.ExistsInRemote(treeHash))
+        if (_fileTreeService.ExistsInRemote(treeHash) || !queuedHashes.Add(treeHash))
             return;

As per coding guidelines, src/Arius.Core/**: Core library. Pay close attention to deduplication logic, encryption correctness, and thread safety.

Also applies to: 175-175, 194-194, 271-287

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Arius.Core/Shared/FileTree/FileTreeBuilder.cs` around lines 61 - 63, The
build currently can schedule multiple identical subtree uploads because it
checks ExistsInRemote per subtree and then enqueues WriteAsync tasks; fix this
by deduplicating tree hashes before enqueuing to the worker pool: in
FileTreeBuilder (the method that computes treeHash and increments
uploadsQueued/uploadsCompleted), maintain a thread-safe hash set (or local
HashSet before parallel scheduling) of scheduled treeHash values and only
schedule WriteAsync for a hash not already present, ensuring the same de-dup
logic is applied wherever tree hashes are gathered (also check the call sites
that use ExistsInRemote and scheduling at FileTreeService.WriteAsync to avoid
races), and ensure the dedupe set is synchronized (or built single-threaded) so
encryption paths and cache file writes remain thread-safe.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
AGENTS.md (1)

45-51: Consolidate duplicate durability sections to prevent guidance drift.

Scale And Durability (Lines 35-44) and Durability And Scale (Lines 45-51) overlap heavily. Consider merging into one canonical section to keep agent instructions consistent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AGENTS.md` around lines 45 - 51, There are two overlapping sections "Scale
And Durability" and "Durability And Scale" in AGENTS.md; consolidate them into a
single canonical section (pick one header name, e.g., "Durability and Scale") by
merging unique guidance from both, removing duplicate bullets, preserving the
authoritative lines about snapshot commit points, tentative partial state,
parallelization constraints, and spool-then-publish flow, and update any
internal references or links to point to the single section so agent
instructions remain consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@AGENTS.md`:
- Line 48: Revise the sentence about snapshots to state that chunk-index flushes
and filetree uploads performed before snapshot creation are already durably
persisted but remain uncommitted until snapshot publication; specifically
replace the phrase "tentative partial state, not a completed archive" with
wording that clarifies these writes are "durable partial state (persisted
remotely) but uncommitted and not visible until snapshot publication commits the
repository); keep the guidance that snapshots are the repository commit point
and retain the note about parallelizing independent work only when it does not
weaken crash-recovery semantics. Reference phrases to update: "Snapshot
creation", "chunk-index flushes", "filetree uploads", and "snapshot
publication".

In
`@src/Arius.Core.Tests/Shared/ChunkIndex/Fakes/RecordingChunkIndexBlobContainerService.cs`:
- Around line 31-62: UploadAsync currently always replaces _blobs entries
ignoring the overwrite parameter; update UploadAsync (the method named
UploadAsync) to check whether a blob already exists in _blobs for the given
blobName before writing and honor overwrite semantics: if a blob exists and
overwrite is false, do not replace it and throw an appropriate exception (e.g.,
IOException) to match IBlobContainerService.UploadAsync behavior; if overwrite
is true or the blob doesn't exist, proceed with the existing logic that copies
the stream and sets _blobs[blobName] (apply this check in both the chunk-index
branch and the non-chunk branch), ensuring the Interlocked increments/decrements
and delay/failure simulation remain unchanged.

---

Nitpick comments:
In `@AGENTS.md`:
- Around line 45-51: There are two overlapping sections "Scale And Durability"
and "Durability And Scale" in AGENTS.md; consolidate them into a single
canonical section (pick one header name, e.g., "Durability and Scale") by
merging unique guidance from both, removing duplicate bullets, preserving the
authoritative lines about snapshot commit points, tentative partial state,
parallelization constraints, and spool-then-publish flow, and update any
internal references or links to point to the single section so agent
instructions remain consistent.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 64280643-5a14-4be4-89c8-47e911a29fda

📥 Commits

Reviewing files that changed from the base of the PR and between c3083cf and 747f39d.

📒 Files selected for processing (4)
  • AGENTS.md
  • src/Arius.Core.Tests/Shared/ChunkIndex/ChunkIndexServiceTests.cs
  • src/Arius.Core.Tests/Shared/ChunkIndex/Fakes/RecordingChunkIndexBlobContainerService.cs
  • src/Arius.Core/Shared/ChunkIndex/ChunkIndexService.cs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/Arius.Core/Shared/ChunkIndex/ChunkIndexService.cs

Comment thread AGENTS.md
## Durability And Scale

- Arius is a backup/archive tool. Prefer recoverability and correctness over throughput.
- Snapshot creation is the only repository commit point. Chunk-index flushes and filetree uploads that complete before a snapshot exists are tentative partial state, not a completed archive.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Clarify that pre-snapshot state is durable but uncommitted.

Line 48 currently implies flush/upload outputs are merely tentative, but these writes are already persisted remotely; they are durable partial state until snapshot publication commits visibility. Please tighten wording to avoid misleading future implementations.

Based on learnings: “Snapshots are the repository commit point; do not publish a snapshot until all referenced repository data is durably available” and “Parallelize independent work when useful, but do not weaken crash recovery semantics to do it.”

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AGENTS.md` at line 48, Revise the sentence about snapshots to state that
chunk-index flushes and filetree uploads performed before snapshot creation are
already durably persisted but remain uncommitted until snapshot publication;
specifically replace the phrase "tentative partial state, not a completed
archive" with wording that clarifies these writes are "durable partial state
(persisted remotely) but uncommitted and not visible until snapshot publication
commits the repository); keep the guidance that snapshots are the repository
commit point and retain the note about parallelizing independent work only when
it does not weaken crash-recovery semantics. Reference phrases to update:
"Snapshot creation", "chunk-index flushes", "filetree uploads", and "snapshot
publication".

Comment on lines +31 to +62
public async Task UploadAsync(string blobName, Stream content, IReadOnlyDictionary<string, string> metadata, BlobTier tier, string? contentType = null, bool overwrite = false, CancellationToken cancellationToken = default)
{
var isChunkIndex = blobName.StartsWith(BlobPaths.ChunkIndex, StringComparison.Ordinal);
if (isChunkIndex)
{
var active = Interlocked.Increment(ref _activeChunkIndexUploads);
UpdateMaxConcurrency(active);
try
{
if (_chunkIndexUploadDelay > TimeSpan.Zero)
await Task.Delay(_chunkIndexUploadDelay, cancellationToken);

if (_failUploadForPrefix is not null && blobName == BlobPaths.ChunkIndexShard(_failUploadForPrefix))
throw new IOException($"Simulated chunk-index upload failure for prefix {_failUploadForPrefix}.");

await using var ms = new MemoryStream();
await content.CopyToAsync(ms, cancellationToken);
_blobs[blobName] = new StoredBlob(ms.ToArray(), new Dictionary<string, string>(metadata), tier, contentType, false);
}
finally
{
var remaining = Interlocked.Decrement(ref _activeChunkIndexUploads);
UpdateMaxConcurrency(remaining);
}
}
else
{
await using var ms = new MemoryStream();
await content.CopyToAsync(ms, cancellationToken);
_blobs[blobName] = new StoredBlob(ms.ToArray(), new Dictionary<string, string>(metadata), tier, contentType, false);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Honor overwrite semantics in UploadAsync.

Line 31 accepts overwrite, but Lines 48 and 60 always replace existing entries. This diverges from IBlobContainerService.UploadAsync behavior and can mask bugs where callers expect non-overwrite writes to fail.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@src/Arius.Core.Tests/Shared/ChunkIndex/Fakes/RecordingChunkIndexBlobContainerService.cs`
around lines 31 - 62, UploadAsync currently always replaces _blobs entries
ignoring the overwrite parameter; update UploadAsync (the method named
UploadAsync) to check whether a blob already exists in _blobs for the given
blobName before writing and honor overwrite semantics: if a blob exists and
overwrite is false, do not replace it and throw an appropriate exception (e.g.,
IOException) to match IBlobContainerService.UploadAsync behavior; if overwrite
is true or the blob doesn't exist, proceed with the existing logic that copies
the stream and sets _blobs[blobName] (apply this check in both the chunk-index
branch and the non-chunk branch), ensuring the Interlocked increments/decrements
and delay/failure simulation remain unchanged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant