From 060b9c4f624916b66dbf156fc4c6d1c7c86e0db9 Mon Sep 17 00:00:00 2001 From: eanzhao Date: Mon, 27 Apr 2026 13:08:14 +0800 Subject: [PATCH 1/4] Add durable service-run registry for refresh-safe Studio Observe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves issue #429. Introduces an implementation-agnostic, durable service-run registry that all three invoke paths (Workflow / Static / Scripting) write to before returning the accepted receipt. /api/scopes/{scopeId}/services/{serviceId}/runs no longer depends on the workflow-specific IWorkflowRunBindingReader, and /runs/{runId} and /audit resolve via the unified registry first then fan out to implementation-specific detail readers (workflow audit only when the run is workflow-kind). The registry is owned by ServiceRunGAgent (one actor per run, business-named per "actor 即业务实体"), emits ServiceRunRegisteredEvent + ServiceRunStatusUpdatedEvent, and materializes ServiceRunCurrentStateReadModel via a current-state projector. The dispatcher calls IServiceRunRegistrationPort before invoking the target so the durable record is committed prior to receipt return. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Aevatar.GAgentService.Abstractions.csproj | 1 + .../IServiceRunCurrentStateProjectionPort.cs | 10 + .../Ports/IServiceRunQueryPort.cs | 26 +++ .../Ports/IServiceRunRegistrationPort.cs | 23 ++ .../Protos/service_runs.proto | 60 +++++ .../Queries/ServiceRunSnapshot.cs | 28 +++ .../GAgents/ServiceRunGAgent.cs | 121 +++++++++++ .../ServiceCollectionExtensions.cs | 4 + .../Endpoints/ScopeServiceEndpoints.cs | 192 +++++++++++----- .../Adapters/ServiceRunRegistrationAdapter.cs | 102 +++++++++ .../DefaultServiceInvocationDispatcher.cs | 54 ++++- ...ServiceRunCurrentStateProjectionContext.cs | 9 + .../ServiceCollectionExtensions.cs | 13 ++ ...unCurrentStateReadModelMetadataProvider.cs | 13 ++ .../Orchestration/ServiceProjectionNames.cs | 1 + .../ServiceRunCurrentStateProjectionPort.cs | 21 ++ .../ServiceRunCurrentStateProjector.cs | 73 +++++++ .../Queries/ServiceRunQueryReader.cs | 205 ++++++++++++++++++ .../ServiceProjectionReadModels.Partial.cs | 15 ++ .../service_projection_read_models.proto | 28 +++ .../Core/ServiceRunGAgentTests.cs | 165 ++++++++++++++ ...DefaultServiceInvocationDispatcherTests.cs | 151 +++++++++++-- .../ServiceRunCurrentStateProjectorTests.cs | 204 +++++++++++++++++ 23 files changed, 1444 insertions(+), 75 deletions(-) create mode 100644 src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunCurrentStateProjectionPort.cs create mode 100644 src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunQueryPort.cs create mode 100644 src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunRegistrationPort.cs create mode 100644 src/platform/Aevatar.GAgentService.Abstractions/Protos/service_runs.proto create mode 100644 src/platform/Aevatar.GAgentService.Abstractions/Queries/ServiceRunSnapshot.cs create mode 100644 src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs create mode 100644 src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs create mode 100644 src/platform/Aevatar.GAgentService.Projection/Contexts/ServiceRunCurrentStateProjectionContext.cs create mode 100644 src/platform/Aevatar.GAgentService.Projection/Metadata/ServiceRunCurrentStateReadModelMetadataProvider.cs create mode 100644 src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceRunCurrentStateProjectionPort.cs create mode 100644 src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs create mode 100644 src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs create mode 100644 test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs create mode 100644 test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj b/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj index 6bce22805..32973954e 100644 --- a/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj +++ b/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj @@ -26,5 +26,6 @@ + diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunCurrentStateProjectionPort.cs b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunCurrentStateProjectionPort.cs new file mode 100644 index 000000000..d0f0fe77c --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunCurrentStateProjectionPort.cs @@ -0,0 +1,10 @@ +namespace Aevatar.GAgentService.Abstractions.Ports; + +/// +/// Activation port for the durable service-run current-state projection. +/// Mirrors shape but scoped to service-run actors. +/// +public interface IServiceRunCurrentStateProjectionPort +{ + Task EnsureProjectionAsync(string actorId, CancellationToken ct = default); +} diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunQueryPort.cs b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunQueryPort.cs new file mode 100644 index 000000000..fa0122a14 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunQueryPort.cs @@ -0,0 +1,26 @@ +using Aevatar.GAgentService.Abstractions.Queries; + +namespace Aevatar.GAgentService.Abstractions.Ports; + +/// +/// Read contract for the implementation-agnostic service-run registry. +/// Backed by the durable ServiceRunGAgent projection. +/// +public interface IServiceRunQueryPort +{ + Task> ListAsync( + ServiceRunQuery query, + CancellationToken ct = default); + + Task GetByRunIdAsync( + string scopeId, + string serviceId, + string runId, + CancellationToken ct = default); + + Task GetByCommandIdAsync( + string scopeId, + string serviceId, + string commandId, + CancellationToken ct = default); +} diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunRegistrationPort.cs b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunRegistrationPort.cs new file mode 100644 index 000000000..aded0ff6b --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunRegistrationPort.cs @@ -0,0 +1,23 @@ +namespace Aevatar.GAgentService.Abstractions.Ports; + +/// +/// Write contract for the implementation-agnostic service-run registry. +/// Used by the invocation dispatcher to register a run before returning the accepted receipt, +/// so Studio Observe can query the run from the durable readmodel even on immediate refresh. +/// +public interface IServiceRunRegistrationPort +{ + Task RegisterAsync( + ServiceRunRecord record, + CancellationToken ct = default); + + Task UpdateStatusAsync( + string runActorId, + string runId, + ServiceRunStatus status, + CancellationToken ct = default); +} + +public sealed record ServiceRunRegistrationResult( + string RunActorId, + string RunId); diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Protos/service_runs.proto b/src/platform/Aevatar.GAgentService.Abstractions/Protos/service_runs.proto new file mode 100644 index 000000000..db770367d --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/Protos/service_runs.proto @@ -0,0 +1,60 @@ +syntax = "proto3"; + +package aevatar.gagentservice; + +option csharp_namespace = "Aevatar.GAgentService.Abstractions"; + +import "google/protobuf/timestamp.proto"; +import "service_endpoint.proto"; +import "service_revision.proto"; + +enum ServiceRunStatus { + SERVICE_RUN_STATUS_UNSPECIFIED = 0; + SERVICE_RUN_STATUS_ACCEPTED = 1; + SERVICE_RUN_STATUS_COMPLETED = 2; + SERVICE_RUN_STATUS_FAILED = 3; + SERVICE_RUN_STATUS_STOPPED = 4; +} + +message ServiceRunRecord { + string scope_id = 1; + string service_id = 2; + string service_key = 3; + string run_id = 4; + string command_id = 5; + string correlation_id = 6; + string endpoint_id = 7; + ServiceImplementationKind implementation_kind = 8; + string target_actor_id = 9; + string revision_id = 10; + string deployment_id = 11; + ServiceRunStatus status = 12; + google.protobuf.Timestamp created_at = 13; + google.protobuf.Timestamp updated_at = 14; + ServiceIdentity identity = 15; +} + +message ServiceRunState { + ServiceRunRecord record = 1; + int64 last_applied_event_version = 2; + string last_event_id = 3; +} + +message RegisterServiceRunRequested { + ServiceRunRecord record = 1; +} + +message UpdateServiceRunStatusRequested { + string run_id = 1; + ServiceRunStatus status = 2; +} + +message ServiceRunRegisteredEvent { + ServiceRunRecord record = 1; +} + +message ServiceRunStatusUpdatedEvent { + string run_id = 1; + ServiceRunStatus status = 2; + google.protobuf.Timestamp updated_at = 3; +} diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Queries/ServiceRunSnapshot.cs b/src/platform/Aevatar.GAgentService.Abstractions/Queries/ServiceRunSnapshot.cs new file mode 100644 index 000000000..7a66f315b --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/Queries/ServiceRunSnapshot.cs @@ -0,0 +1,28 @@ +namespace Aevatar.GAgentService.Abstractions.Queries; + +public sealed record ServiceRunSnapshot( + string ScopeId, + string ServiceId, + string ServiceKey, + string RunId, + string CommandId, + string CorrelationId, + string EndpointId, + ServiceImplementationKind ImplementationKind, + string TargetActorId, + string RevisionId, + string DeploymentId, + ServiceRunStatus Status, + string ActorId, + string TenantId, + string AppId, + string Namespace, + long StateVersion, + string LastEventId, + DateTimeOffset CreatedAt, + DateTimeOffset UpdatedAt); + +public sealed record ServiceRunQuery( + string ScopeId, + string ServiceId, + int Take = 50); diff --git a/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs b/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs new file mode 100644 index 000000000..33b7bee27 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs @@ -0,0 +1,121 @@ +using Aevatar.Foundation.Abstractions.Attributes; +using Aevatar.Foundation.Core; +using Aevatar.Foundation.Core.EventSourcing; +using Aevatar.GAgentService.Abstractions; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.GAgentService.Core.GAgents; + +public sealed class ServiceRunGAgent : GAgentBase +{ + public ServiceRunGAgent() + { + InitializeId(); + } + + [EventHandler] + public async Task HandleRegisterAsync(RegisterServiceRunRequested command) + { + ArgumentNullException.ThrowIfNull(command); + ArgumentNullException.ThrowIfNull(command.Record); + ValidateRecord(command.Record); + + var existing = State.Record; + if (existing != null && !string.IsNullOrWhiteSpace(existing.RunId)) + { + if (!string.Equals(existing.RunId, command.Record.RunId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' is bound to run '{existing.RunId}' and cannot register run '{command.Record.RunId}'."); + } + + return; + } + + var record = command.Record.Clone(); + if (record.CreatedAt == null) + record.CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow); + record.UpdatedAt = record.CreatedAt; + if (record.Status == ServiceRunStatus.Unspecified) + record.Status = ServiceRunStatus.Accepted; + + await PersistDomainEventAsync(new ServiceRunRegisteredEvent + { + Record = record, + }); + } + + [EventHandler] + public async Task HandleUpdateStatusAsync(UpdateServiceRunStatusRequested command) + { + ArgumentNullException.ThrowIfNull(command); + var existing = State.Record; + if (existing == null || string.IsNullOrWhiteSpace(existing.RunId)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' has no registered run; status update rejected."); + } + + if (!string.IsNullOrWhiteSpace(command.RunId) && + !string.Equals(existing.RunId, command.RunId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' is bound to run '{existing.RunId}' and cannot update run '{command.RunId}'."); + } + + if (command.Status == ServiceRunStatus.Unspecified) + return; + + if (existing.Status == command.Status) + return; + + await PersistDomainEventAsync(new ServiceRunStatusUpdatedEvent + { + RunId = existing.RunId, + Status = command.Status, + UpdatedAt = Timestamp.FromDateTime(DateTime.UtcNow), + }); + } + + protected override ServiceRunState TransitionState(ServiceRunState current, IMessage evt) => + StateTransitionMatcher + .Match(current, evt) + .On(ApplyRegistered) + .On(ApplyStatusUpdated) + .OrCurrent(); + + private static ServiceRunState ApplyRegistered(ServiceRunState state, ServiceRunRegisteredEvent evt) + { + var next = state.Clone(); + next.Record = evt.Record?.Clone() ?? new ServiceRunRecord(); + next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1; + next.LastEventId = $"{next.Record.RunId}:registered"; + return next; + } + + private static ServiceRunState ApplyStatusUpdated(ServiceRunState state, ServiceRunStatusUpdatedEvent evt) + { + var next = state.Clone(); + if (next.Record == null) + next.Record = new ServiceRunRecord(); + next.Record.Status = evt.Status; + next.Record.UpdatedAt = evt.UpdatedAt ?? Timestamp.FromDateTime(DateTime.UtcNow); + next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1; + next.LastEventId = $"{next.Record.RunId}:status:{(int)evt.Status}"; + return next; + } + + private static void ValidateRecord(ServiceRunRecord record) + { + ArgumentNullException.ThrowIfNull(record); + if (string.IsNullOrWhiteSpace(record.RunId)) + throw new InvalidOperationException("run_id is required."); + if (string.IsNullOrWhiteSpace(record.ScopeId)) + throw new InvalidOperationException("scope_id is required."); + if (string.IsNullOrWhiteSpace(record.ServiceId)) + throw new InvalidOperationException("service_id is required."); + if (string.IsNullOrWhiteSpace(record.CommandId)) + throw new InvalidOperationException("command_id is required."); + } +} diff --git a/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs b/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs index 7bead3aa5..fbca4be28 100644 --- a/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs @@ -58,6 +58,7 @@ public static IServiceCollection AddGAgentServiceCapability( services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddEnumerable(ServiceDescriptor.Singleton()); services.TryAddEnumerable(ServiceDescriptor.Singleton()); @@ -117,6 +118,7 @@ public static IServiceCollection AddGAgentServiceProjectionReadModelProviders( TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); + TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); } else @@ -128,6 +130,7 @@ public static IServiceCollection AddGAgentServiceProjectionReadModelProviders( TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); + TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); } @@ -145,6 +148,7 @@ private static bool HasAllGAgentServiceProjectionReaders( && HasProjectionDocumentReaderForProvider(services, providerKind) && HasProjectionDocumentReaderForProvider(services, providerKind) && HasProjectionDocumentReaderForProvider(services, providerKind) + && HasProjectionDocumentReaderForProvider(services, providerKind) && HasProjectionDocumentReaderForProvider(services, providerKind); } diff --git a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs index 3cf576bf3..98d156652 100644 --- a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs +++ b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs @@ -22,6 +22,7 @@ using Aevatar.Scripting.Abstractions.Queries; using Aevatar.Scripting.Core.Ports; using Aevatar.Workflow.Application.Abstractions.Queries; +using Aevatar.GAgentService.Abstractions.Queries; using Aevatar.GAgentService.Hosting.Serialization; using Aevatar.Presentation.AGUI; using Aevatar.Workflow.Application.Abstractions.Runs; @@ -613,7 +614,7 @@ private static Task HandleListDefaultRunsAsync( string scopeId, int take, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) => @@ -623,7 +624,7 @@ private static Task HandleListDefaultRunsAsync( ResolveDefaultScopeServiceId(options.Value), take, lifecycleQueryPort, - workflowRunBindingReader, + serviceRunQueryPort, workflowExecutionQueryService, options, ct); @@ -634,7 +635,7 @@ private static Task HandleGetDefaultRunAsync( string runId, string? actorId, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) => @@ -645,7 +646,7 @@ private static Task HandleGetDefaultRunAsync( runId, actorId, lifecycleQueryPort, - workflowRunBindingReader, + serviceRunQueryPort, workflowExecutionQueryService, options, ct); @@ -656,7 +657,7 @@ private static Task HandleGetDefaultRunAuditAsync( string runId, string? actorId, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) => @@ -667,7 +668,7 @@ private static Task HandleGetDefaultRunAuditAsync( runId, actorId, lifecycleQueryPort, - workflowRunBindingReader, + serviceRunQueryPort, workflowExecutionQueryService, options, ct); @@ -744,7 +745,7 @@ private static async Task HandleListRunsAsync( string serviceId, int take, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) @@ -753,23 +754,17 @@ private static async Task HandleListRunsAsync( if (resolution.Failure != null) return resolution.Failure; - var bindings = await ListScopeServiceRunsAsync( - scopeId, - resolution.Service!, - resolution.Deployments, - workflowRunBindingReader, - take, + var snapshots = await serviceRunQueryPort.ListAsync( + new ServiceRunQuery(scopeId, serviceId, Math.Clamp(take <= 0 ? 50 : take, 1, 200)), ct); - var summaries = new List(bindings.Count); - foreach (var binding in bindings) + var summaries = new List(snapshots.Count); + foreach (var snapshot in snapshots) { - summaries.Add(await BuildScopeRunSummaryAsync( + summaries.Add(await BuildScopeRunSummaryFromRegistryAsync( scopeId, serviceId, - binding, - resolution.Service!, - resolution.Deployments, + snapshot, workflowExecutionQueryService, ct)); } @@ -789,30 +784,29 @@ private static async Task HandleGetRunAsync( string runId, string? actorId, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) { - var resolution = await ResolveScopeServiceRunAsync( - http, - options.Value, - scopeId, - serviceId, - runId, - actorId, - lifecycleQueryPort, - workflowRunBindingReader, - ct); - if (resolution.Failure != null) - return resolution.Failure; + var serviceResolution = await ResolveScopeServiceAsync(http, scopeId, serviceId, lifecycleQueryPort, options.Value, ct); + if (serviceResolution.Failure != null) + return serviceResolution.Failure; - return Results.Ok(await BuildScopeRunSummaryAsync( + var snapshot = await ResolveServiceRunSnapshotAsync(scopeId, serviceId, runId, serviceRunQueryPort, ct); + if (snapshot == null) + { + return Results.NotFound(new + { + code = "SERVICE_RUN_NOT_FOUND", + message = BuildScopeServiceRunNotFoundMessage(scopeId, serviceId, runId?.Trim() ?? string.Empty), + }); + } + + return Results.Ok(await BuildScopeRunSummaryFromRegistryAsync( scopeId, serviceId, - resolution.Binding!, - resolution.Service!, - resolution.Deployments, + snapshot, workflowExecutionQueryService, ct)); } @@ -824,45 +818,73 @@ private static async Task HandleGetRunAuditAsync( string runId, string? actorId, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) { - var resolution = await ResolveScopeServiceRunAsync( - http, - options.Value, - scopeId, - serviceId, - runId, - actorId, - lifecycleQueryPort, - workflowRunBindingReader, - ct); - if (resolution.Failure != null) - return resolution.Failure; + var serviceResolution = await ResolveScopeServiceAsync(http, scopeId, serviceId, lifecycleQueryPort, options.Value, ct); + if (serviceResolution.Failure != null) + return serviceResolution.Failure; - var summary = await BuildScopeRunSummaryAsync( + var snapshot = await ResolveServiceRunSnapshotAsync(scopeId, serviceId, runId, serviceRunQueryPort, ct); + if (snapshot == null) + { + return Results.NotFound(new + { + code = "SERVICE_RUN_NOT_FOUND", + message = BuildScopeServiceRunNotFoundMessage(scopeId, serviceId, runId?.Trim() ?? string.Empty), + }); + } + + var summary = await BuildScopeRunSummaryFromRegistryAsync( scopeId, serviceId, - resolution.Binding!, - resolution.Service!, - resolution.Deployments, + snapshot, workflowExecutionQueryService, ct); - var report = await workflowExecutionQueryService.GetActorReportAsync(resolution.Binding!.ActorId, ct); + + if (snapshot.ImplementationKind != ServiceImplementationKind.Workflow || + string.IsNullOrWhiteSpace(snapshot.TargetActorId)) + { + return Results.NotFound(new + { + code = "SERVICE_RUN_AUDIT_NOT_AVAILABLE", + message = $"Audit detail for run '{snapshot.RunId}' is not available for {snapshot.ImplementationKind} services.", + }); + } + + var report = await workflowExecutionQueryService.GetActorReportAsync(snapshot.TargetActorId, ct); if (report == null) { return Results.NotFound(new { code = "SERVICE_RUN_AUDIT_NOT_FOUND", - message = $"Audit report for run '{resolution.Binding.RunId}' was not found on service '{serviceId}' in scope '{scopeId}'.", + message = $"Audit report for run '{snapshot.RunId}' was not found on service '{serviceId}' in scope '{scopeId}'.", }); } return Results.Ok(new ScopeServiceRunAuditHttpResponse(summary, report)); } + private static async Task ResolveServiceRunSnapshotAsync( + string scopeId, + string serviceId, + string runId, + IServiceRunQueryPort serviceRunQueryPort, + CancellationToken ct) + { + var normalized = runId?.Trim() ?? string.Empty; + if (string.IsNullOrWhiteSpace(normalized)) + return null; + + var byRun = await serviceRunQueryPort.GetByRunIdAsync(scopeId, serviceId, normalized, ct); + if (byRun != null) + return byRun; + + return await serviceRunQueryPort.GetByCommandIdAsync(scopeId, serviceId, normalized, ct); + } + private static async Task HandleInvokeStreamAsync( HttpContext http, string scopeId, @@ -2025,7 +2047,56 @@ private static async Task BuildScopeRunSumma snapshot?.CompletedSteps ?? 0, snapshot?.RoleReplyCount ?? 0, snapshot?.LastOutput ?? string.Empty, - snapshot?.LastError ?? string.Empty); + snapshot?.LastError ?? string.Empty, + ServiceImplementationKind.Workflow.ToString(), + ServiceRunStatus.Accepted.ToString(), + string.Empty, + string.Empty, + string.Empty, + binding.ActorId, + binding.CreatedAt); + } + + private static async Task BuildScopeRunSummaryFromRegistryAsync( + string scopeId, + string serviceId, + ServiceRunSnapshot snapshot, + IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, + CancellationToken ct) + { + var workflowSnapshot = snapshot.ImplementationKind == ServiceImplementationKind.Workflow && + !string.IsNullOrWhiteSpace(snapshot.TargetActorId) + ? await workflowExecutionQueryService.GetActorSnapshotAsync(snapshot.TargetActorId, ct) + : null; + + return new ScopeServiceRunSummaryHttpResponse( + scopeId, + serviceId, + snapshot.RunId, + snapshot.ActorId, + string.Empty, + snapshot.RevisionId, + snapshot.DeploymentId, + workflowSnapshot?.WorkflowName ?? string.Empty, + workflowSnapshot?.CompletionStatus ?? WorkflowRunCompletionStatus.Unknown, + workflowSnapshot?.StateVersion ?? snapshot.StateVersion, + workflowSnapshot?.LastEventId ?? snapshot.LastEventId, + workflowSnapshot?.LastUpdatedAt ?? snapshot.UpdatedAt, + snapshot.CreatedAt, + snapshot.UpdatedAt, + workflowSnapshot?.LastSuccess, + workflowSnapshot?.TotalSteps ?? 0, + workflowSnapshot?.CompletedSteps ?? 0, + workflowSnapshot?.RoleReplyCount ?? 0, + workflowSnapshot?.LastOutput ?? string.Empty, + workflowSnapshot?.LastError ?? string.Empty, + snapshot.ImplementationKind.ToString(), + snapshot.Status.ToString(), + snapshot.CommandId, + snapshot.CorrelationId, + snapshot.EndpointId, + snapshot.TargetActorId, + snapshot.CreatedAt); } private static ServiceDeploymentSnapshot? ResolveRunDeployment( @@ -2700,7 +2771,14 @@ public sealed record ScopeServiceRunSummaryHttpResponse( int CompletedSteps, int RoleReplyCount, string LastOutput, - string LastError); + string LastError, + string ImplementationKind, + string Status, + string CommandId, + string CorrelationId, + string EndpointId, + string TargetActorId, + DateTimeOffset? CreatedAt = null); public sealed record ScopeServiceRunAuditHttpResponse( ScopeServiceRunSummaryHttpResponse Summary, diff --git a/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs b/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs new file mode 100644 index 000000000..76b0c0229 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs @@ -0,0 +1,102 @@ +using Aevatar.Foundation.Abstractions; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Abstractions.Ports; +using Aevatar.GAgentService.Core.GAgents; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.GAgentService.Infrastructure.Adapters; + +/// +/// Infrastructure adapter that registers and updates service runs by dispatching +/// commands to actors. The actor commits the events +/// and the current-state projection materializes them into the durable readmodel. +/// +public sealed class ServiceRunRegistrationAdapter : IServiceRunRegistrationPort +{ + private const string PublisherId = "gagent-service.runs"; + + private readonly IActorRuntime _runtime; + private readonly IActorDispatchPort _dispatchPort; + private readonly IServiceRunCurrentStateProjectionPort _projectionPort; + + public ServiceRunRegistrationAdapter( + IActorRuntime runtime, + IActorDispatchPort dispatchPort, + IServiceRunCurrentStateProjectionPort projectionPort) + { + _runtime = runtime ?? throw new ArgumentNullException(nameof(runtime)); + _dispatchPort = dispatchPort ?? throw new ArgumentNullException(nameof(dispatchPort)); + _projectionPort = projectionPort ?? throw new ArgumentNullException(nameof(projectionPort)); + } + + public async Task RegisterAsync( + ServiceRunRecord record, + CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(record); + if (string.IsNullOrWhiteSpace(record.RunId)) + throw new InvalidOperationException("run_id is required."); + + var actorId = BuildRunActorId(record.RunId); + var actor = await _runtime.CreateAsync(actorId, ct: ct); + await _projectionPort.EnsureProjectionAsync(actor.Id, ct); + + var prepared = record.Clone(); + if (prepared.CreatedAt == null) + prepared.CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow); + prepared.UpdatedAt = prepared.CreatedAt; + if (prepared.Status == ServiceRunStatus.Unspecified) + prepared.Status = ServiceRunStatus.Accepted; + + var envelope = CreateEnvelope(actor.Id, Any.Pack(new RegisterServiceRunRequested + { + Record = prepared, + }), prepared.CommandId, prepared.CorrelationId); + + await _dispatchPort.DispatchAsync(actor.Id, envelope, ct); + return new ServiceRunRegistrationResult(actor.Id, prepared.RunId); + } + + public async Task UpdateStatusAsync( + string runActorId, + string runId, + ServiceRunStatus status, + CancellationToken ct = default) + { + if (string.IsNullOrWhiteSpace(runActorId)) + throw new ArgumentException("runActorId is required.", nameof(runActorId)); + if (status == ServiceRunStatus.Unspecified) + return; + + var commandId = Guid.NewGuid().ToString("N"); + var envelope = CreateEnvelope( + runActorId, + Any.Pack(new UpdateServiceRunStatusRequested + { + RunId = runId ?? string.Empty, + Status = status, + }), + commandId, + commandId); + await _dispatchPort.DispatchAsync(runActorId, envelope, ct); + } + + private static string BuildRunActorId(string runId) => $"service-run:{runId}"; + + private static EventEnvelope CreateEnvelope( + string actorId, + Any payload, + string commandId, + string correlationId) => + new() + { + Id = string.IsNullOrWhiteSpace(commandId) ? Guid.NewGuid().ToString("N") : commandId, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + Payload = payload, + Route = EnvelopeRouteSemantics.CreateDirect(PublisherId, actorId), + Propagation = new EnvelopePropagation + { + CorrelationId = string.IsNullOrWhiteSpace(correlationId) ? commandId : correlationId, + }, + }; +} diff --git a/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs b/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs index 06f20d4c6..ee5a8bff9 100644 --- a/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs +++ b/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs @@ -14,15 +14,18 @@ public sealed class DefaultServiceInvocationDispatcher : IServiceInvocationDispa private readonly IActorDispatchPort _dispatchPort; private readonly IScriptRuntimeCommandPort _scriptRuntimeCommandPort; private readonly IWorkflowRunActorPort _workflowRunActorPort; + private readonly IServiceRunRegistrationPort _serviceRunRegistrationPort; public DefaultServiceInvocationDispatcher( IActorDispatchPort dispatchPort, IScriptRuntimeCommandPort scriptRuntimeCommandPort, - IWorkflowRunActorPort workflowRunActorPort) + IWorkflowRunActorPort workflowRunActorPort, + IServiceRunRegistrationPort serviceRunRegistrationPort) { _dispatchPort = dispatchPort ?? throw new ArgumentNullException(nameof(dispatchPort)); _scriptRuntimeCommandPort = scriptRuntimeCommandPort ?? throw new ArgumentNullException(nameof(scriptRuntimeCommandPort)); _workflowRunActorPort = workflowRunActorPort ?? throw new ArgumentNullException(nameof(workflowRunActorPort)); + _serviceRunRegistrationPort = serviceRunRegistrationPort ?? throw new ArgumentNullException(nameof(serviceRunRegistrationPort)); } public async Task DispatchAsync( @@ -49,9 +52,12 @@ private async Task DispatchStaticAsync( CancellationToken ct) { var commandId = ResolveCommandId(request); - var envelope = CreateEnvelope(target.Service.PrimaryActorId, request.Payload, commandId, ResolveCorrelationId(request, commandId)); + var correlationId = ResolveCorrelationId(request, commandId); + var runId = ResolveRunId(request, commandId); + await RegisterRunAsync(target, request, runId, commandId, correlationId, target.Service.PrimaryActorId, ServiceImplementationKind.Static, ct); + var envelope = CreateEnvelope(target.Service.PrimaryActorId, request.Payload, commandId, correlationId); await _dispatchPort.DispatchAsync(target.Service.PrimaryActorId, envelope, ct); - return CreateReceipt(target, target.Service.PrimaryActorId, commandId, ResolveCorrelationId(request, commandId)); + return CreateReceipt(target, target.Service.PrimaryActorId, commandId, correlationId); } private async Task DispatchScriptingAsync( @@ -61,6 +67,9 @@ private async Task DispatchScriptingAsync( { var plan = target.Artifact.DeploymentPlan.ScriptingPlan; var commandId = ResolveCommandId(request); + var correlationId = ResolveCorrelationId(request, commandId); + var runId = ResolveRunId(request, commandId); + await RegisterRunAsync(target, request, runId, commandId, correlationId, target.Service.PrimaryActorId, ServiceImplementationKind.Scripting, ct); await _scriptRuntimeCommandPort.RunRuntimeAsync( target.Service.PrimaryActorId, runId: commandId, @@ -70,7 +79,7 @@ await _scriptRuntimeCommandPort.RunRuntimeAsync( request.Payload?.TypeUrl ?? string.Empty, request.Identity?.TenantId, ct); - return CreateReceipt(target, target.Service.PrimaryActorId, commandId, ResolveCorrelationId(request, commandId)); + return CreateReceipt(target, target.Service.PrimaryActorId, commandId, correlationId); } private async Task DispatchWorkflowAsync( @@ -91,11 +100,42 @@ private async Task DispatchWorkflowAsync( ct); var commandId = ResolveCommandId(request); var correlationId = ResolveCorrelationId(request, commandId); + var runId = ResolveRunId(request, commandId); + await RegisterRunAsync(target, request, runId, commandId, correlationId, run.Actor.Id, ServiceImplementationKind.Workflow, ct); var envelope = CreateEnvelope(run.Actor.Id, Any.Pack(chatRequest), commandId, correlationId); await _dispatchPort.DispatchAsync(run.Actor.Id, envelope, ct); return CreateReceipt(target, run.Actor.Id, commandId, correlationId); } + private async Task RegisterRunAsync( + ServiceInvocationResolvedTarget target, + ServiceInvocationRequest request, + string runId, + string commandId, + string correlationId, + string targetActorId, + ServiceImplementationKind implementationKind, + CancellationToken ct) + { + var record = new ServiceRunRecord + { + ScopeId = request.Identity?.TenantId ?? string.Empty, + ServiceId = request.Identity?.ServiceId ?? string.Empty, + ServiceKey = target.Service.ServiceKey ?? string.Empty, + RunId = runId, + CommandId = commandId, + CorrelationId = correlationId, + EndpointId = target.Endpoint.EndpointId ?? string.Empty, + ImplementationKind = implementationKind, + TargetActorId = targetActorId ?? string.Empty, + RevisionId = target.Service.RevisionId ?? string.Empty, + DeploymentId = target.Service.DeploymentId ?? string.Empty, + Status = ServiceRunStatus.Accepted, + Identity = request.Identity?.Clone(), + }; + await _serviceRunRegistrationPort.RegisterAsync(record, ct); + } + private static void EnsureEndpointPayloadMatch(ServiceEndpointDescriptor endpoint, ServiceInvocationRequest request) { if (request.Payload == null) @@ -155,9 +195,13 @@ private static string ResolveCorrelationId(ServiceInvocationRequest request, str ? commandId : request.CorrelationId; + private static string ResolveRunId(ServiceInvocationRequest request, string commandId) => + string.IsNullOrWhiteSpace(request.CommandId) + ? commandId + : request.CommandId; + private static string ResolveAuthoritativeScopeId(ServiceInvocationRequest request, ChatRequestEvent chatRequest) { - // Path-level scope (Identity.TenantId) is authoritative; payload cannot override it. if (!string.IsNullOrWhiteSpace(request.Identity?.TenantId)) return request.Identity.TenantId.Trim(); return ResolveScopeId(chatRequest); diff --git a/src/platform/Aevatar.GAgentService.Projection/Contexts/ServiceRunCurrentStateProjectionContext.cs b/src/platform/Aevatar.GAgentService.Projection/Contexts/ServiceRunCurrentStateProjectionContext.cs new file mode 100644 index 000000000..c895ede6a --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Contexts/ServiceRunCurrentStateProjectionContext.cs @@ -0,0 +1,9 @@ +namespace Aevatar.GAgentService.Projection.Contexts; + +public sealed class ServiceRunCurrentStateProjectionContext + : IProjectionMaterializationContext +{ + public required string RootActorId { get; init; } + + public required string ProjectionKind { get; init; } +} diff --git a/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs b/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs index 869454b91..5b557a59c 100644 --- a/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs @@ -73,6 +73,13 @@ public static IServiceCollection AddGAgentServiceProjection( ProjectionKind = scopeKey.ProjectionKind, }, static context => new ServiceProjectionRuntimeLease(context.RootActorId, context)); + services.AddServiceProjectionRuntime>( + static scopeKey => new ServiceRunCurrentStateProjectionContext + { + RootActorId = scopeKey.RootActorId, + ProjectionKind = scopeKey.ProjectionKind, + }, + static context => new ServiceProjectionRuntimeLease(context.RootActorId, context)); services.AddEventSinkProjectionRuntimeCore< GAgentDraftRunProjectionContext, GAgentDraftRunRuntimeLease, @@ -92,6 +99,7 @@ public static IServiceCollection AddGAgentServiceProjection( services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton, GAgentDraftRunSessionEventCodec>(); services.TryAddSingleton, ProjectionSessionEventHub>(); services.TryAddSingleton(); @@ -102,6 +110,7 @@ public static IServiceCollection AddGAgentServiceProjection( services.TryAddSingleton, ServiceRolloutCommandObservationReadModelMetadataProvider>(); services.TryAddSingleton, ServiceTrafficViewReadModelMetadataProvider>(); services.TryAddSingleton, ServiceRevisionCatalogReadModelMetadataProvider>(); + services.TryAddSingleton, ServiceRunCurrentStateReadModelMetadataProvider>(); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); @@ -109,6 +118,7 @@ public static IServiceCollection AddGAgentServiceProjection( services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.AddProjectionArtifactMaterializer< ServiceCatalogProjectionContext, ServiceCatalogProjector>(); @@ -130,6 +140,9 @@ public static IServiceCollection AddGAgentServiceProjection( services.AddProjectionArtifactMaterializer< ServiceRevisionCatalogProjectionContext, ServiceRevisionCatalogProjector>(); + services.AddCurrentStateProjectionMaterializer< + ServiceRunCurrentStateProjectionContext, + ServiceRunCurrentStateProjector>(); services.TryAddEnumerable(ServiceDescriptor.Singleton< IProjectionProjector, GAgentDraftRunSessionEventProjector>()); diff --git a/src/platform/Aevatar.GAgentService.Projection/Metadata/ServiceRunCurrentStateReadModelMetadataProvider.cs b/src/platform/Aevatar.GAgentService.Projection/Metadata/ServiceRunCurrentStateReadModelMetadataProvider.cs new file mode 100644 index 000000000..2b0bf8b27 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Metadata/ServiceRunCurrentStateReadModelMetadataProvider.cs @@ -0,0 +1,13 @@ +using Aevatar.CQRS.Projection.Stores.Abstractions; +using Aevatar.GAgentService.Projection.ReadModels; + +namespace Aevatar.GAgentService.Projection.Metadata; + +public sealed class ServiceRunCurrentStateReadModelMetadataProvider : IProjectionDocumentMetadataProvider +{ + public DocumentIndexMetadata Metadata { get; } = new( + "gagent-service-runs", + Mappings: new Dictionary(), + Settings: new Dictionary(), + Aliases: new Dictionary()); +} diff --git a/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs index 9ac1f658a..752c2d8ad 100644 --- a/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs +++ b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs @@ -9,4 +9,5 @@ internal static class ServiceProjectionKinds public const string Rollouts = "service-rollouts"; public const string Traffic = "service-traffic"; public const string DraftRunSession = "service-draft-run-session"; + public const string Runs = "service-runs"; } diff --git a/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceRunCurrentStateProjectionPort.cs b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceRunCurrentStateProjectionPort.cs new file mode 100644 index 000000000..164b5c929 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceRunCurrentStateProjectionPort.cs @@ -0,0 +1,21 @@ +using Aevatar.GAgentService.Abstractions.Ports; +using Aevatar.GAgentService.Projection.Configuration; +using Aevatar.GAgentService.Projection.Contexts; + +namespace Aevatar.GAgentService.Projection.Orchestration; + +public sealed class ServiceRunCurrentStateProjectionPort + : ServiceProjectionPortBase, + IServiceRunCurrentStateProjectionPort +{ + public ServiceRunCurrentStateProjectionPort( + ServiceProjectionOptions options, + IProjectionScopeActivationService> activationService, + IProjectionScopeReleaseService> releaseService) + : base(options, activationService, releaseService, ServiceProjectionKinds.Runs) + { + } + + public Task EnsureProjectionAsync(string actorId, CancellationToken ct = default) => + EnsureProjectionCoreAsync(actorId, ct); +} diff --git a/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs b/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs new file mode 100644 index 000000000..d1198a65f --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs @@ -0,0 +1,73 @@ +using Aevatar.CQRS.Projection.Core.Orchestration; +using Aevatar.CQRS.Projection.Runtime.Abstractions; +using Aevatar.CQRS.Projection.Stores.Abstractions; +using Aevatar.Foundation.Abstractions; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Projection.Contexts; +using Aevatar.GAgentService.Projection.ReadModels; + +namespace Aevatar.GAgentService.Projection.Projectors; + +public sealed class ServiceRunCurrentStateProjector + : ICurrentStateProjectionMaterializer +{ + private readonly IProjectionWriteDispatcher _writeDispatcher; + private readonly IProjectionClock _clock; + + public ServiceRunCurrentStateProjector( + IProjectionWriteDispatcher writeDispatcher, + IProjectionClock clock) + { + _writeDispatcher = writeDispatcher ?? throw new ArgumentNullException(nameof(writeDispatcher)); + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); + } + + public async ValueTask ProjectAsync( + ServiceRunCurrentStateProjectionContext context, + EventEnvelope envelope, + CancellationToken ct = default) + { + if (!CommittedStateEventEnvelope.TryUnpackState( + envelope, + out _, + out var stateEvent, + out var state) || + stateEvent == null || + state?.Record == null) + { + return; + } + + var record = state.Record; + if (string.IsNullOrWhiteSpace(record.RunId)) + return; + + var observedAt = CommittedStateEventEnvelope.ResolveTimestamp(envelope, _clock.UtcNow); + var document = new ServiceRunCurrentStateReadModel + { + Id = record.RunId, + ActorId = context.RootActorId, + ScopeId = record.ScopeId ?? string.Empty, + ServiceId = record.ServiceId ?? string.Empty, + ServiceKey = record.ServiceKey ?? string.Empty, + RunId = record.RunId, + CommandId = record.CommandId ?? string.Empty, + CorrelationId = record.CorrelationId ?? string.Empty, + EndpointId = record.EndpointId ?? string.Empty, + ImplementationKind = (int)record.ImplementationKind, + TargetActorId = record.TargetActorId ?? string.Empty, + RevisionId = record.RevisionId ?? string.Empty, + DeploymentId = record.DeploymentId ?? string.Empty, + Status = (int)record.Status, + TenantId = record.Identity?.TenantId ?? string.Empty, + AppId = record.Identity?.AppId ?? string.Empty, + Namespace = record.Identity?.Namespace ?? string.Empty, + CreatedAt = record.CreatedAt?.ToDateTimeOffset() ?? observedAt, + UpdatedAt = record.UpdatedAt?.ToDateTimeOffset() ?? observedAt, + StateVersion = stateEvent.Version, + LastEventId = stateEvent.EventId ?? string.Empty, + }; + + await _writeDispatcher.UpsertAsync(document, ct); + } +} diff --git a/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs b/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs new file mode 100644 index 000000000..7ee5c6044 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs @@ -0,0 +1,205 @@ +using Aevatar.CQRS.Projection.Stores.Abstractions; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Abstractions.Ports; +using Aevatar.GAgentService.Abstractions.Queries; +using Aevatar.GAgentService.Projection.Configuration; +using Aevatar.GAgentService.Projection.ReadModels; + +namespace Aevatar.GAgentService.Projection.Queries; + +public sealed class ServiceRunQueryReader : IServiceRunQueryPort +{ + private readonly IProjectionDocumentReader _documentStore; + private readonly bool _enabled; + + public ServiceRunQueryReader( + IProjectionDocumentReader documentStore, + ServiceProjectionOptions? options = null) + { + _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore)); + _enabled = options?.Enabled ?? true; + } + + public async Task> ListAsync( + ServiceRunQuery query, + CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(query); + if (!_enabled) + return []; + + var boundedTake = Math.Clamp(query.Take, 1, 200); + var filters = new List(2); + if (!string.IsNullOrWhiteSpace(query.ScopeId)) + { + filters.Add(new ProjectionDocumentFilter + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.ScopeId), + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(query.ScopeId), + }); + } + if (!string.IsNullOrWhiteSpace(query.ServiceId)) + { + filters.Add(new ProjectionDocumentFilter + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.ServiceId), + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(query.ServiceId), + }); + } + + var result = await _documentStore.QueryAsync( + new ProjectionDocumentQuery + { + Take = boundedTake, + Filters = filters, + Sorts = new[] + { + new ProjectionDocumentSort + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.UpdatedAt), + Direction = ProjectionDocumentSortDirection.Desc, + }, + new ProjectionDocumentSort + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.RunId), + Direction = ProjectionDocumentSortDirection.Asc, + }, + }, + }, + ct); + return result.Items.Take(boundedTake).Select(Map).ToList(); + } + + public async Task GetByRunIdAsync( + string scopeId, + string serviceId, + string runId, + CancellationToken ct = default) + { + if (!_enabled) + return null; + if (string.IsNullOrWhiteSpace(runId)) + return null; + + var direct = await _documentStore.GetAsync(runId.Trim(), ct); + if (direct != null && MatchesScopeAndService(direct, scopeId, serviceId)) + return Map(direct); + + var matches = await QueryByEqualityAsync( + scopeId, + serviceId, + nameof(ServiceRunCurrentStateReadModel.RunId), + runId.Trim(), + ct); + return matches.FirstOrDefault(); + } + + public async Task GetByCommandIdAsync( + string scopeId, + string serviceId, + string commandId, + CancellationToken ct = default) + { + if (!_enabled) + return null; + if (string.IsNullOrWhiteSpace(commandId)) + return null; + + var matches = await QueryByEqualityAsync( + scopeId, + serviceId, + nameof(ServiceRunCurrentStateReadModel.CommandId), + commandId.Trim(), + ct); + return matches.FirstOrDefault(); + } + + private async Task> QueryByEqualityAsync( + string scopeId, + string serviceId, + string fieldPath, + string value, + CancellationToken ct) + { + var filters = new List(3) + { + new ProjectionDocumentFilter + { + FieldPath = fieldPath, + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(value), + }, + }; + if (!string.IsNullOrWhiteSpace(scopeId)) + { + filters.Add(new ProjectionDocumentFilter + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.ScopeId), + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(scopeId), + }); + } + if (!string.IsNullOrWhiteSpace(serviceId)) + { + filters.Add(new ProjectionDocumentFilter + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.ServiceId), + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(serviceId), + }); + } + + var result = await _documentStore.QueryAsync( + new ProjectionDocumentQuery + { + Take = 5, + Filters = filters, + }, + ct); + return result.Items.Select(Map).ToList(); + } + + private static bool MatchesScopeAndService( + ServiceRunCurrentStateReadModel readModel, + string? scopeId, + string? serviceId) + { + if (!string.IsNullOrWhiteSpace(scopeId) && + !string.Equals(readModel.ScopeId, scopeId, StringComparison.Ordinal)) + { + return false; + } + if (!string.IsNullOrWhiteSpace(serviceId) && + !string.Equals(readModel.ServiceId, serviceId, StringComparison.Ordinal)) + { + return false; + } + + return true; + } + + private static ServiceRunSnapshot Map(ServiceRunCurrentStateReadModel readModel) => + new( + readModel.ScopeId, + readModel.ServiceId, + readModel.ServiceKey, + readModel.RunId, + readModel.CommandId, + readModel.CorrelationId, + readModel.EndpointId, + (ServiceImplementationKind)readModel.ImplementationKind, + readModel.TargetActorId, + readModel.RevisionId, + readModel.DeploymentId, + (ServiceRunStatus)readModel.Status, + readModel.ActorId, + readModel.TenantId, + readModel.AppId, + readModel.Namespace, + readModel.StateVersion, + readModel.LastEventId, + readModel.CreatedAt, + readModel.UpdatedAt); +} diff --git a/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs b/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs index a10c9a0c5..3347f068a 100644 --- a/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs +++ b/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs @@ -202,6 +202,21 @@ public IList Targets } } +public sealed partial class ServiceRunCurrentStateReadModel : IProjectionReadModel +{ + public DateTimeOffset CreatedAt + { + get => ServiceProjectionReadModelSupport.ToDateTimeOffset(CreatedAtUtcValue); + set => CreatedAtUtcValue = ServiceProjectionReadModelSupport.ToTimestamp(value); + } + + public DateTimeOffset UpdatedAt + { + get => ServiceProjectionReadModelSupport.ToDateTimeOffset(UpdatedAtUtcValue); + set => UpdatedAtUtcValue = ServiceProjectionReadModelSupport.ToTimestamp(value); + } +} + internal static class ServiceProjectionReadModelSupport { public static Timestamp ToTimestamp(DateTimeOffset value) => diff --git a/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto b/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto index 1ebe6c536..684eb1c82 100644 --- a/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto +++ b/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto @@ -175,3 +175,31 @@ message ServiceTrafficTargetReadModel { int32 allocation_weight = 4; string serving_state = 5; } + +// --- ServiceRunCurrentStateReadModel --- + +message ServiceRunCurrentStateReadModel { + string id = 1; + string actor_id = 2; + int64 state_version = 3; + string last_event_id = 4; + + string scope_id = 5; + string service_id = 6; + string service_key = 7; + string run_id = 8; + string command_id = 9; + string correlation_id = 10; + string endpoint_id = 11; + int32 implementation_kind = 12; + string target_actor_id = 13; + string revision_id = 14; + string deployment_id = 15; + int32 status = 16; + string tenant_id = 17; + string app_id = 18; + string namespace = 19; + + google.protobuf.Timestamp created_at_utc_value = 20; + google.protobuf.Timestamp updated_at_utc_value = 21; +} diff --git a/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs b/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs new file mode 100644 index 000000000..149a55c63 --- /dev/null +++ b/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs @@ -0,0 +1,165 @@ +using Aevatar.Foundation.Runtime.Persistence; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Core.GAgents; +using Aevatar.GAgentService.Tests.TestSupport; +using FluentAssertions; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.GAgentService.Tests.Core; + +public sealed class ServiceRunGAgentTests +{ + [Fact] + public async Task HandleRegisterAsync_ShouldPersistRecord_AndDefaultStatusToAccepted() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + await actor.ActivateAsync(); + + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + actor.State.Record.Should().NotBeNull(); + actor.State.Record!.RunId.Should().Be("run-1"); + actor.State.Record.Status.Should().Be(ServiceRunStatus.Accepted); + actor.State.LastAppliedEventVersion.Should().Be(1); + } + + [Fact] + public async Task HandleRegisterAsync_ShouldBeIdempotent_WhenRunIdAlreadyBound() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + actor.State.LastAppliedEventVersion.Should().Be(1); + } + + [Fact] + public async Task HandleRegisterAsync_ShouldRejectMismatchedRunId() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-2"), + }); + + await act.Should().ThrowAsync() + .WithMessage("*run-1*cannot register run 'run-2'*"); + } + + [Fact] + public async Task HandleRegisterAsync_ShouldRejectMissingRequiredFields() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:bad", + static () => new ServiceRunGAgent()); + + var noRunId = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = new ServiceRunRecord { ScopeId = "t", ServiceId = "s", CommandId = "c" }, + }); + await noRunId.Should().ThrowAsync().WithMessage("run_id*"); + } + + [Fact] + public async Task HandleUpdateStatusAsync_ShouldAdvanceStatusAndStamp() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + await actor.HandleUpdateStatusAsync(new UpdateServiceRunStatusRequested + { + RunId = "run-1", + Status = ServiceRunStatus.Completed, + }); + + actor.State.Record!.Status.Should().Be(ServiceRunStatus.Completed); + actor.State.LastAppliedEventVersion.Should().Be(2); + } + + [Fact] + public async Task HandleUpdateStatusAsync_ShouldNoOp_WhenStatusUnchanged() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + await actor.HandleUpdateStatusAsync(new UpdateServiceRunStatusRequested + { + RunId = "run-1", + Status = ServiceRunStatus.Accepted, + }); + + actor.State.LastAppliedEventVersion.Should().Be(1); + } + + [Fact] + public async Task HandleUpdateStatusAsync_ShouldRejectWhenNotRegistered() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + + var act = () => actor.HandleUpdateStatusAsync(new UpdateServiceRunStatusRequested + { + RunId = "run-1", + Status = ServiceRunStatus.Completed, + }); + await act.Should().ThrowAsync() + .WithMessage("*has no registered run*"); + } + + private static ServiceRunRecord BuildRecord(string runId) => + new() + { + ScopeId = "tenant-1", + ServiceId = "svc-1", + ServiceKey = "tenant-1:svc-1", + RunId = runId, + CommandId = $"cmd-{runId}", + CorrelationId = $"corr-{runId}", + EndpointId = "run", + ImplementationKind = ServiceImplementationKind.Static, + TargetActorId = $"target-{runId}", + RevisionId = "r1", + DeploymentId = "dep-1", + Status = ServiceRunStatus.Unspecified, + CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow), + }; +} diff --git a/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs b/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs index 2af93dd78..60c849349 100644 --- a/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs +++ b/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs @@ -1,6 +1,7 @@ using Aevatar.AI.Abstractions; using Aevatar.Foundation.Abstractions; using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Abstractions.Ports; using Aevatar.GAgentService.Infrastructure.Dispatch; using Aevatar.GAgentService.Tests.TestSupport; using Aevatar.Scripting.Core.Ports; @@ -19,7 +20,8 @@ public async Task DispatchAsync_ShouldDispatchStaticEnvelope() var dispatcher = new DefaultServiceInvocationDispatcher( dispatchPort, new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run"); var request = new ServiceInvocationRequest { @@ -46,7 +48,8 @@ public async Task DispatchAsync_ShouldDelegateScriptingRun() var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), scriptPort, - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Scripting, endpointId: "run", @@ -83,7 +86,8 @@ public async Task DispatchAsync_ShouldCreateWorkflowRun_AndSendEnvelope() var dispatcher = new DefaultServiceInvocationDispatcher( dispatchPort, new RecordingScriptRuntimeCommandPort(), - workflowPort); + workflowPort, + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -127,7 +131,8 @@ public async Task DispatchAsync_ShouldPreferIdentityTenantIdOverPayloadScope() var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - workflowPort); + workflowPort, + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -166,7 +171,8 @@ public async Task DispatchAsync_ShouldResolveScopeIdFromRequestScopeBeforeMetada var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - workflowPort); + workflowPort, + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -205,7 +211,8 @@ public async Task DispatchAsync_ShouldResolveScopeIdFromWorkflowMetadataKey_When var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - workflowPort); + workflowPort, + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -242,7 +249,8 @@ public async Task DispatchAsync_ShouldResolveScopeIdFromLegacyMetadataKey_WhenOt var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - workflowPort); + workflowPort, + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -277,7 +285,8 @@ public async Task DispatchAsync_ShouldRejectPayloadTypeMismatch() var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Static, endpointId: "run", @@ -302,7 +311,8 @@ public async Task DispatchAsync_ShouldGenerateCommandAndCorrelationIds_WhenMissi var dispatcher = new DefaultServiceInvocationDispatcher( dispatchPort, new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run"); var receipt = await dispatcher.DispatchAsync(target, new ServiceInvocationRequest @@ -325,7 +335,8 @@ public async Task DispatchAsync_ShouldRejectMissingPayload() var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run"); var act = () => dispatcher.DispatchAsync(target, new ServiceInvocationRequest @@ -344,7 +355,8 @@ public async Task DispatchAsync_ShouldRejectWorkflowPayloadThatIsNotChatRequest( var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -372,7 +384,8 @@ public async Task DispatchAsync_ShouldPassRequestedEventTypeAndGeneratedRunIdToS var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), scriptPort, - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Scripting, endpointId: "run", @@ -398,13 +411,111 @@ public async Task DispatchAsync_ShouldPassRequestedEventTypeAndGeneratedRunIdToS scriptPort.Calls[0].payload!.TypeUrl.Should().Be(Any.Pack(new StringValue()).TypeUrl); } + [Fact] + public async Task DispatchAsync_ShouldRegisterServiceRun_ForStaticPath() + { + var registry = new RecordingServiceRunRegistrationPort(); + var dispatcher = new DefaultServiceInvocationDispatcher( + new RecordingDispatchPort(), + new RecordingScriptRuntimeCommandPort(), + new RecordingWorkflowRunActorPort(), + registry); + var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run"); + var request = new ServiceInvocationRequest + { + Identity = GAgentServiceTestKit.CreateIdentity(), + EndpointId = "run", + CommandId = "cmd-static", + Payload = Any.Pack(new StringValue { Value = "payload" }), + }; + + var receipt = await dispatcher.DispatchAsync(target, request); + + registry.Calls.Should().ContainSingle(); + registry.Calls[0].RunId.Should().Be(receipt.CommandId); + registry.Calls[0].CommandId.Should().Be("cmd-static"); + registry.Calls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Static); + registry.Calls[0].TargetActorId.Should().Be("primary-actor"); + registry.Calls[0].ScopeId.Should().Be("tenant"); + registry.Calls[0].ServiceId.Should().Be("svc"); + } + + [Fact] + public async Task DispatchAsync_ShouldRegisterServiceRun_ForScriptingPath() + { + var registry = new RecordingServiceRunRegistrationPort(); + var dispatcher = new DefaultServiceInvocationDispatcher( + new RecordingDispatchPort(), + new RecordingScriptRuntimeCommandPort(), + new RecordingWorkflowRunActorPort(), + registry); + var target = CreateTarget( + ServiceImplementationKind.Scripting, + endpointId: "run", + requestTypeUrl: Any.Pack(new StringValue()).TypeUrl); + target.Artifact.DeploymentPlan.ScriptingPlan = new ScriptingServiceDeploymentPlan + { + Revision = "rev-1", + DefinitionActorId = "definition-1", + }; + var request = new ServiceInvocationRequest + { + Identity = GAgentServiceTestKit.CreateIdentity(), + EndpointId = "run", + CommandId = "cmd-script", + Payload = Any.Pack(new StringValue { Value = "payload" }), + }; + + await dispatcher.DispatchAsync(target, request); + + registry.Calls.Should().ContainSingle(); + registry.Calls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Scripting); + registry.Calls[0].CommandId.Should().Be("cmd-script"); + } + + [Fact] + public async Task DispatchAsync_ShouldRegisterServiceRun_ForWorkflowPath() + { + var registry = new RecordingServiceRunRegistrationPort(); + var workflowPort = new RecordingWorkflowRunActorPort(); + var dispatcher = new DefaultServiceInvocationDispatcher( + new RecordingDispatchPort(), + new RecordingScriptRuntimeCommandPort(), + workflowPort, + registry); + var target = CreateTarget( + ServiceImplementationKind.Workflow, + endpointId: "chat", + requestTypeUrl: Any.Pack(new ChatRequestEvent()).TypeUrl); + target.Artifact.DeploymentPlan.WorkflowPlan = new WorkflowServiceDeploymentPlan + { + WorkflowName = "wf", + WorkflowYaml = "name: wf", + }; + var request = new ServiceInvocationRequest + { + Identity = GAgentServiceTestKit.CreateIdentity(), + EndpointId = "chat", + CommandId = "cmd-wf", + Payload = Any.Pack(new ChatRequestEvent { Prompt = "hi" }), + }; + + await dispatcher.DispatchAsync(target, request); + + registry.Calls.Should().ContainSingle(); + registry.Calls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Workflow); + registry.Calls[0].TargetActorId.Should().Be(workflowPort.RunActor.Id); + registry.Calls[0].CommandId.Should().Be("cmd-wf"); + } + [Fact] public async Task DispatchAsync_ShouldRejectUnsupportedImplementationKind() { var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run"); target.Artifact.ImplementationKind = ServiceImplementationKind.Unspecified; @@ -453,6 +564,20 @@ private static ServiceInvocationResolvedTarget CreateTarget( }); } + private sealed class RecordingServiceRunRegistrationPort : IServiceRunRegistrationPort + { + public List Calls { get; } = []; + + public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default) + { + Calls.Add(record.Clone()); + return Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.RunId}", record.RunId)); + } + + public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default) => + Task.CompletedTask; + } + private sealed class RecordingDispatchPort : IActorDispatchPort { public List<(string actorId, EventEnvelope envelope)> Calls { get; } = []; diff --git a/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs b/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs new file mode 100644 index 000000000..8bf67ffa0 --- /dev/null +++ b/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs @@ -0,0 +1,204 @@ +using Aevatar.CQRS.Projection.Core.Abstractions; +using Aevatar.Foundation.Abstractions; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Projection.Contexts; +using Aevatar.GAgentService.Projection.Projectors; +using Aevatar.GAgentService.Projection.Queries; +using Aevatar.GAgentService.Projection.ReadModels; +using Aevatar.GAgentService.Abstractions.Queries; +using FluentAssertions; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.GAgentService.Tests.Projection; + +public sealed class ServiceRunCurrentStateProjectorTests +{ + [Fact] + public async Task ProjectAsync_ShouldMaterializeCurrentState_FromCommittedStateRoot() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new ServiceRunCurrentStateProjector( + store, + new FixedProjectionClock(DateTimeOffset.Parse("2026-04-27T00:00:00+00:00"))); + var observedAt = DateTimeOffset.Parse("2026-04-27T01:00:00+00:00"); + var record = BuildRecord( + scopeId: "tenant-1", + serviceId: "svc-1", + runId: "run-1", + commandId: "cmd-1", + implementation: ServiceImplementationKind.Workflow, + targetActorId: "workflow-run:abc", + createdAt: observedAt); + var envelope = WrapCommittedRunState( + record, + stateVersion: 3, + eventId: "evt-registered", + observedAt: observedAt); + var context = new ServiceRunCurrentStateProjectionContext + { + RootActorId = "service-run:run-1", + ProjectionKind = "service-runs", + }; + + await projector.ProjectAsync(context, envelope); + + var doc = await store.GetAsync("run-1"); + doc.Should().NotBeNull(); + doc!.RunId.Should().Be("run-1"); + doc.CommandId.Should().Be("cmd-1"); + doc.ScopeId.Should().Be("tenant-1"); + doc.ServiceId.Should().Be("svc-1"); + doc.ActorId.Should().Be("service-run:run-1"); + doc.ImplementationKind.Should().Be((int)ServiceImplementationKind.Workflow); + doc.TargetActorId.Should().Be("workflow-run:abc"); + doc.Status.Should().Be((int)ServiceRunStatus.Accepted); + doc.StateVersion.Should().Be(3); + doc.LastEventId.Should().Be("evt-registered"); + } + + [Fact] + public async Task ProjectAsync_ShouldIgnoreEnvelope_WithoutCommittedStateRoot() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new ServiceRunCurrentStateProjector( + store, + new FixedProjectionClock(DateTimeOffset.UtcNow)); + var context = new ServiceRunCurrentStateProjectionContext + { + RootActorId = "service-run:run-x", + ProjectionKind = "service-runs", + }; + + await projector.ProjectAsync(context, new EventEnvelope + { + Id = "raw", + Payload = Any.Pack(new StringValue { Value = "noop" }), + }); + + (await store.ReadItemsAsync()).Should().BeEmpty(); + } + + [Fact] + public async Task QueryReader_ShouldFilterByScopeAndService_AndResolveByRunIdAndCommandId() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new ServiceRunCurrentStateProjector( + store, + new FixedProjectionClock(DateTimeOffset.Parse("2026-04-27T00:00:00+00:00"))); + var reader = new ServiceRunQueryReader(store); + await projector.ProjectAsync( + CreateContext("service-run:run-a"), + WrapCommittedRunState( + BuildRecord("tenant-1", "svc-1", "run-a", "cmd-a", ServiceImplementationKind.Static, "actor-a"), + stateVersion: 1, + eventId: "evt-a", + observedAt: DateTimeOffset.Parse("2026-04-27T01:00:00+00:00"))); + await projector.ProjectAsync( + CreateContext("service-run:run-b"), + WrapCommittedRunState( + BuildRecord("tenant-1", "svc-1", "run-b", "cmd-b", ServiceImplementationKind.Workflow, "actor-b"), + stateVersion: 1, + eventId: "evt-b", + observedAt: DateTimeOffset.Parse("2026-04-27T02:00:00+00:00"))); + await projector.ProjectAsync( + CreateContext("service-run:run-c"), + WrapCommittedRunState( + BuildRecord("tenant-2", "svc-1", "run-c", "cmd-c", ServiceImplementationKind.Scripting, "actor-c"), + stateVersion: 1, + eventId: "evt-c", + observedAt: DateTimeOffset.Parse("2026-04-27T03:00:00+00:00"))); + + var listForTenant1 = await reader.ListAsync(new ServiceRunQuery("tenant-1", "svc-1")); + listForTenant1.Should().HaveCount(2); + listForTenant1.Select(x => x.RunId).Should().BeEquivalentTo(new[] { "run-a", "run-b" }); + + var listForTenant2 = await reader.ListAsync(new ServiceRunQuery("tenant-2", "svc-1")); + listForTenant2.Select(x => x.RunId).Should().Equal("run-c"); + + var byRun = await reader.GetByRunIdAsync("tenant-1", "svc-1", "run-a"); + byRun.Should().NotBeNull(); + byRun!.CommandId.Should().Be("cmd-a"); + + var byCommand = await reader.GetByCommandIdAsync("tenant-1", "svc-1", "cmd-b"); + byCommand.Should().NotBeNull(); + byCommand!.RunId.Should().Be("run-b"); + + var byRunWrongScope = await reader.GetByRunIdAsync("tenant-1", "svc-1", "run-c"); + byRunWrongScope.Should().BeNull(); + } + + private static ServiceRunCurrentStateProjectionContext CreateContext(string rootActorId) => + new() + { + RootActorId = rootActorId, + ProjectionKind = "service-runs", + }; + + private static ServiceRunRecord BuildRecord( + string scopeId, + string serviceId, + string runId, + string commandId, + ServiceImplementationKind implementation, + string targetActorId, + DateTimeOffset? createdAt = null) => + new() + { + ScopeId = scopeId, + ServiceId = serviceId, + ServiceKey = $"{scopeId}:{serviceId}", + RunId = runId, + CommandId = commandId, + CorrelationId = commandId, + EndpointId = "run", + ImplementationKind = implementation, + TargetActorId = targetActorId, + RevisionId = "r1", + DeploymentId = "dep-1", + Status = ServiceRunStatus.Accepted, + CreatedAt = createdAt.HasValue ? Timestamp.FromDateTimeOffset(createdAt.Value) : null, + UpdatedAt = createdAt.HasValue ? Timestamp.FromDateTimeOffset(createdAt.Value) : null, + Identity = new ServiceIdentity + { + TenantId = scopeId, + AppId = "app", + Namespace = "default", + ServiceId = serviceId, + }, + }; + + private static EventEnvelope WrapCommittedRunState( + ServiceRunRecord record, + long stateVersion, + string eventId, + DateTimeOffset observedAt) + { + var state = new ServiceRunState + { + Record = record.Clone(), + LastAppliedEventVersion = stateVersion, + LastEventId = eventId, + }; + return new EventEnvelope + { + Id = $"outer-{eventId}", + Timestamp = Timestamp.FromDateTimeOffset(observedAt), + Route = EnvelopeRouteSemantics.CreateObserverPublication("root-actor"), + Payload = Any.Pack(new CommittedStateEventPublished + { + StateEvent = new StateEvent + { + EventId = eventId, + Version = stateVersion, + Timestamp = Timestamp.FromDateTimeOffset(observedAt), + EventData = Any.Pack(new ServiceRunRegisteredEvent + { + Record = record.Clone(), + }), + }, + StateRoot = Any.Pack(state), + }), + }; + } +} From 4c2bc7ca999533c8b6db3397ba45bd07d20ee27e Mon Sep 17 00:00:00 2001 From: eanzhao Date: Mon, 27 Apr 2026 13:44:03 +0800 Subject: [PATCH 2/4] Address PR #430 review feedback Three P1 review items from eanzhao + codex: 1. Registry actor and document keys are now scoped to {scopeId}:{serviceId}:{runId} via the new ServiceRunIds helper. Reusing the same command_id across different scope/service no longer collides on a single ServiceRunGAgent or overwrites another tenant's readmodel entry. 2. ServiceRunGAgent's idempotent re-register check now also validates scope_id, service_id, and target_actor_id against the existing record so silently accepting a foreign re-registration is impossible. 3. ScopeServiceRunSummaryHttpResponse.ActorId now returns the controllable target actor (workflow run actor for workflow runs) so existing resume/signal/stop callers that round-trip the field continue to resolve; the registry actor is internal infrastructure. 4. HandleInvokeStreamAsync (and its default-chat forwarder) now register the service run via IServiceRunRegistrationPort before delegating to workflow / static / scripting stream handlers, so refresh after a streamed invoke returns the run from /runs. Adds ServiceRunRegistrationAdapter unit tests, composite-key collision tests on the projector, and scope/service/target mismatch tests on the actor. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ServiceRunIds.cs | 23 +++ .../GAgents/ServiceRunGAgent.cs | 33 ++- .../Endpoints/ScopeServiceEndpoints.cs | 50 ++++- .../Adapters/ServiceRunRegistrationAdapter.cs | 8 +- .../ServiceRunCurrentStateProjector.cs | 8 +- .../Queries/ServiceRunQueryReader.cs | 38 +--- .../Core/ServiceRunGAgentTests.cs | 60 ++++++ .../ServiceRunRegistrationAdapterTests.cs | 193 ++++++++++++++++++ .../ServiceRunCurrentStateProjectorTests.cs | 51 ++++- 9 files changed, 421 insertions(+), 43 deletions(-) create mode 100644 src/platform/Aevatar.GAgentService.Abstractions/ServiceRunIds.cs create mode 100644 test/Aevatar.GAgentService.Tests/Infrastructure/ServiceRunRegistrationAdapterTests.cs diff --git a/src/platform/Aevatar.GAgentService.Abstractions/ServiceRunIds.cs b/src/platform/Aevatar.GAgentService.Abstractions/ServiceRunIds.cs new file mode 100644 index 000000000..6e4a1070d --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/ServiceRunIds.cs @@ -0,0 +1,23 @@ +namespace Aevatar.GAgentService.Abstractions; + +public static class ServiceRunIds +{ + public const string ActorPrefix = "service-run:"; + + public static string BuildKey(string scopeId, string serviceId, string runId) + { + if (string.IsNullOrWhiteSpace(scopeId)) + throw new ArgumentException("scopeId is required.", nameof(scopeId)); + if (string.IsNullOrWhiteSpace(serviceId)) + throw new ArgumentException("serviceId is required.", nameof(serviceId)); + if (string.IsNullOrWhiteSpace(runId)) + throw new ArgumentException("runId is required.", nameof(runId)); + + return $"{Normalize(scopeId)}:{Normalize(serviceId)}:{Normalize(runId)}"; + } + + public static string BuildActorId(string scopeId, string serviceId, string runId) => + ActorPrefix + BuildKey(scopeId, serviceId, runId); + + private static string Normalize(string value) => value.Trim(); +} diff --git a/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs b/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs index 33b7bee27..75dc14595 100644 --- a/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs +++ b/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs @@ -24,12 +24,7 @@ public async Task HandleRegisterAsync(RegisterServiceRunRequested command) var existing = State.Record; if (existing != null && !string.IsNullOrWhiteSpace(existing.RunId)) { - if (!string.Equals(existing.RunId, command.Record.RunId, StringComparison.Ordinal)) - { - throw new InvalidOperationException( - $"Service run actor '{Id}' is bound to run '{existing.RunId}' and cannot register run '{command.Record.RunId}'."); - } - + EnsureExistingMatches(existing, command.Record); return; } @@ -106,6 +101,32 @@ private static ServiceRunState ApplyStatusUpdated(ServiceRunState state, Service return next; } + private void EnsureExistingMatches(ServiceRunRecord existing, ServiceRunRecord incoming) + { + if (!string.Equals(existing.RunId, incoming.RunId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' is bound to run '{existing.RunId}' and cannot register run '{incoming.RunId}'."); + } + if (!string.Equals(existing.ScopeId, incoming.ScopeId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' is bound to scope '{existing.ScopeId}' and cannot re-register under scope '{incoming.ScopeId}'."); + } + if (!string.Equals(existing.ServiceId, incoming.ServiceId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' is bound to service '{existing.ServiceId}' and cannot re-register under service '{incoming.ServiceId}'."); + } + if (!string.IsNullOrWhiteSpace(incoming.TargetActorId) && + !string.IsNullOrWhiteSpace(existing.TargetActorId) && + !string.Equals(existing.TargetActorId, incoming.TargetActorId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' is bound to target '{existing.TargetActorId}' and cannot re-register against target '{incoming.TargetActorId}'."); + } + } + private static void ValidateRecord(ServiceRunRecord record) { ArgumentNullException.ThrowIfNull(record); diff --git a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs index 98d156652..98a268875 100644 --- a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs +++ b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs @@ -496,6 +496,7 @@ private static async Task HandleInvokeDefaultChatStreamAsync( StreamScopeServiceHttpRequest request, [FromServices] ServiceInvocationResolutionService resolutionService, [FromServices] IInvokeAdmissionAuthorizer admissionAuthorizer, + [FromServices] IServiceRunRegistrationPort serviceRunRegistrationPort, [FromServices] ICommandInteractionService chatRunService, [FromServices] ICommandInteractionService gagentDraftRunService, [FromServices] IScriptRuntimeCommandPort scriptRuntimeCommandPort, @@ -520,6 +521,7 @@ await HandleInvokeStreamAsync( appId: null, resolutionService, admissionAuthorizer, + serviceRunRegistrationPort, chatRunService, gagentDraftRunService, scriptRuntimeCommandPort, @@ -867,6 +869,42 @@ private static async Task HandleGetRunAuditAsync( return Results.Ok(new ScopeServiceRunAuditHttpResponse(summary, report)); } + private static async Task RegisterStreamServiceRunAsync( + IServiceRunRegistrationPort serviceRunRegistrationPort, + ServiceInvocationResolvedTarget target, + ServiceInvocationRequest invocationRequest, + string scopeId, + string serviceId, + CancellationToken ct) + { + var commandId = string.IsNullOrWhiteSpace(invocationRequest.CommandId) + ? Guid.NewGuid().ToString("N") + : invocationRequest.CommandId; + var record = new ServiceRunRecord + { + ScopeId = scopeId, + ServiceId = serviceId, + ServiceKey = target.Service.ServiceKey ?? string.Empty, + RunId = commandId, + CommandId = commandId, + CorrelationId = string.IsNullOrWhiteSpace(invocationRequest.CorrelationId) + ? commandId + : invocationRequest.CorrelationId, + EndpointId = target.Endpoint.EndpointId ?? string.Empty, + ImplementationKind = target.Artifact.ImplementationKind, + // Stream paths construct the implementation-specific run actor downstream + // (workflow run actor, draft-run session, scripting runtime); the registry + // stores the service primary actor so the directory entry is observable + // immediately after invoke returns. + TargetActorId = target.Service.PrimaryActorId ?? string.Empty, + RevisionId = target.Service.RevisionId ?? string.Empty, + DeploymentId = target.Service.DeploymentId ?? string.Empty, + Status = ServiceRunStatus.Accepted, + Identity = invocationRequest.Identity?.Clone(), + }; + await serviceRunRegistrationPort.RegisterAsync(record, ct); + } + private static async Task ResolveServiceRunSnapshotAsync( string scopeId, string serviceId, @@ -894,6 +932,7 @@ private static async Task HandleInvokeStreamAsync( string? appId, [FromServices] ServiceInvocationResolutionService resolutionService, [FromServices] IInvokeAdmissionAuthorizer admissionAuthorizer, + [FromServices] IServiceRunRegistrationPort serviceRunRegistrationPort, [FromServices] ICommandInteractionService chatRunService, [FromServices] ICommandInteractionService gagentDraftRunService, [FromServices] IScriptRuntimeCommandPort scriptRuntimeCommandPort, @@ -925,6 +964,13 @@ await admissionAuthorizer.AuthorizeAsync( target.Endpoint, invocationRequest, ct); + await RegisterStreamServiceRunAsync( + serviceRunRegistrationPort, + target, + invocationRequest, + scopeId, + serviceId, + ct); switch (target.Artifact.ImplementationKind) { @@ -2073,7 +2119,9 @@ private static async Task BuildScopeRunSumma scopeId, serviceId, snapshot.RunId, - snapshot.ActorId, + // ActorId stays the controllable target so existing resume/signal/stop + // round-trips keep working; the registry actor is internal infra. + snapshot.TargetActorId, string.Empty, snapshot.RevisionId, snapshot.DeploymentId, diff --git a/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs b/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs index 76b0c0229..a93cffc76 100644 --- a/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs +++ b/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs @@ -36,8 +36,12 @@ public async Task RegisterAsync( ArgumentNullException.ThrowIfNull(record); if (string.IsNullOrWhiteSpace(record.RunId)) throw new InvalidOperationException("run_id is required."); + if (string.IsNullOrWhiteSpace(record.ScopeId)) + throw new InvalidOperationException("scope_id is required."); + if (string.IsNullOrWhiteSpace(record.ServiceId)) + throw new InvalidOperationException("service_id is required."); - var actorId = BuildRunActorId(record.RunId); + var actorId = ServiceRunIds.BuildActorId(record.ScopeId, record.ServiceId, record.RunId); var actor = await _runtime.CreateAsync(actorId, ct: ct); await _projectionPort.EnsureProjectionAsync(actor.Id, ct); @@ -81,8 +85,6 @@ public async Task UpdateStatusAsync( await _dispatchPort.DispatchAsync(runActorId, envelope, ct); } - private static string BuildRunActorId(string runId) => $"service-run:{runId}"; - private static EventEnvelope CreateEnvelope( string actorId, Any payload, diff --git a/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs b/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs index d1198a65f..1d3740011 100644 --- a/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs +++ b/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs @@ -39,13 +39,17 @@ public async ValueTask ProjectAsync( } var record = state.Record; - if (string.IsNullOrWhiteSpace(record.RunId)) + if (string.IsNullOrWhiteSpace(record.RunId) || + string.IsNullOrWhiteSpace(record.ScopeId) || + string.IsNullOrWhiteSpace(record.ServiceId)) + { return; + } var observedAt = CommittedStateEventEnvelope.ResolveTimestamp(envelope, _clock.UtcNow); var document = new ServiceRunCurrentStateReadModel { - Id = record.RunId, + Id = ServiceRunIds.BuildKey(record.ScopeId, record.ServiceId, record.RunId), ActorId = context.RootActorId, ScopeId = record.ScopeId ?? string.Empty, ServiceId = record.ServiceId ?? string.Empty, diff --git a/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs b/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs index 7ee5c6044..e84eb69fc 100644 --- a/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs +++ b/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs @@ -80,20 +80,17 @@ public async Task> ListAsync( { if (!_enabled) return null; - if (string.IsNullOrWhiteSpace(runId)) + if (string.IsNullOrWhiteSpace(runId) || + string.IsNullOrWhiteSpace(scopeId) || + string.IsNullOrWhiteSpace(serviceId)) + { return null; + } - var direct = await _documentStore.GetAsync(runId.Trim(), ct); - if (direct != null && MatchesScopeAndService(direct, scopeId, serviceId)) - return Map(direct); - - var matches = await QueryByEqualityAsync( - scopeId, - serviceId, - nameof(ServiceRunCurrentStateReadModel.RunId), - runId.Trim(), + var direct = await _documentStore.GetAsync( + ServiceRunIds.BuildKey(scopeId, serviceId, runId), ct); - return matches.FirstOrDefault(); + return direct == null ? null : Map(direct); } public async Task GetByCommandIdAsync( @@ -161,25 +158,6 @@ private async Task> QueryByEqualityAsync( return result.Items.Select(Map).ToList(); } - private static bool MatchesScopeAndService( - ServiceRunCurrentStateReadModel readModel, - string? scopeId, - string? serviceId) - { - if (!string.IsNullOrWhiteSpace(scopeId) && - !string.Equals(readModel.ScopeId, scopeId, StringComparison.Ordinal)) - { - return false; - } - if (!string.IsNullOrWhiteSpace(serviceId) && - !string.Equals(readModel.ServiceId, serviceId, StringComparison.Ordinal)) - { - return false; - } - - return true; - } - private static ServiceRunSnapshot Map(ServiceRunCurrentStateReadModel readModel) => new( readModel.ScopeId, diff --git a/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs b/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs index 149a55c63..399ccfd4c 100644 --- a/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs +++ b/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs @@ -70,6 +70,66 @@ await act.Should().ThrowAsync() .WithMessage("*run-1*cannot register run 'run-2'*"); } + [Fact] + public async Task HandleRegisterAsync_ShouldRejectScopeMismatchOnReRegister() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:tenant-1:svc-1:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + var foreign = BuildRecord("run-1"); + foreign.ScopeId = "tenant-2"; + var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested { Record = foreign }); + + await act.Should().ThrowAsync() + .WithMessage("*tenant-1*cannot re-register under scope 'tenant-2'*"); + } + + [Fact] + public async Task HandleRegisterAsync_ShouldRejectServiceMismatchOnReRegister() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:tenant-1:svc-1:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + var foreign = BuildRecord("run-1"); + foreign.ServiceId = "svc-2"; + var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested { Record = foreign }); + + await act.Should().ThrowAsync() + .WithMessage("*svc-1*cannot re-register under service 'svc-2'*"); + } + + [Fact] + public async Task HandleRegisterAsync_ShouldRejectTargetMismatchOnReRegister() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:tenant-1:svc-1:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + var foreign = BuildRecord("run-1"); + foreign.TargetActorId = "different-target"; + var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested { Record = foreign }); + + await act.Should().ThrowAsync() + .WithMessage("*target-run-1*cannot re-register against target 'different-target'*"); + } + [Fact] public async Task HandleRegisterAsync_ShouldRejectMissingRequiredFields() { diff --git a/test/Aevatar.GAgentService.Tests/Infrastructure/ServiceRunRegistrationAdapterTests.cs b/test/Aevatar.GAgentService.Tests/Infrastructure/ServiceRunRegistrationAdapterTests.cs new file mode 100644 index 000000000..fa94d5049 --- /dev/null +++ b/test/Aevatar.GAgentService.Tests/Infrastructure/ServiceRunRegistrationAdapterTests.cs @@ -0,0 +1,193 @@ +using Aevatar.Foundation.Abstractions; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Abstractions.Ports; +using Aevatar.GAgentService.Core.GAgents; +using Aevatar.GAgentService.Infrastructure.Adapters; +using Aevatar.GAgentService.Tests.TestSupport; +using FluentAssertions; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.GAgentService.Tests.Infrastructure; + +public sealed class ServiceRunRegistrationAdapterTests +{ + [Fact] + public async Task RegisterAsync_ShouldCreateActorWithCompositeId_AndDispatchRegisterEnvelope() + { + var runtime = new RecordingRunRegistryRuntime(); + var dispatchPort = new RecordingDispatchPort(); + var projectionPort = new RecordingServiceRunProjectionPort(); + var adapter = new ServiceRunRegistrationAdapter(runtime, dispatchPort, projectionPort); + + var record = BuildRecord(scopeId: "tenant-1", serviceId: "svc-1", runId: "run-1"); + var result = await adapter.RegisterAsync(record); + + var expectedActorId = ServiceRunIds.BuildActorId("tenant-1", "svc-1", "run-1"); + result.RunActorId.Should().Be(expectedActorId); + result.RunId.Should().Be("run-1"); + runtime.CreateCalls.Should().ContainSingle(); + runtime.CreateCalls[0].agentType.Should().Be(typeof(ServiceRunGAgent)); + runtime.CreateCalls[0].actorId.Should().Be(expectedActorId); + projectionPort.EnsureCalls.Should().Equal(expectedActorId); + dispatchPort.Calls.Should().ContainSingle(); + dispatchPort.Calls[0].actorId.Should().Be(expectedActorId); + dispatchPort.Calls[0].envelope.Payload.TypeUrl.Should().Contain("RegisterServiceRunRequested"); + } + + [Fact] + public async Task RegisterAsync_ShouldNotCollide_OnSameRunIdAcrossScopes() + { + var runtime = new RecordingRunRegistryRuntime(); + var adapter = new ServiceRunRegistrationAdapter( + runtime, + new RecordingDispatchPort(), + new RecordingServiceRunProjectionPort()); + + await adapter.RegisterAsync(BuildRecord("tenant-a", "svc", "run-shared")); + await adapter.RegisterAsync(BuildRecord("tenant-b", "svc", "run-shared")); + + runtime.CreateCalls.Should().HaveCount(2); + runtime.CreateCalls[0].actorId.Should().Be(ServiceRunIds.BuildActorId("tenant-a", "svc", "run-shared")); + runtime.CreateCalls[1].actorId.Should().Be(ServiceRunIds.BuildActorId("tenant-b", "svc", "run-shared")); + runtime.CreateCalls[0].actorId.Should().NotBe(runtime.CreateCalls[1].actorId); + } + + [Fact] + public async Task RegisterAsync_ShouldRejectMissingRequiredFields() + { + var adapter = new ServiceRunRegistrationAdapter( + new RecordingRunRegistryRuntime(), + new RecordingDispatchPort(), + new RecordingServiceRunProjectionPort()); + + var noRun = BuildRecord("tenant", "svc", string.Empty); + var act = () => adapter.RegisterAsync(noRun); + await act.Should().ThrowAsync().WithMessage("run_id*"); + + var noScope = BuildRecord(string.Empty, "svc", "run-1"); + var act2 = () => adapter.RegisterAsync(noScope); + await act2.Should().ThrowAsync().WithMessage("scope_id*"); + + var noService = BuildRecord("tenant", string.Empty, "run-1"); + var act3 = () => adapter.RegisterAsync(noService); + await act3.Should().ThrowAsync().WithMessage("service_id*"); + } + + [Fact] + public async Task UpdateStatusAsync_ShouldDispatchUpdateEnvelope() + { + var dispatchPort = new RecordingDispatchPort(); + var adapter = new ServiceRunRegistrationAdapter( + new RecordingRunRegistryRuntime(), + dispatchPort, + new RecordingServiceRunProjectionPort()); + + await adapter.UpdateStatusAsync("service-run:tenant:svc:run-1", "run-1", ServiceRunStatus.Completed); + + dispatchPort.Calls.Should().ContainSingle(); + dispatchPort.Calls[0].actorId.Should().Be("service-run:tenant:svc:run-1"); + dispatchPort.Calls[0].envelope.Payload.TypeUrl.Should().Contain("UpdateServiceRunStatusRequested"); + } + + [Fact] + public async Task UpdateStatusAsync_ShouldNoOp_WhenStatusUnspecified() + { + var dispatchPort = new RecordingDispatchPort(); + var adapter = new ServiceRunRegistrationAdapter( + new RecordingRunRegistryRuntime(), + dispatchPort, + new RecordingServiceRunProjectionPort()); + + await adapter.UpdateStatusAsync("service-run:tenant:svc:run-1", "run-1", ServiceRunStatus.Unspecified); + + dispatchPort.Calls.Should().BeEmpty(); + } + + private static ServiceRunRecord BuildRecord(string scopeId, string serviceId, string runId) => + new() + { + ScopeId = scopeId, + ServiceId = serviceId, + ServiceKey = $"{scopeId}:{serviceId}", + RunId = runId, + CommandId = $"cmd-{runId}", + CorrelationId = $"corr-{runId}", + EndpointId = "run", + ImplementationKind = ServiceImplementationKind.Static, + TargetActorId = "primary-actor", + RevisionId = "r1", + DeploymentId = "dep-1", + Status = ServiceRunStatus.Unspecified, + CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow), + }; + + private sealed class RecordingRunRegistryRuntime : IActorRuntime + { + public List<(System.Type agentType, string actorId)> CreateCalls { get; } = []; + + public Task CreateAsync(string? id = null, CancellationToken ct = default) + where TAgent : IAgent => + CreateAsync(typeof(TAgent), id, ct); + + public Task CreateAsync(System.Type agentType, string? id = null, CancellationToken ct = default) + { + var actorId = id ?? $"created:{agentType.Name}"; + CreateCalls.Add((agentType, actorId)); + return Task.FromResult(new RecordingActor(actorId)); + } + + public Task DestroyAsync(string id, CancellationToken ct = default) => Task.CompletedTask; + + public Task GetAsync(string id) => Task.FromResult(null); + + public Task ExistsAsync(string id) => Task.FromResult(false); + + public Task LinkAsync(string parentId, string childId, CancellationToken ct = default) => Task.CompletedTask; + + public Task UnlinkAsync(string childId, CancellationToken ct = default) => Task.CompletedTask; + } + + private sealed class RecordingDispatchPort : IActorDispatchPort + { + public List<(string actorId, EventEnvelope envelope)> Calls { get; } = []; + + public Task DispatchAsync(string actorId, EventEnvelope envelope, CancellationToken ct = default) + { + Calls.Add((actorId, envelope)); + return Task.CompletedTask; + } + } + + private sealed class RecordingServiceRunProjectionPort : IServiceRunCurrentStateProjectionPort + { + public List EnsureCalls { get; } = []; + + public Task EnsureProjectionAsync(string actorId, CancellationToken ct = default) + { + EnsureCalls.Add(actorId); + return Task.CompletedTask; + } + } + + private sealed class RecordingActor : IActor + { + public RecordingActor(string id) + { + Id = id; + } + + public string Id { get; } + + public IAgent Agent { get; } = new TestStaticServiceAgent(); + + public Task ActivateAsync(CancellationToken ct = default) => Task.CompletedTask; + + public Task DeactivateAsync(CancellationToken ct = default) => Task.CompletedTask; + + public Task HandleEventAsync(EventEnvelope envelope, CancellationToken ct = default) => Task.CompletedTask; + + public Task GetParentIdAsync() => Task.FromResult(null); + + public Task> GetChildrenIdsAsync() => Task.FromResult>([]); + } +} diff --git a/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs b/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs index 8bf67ffa0..88c661218 100644 --- a/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs +++ b/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs @@ -43,7 +43,7 @@ public async Task ProjectAsync_ShouldMaterializeCurrentState_FromCommittedStateR await projector.ProjectAsync(context, envelope); - var doc = await store.GetAsync("run-1"); + var doc = await store.GetAsync(ServiceRunIds.BuildKey("tenant-1", "svc-1", "run-1")); doc.Should().NotBeNull(); doc!.RunId.Should().Be("run-1"); doc.CommandId.Should().Be("cmd-1"); @@ -128,6 +128,55 @@ await projector.ProjectAsync( byRunWrongScope.Should().BeNull(); } + [Fact] + public async Task ProjectAsync_ShouldNotCollide_WhenSameRunIdAcrossDifferentScopes() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new ServiceRunCurrentStateProjector( + store, + new FixedProjectionClock(DateTimeOffset.Parse("2026-04-27T00:00:00+00:00"))); + var observedAt = DateTimeOffset.Parse("2026-04-27T01:00:00+00:00"); + + await projector.ProjectAsync( + CreateContext("service-run:tenant-a:svc:run-shared"), + WrapCommittedRunState( + BuildRecord("tenant-a", "svc", "run-shared", "cmd-x", ServiceImplementationKind.Static, "actor-a", observedAt), + stateVersion: 1, + eventId: "evt-a", + observedAt: observedAt)); + await projector.ProjectAsync( + CreateContext("service-run:tenant-b:svc:run-shared"), + WrapCommittedRunState( + BuildRecord("tenant-b", "svc", "run-shared", "cmd-x", ServiceImplementationKind.Static, "actor-b", observedAt), + stateVersion: 1, + eventId: "evt-b", + observedAt: observedAt)); + + var docA = await store.GetAsync(ServiceRunIds.BuildKey("tenant-a", "svc", "run-shared")); + var docB = await store.GetAsync(ServiceRunIds.BuildKey("tenant-b", "svc", "run-shared")); + docA.Should().NotBeNull(); + docB.Should().NotBeNull(); + docA!.TargetActorId.Should().Be("actor-a"); + docB!.TargetActorId.Should().Be("actor-b"); + } + + [Fact] + public async Task ProjectAsync_ShouldIgnoreState_WithMissingScopeOrService() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new ServiceRunCurrentStateProjector( + store, + new FixedProjectionClock(DateTimeOffset.UtcNow)); + + var record = BuildRecord("tenant-1", "svc-1", "run-1", "cmd-1", ServiceImplementationKind.Static, "actor-1"); + record.ScopeId = string.Empty; + await projector.ProjectAsync( + CreateContext("service-run:bad"), + WrapCommittedRunState(record, stateVersion: 1, eventId: "evt-bad", observedAt: DateTimeOffset.UtcNow)); + + (await store.ReadItemsAsync()).Should().BeEmpty(); + } + private static ServiceRunCurrentStateProjectionContext CreateContext(string rootActorId) => new() { From b4213cd38429ef1a8d32ecfef94b47221ba6fc21 Mon Sep 17 00:00:00 2001 From: eanzhao Date: Mon, 27 Apr 2026 14:04:06 +0800 Subject: [PATCH 3/4] Wire member-stream invoke into service-run registry dev added HandleInvokeMemberStreamAsync (PR #427 member-first APIs) which forwards to HandleInvokeStreamAsync. Pass IServiceRunRegistrationPort through so the new member-stream entry point also registers a service-run record before delegating, matching the other stream entry points. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Endpoints/ScopeServiceEndpoints.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs index 1f4d990e1..3dfe774ae 100644 --- a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs +++ b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs @@ -772,6 +772,7 @@ private static async Task HandleInvokeMemberStreamAsync( [FromServices] IMemberPublishedServiceResolver memberPublishedServiceResolver, [FromServices] ServiceInvocationResolutionService resolutionService, [FromServices] IInvokeAdmissionAuthorizer admissionAuthorizer, + [FromServices] IServiceRunRegistrationPort serviceRunRegistrationPort, [FromServices] ICommandInteractionService chatRunService, [FromServices] ICommandInteractionService gagentDraftRunService, [FromServices] IScriptRuntimeCommandPort scriptRuntimeCommandPort, @@ -799,6 +800,7 @@ await HandleInvokeStreamAsync( null, resolutionService, admissionAuthorizer, + serviceRunRegistrationPort, chatRunService, gagentDraftRunService, scriptRuntimeCommandPort, From 037873bdfccbda724084669ab2b35e56d3a0ba5b Mon Sep 17 00:00:00 2001 From: eanzhao Date: Mon, 27 Apr 2026 14:30:41 +0800 Subject: [PATCH 4/4] Register stream service-runs with the actual run id Two P1 review items from PR #430: 1. Stream invoke paths previously registered the run upfront with a freshly generated commandId, but the workflow / static / scripting pipelines each produced their own run id downstream. The id the SSE RunStarted frame carried did not match the id stored in the registry, so /runs/{runId} would return 404 after refresh. Each stream branch now registers via IServiceRunRegistrationPort at the moment the actual run id is established: - Workflow: passes an onAcceptedHook into WorkflowCapabilityEndpoints.HandleChat and registers using receipt.ActorId / receipt.CommandId before the SSE writer emits run-context. - Static: registers inside HandleStaticGAgentChatStreamAsync's existing OnAcceptedAsync, using receipt.CommandId (the same value that becomes RunStarted.RunId). - Scripting: registers inside HandleScriptingServiceChatStreamAsync once the local runId is generated, before RunRuntimeAsync. The shared RegisterStreamServiceRunAsync helper now takes runId / commandId / correlationId / targetActorId explicitly so each branch supplies its authentic values. 2. The integration test host did not register IServiceRunRegistrationPort, so seven stream-endpoint test cases failed with "No service for type 'IServiceRunRegistrationPort' has been registered". Added a recording port + a fake IServiceRunQueryPort to the test host, and bridged the existing FakeWorkflowRunBindingReader fixtures into the query port (with a deployment resolver that fills RevisionId/DeploymentId from FakeServiceLifecycleQueryPort.Deployments) so existing tests keep working without per-test fixture changes. Stream-helper reflection tests now pass through small typed wrappers so future signature changes only require updating two helper definitions instead of every call site. Adds an assertion to the workflow stream test verifying the registry RunId / CommandId / TargetActorId match what the workflow pipeline returns. Removes three obsolete assertions that checked the workflow binding reader was queried (the run-list/get endpoints now go through IServiceRunQueryPort). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Endpoints/ScopeServiceEndpoints.cs | 91 +++++--- .../CapabilityApi/ChatEndpoints.cs | 5 +- .../ScopeServiceEndpointsStreamTests.cs | 92 ++++++-- .../ScopeServiceEndpointsTests.cs | 204 +++++++++++++++++- 4 files changed, 337 insertions(+), 55 deletions(-) diff --git a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs index 3dfe774ae..a83087fb4 100644 --- a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs +++ b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs @@ -1461,40 +1461,41 @@ private static async Task HandleGetRunAuditAsync( return Results.Ok(new ScopeServiceRunAuditHttpResponse(summary, report)); } - private static async Task RegisterStreamServiceRunAsync( + // Registers a stream-invocation run with the durable service-run registry using the + // actual run id that the implementation pipeline produced (workflow run actor id / + // draft-run command id / scripting-generated run id). Called once the downstream + // run id is known so /runs/{runId} resolves the same id the client receives via SSE. + private static ValueTask RegisterStreamServiceRunAsync( IServiceRunRegistrationPort serviceRunRegistrationPort, ServiceInvocationResolvedTarget target, ServiceInvocationRequest invocationRequest, string scopeId, string serviceId, + string runId, + string commandId, + string correlationId, + string targetActorId, CancellationToken ct) { - var commandId = string.IsNullOrWhiteSpace(invocationRequest.CommandId) - ? Guid.NewGuid().ToString("N") - : invocationRequest.CommandId; var record = new ServiceRunRecord { ScopeId = scopeId, ServiceId = serviceId, ServiceKey = target.Service.ServiceKey ?? string.Empty, - RunId = commandId, - CommandId = commandId, - CorrelationId = string.IsNullOrWhiteSpace(invocationRequest.CorrelationId) - ? commandId - : invocationRequest.CorrelationId, + RunId = runId, + CommandId = string.IsNullOrWhiteSpace(commandId) ? runId : commandId, + CorrelationId = string.IsNullOrWhiteSpace(correlationId) ? runId : correlationId, EndpointId = target.Endpoint.EndpointId ?? string.Empty, ImplementationKind = target.Artifact.ImplementationKind, - // Stream paths construct the implementation-specific run actor downstream - // (workflow run actor, draft-run session, scripting runtime); the registry - // stores the service primary actor so the directory entry is observable - // immediately after invoke returns. - TargetActorId = target.Service.PrimaryActorId ?? string.Empty, + TargetActorId = string.IsNullOrWhiteSpace(targetActorId) + ? target.Service.PrimaryActorId ?? string.Empty + : targetActorId, RevisionId = target.Service.RevisionId ?? string.Empty, DeploymentId = target.Service.DeploymentId ?? string.Empty, Status = ServiceRunStatus.Accepted, Identity = invocationRequest.Identity?.Clone(), }; - await serviceRunRegistrationPort.RegisterAsync(record, ct); + return new ValueTask(serviceRunRegistrationPort.RegisterAsync(record, ct)); } private static async Task ResolveServiceRunSnapshotAsync( @@ -1556,14 +1557,6 @@ await admissionAuthorizer.AuthorizeAsync( target.Endpoint, invocationRequest, ct); - await RegisterStreamServiceRunAsync( - serviceRunRegistrationPort, - target, - invocationRequest, - scopeId, - serviceId, - ct); - switch (target.Artifact.ImplementationKind) { case ServiceImplementationKind.Workflow: @@ -1580,7 +1573,20 @@ await WorkflowCapabilityEndpoints.HandleChat( Metadata = scopedHeaders, }, chatRunService, - ct); + ct, + onAcceptedHook: (receipt, token) => RegisterStreamServiceRunAsync( + serviceRunRegistrationPort, + target, + invocationRequest, + scopeId, + serviceId, + // For workflow, the SSE RunStarted carries the workflow run actor id as the run identifier; + // use the same id so /runs/{runId} resolves to this run after refresh. + runId: receipt.ActorId, + commandId: receipt.CommandId, + correlationId: receipt.CorrelationId, + targetActorId: receipt.ActorId, + token)); break; case ServiceImplementationKind.Static: @@ -1591,9 +1597,12 @@ await HandleStaticGAgentChatStreamAsync( request.ActorId, request.SessionId, scopeId, + serviceId, scopedHeaders, request.InputParts, gagentDraftRunService, + invocationRequest, + serviceRunRegistrationPort, ct); break; @@ -1604,9 +1613,12 @@ await HandleScriptingServiceChatStreamAsync( normalizedPrompt, request.SessionId, scopeId, + serviceId, scopedHeaders, scriptRuntimeCommandPort, scriptExecutionProjectionPort, + invocationRequest, + serviceRunRegistrationPort, ct); break; @@ -1642,9 +1654,12 @@ private static async Task HandleStaticGAgentChatStreamAsync( string? actorId, string? sessionId, string scopeId, + string serviceId, IReadOnlyDictionary? headers, IReadOnlyList? inputParts, ICommandInteractionService interactionService, + ServiceInvocationRequest invocationRequest, + IServiceRunRegistrationPort serviceRunRegistrationPort, CancellationToken ct) { var plan = target.Artifact.DeploymentPlan.StaticPlan; @@ -1677,6 +1692,19 @@ async ValueTask EmitAsync(AGUIEvent aguiEvent, CancellationToken token) async ValueTask OnAcceptedAsync(GAgentDraftRunAcceptedReceipt receipt, CancellationToken token) { http.Response.Headers["X-Correlation-Id"] = receipt.CorrelationId; + // Register the service run with the same id we are about to send to the client + // so /runs/{runId} resolves immediately on refresh. + await RegisterStreamServiceRunAsync( + serviceRunRegistrationPort, + target, + invocationRequest, + scopeId, + serviceId, + runId: receipt.CommandId, + commandId: receipt.CommandId, + correlationId: receipt.CorrelationId, + targetActorId: receipt.ActorId, + token); await EnsureSseStartedAsync(token); await writer.WriteAsync( new AGUIEvent @@ -1764,9 +1792,12 @@ private static async Task HandleScriptingServiceChatStreamAsync( string prompt, string? sessionId, string scopeId, + string serviceId, IReadOnlyDictionary? headers, IScriptRuntimeCommandPort scriptRuntimeCommandPort, IScriptExecutionProjectionPort scriptExecutionProjectionPort, + ServiceInvocationRequest invocationRequest, + IServiceRunRegistrationPort serviceRunRegistrationPort, CancellationToken ct) { var actorId = target.Service.PrimaryActorId; @@ -1775,6 +1806,18 @@ private static async Task HandleScriptingServiceChatStreamAsync( "Script runtime actor is not available. The service may not be activated."); var runId = Guid.NewGuid().ToString("N"); + // Register the service run with the same id the SSE RunStarted frame will carry. + await RegisterStreamServiceRunAsync( + serviceRunRegistrationPort, + target, + invocationRequest, + scopeId, + serviceId, + runId: runId, + commandId: runId, + correlationId: runId, + targetActorId: actorId, + ct); var chatRequest = new ChatRequestEvent { Prompt = prompt, diff --git a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs index fc85a8ca0..f36803851 100644 --- a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs +++ b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs @@ -38,7 +38,8 @@ public static async Task HandleChat( HttpContext http, ChatInput input, ICommandInteractionService chatRunService, - CancellationToken ct = default) + CancellationToken ct = default, + Func? onAcceptedHook = null) { using var scope = ApiRequestScope.BeginHttp(); var writer = new ChatSseResponseWriter(http.Response); @@ -73,6 +74,8 @@ public static async Task HandleChat( onAcceptedAsync: async (receipt, token) => { CapabilityTraceContext.ApplyCorrelationHeader(http.Response, receipt.CorrelationId); + if (onAcceptedHook != null) + await onAcceptedHook(receipt, token); await writer.StartAsync(token); await writer.WriteAsync(BuildRunContextFrame(receipt), token); scope.RecordFirstResponse(); diff --git a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs index ca5e595fb..9c64eda14 100644 --- a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs +++ b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs @@ -8,6 +8,7 @@ using Aevatar.Foundation.Abstractions; using Aevatar.Foundation.Abstractions.Streaming; using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Abstractions.Ports; using Aevatar.GAgentService.Abstractions.ScopeGAgents; using Aevatar.GAgentService.Application.ScopeGAgents; using Aevatar.GAgentService.Hosting.Endpoints; @@ -68,8 +69,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldCreateActor_AndEmitSy }; var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort); - await InvokePrivateTaskAsync( - HandleGAgentStreamMethod, + await InvokeStaticStreamAsync( http, CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"), "hello", @@ -120,8 +120,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldReuseExistingActor_An }; var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort); - await InvokePrivateTaskAsync( - HandleGAgentStreamMethod, + await InvokeStaticStreamAsync( http, CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"), "hello", @@ -155,8 +154,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldMapAllInputPartKinds_ }; var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort); - await InvokePrivateTaskAsync( - HandleGAgentStreamMethod, + await InvokeStaticStreamAsync( http, CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"), "hello", @@ -215,8 +213,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldPreserveRunErrorWitho }; var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort); - await InvokePrivateTaskAsync( - HandleGAgentStreamMethod, + await InvokeStaticStreamAsync( http, CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"), "hello", @@ -350,8 +347,7 @@ public async Task ScriptExecutionSessionEventProjector_ShouldRouteOnlyMatchingRu [Fact] public async Task HandleGAgentServiceChatStreamAsync_ShouldThrow_WhenAgentTypeCannotBeResolved() { - var act = () => InvokePrivateTaskAsync( - HandleGAgentStreamMethod, + var act = () => InvokeStaticStreamAsync( CreateHttpContext(), CreateStaticTarget("Missing.Agent, Missing.Assembly", primaryActorId: "actor-1"), "hello", @@ -370,8 +366,7 @@ await act.Should().ThrowAsync() [Fact] public async Task HandleScriptingServiceChatStreamAsync_ShouldThrow_WhenPrimaryActorMissing() { - var act = () => InvokePrivateTaskAsync( - HandleScriptingStreamMethod, + var act = () => InvokeScriptingStreamAsync( CreateHttpContext(), CreateScriptingTarget(primaryActorId: string.Empty), "hello", @@ -389,8 +384,7 @@ await act.Should().ThrowAsync() [Fact] public async Task HandleScriptingServiceChatStreamAsync_ShouldThrow_WhenActorCannotBeResolved() { - var act = () => InvokePrivateTaskAsync( - HandleScriptingStreamMethod, + var act = () => InvokeScriptingStreamAsync( CreateHttpContext(), CreateScriptingTarget(primaryActorId: "actor-1"), "hello", @@ -421,8 +415,7 @@ public async Task HandleScriptingServiceChatStreamAsync_ShouldEmitSyntheticFinis }, }; - await InvokePrivateTaskAsync( - HandleScriptingStreamMethod, + await InvokeScriptingStreamAsync( http, CreateScriptingTarget(primaryActorId: "actor-1"), "hello", @@ -469,8 +462,7 @@ public async Task HandleScriptingServiceChatStreamAsync_ShouldPreserveRunErrorWi }, }; - await InvokePrivateTaskAsync( - HandleScriptingStreamMethod, + await InvokeScriptingStreamAsync( http, CreateScriptingTarget(primaryActorId: "actor-1"), "hello", @@ -509,8 +501,7 @@ public async Task HandleScriptingServiceChatStreamAsync_ShouldAvoidSyntheticDupl }, }; - await InvokePrivateTaskAsync( - HandleScriptingStreamMethod, + await InvokeScriptingStreamAsync( http, CreateScriptingTarget(primaryActorId: "actor-1"), "hello", @@ -620,6 +611,67 @@ private static ServiceInvocationResolvedTarget CreateScriptingTarget(string prim artifact.Endpoints[0]); } + private static Task InvokeStaticStreamAsync( + HttpContext http, + ServiceInvocationResolvedTarget target, + string prompt, + string? actorId, + string? sessionId, + string scopeId, + IReadOnlyDictionary? headers, + IReadOnlyList? inputParts, + ICommandInteractionService interactionService, + CancellationToken ct) => + InvokePrivateTaskAsync( + HandleGAgentStreamMethod, + http, + target, + prompt, + actorId, + sessionId, + scopeId, + "svc-default", + headers, + inputParts, + interactionService, + new ServiceInvocationRequest(), + new NoOpServiceRunRegistrationPort(), + ct); + + private static Task InvokeScriptingStreamAsync( + HttpContext http, + ServiceInvocationResolvedTarget target, + string prompt, + string? sessionId, + string scopeId, + IReadOnlyDictionary? headers, + IScriptRuntimeCommandPort scriptRuntimeCommandPort, + IScriptExecutionProjectionPort scriptExecutionProjectionPort, + CancellationToken ct) => + InvokePrivateTaskAsync( + HandleScriptingStreamMethod, + http, + target, + prompt, + sessionId, + scopeId, + "svc-default", + headers, + scriptRuntimeCommandPort, + scriptExecutionProjectionPort, + new ServiceInvocationRequest(), + new NoOpServiceRunRegistrationPort(), + ct); + + private sealed class NoOpServiceRunRegistrationPort : IServiceRunRegistrationPort + { + public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default) => + Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.RunId}", record.RunId)); + + public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default) => + Task.CompletedTask; + } + private static async Task InvokePrivateTaskAsync(MethodInfo method, params object?[] args) { var result = method.Invoke(null, args); diff --git a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs index 090aa5c15..26c816b9b 100644 --- a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs +++ b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs @@ -1490,6 +1490,13 @@ await host.ArtifactStore.SaveAsync( host.InteractionService.LastRequest!.ActorId.Should().Be("definition-actor-1"); host.InteractionService.LastRequest.ScopeId.Should().Be("scope-a"); host.InteractionService.LastRequest.Metadata.Should().ContainKey("source").WhoseValue.Should().Be("tests"); + // Service-run registry receives the actual workflow run actor id as the run id, so + // /runs/{runId} can resolve the same id the SSE RunStarted frame carries. + host.ServiceRunRegistrationPort.RegisterCalls.Should().ContainSingle(); + host.ServiceRunRegistrationPort.RegisterCalls[0].RunId.Should().Be("run-actor-1"); + host.ServiceRunRegistrationPort.RegisterCalls[0].CommandId.Should().Be("cmd-1"); + host.ServiceRunRegistrationPort.RegisterCalls[0].TargetActorId.Should().Be("run-actor-1"); + host.ServiceRunRegistrationPort.RegisterCalls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Workflow); } [Fact] @@ -1815,9 +1822,12 @@ public async Task ScopeServiceEndpointHelpers_ShouldRejectScriptingStream_WhenRu "hello", "session-1", "scope-a", + "default", new Dictionary(), new NoOpScriptRuntimeCommandPort(), new NoOpScriptExecutionProjectionPort(), + new ServiceInvocationRequest(), + new NoOpServiceRunRegistrationPort(), CancellationToken.None)) .Should() .ThrowAsync(); @@ -1876,9 +1886,12 @@ public async Task ScopeServiceEndpointHelpers_ShouldRejectScriptingStream_WhenRu "hello", "session-1", "scope-a", + "default", new Dictionary(), new ThrowingScriptRuntimeCommandPort(new InvalidOperationException("Script runtime actor 'script-runtime-1' could not be resolved. The service may not be activated.")), new NoOpScriptExecutionProjectionPort(), + new ServiceInvocationRequest(), + new NoOpServiceRunRegistrationPort(), CancellationToken.None)) .Should() .ThrowAsync(); @@ -2815,8 +2828,6 @@ public async Task ListDefaultRunsEndpoint_ShouldReturnDefaultServiceRunHistory() response.Runs[0].RevisionId.Should().Be("rev-1"); response.Runs[0].DeploymentId.Should().Be("dep-old"); response.Runs[0].WorkflowName.Should().Be("default-flow"); - host.RunBindingReader.Queries.Should().ContainSingle(); - host.RunBindingReader.Queries[0].ScopeId.Should().Be("scope-a"); } [Fact] @@ -2927,8 +2938,6 @@ public async Task ListMemberRunsEndpoint_ShouldReturnMemberScopedRunHistory() response.Runs[0].RevisionId.Should().Be("rev-1"); response.Runs[0].DeploymentId.Should().Be("dep-member-old"); response.Runs[0].StateVersion.Should().Be(13); - host.RunBindingReader.Queries.Should().ContainSingle(); - host.RunBindingReader.Queries[0].DefinitionActorIds.Should().BeEquivalentTo(["def-member-active", "def-member-old"]); } [Fact] @@ -3199,10 +3208,6 @@ public async Task ListRunsEndpoint_ShouldReturnScopeScopedRunHistory() response.Runs[0].CompletionStatus.Should().Be(WorkflowRunCompletionStatus.Completed); response.Runs[0].StateVersion.Should().Be(7); response.Runs[0].LastEventId.Should().Be("evt-7"); - host.RunBindingReader.Queries.Should().ContainSingle(); - host.RunBindingReader.Queries[0].ScopeId.Should().Be("scope-a"); - host.RunBindingReader.Queries[0].Take.Should().Be(5); - host.RunBindingReader.Queries[0].DefinitionActorIds.Should().BeEquivalentTo(["def-actor-active", "def-actor-old"]); } [Fact] @@ -4152,7 +4157,9 @@ private ScopeServiceEndpointTestHost( FakeWorkflowRunBindingReader runBindingReader, RecordingResumeDispatchService resumeDispatchService, RecordingSignalDispatchService signalDispatchService, - RecordingStopDispatchService stopDispatchService) + RecordingStopDispatchService stopDispatchService, + RecordingServiceRunRegistrationPort serviceRunRegistrationPort, + FakeServiceRunQueryPort serviceRunQueryPort) { _app = app; Client = client; @@ -4172,6 +4179,8 @@ private ScopeServiceEndpointTestHost( ResumeDispatchService = resumeDispatchService; SignalDispatchService = signalDispatchService; StopDispatchService = stopDispatchService; + ServiceRunRegistrationPort = serviceRunRegistrationPort; + ServiceRunQueryPort = serviceRunQueryPort; } public HttpClient Client { get; } @@ -4208,6 +4217,10 @@ private ScopeServiceEndpointTestHost( public RecordingStopDispatchService StopDispatchService { get; } + public RecordingServiceRunRegistrationPort ServiceRunRegistrationPort { get; } + + public FakeServiceRunQueryPort ServiceRunQueryPort { get; } + public static async Task StartAsync(bool authenticationEnabled = true) { var builder = WebApplication.CreateBuilder(new WebApplicationOptions @@ -4238,6 +4251,20 @@ public static async Task StartAsync(bool authentic var stopDispatchService = new RecordingStopDispatchService(); var actorRuntime = new NoOpActorRuntime(); var eventSubscriptionProvider = new NoOpActorEventSubscriptionProvider(); + var serviceRunQueryPort = new FakeServiceRunQueryPort + { + WorkflowBindingFallback = runBindingReader, + DeploymentResolver = binding => + { + var deployment = lifecycleQueryPort.Deployments?.Deployments.FirstOrDefault(d => + string.Equals(d.PrimaryActorId, binding.EffectiveDefinitionActorId, StringComparison.Ordinal)); + return (deployment?.DeploymentId ?? string.Empty, deployment?.RevisionId ?? string.Empty); + }, + }; + var serviceRunRegistrationPort = new RecordingServiceRunRegistrationPort + { + LinkedQueryPort = serviceRunQueryPort, + }; builder.Services.AddSingleton(commandPort); builder.Services.AddSingleton(queryPort); builder.Services.AddSingleton(scopeBindingPort); @@ -4262,6 +4289,8 @@ public static async Task StartAsync(bool authentic builder.Services.AddSingleton>(stopDispatchService); builder.Services.AddSingleton(actorRuntime); builder.Services.AddSingleton(eventSubscriptionProvider); + builder.Services.AddSingleton(serviceRunRegistrationPort); + builder.Services.AddSingleton(serviceRunQueryPort); builder.Services.AddSingleton>( Options.Create(new ScopeWorkflowCapabilityOptions { @@ -4369,7 +4398,9 @@ public static async Task StartAsync(bool authentic runBindingReader, resumeDispatchService, signalDispatchService, - stopDispatchService); + stopDispatchService, + serviceRunRegistrationPort, + serviceRunQueryPort); } private static bool TryGetRequestedScopeId(string? path, out string scopeId) @@ -4574,6 +4605,147 @@ private sealed class RecordingServiceGovernanceQueryPort : IServiceGovernanceQue throw new NotSupportedException(); } + private sealed class RecordingServiceRunRegistrationPort : IServiceRunRegistrationPort + { + public List RegisterCalls { get; } = []; + public List<(string runActorId, string runId, ServiceRunStatus status)> StatusCalls { get; } = []; + + public FakeServiceRunQueryPort? LinkedQueryPort { get; set; } + + public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default) + { + RegisterCalls.Add(record.Clone()); + LinkedQueryPort?.Upsert(BuildSnapshot(record)); + return Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.ScopeId}:{record.ServiceId}:{record.RunId}", record.RunId)); + } + + public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default) + { + StatusCalls.Add((runActorId, runId, status)); + return Task.CompletedTask; + } + + private static ServiceRunSnapshot BuildSnapshot(ServiceRunRecord record) => + new( + record.ScopeId, + record.ServiceId, + record.ServiceKey, + record.RunId, + record.CommandId, + record.CorrelationId, + record.EndpointId, + record.ImplementationKind, + record.TargetActorId, + record.RevisionId, + record.DeploymentId, + record.Status, + $"service-run:{record.ScopeId}:{record.ServiceId}:{record.RunId}", + record.Identity?.TenantId ?? string.Empty, + record.Identity?.AppId ?? string.Empty, + record.Identity?.Namespace ?? string.Empty, + StateVersion: 1, + LastEventId: $"{record.RunId}:registered", + CreatedAt: record.CreatedAt?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow, + UpdatedAt: record.UpdatedAt?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow); + } + + private sealed class FakeServiceRunQueryPort : IServiceRunQueryPort + { + private readonly List _snapshots = []; + + // Bridge to existing FakeWorkflowRunBindingReader fixtures so tests that pre-populate + // workflow run bindings also see the runs through the new IServiceRunQueryPort surface. + public FakeWorkflowRunBindingReader? WorkflowBindingFallback { get; set; } + + // Optional resolver that maps a workflow run binding to (deploymentId, revisionId) so the + // bridged snapshot mirrors what production projector would write from the dispatcher. + public Func? DeploymentResolver { get; set; } + + public IReadOnlyList Snapshots => _snapshots; + + public void Upsert(ServiceRunSnapshot snapshot) + { + _snapshots.RemoveAll(x => + string.Equals(x.ScopeId, snapshot.ScopeId, StringComparison.Ordinal) && + string.Equals(x.ServiceId, snapshot.ServiceId, StringComparison.Ordinal) && + string.Equals(x.RunId, snapshot.RunId, StringComparison.Ordinal)); + _snapshots.Add(snapshot); + } + + public Task> ListAsync(ServiceRunQuery query, CancellationToken ct = default) + { + var bridged = MaterializeForQuery(query.ScopeId, query.ServiceId).ToList(); + IEnumerable results = bridged; + if (!string.IsNullOrWhiteSpace(query.ScopeId)) + results = results.Where(s => string.Equals(s.ScopeId, query.ScopeId, StringComparison.Ordinal)); + if (!string.IsNullOrWhiteSpace(query.ServiceId)) + results = results.Where(s => string.Equals(s.ServiceId, query.ServiceId, StringComparison.Ordinal)); + return Task.FromResult>( + results.OrderByDescending(s => s.UpdatedAt).Take(query.Take).ToList()); + } + + public Task GetByRunIdAsync(string scopeId, string serviceId, string runId, CancellationToken ct = default) => + Task.FromResult(MaterializeForQuery(scopeId, serviceId).FirstOrDefault(s => + string.Equals(s.ScopeId, scopeId, StringComparison.Ordinal) && + string.Equals(s.ServiceId, serviceId, StringComparison.Ordinal) && + string.Equals(s.RunId, runId, StringComparison.Ordinal))); + + public Task GetByCommandIdAsync(string scopeId, string serviceId, string commandId, CancellationToken ct = default) => + Task.FromResult(MaterializeForQuery(scopeId, serviceId).FirstOrDefault(s => + string.Equals(s.ScopeId, scopeId, StringComparison.Ordinal) && + string.Equals(s.ServiceId, serviceId, StringComparison.Ordinal) && + string.Equals(s.CommandId, commandId, StringComparison.Ordinal))); + + // Materializes snapshots, treating any workflow binding fixtures as belonging to the queried service + // (workflow bindings predate the service-run registry and don't carry serviceId in the test fixtures). + private IEnumerable MaterializeForQuery(string scopeId, string serviceId) + { + foreach (var snapshot in _snapshots) + yield return snapshot; + if (WorkflowBindingFallback != null) + { + foreach (var binding in WorkflowBindingFallback.AllBindings()) + { + if (_snapshots.Any(s => string.Equals(s.RunId, binding.RunId, StringComparison.Ordinal) && + string.Equals(s.ServiceId, serviceId, StringComparison.Ordinal))) + { + continue; + } + var (deploymentId, revisionId) = DeploymentResolver?.Invoke(binding) ?? (string.Empty, string.Empty); + yield return BuildSnapshotFromBinding(binding, scopeId, serviceId, deploymentId, revisionId); + } + } + } + + private static ServiceRunSnapshot BuildSnapshotFromBinding( + WorkflowActorBinding binding, + string scopeId, + string serviceId, + string deploymentId, + string revisionId) => + new( + ScopeId: string.IsNullOrWhiteSpace(scopeId) ? binding.ScopeId ?? string.Empty : scopeId, + ServiceId: serviceId ?? string.Empty, + ServiceKey: string.Empty, + RunId: binding.RunId, + CommandId: binding.RunId, + CorrelationId: binding.RunId, + EndpointId: string.Empty, + ImplementationKind: ServiceImplementationKind.Workflow, + TargetActorId: binding.ActorId, + RevisionId: revisionId, + DeploymentId: deploymentId, + Status: ServiceRunStatus.Accepted, + ActorId: binding.ActorId, + TenantId: binding.ScopeId ?? string.Empty, + AppId: string.Empty, + Namespace: string.Empty, + StateVersion: binding.SourceVersion, + LastEventId: binding.SourceEventId ?? string.Empty, + CreatedAt: binding.CreatedAt ?? DateTimeOffset.UtcNow, + UpdatedAt: binding.UpdatedAt ?? DateTimeOffset.UtcNow); + } + private sealed class RecordingServiceInvocationPort : IServiceInvocationPort { public ServiceInvocationRequest? LastRequest { get; private set; } @@ -4703,6 +4875,9 @@ private sealed class FakeWorkflowRunBindingReader : IWorkflowRunBindingReader public List Queries { get; } = []; + public IEnumerable AllBindings() => + BindingsByRunId.Values.SelectMany(x => x); + public Task> ListByRunIdAsync( string runId, int take = 20, @@ -4818,6 +4993,15 @@ public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default) => + Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.RunId}", record.RunId)); + + public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default) => + Task.CompletedTask; + } + private sealed class NoOpScriptRuntimeCommandPort : IScriptRuntimeCommandPort { public Task RunRuntimeAsync(