From 6bbebc0204d1aebfa671f21bb131183a89489c76 Mon Sep 17 00:00:00 2001 From: Ole Kristian Losvik Date: Thu, 21 May 2026 18:35:41 +0200 Subject: [PATCH] Api: Add async $archive-import endpoint ImportController accepts a zip upload, buffers it to memory, queues ingestion via BackgroundTaskQueue, and returns 202 Accepted with an OperationOutcome carrying the operation id. The background worker opens the zip as a ZipArchive, emits per-entry Progress events and an overall Completed event on the operations hub, and surfaces invalid zip content as a clean Error event. A per-action upload cap (default 50 MiB, configurable via ImportSettings:MaxUploadSizeBytes) is enforced by ImportRequestSizeLimitFilter, which raises the per-request limit above the global Kestrel default for this endpoint only. Only one import runs at a time, a second concurrent caller gets 429 Too Many Requests instead of stacking another upload buffer ahead of the single-reader drainer. Per-entry handling routes through an IngestEntryAsync stub that a later slice will replace with FHIR resource parsing and ingestion. --- src/Ignis.Api/Configuration/ImportSettings.cs | 21 ++++ src/Ignis.Api/Controllers/ImportController.cs | 91 +++++++++++--- .../Filters/ImportRequestSizeLimitFilter.cs | 39 ++++++ src/Ignis.Api/Program.cs | 5 + .../Services/Import/IImportService.cs | 7 +- .../Services/Import/ImportService.cs | 79 ++++++++++-- src/Ignis.Api/appsettings.json | 3 + .../Ignis.Api.Tests/ImportControllerTests.cs | 119 ++++++++++++++++++ 8 files changed, 334 insertions(+), 30 deletions(-) create mode 100644 src/Ignis.Api/Configuration/ImportSettings.cs create mode 100644 src/Ignis.Api/Filters/ImportRequestSizeLimitFilter.cs diff --git a/src/Ignis.Api/Configuration/ImportSettings.cs b/src/Ignis.Api/Configuration/ImportSettings.cs new file mode 100644 index 0000000..b7b70f6 --- /dev/null +++ b/src/Ignis.Api/Configuration/ImportSettings.cs @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +namespace Ignis.Api.Configuration; + +/// +/// Settings for archive import; bound from the ImportSettings +/// configuration section. +/// +public sealed class ImportSettings +{ + /// + /// Max upload size for $archive-import in bytes. Applied per-request + /// via ImportRequestSizeLimitFilter; opts up from the global + /// Kestrel default. Default 50 MiB. + /// + public long MaxUploadSizeBytes { get; set; } = 50 * 1024 * 1024; +} diff --git a/src/Ignis.Api/Controllers/ImportController.cs b/src/Ignis.Api/Controllers/ImportController.cs index 55f5d0c..be79442 100644 --- a/src/Ignis.Api/Controllers/ImportController.cs +++ b/src/Ignis.Api/Controllers/ImportController.cs @@ -10,6 +10,8 @@ using Ignis.Api.Configuration; using Ignis.Api.Extensions; +using Ignis.Api.Filters; +using Ignis.Api.Services.BackgroundTasks; using Ignis.Api.Services.Import; using Ignis.Auth.Authorization; @@ -27,23 +29,27 @@ namespace Ignis.Api.Controllers; [Route("fhir"), ApiController] [Authorize(AuthenticationSchemes = OpenIddictValidationAspNetCoreDefaults.AuthenticationScheme)] public class ImportController( - IImportService importService, + BackgroundTaskQueue backgroundTaskQueue, IOptions featureSettings, ILogger logger) : ControllerBase { + // One import at a time: the drainer is single-reader and uploads sit in + // memory until processed, so a second caller gets 429 instead of stacking + // another buffer. + private static readonly SemaphoreSlim _importSlot = new(initialCount: 1, maxCount: 1); + /// - /// Imports an archive (zip) of JSON-serialized FHIR resources. - /// Requires the operations.import scope and the - /// FeatureManagement:AllowImport flag to be enabled — otherwise the - /// endpoint responds with 503 Service Unavailable. - /// Returns 202 Accepted with an - /// carrying the operation id; subsequent progress, completion, or error is - /// reported via the operations hub. Archive parsing and ingestion are not - /// yet implemented — the service stub publishes an error event on the hub. + /// Imports a zip archive of JSON-serialized FHIR resources. Requires the + /// operations.import scope and FeatureManagement:AllowImport; + /// otherwise responds with 503. Returns 202 with an + /// carrying the operation id; the archive + /// is buffered, queued, and progress/completion/error is reported via the + /// operations hub. /// [HttpPost("$archive-import"), Tags("Operations")] [Authorize(Policy = OperationsPolicies.Import)] [Consumes("multipart/form-data")] + [ServiceFilter] public async Task ArchiveImport([FromForm] IFormFile file) { if (!featureSettings.Value.AllowImport) @@ -51,19 +57,64 @@ public async Task ArchiveImport([FromForm] IFormFile file) HttpStatusCode.ServiceUnavailable, "Archive import is not enabled on this server."); - var operationId = Guid.NewGuid(); - logger.LogInformation( - "Archive import requested by {Subject} (operation {OperationId}, {Bytes} bytes).", - User.FindFirst(OpenIddictConstants.Claims.Subject)?.Value ?? "unknown", - operationId, - file.Length); + if (!_importSlot.Wait(0)) + { + logger.LogInformation( + "Archive import rejected (already in progress) for {Subject}.", + User.FindFirst(OpenIddictConstants.Claims.Subject)?.Value ?? "unknown"); + return Respond.WithError( + HttpStatusCode.TooManyRequests, + "Another archive import is already in progress. Try again when it completes."); + } + + // Worker takes over the slot and the buffer once queued; until then the controller owns both. + MemoryStream? buffer = null; + var handedOffToWorker = false; + try + { + var operationId = Guid.NewGuid(); + logger.LogInformation( + "Archive import requested by {Subject} (operation {OperationId}, {Bytes} bytes).", + User.FindFirst(OpenIddictConstants.Claims.Subject)?.Value ?? "unknown", + operationId, + file.Length); + + // IFormFile closes with the request scope; worker reads after response. + buffer = new MemoryStream(); + await using (var stream = file.OpenReadStream()) + await stream.CopyToAsync(buffer, HttpContext.RequestAborted); + buffer.Position = 0; - await importService.ImportArchiveAsync(operationId); + await backgroundTaskQueue.QueueAsync(async (services, _) => + { + try + { + await using (buffer) + { + var importer = services.GetRequiredService(); + await importer.ImportZipArchiveAsync(operationId, buffer); + } + } + finally + { + _importSlot.Release(); + } + }); + handedOffToWorker = true; - var outcome = new OperationOutcome() - .WithOperationId(operationId) - .AddInformationIssue("Import accepted; progress will be reported via the operations hub."); + var outcome = new OperationOutcome() + .WithOperationId(operationId) + .AddInformationIssue("Import accepted; progress will be reported via the operations hub."); - return Respond.WithResource(StatusCodes.Status202Accepted, outcome); + return Respond.WithResource(StatusCodes.Status202Accepted, outcome); + } + finally + { + if (!handedOffToWorker) + { + _importSlot.Release(); + buffer?.Dispose(); + } + } } } diff --git a/src/Ignis.Api/Filters/ImportRequestSizeLimitFilter.cs b/src/Ignis.Api/Filters/ImportRequestSizeLimitFilter.cs new file mode 100644 index 0000000..df0edc5 --- /dev/null +++ b/src/Ignis.Api/Filters/ImportRequestSizeLimitFilter.cs @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2026, Incendi + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +using Ignis.Api.Configuration; + +using Microsoft.AspNetCore.Http.Features; +using Microsoft.AspNetCore.Mvc.Filters; +using Microsoft.Extensions.Options; + +namespace Ignis.Api.Filters; + +/// +/// Resource filter that overrides the request's max body size from +/// . Runs before model +/// binding reads the body, so the limit applies to the upload itself. +/// +public sealed class ImportRequestSizeLimitFilter : IAsyncResourceFilter +{ + private readonly long _maxBytes; + + public ImportRequestSizeLimitFilter(IOptions options) + { + ArgumentNullException.ThrowIfNull(options); + _maxBytes = options.Value.MaxUploadSizeBytes; + } + + public Task OnResourceExecutionAsync( + ResourceExecutingContext context, + ResourceExecutionDelegate next) + { + var feature = context.HttpContext.Features.Get(); + if (feature is { IsReadOnly: false }) + feature.MaxRequestBodySize = _maxBytes; + return next(); + } +} diff --git a/src/Ignis.Api/Program.cs b/src/Ignis.Api/Program.cs index 78531bd..7049839 100644 --- a/src/Ignis.Api/Program.cs +++ b/src/Ignis.Api/Program.cs @@ -6,6 +6,7 @@ using Ignis.Api.Configuration; using Ignis.Api.Extensions; +using Ignis.Api.Filters; using Ignis.Api.Hubs; using Ignis.Api.Services.BackgroundTasks; using Ignis.Api.Services.Import; @@ -51,6 +52,10 @@ // Bind feature flags builder.Services.Configure(builder.Configuration.GetSection("FeatureManagement")); +// Bind import settings + per-action size filter used by $archive-import +builder.Services.Configure(builder.Configuration.GetSection("ImportSettings")); +builder.Services.AddScoped(); + // Bind forwarded headers settings // Middleware is only added if at least one of KnownProxies or KnownNetworks are configured. var forwardedHeadersSettings = new ForwardedHeadersSettings(); diff --git a/src/Ignis.Api/Services/Import/IImportService.cs b/src/Ignis.Api/Services/Import/IImportService.cs index f61552d..bd2bf32 100644 --- a/src/Ignis.Api/Services/Import/IImportService.cs +++ b/src/Ignis.Api/Services/Import/IImportService.cs @@ -14,8 +14,9 @@ namespace Ignis.Api.Services.Import; public interface IImportService { /// - /// Imports an archive (zip) of JSON-serialized FHIR resources. - /// Archive parsing, extraction limits, and resource ingestion are not yet implemented. + /// Reads a zip archive and reports the number of entries it contains via + /// the operations hub. Resource parsing and ingestion are out of scope for + /// the current slice; later slices will extend the implementation. /// - Task ImportArchiveAsync(Guid operationId); + Task ImportZipArchiveAsync(Guid operationId, Stream archive); } diff --git a/src/Ignis.Api/Services/Import/ImportService.cs b/src/Ignis.Api/Services/Import/ImportService.cs index 6c2923e..ab45f06 100644 --- a/src/Ignis.Api/Services/Import/ImportService.cs +++ b/src/Ignis.Api/Services/Import/ImportService.cs @@ -4,6 +4,8 @@ * SPDX-License-Identifier: BSD-3-Clause */ +using System.IO.Compression; + using Ignis.Api.Services.Operations; namespace Ignis.Api.Services.Import; @@ -12,14 +14,77 @@ public sealed class ImportService( IOperationProgressNotifier notifier, ILogger logger) : IImportService { - public async Task ImportArchiveAsync(Guid operationId) + public async Task ImportZipArchiveAsync(Guid operationId, Stream archive) { - logger.LogInformation( - "Archive import requested but not yet implemented (operation {OperationId}).", - operationId); + try + { + using var zip = new ZipArchive(archive, ZipArchiveMode.Read, leaveOpen: true); + var entryCount = zip.Entries.Count; + + logger.LogInformation( + "Archive opened (operation {OperationId}, {EntryCount} entries).", + operationId, entryCount); + + await notifier.ProgressAsync( + operationId, + $"Found {entryCount} entries in archive.", + new OperationProgress(Current: 0, Total: entryCount)) + .ConfigureAwait(false); + + var current = 0; + foreach (var entry in zip.Entries) + { + current++; + await IngestEntryAsync(operationId, entry).ConfigureAwait(false); + await notifier.ProgressAsync( + operationId, + $"Processed {TruncateName(entry.FullName)}", + new OperationProgress(Current: current, Total: entryCount)) + .ConfigureAwait(false); + } + + await notifier + .CompletedAsync(operationId, $"Enumerated {entryCount} entries.") + .ConfigureAwait(false); + } + catch (Exception ex) when ( + ex is InvalidDataException or + ArgumentOutOfRangeException or + EndOfStreamException) + { + logger.LogWarning( + ex, + "Archive could not be opened (operation {OperationId}).", + operationId); - await notifier - .ErrorAsync(operationId, "Archive import is not yet implemented.") - .ConfigureAwait(false); + await notifier + .ErrorAsync(operationId, "Not able to parse uploaded zip archive.") + .ConfigureAwait(false); + } + catch (Exception ex) + { + // Catch-all so the hub always emits an Error event; otherwise the client waits indefinitely. + logger.LogError( + ex, + "Unexpected error during archive import (operation {OperationId}).", + operationId); + + await notifier + .ErrorAsync(operationId, "Unexpected error while importing archive.") + .ConfigureAwait(false); + } } + + // Stub — will be replaced by FHIR parsing + write in a later slice. + private Task IngestEntryAsync(Guid operationId, ZipArchiveEntry entry) + { + logger.LogDebug( + "Entry stub (operation {OperationId}): {Name} ({Size} bytes).", + operationId, TruncateName(entry.FullName), entry.Length); + return Task.CompletedTask; + } + + // Cap names before they hit logs or hub events — zip permits arbitrarily long names. + private static string TruncateName(string name) => + name.Length > 200 ? name[..200] + "…" : name; } diff --git a/src/Ignis.Api/appsettings.json b/src/Ignis.Api/appsettings.json index f203dcd..be69bb2 100644 --- a/src/Ignis.Api/appsettings.json +++ b/src/Ignis.Api/appsettings.json @@ -10,6 +10,9 @@ "WriteTo": [{ "Name": "Console" }] }, "AllowedHosts": "localhost", + "ImportSettings": { + "MaxUploadSizeBytes": 52428800 + }, "StoreSettings": { "ConnectionString": "mongodb://localhost:27017/ignis" }, diff --git a/tests/Ignis.Api.Tests/ImportControllerTests.cs b/tests/Ignis.Api.Tests/ImportControllerTests.cs index 96d359d..077fb6c 100644 --- a/tests/Ignis.Api.Tests/ImportControllerTests.cs +++ b/tests/Ignis.Api.Tests/ImportControllerTests.cs @@ -4,16 +4,28 @@ * SPDX-License-Identifier: BSD-3-Clause */ +using System.Collections.Concurrent; +using System.IO.Compression; using System.Net; using System.Net.Http.Headers; +using System.Threading.Channels; using FluentAssertions; +using Hl7.Fhir.Model; +using Hl7.Fhir.Serialization; + using Ignis.Api.Configuration; +using Ignis.Api.Hubs; +using Ignis.Api.Services.Operations; using Ignis.Auth.Authorization; +using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.DependencyInjection; +// Avoid clash with Hl7.Fhir.Model.Task +using Task = System.Threading.Tasks.Task; + namespace Ignis.Api.Tests; [Collection("IntegrationTests")] @@ -28,6 +40,21 @@ public ImportControllerTests(IntegrationFixture fixture) private static CancellationToken CT => TestContext.Current.CancellationToken; + private static async Task<(Guid Id, string Message)> WaitForEventAsync( + ChannelReader<(Guid Id, string Message)> reader, + Guid expectedId, + TimeSpan timeout, + CancellationToken ct) + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + cts.CancelAfter(timeout); + while (true) + { + var ev = await reader.ReadAsync(cts.Token); + if (ev.Id == expectedId) return ev; + } + } + private static MultipartFormDataContent BuildArchiveContent() { var content = new MultipartFormDataContent(); @@ -38,6 +65,21 @@ private static MultipartFormDataContent BuildArchiveContent() return content; } + private static MultipartFormDataContent BuildValidZipContent(int entryCount) + { + using var ms = new MemoryStream(); + using (var zip = new ZipArchive(ms, ZipArchiveMode.Create, leaveOpen: true)) + { + for (var i = 0; i < entryCount; i++) + zip.CreateEntry($"entry-{i}.json"); + } + var content = new MultipartFormDataContent(); + var fileContent = new ByteArrayContent(ms.ToArray()); + fileContent.Headers.ContentType = new MediaTypeHeaderValue("application/zip"); + content.Add(fileContent, "file", "archive.zip"); + return content; + } + [Fact] public async Task ArchiveImport_WithoutAuth_ReturnsUnauthorized() { @@ -93,4 +135,81 @@ public async Task ArchiveImport_WithFeatureDisabled_ReturnsServiceUnavailable() response.StatusCode.Should().Be(HttpStatusCode.ServiceUnavailable); } + + [Fact] + public async Task ArchiveImport_WithValidZip_PublishesCountSummary() + { + var token = await _fixture.GetClientCredentialsTokenAsync( + CT, $"{OperationsScopes.Import} {OperationsScopes.Read}"); + + await using var hub = _fixture.BuildHubConnection("/hubs/operations", token); + + // ConcurrentQueue + Channel — SignalR callbacks run on thread-pool threads. + var progressEvents = new ConcurrentQueue<(Guid Id, string Message)>(); + var completedEvents = Channel.CreateUnbounded<(Guid Id, string Message)>(); + hub.On( + OperationProgressHubMethods.Progress, + (id, msg, _) => progressEvents.Enqueue((id, msg))); + hub.On( + OperationProgressHubMethods.Completed, + (id, msg) => completedEvents.Writer.TryWrite((id, msg))); + + await hub.StartAsync(CT); + + using var client = _fixture.Factory.CreateClient(); + client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token); + + using var content = BuildValidZipContent(entryCount: 3); + var response = await client.PostAsync("/fhir/$archive-import", content, CT); + + response.StatusCode.Should().Be(HttpStatusCode.Accepted); + + var outcome = new FhirJsonParser().Parse( + await response.Content.ReadAsStringAsync(CT)); + var operationId = Guid.Parse(outcome.Id); + + var completed = await WaitForEventAsync( + completedEvents.Reader, operationId, TimeSpan.FromSeconds(5), CT); + completed.Message.Should().Contain("3"); + + var ourProgress = progressEvents + .Where(e => e.Id == operationId) + .Select(e => e.Message) + .ToList(); + ourProgress.Should().Contain(m => m.Contains('3')); + ourProgress.Should().Contain(m => m.Contains("entry-0.json")); + ourProgress.Should().Contain(m => m.Contains("entry-2.json")); + } + + [Fact] + public async Task ArchiveImport_WithInvalidZipBytes_PublishesError() + { + var token = await _fixture.GetClientCredentialsTokenAsync( + CT, $"{OperationsScopes.Import} {OperationsScopes.Read}"); + + await using var hub = _fixture.BuildHubConnection("/hubs/operations", token); + + var errorEvents = Channel.CreateUnbounded<(Guid Id, string Message)>(); + hub.On( + OperationProgressHubMethods.Error, + (id, msg) => errorEvents.Writer.TryWrite((id, msg))); + + await hub.StartAsync(CT); + + using var client = _fixture.Factory.CreateClient(); + client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token); + + using var content = BuildArchiveContent(); + var response = await client.PostAsync("/fhir/$archive-import", content, CT); + + response.StatusCode.Should().Be(HttpStatusCode.Accepted); + + var outcome = new FhirJsonParser().Parse( + await response.Content.ReadAsStringAsync(CT)); + var operationId = Guid.Parse(outcome.Id); + + var error = await WaitForEventAsync( + errorEvents.Reader, operationId, TimeSpan.FromSeconds(5), CT); + error.Message.Should().Contain("zip"); + } }