diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj b/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj index 6bce22805..32973954e 100644 --- a/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj +++ b/src/platform/Aevatar.GAgentService.Abstractions/Aevatar.GAgentService.Abstractions.csproj @@ -26,5 +26,6 @@ + diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunCurrentStateProjectionPort.cs b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunCurrentStateProjectionPort.cs new file mode 100644 index 000000000..d0f0fe77c --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunCurrentStateProjectionPort.cs @@ -0,0 +1,10 @@ +namespace Aevatar.GAgentService.Abstractions.Ports; + +/// +/// Activation port for the durable service-run current-state projection. +/// Mirrors shape but scoped to service-run actors. +/// +public interface IServiceRunCurrentStateProjectionPort +{ + Task EnsureProjectionAsync(string actorId, CancellationToken ct = default); +} diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunQueryPort.cs b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunQueryPort.cs new file mode 100644 index 000000000..fa0122a14 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunQueryPort.cs @@ -0,0 +1,26 @@ +using Aevatar.GAgentService.Abstractions.Queries; + +namespace Aevatar.GAgentService.Abstractions.Ports; + +/// +/// Read contract for the implementation-agnostic service-run registry. +/// Backed by the durable ServiceRunGAgent projection. +/// +public interface IServiceRunQueryPort +{ + Task> ListAsync( + ServiceRunQuery query, + CancellationToken ct = default); + + Task GetByRunIdAsync( + string scopeId, + string serviceId, + string runId, + CancellationToken ct = default); + + Task GetByCommandIdAsync( + string scopeId, + string serviceId, + string commandId, + CancellationToken ct = default); +} diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunRegistrationPort.cs b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunRegistrationPort.cs new file mode 100644 index 000000000..aded0ff6b --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/Ports/IServiceRunRegistrationPort.cs @@ -0,0 +1,23 @@ +namespace Aevatar.GAgentService.Abstractions.Ports; + +/// +/// Write contract for the implementation-agnostic service-run registry. +/// Used by the invocation dispatcher to register a run before returning the accepted receipt, +/// so Studio Observe can query the run from the durable readmodel even on immediate refresh. +/// +public interface IServiceRunRegistrationPort +{ + Task RegisterAsync( + ServiceRunRecord record, + CancellationToken ct = default); + + Task UpdateStatusAsync( + string runActorId, + string runId, + ServiceRunStatus status, + CancellationToken ct = default); +} + +public sealed record ServiceRunRegistrationResult( + string RunActorId, + string RunId); diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Protos/service_runs.proto b/src/platform/Aevatar.GAgentService.Abstractions/Protos/service_runs.proto new file mode 100644 index 000000000..db770367d --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/Protos/service_runs.proto @@ -0,0 +1,60 @@ +syntax = "proto3"; + +package aevatar.gagentservice; + +option csharp_namespace = "Aevatar.GAgentService.Abstractions"; + +import "google/protobuf/timestamp.proto"; +import "service_endpoint.proto"; +import "service_revision.proto"; + +enum ServiceRunStatus { + SERVICE_RUN_STATUS_UNSPECIFIED = 0; + SERVICE_RUN_STATUS_ACCEPTED = 1; + SERVICE_RUN_STATUS_COMPLETED = 2; + SERVICE_RUN_STATUS_FAILED = 3; + SERVICE_RUN_STATUS_STOPPED = 4; +} + +message ServiceRunRecord { + string scope_id = 1; + string service_id = 2; + string service_key = 3; + string run_id = 4; + string command_id = 5; + string correlation_id = 6; + string endpoint_id = 7; + ServiceImplementationKind implementation_kind = 8; + string target_actor_id = 9; + string revision_id = 10; + string deployment_id = 11; + ServiceRunStatus status = 12; + google.protobuf.Timestamp created_at = 13; + google.protobuf.Timestamp updated_at = 14; + ServiceIdentity identity = 15; +} + +message ServiceRunState { + ServiceRunRecord record = 1; + int64 last_applied_event_version = 2; + string last_event_id = 3; +} + +message RegisterServiceRunRequested { + ServiceRunRecord record = 1; +} + +message UpdateServiceRunStatusRequested { + string run_id = 1; + ServiceRunStatus status = 2; +} + +message ServiceRunRegisteredEvent { + ServiceRunRecord record = 1; +} + +message ServiceRunStatusUpdatedEvent { + string run_id = 1; + ServiceRunStatus status = 2; + google.protobuf.Timestamp updated_at = 3; +} diff --git a/src/platform/Aevatar.GAgentService.Abstractions/Queries/ServiceRunSnapshot.cs b/src/platform/Aevatar.GAgentService.Abstractions/Queries/ServiceRunSnapshot.cs new file mode 100644 index 000000000..7a66f315b --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/Queries/ServiceRunSnapshot.cs @@ -0,0 +1,28 @@ +namespace Aevatar.GAgentService.Abstractions.Queries; + +public sealed record ServiceRunSnapshot( + string ScopeId, + string ServiceId, + string ServiceKey, + string RunId, + string CommandId, + string CorrelationId, + string EndpointId, + ServiceImplementationKind ImplementationKind, + string TargetActorId, + string RevisionId, + string DeploymentId, + ServiceRunStatus Status, + string ActorId, + string TenantId, + string AppId, + string Namespace, + long StateVersion, + string LastEventId, + DateTimeOffset CreatedAt, + DateTimeOffset UpdatedAt); + +public sealed record ServiceRunQuery( + string ScopeId, + string ServiceId, + int Take = 50); diff --git a/src/platform/Aevatar.GAgentService.Abstractions/ServiceRunIds.cs b/src/platform/Aevatar.GAgentService.Abstractions/ServiceRunIds.cs new file mode 100644 index 000000000..6e4a1070d --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/ServiceRunIds.cs @@ -0,0 +1,23 @@ +namespace Aevatar.GAgentService.Abstractions; + +public static class ServiceRunIds +{ + public const string ActorPrefix = "service-run:"; + + public static string BuildKey(string scopeId, string serviceId, string runId) + { + if (string.IsNullOrWhiteSpace(scopeId)) + throw new ArgumentException("scopeId is required.", nameof(scopeId)); + if (string.IsNullOrWhiteSpace(serviceId)) + throw new ArgumentException("serviceId is required.", nameof(serviceId)); + if (string.IsNullOrWhiteSpace(runId)) + throw new ArgumentException("runId is required.", nameof(runId)); + + return $"{Normalize(scopeId)}:{Normalize(serviceId)}:{Normalize(runId)}"; + } + + public static string BuildActorId(string scopeId, string serviceId, string runId) => + ActorPrefix + BuildKey(scopeId, serviceId, runId); + + private static string Normalize(string value) => value.Trim(); +} diff --git a/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs b/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs new file mode 100644 index 000000000..75dc14595 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs @@ -0,0 +1,142 @@ +using Aevatar.Foundation.Abstractions.Attributes; +using Aevatar.Foundation.Core; +using Aevatar.Foundation.Core.EventSourcing; +using Aevatar.GAgentService.Abstractions; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.GAgentService.Core.GAgents; + +public sealed class ServiceRunGAgent : GAgentBase +{ + public ServiceRunGAgent() + { + InitializeId(); + } + + [EventHandler] + public async Task HandleRegisterAsync(RegisterServiceRunRequested command) + { + ArgumentNullException.ThrowIfNull(command); + ArgumentNullException.ThrowIfNull(command.Record); + ValidateRecord(command.Record); + + var existing = State.Record; + if (existing != null && !string.IsNullOrWhiteSpace(existing.RunId)) + { + EnsureExistingMatches(existing, command.Record); + return; + } + + var record = command.Record.Clone(); + if (record.CreatedAt == null) + record.CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow); + record.UpdatedAt = record.CreatedAt; + if (record.Status == ServiceRunStatus.Unspecified) + record.Status = ServiceRunStatus.Accepted; + + await PersistDomainEventAsync(new ServiceRunRegisteredEvent + { + Record = record, + }); + } + + [EventHandler] + public async Task HandleUpdateStatusAsync(UpdateServiceRunStatusRequested command) + { + ArgumentNullException.ThrowIfNull(command); + var existing = State.Record; + if (existing == null || string.IsNullOrWhiteSpace(existing.RunId)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' has no registered run; status update rejected."); + } + + if (!string.IsNullOrWhiteSpace(command.RunId) && + !string.Equals(existing.RunId, command.RunId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' is bound to run '{existing.RunId}' and cannot update run '{command.RunId}'."); + } + + if (command.Status == ServiceRunStatus.Unspecified) + return; + + if (existing.Status == command.Status) + return; + + await PersistDomainEventAsync(new ServiceRunStatusUpdatedEvent + { + RunId = existing.RunId, + Status = command.Status, + UpdatedAt = Timestamp.FromDateTime(DateTime.UtcNow), + }); + } + + protected override ServiceRunState TransitionState(ServiceRunState current, IMessage evt) => + StateTransitionMatcher + .Match(current, evt) + .On(ApplyRegistered) + .On(ApplyStatusUpdated) + .OrCurrent(); + + private static ServiceRunState ApplyRegistered(ServiceRunState state, ServiceRunRegisteredEvent evt) + { + var next = state.Clone(); + next.Record = evt.Record?.Clone() ?? new ServiceRunRecord(); + next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1; + next.LastEventId = $"{next.Record.RunId}:registered"; + return next; + } + + private static ServiceRunState ApplyStatusUpdated(ServiceRunState state, ServiceRunStatusUpdatedEvent evt) + { + var next = state.Clone(); + if (next.Record == null) + next.Record = new ServiceRunRecord(); + next.Record.Status = evt.Status; + next.Record.UpdatedAt = evt.UpdatedAt ?? Timestamp.FromDateTime(DateTime.UtcNow); + next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1; + next.LastEventId = $"{next.Record.RunId}:status:{(int)evt.Status}"; + return next; + } + + private void EnsureExistingMatches(ServiceRunRecord existing, ServiceRunRecord incoming) + { + if (!string.Equals(existing.RunId, incoming.RunId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' is bound to run '{existing.RunId}' and cannot register run '{incoming.RunId}'."); + } + if (!string.Equals(existing.ScopeId, incoming.ScopeId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' is bound to scope '{existing.ScopeId}' and cannot re-register under scope '{incoming.ScopeId}'."); + } + if (!string.Equals(existing.ServiceId, incoming.ServiceId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' is bound to service '{existing.ServiceId}' and cannot re-register under service '{incoming.ServiceId}'."); + } + if (!string.IsNullOrWhiteSpace(incoming.TargetActorId) && + !string.IsNullOrWhiteSpace(existing.TargetActorId) && + !string.Equals(existing.TargetActorId, incoming.TargetActorId, StringComparison.Ordinal)) + { + throw new InvalidOperationException( + $"Service run actor '{Id}' is bound to target '{existing.TargetActorId}' and cannot re-register against target '{incoming.TargetActorId}'."); + } + } + + private static void ValidateRecord(ServiceRunRecord record) + { + ArgumentNullException.ThrowIfNull(record); + if (string.IsNullOrWhiteSpace(record.RunId)) + throw new InvalidOperationException("run_id is required."); + if (string.IsNullOrWhiteSpace(record.ScopeId)) + throw new InvalidOperationException("scope_id is required."); + if (string.IsNullOrWhiteSpace(record.ServiceId)) + throw new InvalidOperationException("service_id is required."); + if (string.IsNullOrWhiteSpace(record.CommandId)) + throw new InvalidOperationException("command_id is required."); + } +} diff --git a/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs b/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs index 73d1de118..c8d4d1515 100644 --- a/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs @@ -58,6 +58,7 @@ public static IServiceCollection AddGAgentServiceCapability( services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddEnumerable(ServiceDescriptor.Singleton()); services.TryAddEnumerable(ServiceDescriptor.Singleton()); @@ -118,6 +119,7 @@ public static IServiceCollection AddGAgentServiceProjectionReadModelProviders( TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); + TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); } else @@ -129,6 +131,7 @@ public static IServiceCollection AddGAgentServiceProjectionReadModelProviders( TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); + TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); } @@ -146,6 +149,7 @@ private static bool HasAllGAgentServiceProjectionReaders( && HasProjectionDocumentReaderForProvider(services, providerKind) && HasProjectionDocumentReaderForProvider(services, providerKind) && HasProjectionDocumentReaderForProvider(services, providerKind) + && HasProjectionDocumentReaderForProvider(services, providerKind) && HasProjectionDocumentReaderForProvider(services, providerKind); } diff --git a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs index f9c260126..a83087fb4 100644 --- a/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs +++ b/src/platform/Aevatar.GAgentService.Hosting/Endpoints/ScopeServiceEndpoints.cs @@ -22,6 +22,7 @@ using Aevatar.Scripting.Abstractions.Queries; using Aevatar.Scripting.Core.Ports; using Aevatar.Workflow.Application.Abstractions.Queries; +using Aevatar.GAgentService.Abstractions.Queries; using Aevatar.GAgentService.Hosting.Serialization; using Aevatar.Presentation.AGUI; using Aevatar.Workflow.Application.Abstractions.Runs; @@ -647,6 +648,7 @@ private static async Task HandleInvokeDefaultChatStreamAsync( StreamScopeServiceHttpRequest request, [FromServices] ServiceInvocationResolutionService resolutionService, [FromServices] IInvokeAdmissionAuthorizer admissionAuthorizer, + [FromServices] IServiceRunRegistrationPort serviceRunRegistrationPort, [FromServices] ICommandInteractionService chatRunService, [FromServices] ICommandInteractionService gagentDraftRunService, [FromServices] IScriptRuntimeCommandPort scriptRuntimeCommandPort, @@ -671,6 +673,7 @@ await HandleInvokeStreamAsync( appId: null, resolutionService, admissionAuthorizer, + serviceRunRegistrationPort, chatRunService, gagentDraftRunService, scriptRuntimeCommandPort, @@ -769,6 +772,7 @@ private static async Task HandleInvokeMemberStreamAsync( [FromServices] IMemberPublishedServiceResolver memberPublishedServiceResolver, [FromServices] ServiceInvocationResolutionService resolutionService, [FromServices] IInvokeAdmissionAuthorizer admissionAuthorizer, + [FromServices] IServiceRunRegistrationPort serviceRunRegistrationPort, [FromServices] ICommandInteractionService chatRunService, [FromServices] ICommandInteractionService gagentDraftRunService, [FromServices] IScriptRuntimeCommandPort scriptRuntimeCommandPort, @@ -796,6 +800,7 @@ await HandleInvokeStreamAsync( null, resolutionService, admissionAuthorizer, + serviceRunRegistrationPort, chatRunService, gagentDraftRunService, scriptRuntimeCommandPort, @@ -863,7 +868,7 @@ private static Task HandleListDefaultRunsAsync( string scopeId, int take, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) => @@ -873,7 +878,7 @@ private static Task HandleListDefaultRunsAsync( ResolveDefaultScopeServiceId(options.Value), take, lifecycleQueryPort, - workflowRunBindingReader, + serviceRunQueryPort, workflowExecutionQueryService, options, ct); @@ -884,7 +889,7 @@ private static Task HandleGetDefaultRunAsync( string runId, string? actorId, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) => @@ -895,7 +900,7 @@ private static Task HandleGetDefaultRunAsync( runId, actorId, lifecycleQueryPort, - workflowRunBindingReader, + serviceRunQueryPort, workflowExecutionQueryService, options, ct); @@ -906,7 +911,7 @@ private static Task HandleGetDefaultRunAuditAsync( string runId, string? actorId, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) => @@ -917,7 +922,7 @@ private static Task HandleGetDefaultRunAuditAsync( runId, actorId, lifecycleQueryPort, - workflowRunBindingReader, + serviceRunQueryPort, workflowExecutionQueryService, options, ct); @@ -1334,7 +1339,7 @@ private static async Task HandleListRunsAsync( string serviceId, int take, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) @@ -1343,23 +1348,17 @@ private static async Task HandleListRunsAsync( if (resolution.Failure != null) return resolution.Failure; - var bindings = await ListScopeServiceRunsAsync( - scopeId, - resolution.Service!, - resolution.Deployments, - workflowRunBindingReader, - take, + var snapshots = await serviceRunQueryPort.ListAsync( + new ServiceRunQuery(scopeId, serviceId, Math.Clamp(take <= 0 ? 50 : take, 1, 200)), ct); - var summaries = new List(bindings.Count); - foreach (var binding in bindings) + var summaries = new List(snapshots.Count); + foreach (var snapshot in snapshots) { - summaries.Add(await BuildScopeRunSummaryAsync( + summaries.Add(await BuildScopeRunSummaryFromRegistryAsync( scopeId, serviceId, - binding, - resolution.Service!, - resolution.Deployments, + snapshot, workflowExecutionQueryService, ct)); } @@ -1379,30 +1378,29 @@ private static async Task HandleGetRunAsync( string runId, string? actorId, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) { - var resolution = await ResolveScopeServiceRunAsync( - http, - options.Value, - scopeId, - serviceId, - runId, - actorId, - lifecycleQueryPort, - workflowRunBindingReader, - ct); - if (resolution.Failure != null) - return resolution.Failure; + var serviceResolution = await ResolveScopeServiceAsync(http, scopeId, serviceId, lifecycleQueryPort, options.Value, ct); + if (serviceResolution.Failure != null) + return serviceResolution.Failure; - return Results.Ok(await BuildScopeRunSummaryAsync( + var snapshot = await ResolveServiceRunSnapshotAsync(scopeId, serviceId, runId, serviceRunQueryPort, ct); + if (snapshot == null) + { + return Results.NotFound(new + { + code = "SERVICE_RUN_NOT_FOUND", + message = BuildScopeServiceRunNotFoundMessage(scopeId, serviceId, runId?.Trim() ?? string.Empty), + }); + } + + return Results.Ok(await BuildScopeRunSummaryFromRegistryAsync( scopeId, serviceId, - resolution.Binding!, - resolution.Service!, - resolution.Deployments, + snapshot, workflowExecutionQueryService, ct)); } @@ -1414,45 +1412,110 @@ private static async Task HandleGetRunAuditAsync( string runId, string? actorId, [FromServices] IServiceLifecycleQueryPort lifecycleQueryPort, - [FromServices] IWorkflowRunBindingReader workflowRunBindingReader, + [FromServices] IServiceRunQueryPort serviceRunQueryPort, [FromServices] IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, [FromServices] IOptions options, CancellationToken ct) { - var resolution = await ResolveScopeServiceRunAsync( - http, - options.Value, - scopeId, - serviceId, - runId, - actorId, - lifecycleQueryPort, - workflowRunBindingReader, - ct); - if (resolution.Failure != null) - return resolution.Failure; + var serviceResolution = await ResolveScopeServiceAsync(http, scopeId, serviceId, lifecycleQueryPort, options.Value, ct); + if (serviceResolution.Failure != null) + return serviceResolution.Failure; + + var snapshot = await ResolveServiceRunSnapshotAsync(scopeId, serviceId, runId, serviceRunQueryPort, ct); + if (snapshot == null) + { + return Results.NotFound(new + { + code = "SERVICE_RUN_NOT_FOUND", + message = BuildScopeServiceRunNotFoundMessage(scopeId, serviceId, runId?.Trim() ?? string.Empty), + }); + } - var summary = await BuildScopeRunSummaryAsync( + var summary = await BuildScopeRunSummaryFromRegistryAsync( scopeId, serviceId, - resolution.Binding!, - resolution.Service!, - resolution.Deployments, + snapshot, workflowExecutionQueryService, ct); - var report = await workflowExecutionQueryService.GetActorReportAsync(resolution.Binding!.ActorId, ct); + + if (snapshot.ImplementationKind != ServiceImplementationKind.Workflow || + string.IsNullOrWhiteSpace(snapshot.TargetActorId)) + { + return Results.NotFound(new + { + code = "SERVICE_RUN_AUDIT_NOT_AVAILABLE", + message = $"Audit detail for run '{snapshot.RunId}' is not available for {snapshot.ImplementationKind} services.", + }); + } + + var report = await workflowExecutionQueryService.GetActorReportAsync(snapshot.TargetActorId, ct); if (report == null) { return Results.NotFound(new { code = "SERVICE_RUN_AUDIT_NOT_FOUND", - message = $"Audit report for run '{resolution.Binding.RunId}' was not found on service '{serviceId}' in scope '{scopeId}'.", + message = $"Audit report for run '{snapshot.RunId}' was not found on service '{serviceId}' in scope '{scopeId}'.", }); } return Results.Ok(new ScopeServiceRunAuditHttpResponse(summary, report)); } + // Registers a stream-invocation run with the durable service-run registry using the + // actual run id that the implementation pipeline produced (workflow run actor id / + // draft-run command id / scripting-generated run id). Called once the downstream + // run id is known so /runs/{runId} resolves the same id the client receives via SSE. + private static ValueTask RegisterStreamServiceRunAsync( + IServiceRunRegistrationPort serviceRunRegistrationPort, + ServiceInvocationResolvedTarget target, + ServiceInvocationRequest invocationRequest, + string scopeId, + string serviceId, + string runId, + string commandId, + string correlationId, + string targetActorId, + CancellationToken ct) + { + var record = new ServiceRunRecord + { + ScopeId = scopeId, + ServiceId = serviceId, + ServiceKey = target.Service.ServiceKey ?? string.Empty, + RunId = runId, + CommandId = string.IsNullOrWhiteSpace(commandId) ? runId : commandId, + CorrelationId = string.IsNullOrWhiteSpace(correlationId) ? runId : correlationId, + EndpointId = target.Endpoint.EndpointId ?? string.Empty, + ImplementationKind = target.Artifact.ImplementationKind, + TargetActorId = string.IsNullOrWhiteSpace(targetActorId) + ? target.Service.PrimaryActorId ?? string.Empty + : targetActorId, + RevisionId = target.Service.RevisionId ?? string.Empty, + DeploymentId = target.Service.DeploymentId ?? string.Empty, + Status = ServiceRunStatus.Accepted, + Identity = invocationRequest.Identity?.Clone(), + }; + return new ValueTask(serviceRunRegistrationPort.RegisterAsync(record, ct)); + } + + private static async Task ResolveServiceRunSnapshotAsync( + string scopeId, + string serviceId, + string runId, + IServiceRunQueryPort serviceRunQueryPort, + CancellationToken ct) + { + var normalized = runId?.Trim() ?? string.Empty; + if (string.IsNullOrWhiteSpace(normalized)) + return null; + + var byRun = await serviceRunQueryPort.GetByRunIdAsync(scopeId, serviceId, normalized, ct); + if (byRun != null) + return byRun; + + return await serviceRunQueryPort.GetByCommandIdAsync(scopeId, serviceId, normalized, ct); + } + private static async Task HandleInvokeStreamAsync( HttpContext http, string scopeId, @@ -1462,6 +1525,7 @@ private static async Task HandleInvokeStreamAsync( string? appId, [FromServices] ServiceInvocationResolutionService resolutionService, [FromServices] IInvokeAdmissionAuthorizer admissionAuthorizer, + [FromServices] IServiceRunRegistrationPort serviceRunRegistrationPort, [FromServices] ICommandInteractionService chatRunService, [FromServices] ICommandInteractionService gagentDraftRunService, [FromServices] IScriptRuntimeCommandPort scriptRuntimeCommandPort, @@ -1493,7 +1557,6 @@ await admissionAuthorizer.AuthorizeAsync( target.Endpoint, invocationRequest, ct); - switch (target.Artifact.ImplementationKind) { case ServiceImplementationKind.Workflow: @@ -1510,7 +1573,20 @@ await WorkflowCapabilityEndpoints.HandleChat( Metadata = scopedHeaders, }, chatRunService, - ct); + ct, + onAcceptedHook: (receipt, token) => RegisterStreamServiceRunAsync( + serviceRunRegistrationPort, + target, + invocationRequest, + scopeId, + serviceId, + // For workflow, the SSE RunStarted carries the workflow run actor id as the run identifier; + // use the same id so /runs/{runId} resolves to this run after refresh. + runId: receipt.ActorId, + commandId: receipt.CommandId, + correlationId: receipt.CorrelationId, + targetActorId: receipt.ActorId, + token)); break; case ServiceImplementationKind.Static: @@ -1521,9 +1597,12 @@ await HandleStaticGAgentChatStreamAsync( request.ActorId, request.SessionId, scopeId, + serviceId, scopedHeaders, request.InputParts, gagentDraftRunService, + invocationRequest, + serviceRunRegistrationPort, ct); break; @@ -1534,9 +1613,12 @@ await HandleScriptingServiceChatStreamAsync( normalizedPrompt, request.SessionId, scopeId, + serviceId, scopedHeaders, scriptRuntimeCommandPort, scriptExecutionProjectionPort, + invocationRequest, + serviceRunRegistrationPort, ct); break; @@ -1572,9 +1654,12 @@ private static async Task HandleStaticGAgentChatStreamAsync( string? actorId, string? sessionId, string scopeId, + string serviceId, IReadOnlyDictionary? headers, IReadOnlyList? inputParts, ICommandInteractionService interactionService, + ServiceInvocationRequest invocationRequest, + IServiceRunRegistrationPort serviceRunRegistrationPort, CancellationToken ct) { var plan = target.Artifact.DeploymentPlan.StaticPlan; @@ -1607,6 +1692,19 @@ async ValueTask EmitAsync(AGUIEvent aguiEvent, CancellationToken token) async ValueTask OnAcceptedAsync(GAgentDraftRunAcceptedReceipt receipt, CancellationToken token) { http.Response.Headers["X-Correlation-Id"] = receipt.CorrelationId; + // Register the service run with the same id we are about to send to the client + // so /runs/{runId} resolves immediately on refresh. + await RegisterStreamServiceRunAsync( + serviceRunRegistrationPort, + target, + invocationRequest, + scopeId, + serviceId, + runId: receipt.CommandId, + commandId: receipt.CommandId, + correlationId: receipt.CorrelationId, + targetActorId: receipt.ActorId, + token); await EnsureSseStartedAsync(token); await writer.WriteAsync( new AGUIEvent @@ -1694,9 +1792,12 @@ private static async Task HandleScriptingServiceChatStreamAsync( string prompt, string? sessionId, string scopeId, + string serviceId, IReadOnlyDictionary? headers, IScriptRuntimeCommandPort scriptRuntimeCommandPort, IScriptExecutionProjectionPort scriptExecutionProjectionPort, + ServiceInvocationRequest invocationRequest, + IServiceRunRegistrationPort serviceRunRegistrationPort, CancellationToken ct) { var actorId = target.Service.PrimaryActorId; @@ -1705,6 +1806,18 @@ private static async Task HandleScriptingServiceChatStreamAsync( "Script runtime actor is not available. The service may not be activated."); var runId = Guid.NewGuid().ToString("N"); + // Register the service run with the same id the SSE RunStarted frame will carry. + await RegisterStreamServiceRunAsync( + serviceRunRegistrationPort, + target, + invocationRequest, + scopeId, + serviceId, + runId: runId, + commandId: runId, + correlationId: runId, + targetActorId: actorId, + ct); var chatRequest = new ChatRequestEvent { Prompt = prompt, @@ -2726,7 +2839,58 @@ private static async Task BuildScopeRunSumma snapshot?.CompletedSteps ?? 0, snapshot?.RoleReplyCount ?? 0, snapshot?.LastOutput ?? string.Empty, - snapshot?.LastError ?? string.Empty); + snapshot?.LastError ?? string.Empty, + ServiceImplementationKind.Workflow.ToString(), + ServiceRunStatus.Accepted.ToString(), + string.Empty, + string.Empty, + string.Empty, + binding.ActorId, + binding.CreatedAt); + } + + private static async Task BuildScopeRunSummaryFromRegistryAsync( + string scopeId, + string serviceId, + ServiceRunSnapshot snapshot, + IWorkflowExecutionQueryApplicationService workflowExecutionQueryService, + CancellationToken ct) + { + var workflowSnapshot = snapshot.ImplementationKind == ServiceImplementationKind.Workflow && + !string.IsNullOrWhiteSpace(snapshot.TargetActorId) + ? await workflowExecutionQueryService.GetActorSnapshotAsync(snapshot.TargetActorId, ct) + : null; + + return new ScopeServiceRunSummaryHttpResponse( + scopeId, + serviceId, + snapshot.RunId, + // ActorId stays the controllable target so existing resume/signal/stop + // round-trips keep working; the registry actor is internal infra. + snapshot.TargetActorId, + string.Empty, + snapshot.RevisionId, + snapshot.DeploymentId, + workflowSnapshot?.WorkflowName ?? string.Empty, + workflowSnapshot?.CompletionStatus ?? WorkflowRunCompletionStatus.Unknown, + workflowSnapshot?.StateVersion ?? snapshot.StateVersion, + workflowSnapshot?.LastEventId ?? snapshot.LastEventId, + workflowSnapshot?.LastUpdatedAt ?? snapshot.UpdatedAt, + snapshot.CreatedAt, + snapshot.UpdatedAt, + workflowSnapshot?.LastSuccess, + workflowSnapshot?.TotalSteps ?? 0, + workflowSnapshot?.CompletedSteps ?? 0, + workflowSnapshot?.RoleReplyCount ?? 0, + workflowSnapshot?.LastOutput ?? string.Empty, + workflowSnapshot?.LastError ?? string.Empty, + snapshot.ImplementationKind.ToString(), + snapshot.Status.ToString(), + snapshot.CommandId, + snapshot.CorrelationId, + snapshot.EndpointId, + snapshot.TargetActorId, + snapshot.CreatedAt); } private static MemberScopeServiceRunSummaryHttpResponse BuildMemberRunSummaryResponse( @@ -3489,7 +3653,14 @@ public sealed record ScopeServiceRunSummaryHttpResponse( int CompletedSteps, int RoleReplyCount, string LastOutput, - string LastError); + string LastError, + string ImplementationKind, + string Status, + string CommandId, + string CorrelationId, + string EndpointId, + string TargetActorId, + DateTimeOffset? CreatedAt = null); public sealed record MemberScopeServiceRunSummaryHttpResponse( string ScopeId, diff --git a/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs b/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs new file mode 100644 index 000000000..a93cffc76 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Infrastructure/Adapters/ServiceRunRegistrationAdapter.cs @@ -0,0 +1,104 @@ +using Aevatar.Foundation.Abstractions; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Abstractions.Ports; +using Aevatar.GAgentService.Core.GAgents; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.GAgentService.Infrastructure.Adapters; + +/// +/// Infrastructure adapter that registers and updates service runs by dispatching +/// commands to actors. The actor commits the events +/// and the current-state projection materializes them into the durable readmodel. +/// +public sealed class ServiceRunRegistrationAdapter : IServiceRunRegistrationPort +{ + private const string PublisherId = "gagent-service.runs"; + + private readonly IActorRuntime _runtime; + private readonly IActorDispatchPort _dispatchPort; + private readonly IServiceRunCurrentStateProjectionPort _projectionPort; + + public ServiceRunRegistrationAdapter( + IActorRuntime runtime, + IActorDispatchPort dispatchPort, + IServiceRunCurrentStateProjectionPort projectionPort) + { + _runtime = runtime ?? throw new ArgumentNullException(nameof(runtime)); + _dispatchPort = dispatchPort ?? throw new ArgumentNullException(nameof(dispatchPort)); + _projectionPort = projectionPort ?? throw new ArgumentNullException(nameof(projectionPort)); + } + + public async Task RegisterAsync( + ServiceRunRecord record, + CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(record); + if (string.IsNullOrWhiteSpace(record.RunId)) + throw new InvalidOperationException("run_id is required."); + if (string.IsNullOrWhiteSpace(record.ScopeId)) + throw new InvalidOperationException("scope_id is required."); + if (string.IsNullOrWhiteSpace(record.ServiceId)) + throw new InvalidOperationException("service_id is required."); + + var actorId = ServiceRunIds.BuildActorId(record.ScopeId, record.ServiceId, record.RunId); + var actor = await _runtime.CreateAsync(actorId, ct: ct); + await _projectionPort.EnsureProjectionAsync(actor.Id, ct); + + var prepared = record.Clone(); + if (prepared.CreatedAt == null) + prepared.CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow); + prepared.UpdatedAt = prepared.CreatedAt; + if (prepared.Status == ServiceRunStatus.Unspecified) + prepared.Status = ServiceRunStatus.Accepted; + + var envelope = CreateEnvelope(actor.Id, Any.Pack(new RegisterServiceRunRequested + { + Record = prepared, + }), prepared.CommandId, prepared.CorrelationId); + + await _dispatchPort.DispatchAsync(actor.Id, envelope, ct); + return new ServiceRunRegistrationResult(actor.Id, prepared.RunId); + } + + public async Task UpdateStatusAsync( + string runActorId, + string runId, + ServiceRunStatus status, + CancellationToken ct = default) + { + if (string.IsNullOrWhiteSpace(runActorId)) + throw new ArgumentException("runActorId is required.", nameof(runActorId)); + if (status == ServiceRunStatus.Unspecified) + return; + + var commandId = Guid.NewGuid().ToString("N"); + var envelope = CreateEnvelope( + runActorId, + Any.Pack(new UpdateServiceRunStatusRequested + { + RunId = runId ?? string.Empty, + Status = status, + }), + commandId, + commandId); + await _dispatchPort.DispatchAsync(runActorId, envelope, ct); + } + + private static EventEnvelope CreateEnvelope( + string actorId, + Any payload, + string commandId, + string correlationId) => + new() + { + Id = string.IsNullOrWhiteSpace(commandId) ? Guid.NewGuid().ToString("N") : commandId, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + Payload = payload, + Route = EnvelopeRouteSemantics.CreateDirect(PublisherId, actorId), + Propagation = new EnvelopePropagation + { + CorrelationId = string.IsNullOrWhiteSpace(correlationId) ? commandId : correlationId, + }, + }; +} diff --git a/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs b/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs index 06f20d4c6..ee5a8bff9 100644 --- a/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs +++ b/src/platform/Aevatar.GAgentService.Infrastructure/Dispatch/DefaultServiceInvocationDispatcher.cs @@ -14,15 +14,18 @@ public sealed class DefaultServiceInvocationDispatcher : IServiceInvocationDispa private readonly IActorDispatchPort _dispatchPort; private readonly IScriptRuntimeCommandPort _scriptRuntimeCommandPort; private readonly IWorkflowRunActorPort _workflowRunActorPort; + private readonly IServiceRunRegistrationPort _serviceRunRegistrationPort; public DefaultServiceInvocationDispatcher( IActorDispatchPort dispatchPort, IScriptRuntimeCommandPort scriptRuntimeCommandPort, - IWorkflowRunActorPort workflowRunActorPort) + IWorkflowRunActorPort workflowRunActorPort, + IServiceRunRegistrationPort serviceRunRegistrationPort) { _dispatchPort = dispatchPort ?? throw new ArgumentNullException(nameof(dispatchPort)); _scriptRuntimeCommandPort = scriptRuntimeCommandPort ?? throw new ArgumentNullException(nameof(scriptRuntimeCommandPort)); _workflowRunActorPort = workflowRunActorPort ?? throw new ArgumentNullException(nameof(workflowRunActorPort)); + _serviceRunRegistrationPort = serviceRunRegistrationPort ?? throw new ArgumentNullException(nameof(serviceRunRegistrationPort)); } public async Task DispatchAsync( @@ -49,9 +52,12 @@ private async Task DispatchStaticAsync( CancellationToken ct) { var commandId = ResolveCommandId(request); - var envelope = CreateEnvelope(target.Service.PrimaryActorId, request.Payload, commandId, ResolveCorrelationId(request, commandId)); + var correlationId = ResolveCorrelationId(request, commandId); + var runId = ResolveRunId(request, commandId); + await RegisterRunAsync(target, request, runId, commandId, correlationId, target.Service.PrimaryActorId, ServiceImplementationKind.Static, ct); + var envelope = CreateEnvelope(target.Service.PrimaryActorId, request.Payload, commandId, correlationId); await _dispatchPort.DispatchAsync(target.Service.PrimaryActorId, envelope, ct); - return CreateReceipt(target, target.Service.PrimaryActorId, commandId, ResolveCorrelationId(request, commandId)); + return CreateReceipt(target, target.Service.PrimaryActorId, commandId, correlationId); } private async Task DispatchScriptingAsync( @@ -61,6 +67,9 @@ private async Task DispatchScriptingAsync( { var plan = target.Artifact.DeploymentPlan.ScriptingPlan; var commandId = ResolveCommandId(request); + var correlationId = ResolveCorrelationId(request, commandId); + var runId = ResolveRunId(request, commandId); + await RegisterRunAsync(target, request, runId, commandId, correlationId, target.Service.PrimaryActorId, ServiceImplementationKind.Scripting, ct); await _scriptRuntimeCommandPort.RunRuntimeAsync( target.Service.PrimaryActorId, runId: commandId, @@ -70,7 +79,7 @@ await _scriptRuntimeCommandPort.RunRuntimeAsync( request.Payload?.TypeUrl ?? string.Empty, request.Identity?.TenantId, ct); - return CreateReceipt(target, target.Service.PrimaryActorId, commandId, ResolveCorrelationId(request, commandId)); + return CreateReceipt(target, target.Service.PrimaryActorId, commandId, correlationId); } private async Task DispatchWorkflowAsync( @@ -91,11 +100,42 @@ private async Task DispatchWorkflowAsync( ct); var commandId = ResolveCommandId(request); var correlationId = ResolveCorrelationId(request, commandId); + var runId = ResolveRunId(request, commandId); + await RegisterRunAsync(target, request, runId, commandId, correlationId, run.Actor.Id, ServiceImplementationKind.Workflow, ct); var envelope = CreateEnvelope(run.Actor.Id, Any.Pack(chatRequest), commandId, correlationId); await _dispatchPort.DispatchAsync(run.Actor.Id, envelope, ct); return CreateReceipt(target, run.Actor.Id, commandId, correlationId); } + private async Task RegisterRunAsync( + ServiceInvocationResolvedTarget target, + ServiceInvocationRequest request, + string runId, + string commandId, + string correlationId, + string targetActorId, + ServiceImplementationKind implementationKind, + CancellationToken ct) + { + var record = new ServiceRunRecord + { + ScopeId = request.Identity?.TenantId ?? string.Empty, + ServiceId = request.Identity?.ServiceId ?? string.Empty, + ServiceKey = target.Service.ServiceKey ?? string.Empty, + RunId = runId, + CommandId = commandId, + CorrelationId = correlationId, + EndpointId = target.Endpoint.EndpointId ?? string.Empty, + ImplementationKind = implementationKind, + TargetActorId = targetActorId ?? string.Empty, + RevisionId = target.Service.RevisionId ?? string.Empty, + DeploymentId = target.Service.DeploymentId ?? string.Empty, + Status = ServiceRunStatus.Accepted, + Identity = request.Identity?.Clone(), + }; + await _serviceRunRegistrationPort.RegisterAsync(record, ct); + } + private static void EnsureEndpointPayloadMatch(ServiceEndpointDescriptor endpoint, ServiceInvocationRequest request) { if (request.Payload == null) @@ -155,9 +195,13 @@ private static string ResolveCorrelationId(ServiceInvocationRequest request, str ? commandId : request.CorrelationId; + private static string ResolveRunId(ServiceInvocationRequest request, string commandId) => + string.IsNullOrWhiteSpace(request.CommandId) + ? commandId + : request.CommandId; + private static string ResolveAuthoritativeScopeId(ServiceInvocationRequest request, ChatRequestEvent chatRequest) { - // Path-level scope (Identity.TenantId) is authoritative; payload cannot override it. if (!string.IsNullOrWhiteSpace(request.Identity?.TenantId)) return request.Identity.TenantId.Trim(); return ResolveScopeId(chatRequest); diff --git a/src/platform/Aevatar.GAgentService.Projection/Contexts/ServiceRunCurrentStateProjectionContext.cs b/src/platform/Aevatar.GAgentService.Projection/Contexts/ServiceRunCurrentStateProjectionContext.cs new file mode 100644 index 000000000..c895ede6a --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Contexts/ServiceRunCurrentStateProjectionContext.cs @@ -0,0 +1,9 @@ +namespace Aevatar.GAgentService.Projection.Contexts; + +public sealed class ServiceRunCurrentStateProjectionContext + : IProjectionMaterializationContext +{ + public required string RootActorId { get; init; } + + public required string ProjectionKind { get; init; } +} diff --git a/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs b/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs index 869454b91..5b557a59c 100644 --- a/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs @@ -73,6 +73,13 @@ public static IServiceCollection AddGAgentServiceProjection( ProjectionKind = scopeKey.ProjectionKind, }, static context => new ServiceProjectionRuntimeLease(context.RootActorId, context)); + services.AddServiceProjectionRuntime>( + static scopeKey => new ServiceRunCurrentStateProjectionContext + { + RootActorId = scopeKey.RootActorId, + ProjectionKind = scopeKey.ProjectionKind, + }, + static context => new ServiceProjectionRuntimeLease(context.RootActorId, context)); services.AddEventSinkProjectionRuntimeCore< GAgentDraftRunProjectionContext, GAgentDraftRunRuntimeLease, @@ -92,6 +99,7 @@ public static IServiceCollection AddGAgentServiceProjection( services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton, GAgentDraftRunSessionEventCodec>(); services.TryAddSingleton, ProjectionSessionEventHub>(); services.TryAddSingleton(); @@ -102,6 +110,7 @@ public static IServiceCollection AddGAgentServiceProjection( services.TryAddSingleton, ServiceRolloutCommandObservationReadModelMetadataProvider>(); services.TryAddSingleton, ServiceTrafficViewReadModelMetadataProvider>(); services.TryAddSingleton, ServiceRevisionCatalogReadModelMetadataProvider>(); + services.TryAddSingleton, ServiceRunCurrentStateReadModelMetadataProvider>(); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); @@ -109,6 +118,7 @@ public static IServiceCollection AddGAgentServiceProjection( services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.AddProjectionArtifactMaterializer< ServiceCatalogProjectionContext, ServiceCatalogProjector>(); @@ -130,6 +140,9 @@ public static IServiceCollection AddGAgentServiceProjection( services.AddProjectionArtifactMaterializer< ServiceRevisionCatalogProjectionContext, ServiceRevisionCatalogProjector>(); + services.AddCurrentStateProjectionMaterializer< + ServiceRunCurrentStateProjectionContext, + ServiceRunCurrentStateProjector>(); services.TryAddEnumerable(ServiceDescriptor.Singleton< IProjectionProjector, GAgentDraftRunSessionEventProjector>()); diff --git a/src/platform/Aevatar.GAgentService.Projection/Metadata/ServiceRunCurrentStateReadModelMetadataProvider.cs b/src/platform/Aevatar.GAgentService.Projection/Metadata/ServiceRunCurrentStateReadModelMetadataProvider.cs new file mode 100644 index 000000000..2b0bf8b27 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Metadata/ServiceRunCurrentStateReadModelMetadataProvider.cs @@ -0,0 +1,13 @@ +using Aevatar.CQRS.Projection.Stores.Abstractions; +using Aevatar.GAgentService.Projection.ReadModels; + +namespace Aevatar.GAgentService.Projection.Metadata; + +public sealed class ServiceRunCurrentStateReadModelMetadataProvider : IProjectionDocumentMetadataProvider +{ + public DocumentIndexMetadata Metadata { get; } = new( + "gagent-service-runs", + Mappings: new Dictionary(), + Settings: new Dictionary(), + Aliases: new Dictionary()); +} diff --git a/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs index 9ac1f658a..752c2d8ad 100644 --- a/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs +++ b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs @@ -9,4 +9,5 @@ internal static class ServiceProjectionKinds public const string Rollouts = "service-rollouts"; public const string Traffic = "service-traffic"; public const string DraftRunSession = "service-draft-run-session"; + public const string Runs = "service-runs"; } diff --git a/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceRunCurrentStateProjectionPort.cs b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceRunCurrentStateProjectionPort.cs new file mode 100644 index 000000000..164b5c929 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceRunCurrentStateProjectionPort.cs @@ -0,0 +1,21 @@ +using Aevatar.GAgentService.Abstractions.Ports; +using Aevatar.GAgentService.Projection.Configuration; +using Aevatar.GAgentService.Projection.Contexts; + +namespace Aevatar.GAgentService.Projection.Orchestration; + +public sealed class ServiceRunCurrentStateProjectionPort + : ServiceProjectionPortBase, + IServiceRunCurrentStateProjectionPort +{ + public ServiceRunCurrentStateProjectionPort( + ServiceProjectionOptions options, + IProjectionScopeActivationService> activationService, + IProjectionScopeReleaseService> releaseService) + : base(options, activationService, releaseService, ServiceProjectionKinds.Runs) + { + } + + public Task EnsureProjectionAsync(string actorId, CancellationToken ct = default) => + EnsureProjectionCoreAsync(actorId, ct); +} diff --git a/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs b/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs new file mode 100644 index 000000000..1d3740011 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Projectors/ServiceRunCurrentStateProjector.cs @@ -0,0 +1,77 @@ +using Aevatar.CQRS.Projection.Core.Orchestration; +using Aevatar.CQRS.Projection.Runtime.Abstractions; +using Aevatar.CQRS.Projection.Stores.Abstractions; +using Aevatar.Foundation.Abstractions; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Projection.Contexts; +using Aevatar.GAgentService.Projection.ReadModels; + +namespace Aevatar.GAgentService.Projection.Projectors; + +public sealed class ServiceRunCurrentStateProjector + : ICurrentStateProjectionMaterializer +{ + private readonly IProjectionWriteDispatcher _writeDispatcher; + private readonly IProjectionClock _clock; + + public ServiceRunCurrentStateProjector( + IProjectionWriteDispatcher writeDispatcher, + IProjectionClock clock) + { + _writeDispatcher = writeDispatcher ?? throw new ArgumentNullException(nameof(writeDispatcher)); + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); + } + + public async ValueTask ProjectAsync( + ServiceRunCurrentStateProjectionContext context, + EventEnvelope envelope, + CancellationToken ct = default) + { + if (!CommittedStateEventEnvelope.TryUnpackState( + envelope, + out _, + out var stateEvent, + out var state) || + stateEvent == null || + state?.Record == null) + { + return; + } + + var record = state.Record; + if (string.IsNullOrWhiteSpace(record.RunId) || + string.IsNullOrWhiteSpace(record.ScopeId) || + string.IsNullOrWhiteSpace(record.ServiceId)) + { + return; + } + + var observedAt = CommittedStateEventEnvelope.ResolveTimestamp(envelope, _clock.UtcNow); + var document = new ServiceRunCurrentStateReadModel + { + Id = ServiceRunIds.BuildKey(record.ScopeId, record.ServiceId, record.RunId), + ActorId = context.RootActorId, + ScopeId = record.ScopeId ?? string.Empty, + ServiceId = record.ServiceId ?? string.Empty, + ServiceKey = record.ServiceKey ?? string.Empty, + RunId = record.RunId, + CommandId = record.CommandId ?? string.Empty, + CorrelationId = record.CorrelationId ?? string.Empty, + EndpointId = record.EndpointId ?? string.Empty, + ImplementationKind = (int)record.ImplementationKind, + TargetActorId = record.TargetActorId ?? string.Empty, + RevisionId = record.RevisionId ?? string.Empty, + DeploymentId = record.DeploymentId ?? string.Empty, + Status = (int)record.Status, + TenantId = record.Identity?.TenantId ?? string.Empty, + AppId = record.Identity?.AppId ?? string.Empty, + Namespace = record.Identity?.Namespace ?? string.Empty, + CreatedAt = record.CreatedAt?.ToDateTimeOffset() ?? observedAt, + UpdatedAt = record.UpdatedAt?.ToDateTimeOffset() ?? observedAt, + StateVersion = stateEvent.Version, + LastEventId = stateEvent.EventId ?? string.Empty, + }; + + await _writeDispatcher.UpsertAsync(document, ct); + } +} diff --git a/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs b/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs new file mode 100644 index 000000000..e84eb69fc --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Queries/ServiceRunQueryReader.cs @@ -0,0 +1,183 @@ +using Aevatar.CQRS.Projection.Stores.Abstractions; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Abstractions.Ports; +using Aevatar.GAgentService.Abstractions.Queries; +using Aevatar.GAgentService.Projection.Configuration; +using Aevatar.GAgentService.Projection.ReadModels; + +namespace Aevatar.GAgentService.Projection.Queries; + +public sealed class ServiceRunQueryReader : IServiceRunQueryPort +{ + private readonly IProjectionDocumentReader _documentStore; + private readonly bool _enabled; + + public ServiceRunQueryReader( + IProjectionDocumentReader documentStore, + ServiceProjectionOptions? options = null) + { + _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore)); + _enabled = options?.Enabled ?? true; + } + + public async Task> ListAsync( + ServiceRunQuery query, + CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(query); + if (!_enabled) + return []; + + var boundedTake = Math.Clamp(query.Take, 1, 200); + var filters = new List(2); + if (!string.IsNullOrWhiteSpace(query.ScopeId)) + { + filters.Add(new ProjectionDocumentFilter + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.ScopeId), + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(query.ScopeId), + }); + } + if (!string.IsNullOrWhiteSpace(query.ServiceId)) + { + filters.Add(new ProjectionDocumentFilter + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.ServiceId), + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(query.ServiceId), + }); + } + + var result = await _documentStore.QueryAsync( + new ProjectionDocumentQuery + { + Take = boundedTake, + Filters = filters, + Sorts = new[] + { + new ProjectionDocumentSort + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.UpdatedAt), + Direction = ProjectionDocumentSortDirection.Desc, + }, + new ProjectionDocumentSort + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.RunId), + Direction = ProjectionDocumentSortDirection.Asc, + }, + }, + }, + ct); + return result.Items.Take(boundedTake).Select(Map).ToList(); + } + + public async Task GetByRunIdAsync( + string scopeId, + string serviceId, + string runId, + CancellationToken ct = default) + { + if (!_enabled) + return null; + if (string.IsNullOrWhiteSpace(runId) || + string.IsNullOrWhiteSpace(scopeId) || + string.IsNullOrWhiteSpace(serviceId)) + { + return null; + } + + var direct = await _documentStore.GetAsync( + ServiceRunIds.BuildKey(scopeId, serviceId, runId), + ct); + return direct == null ? null : Map(direct); + } + + public async Task GetByCommandIdAsync( + string scopeId, + string serviceId, + string commandId, + CancellationToken ct = default) + { + if (!_enabled) + return null; + if (string.IsNullOrWhiteSpace(commandId)) + return null; + + var matches = await QueryByEqualityAsync( + scopeId, + serviceId, + nameof(ServiceRunCurrentStateReadModel.CommandId), + commandId.Trim(), + ct); + return matches.FirstOrDefault(); + } + + private async Task> QueryByEqualityAsync( + string scopeId, + string serviceId, + string fieldPath, + string value, + CancellationToken ct) + { + var filters = new List(3) + { + new ProjectionDocumentFilter + { + FieldPath = fieldPath, + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(value), + }, + }; + if (!string.IsNullOrWhiteSpace(scopeId)) + { + filters.Add(new ProjectionDocumentFilter + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.ScopeId), + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(scopeId), + }); + } + if (!string.IsNullOrWhiteSpace(serviceId)) + { + filters.Add(new ProjectionDocumentFilter + { + FieldPath = nameof(ServiceRunCurrentStateReadModel.ServiceId), + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(serviceId), + }); + } + + var result = await _documentStore.QueryAsync( + new ProjectionDocumentQuery + { + Take = 5, + Filters = filters, + }, + ct); + return result.Items.Select(Map).ToList(); + } + + private static ServiceRunSnapshot Map(ServiceRunCurrentStateReadModel readModel) => + new( + readModel.ScopeId, + readModel.ServiceId, + readModel.ServiceKey, + readModel.RunId, + readModel.CommandId, + readModel.CorrelationId, + readModel.EndpointId, + (ServiceImplementationKind)readModel.ImplementationKind, + readModel.TargetActorId, + readModel.RevisionId, + readModel.DeploymentId, + (ServiceRunStatus)readModel.Status, + readModel.ActorId, + readModel.TenantId, + readModel.AppId, + readModel.Namespace, + readModel.StateVersion, + readModel.LastEventId, + readModel.CreatedAt, + readModel.UpdatedAt); +} diff --git a/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs b/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs index a10c9a0c5..3347f068a 100644 --- a/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs +++ b/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs @@ -202,6 +202,21 @@ public IList Targets } } +public sealed partial class ServiceRunCurrentStateReadModel : IProjectionReadModel +{ + public DateTimeOffset CreatedAt + { + get => ServiceProjectionReadModelSupport.ToDateTimeOffset(CreatedAtUtcValue); + set => CreatedAtUtcValue = ServiceProjectionReadModelSupport.ToTimestamp(value); + } + + public DateTimeOffset UpdatedAt + { + get => ServiceProjectionReadModelSupport.ToDateTimeOffset(UpdatedAtUtcValue); + set => UpdatedAtUtcValue = ServiceProjectionReadModelSupport.ToTimestamp(value); + } +} + internal static class ServiceProjectionReadModelSupport { public static Timestamp ToTimestamp(DateTimeOffset value) => diff --git a/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto b/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto index 1ebe6c536..684eb1c82 100644 --- a/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto +++ b/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto @@ -175,3 +175,31 @@ message ServiceTrafficTargetReadModel { int32 allocation_weight = 4; string serving_state = 5; } + +// --- ServiceRunCurrentStateReadModel --- + +message ServiceRunCurrentStateReadModel { + string id = 1; + string actor_id = 2; + int64 state_version = 3; + string last_event_id = 4; + + string scope_id = 5; + string service_id = 6; + string service_key = 7; + string run_id = 8; + string command_id = 9; + string correlation_id = 10; + string endpoint_id = 11; + int32 implementation_kind = 12; + string target_actor_id = 13; + string revision_id = 14; + string deployment_id = 15; + int32 status = 16; + string tenant_id = 17; + string app_id = 18; + string namespace = 19; + + google.protobuf.Timestamp created_at_utc_value = 20; + google.protobuf.Timestamp updated_at_utc_value = 21; +} diff --git a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs index fc85a8ca0..f36803851 100644 --- a/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs +++ b/src/workflow/Aevatar.Workflow.Infrastructure/CapabilityApi/ChatEndpoints.cs @@ -38,7 +38,8 @@ public static async Task HandleChat( HttpContext http, ChatInput input, ICommandInteractionService chatRunService, - CancellationToken ct = default) + CancellationToken ct = default, + Func? onAcceptedHook = null) { using var scope = ApiRequestScope.BeginHttp(); var writer = new ChatSseResponseWriter(http.Response); @@ -73,6 +74,8 @@ public static async Task HandleChat( onAcceptedAsync: async (receipt, token) => { CapabilityTraceContext.ApplyCorrelationHeader(http.Response, receipt.CorrelationId); + if (onAcceptedHook != null) + await onAcceptedHook(receipt, token); await writer.StartAsync(token); await writer.WriteAsync(BuildRunContextFrame(receipt), token); scope.RecordFirstResponse(); diff --git a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs index ca5e595fb..9c64eda14 100644 --- a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs +++ b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs @@ -8,6 +8,7 @@ using Aevatar.Foundation.Abstractions; using Aevatar.Foundation.Abstractions.Streaming; using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Abstractions.Ports; using Aevatar.GAgentService.Abstractions.ScopeGAgents; using Aevatar.GAgentService.Application.ScopeGAgents; using Aevatar.GAgentService.Hosting.Endpoints; @@ -68,8 +69,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldCreateActor_AndEmitSy }; var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort); - await InvokePrivateTaskAsync( - HandleGAgentStreamMethod, + await InvokeStaticStreamAsync( http, CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"), "hello", @@ -120,8 +120,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldReuseExistingActor_An }; var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort); - await InvokePrivateTaskAsync( - HandleGAgentStreamMethod, + await InvokeStaticStreamAsync( http, CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"), "hello", @@ -155,8 +154,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldMapAllInputPartKinds_ }; var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort); - await InvokePrivateTaskAsync( - HandleGAgentStreamMethod, + await InvokeStaticStreamAsync( http, CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"), "hello", @@ -215,8 +213,7 @@ public async Task HandleGAgentServiceChatStreamAsync_ShouldPreserveRunErrorWitho }; var interactionService = CreateStaticStreamInteractionService(runtime, projectionPort); - await InvokePrivateTaskAsync( - HandleGAgentStreamMethod, + await InvokeStaticStreamAsync( http, CreateStaticTarget(typeof(StreamTestAgent).AssemblyQualifiedName!, primaryActorId: "actor-1"), "hello", @@ -350,8 +347,7 @@ public async Task ScriptExecutionSessionEventProjector_ShouldRouteOnlyMatchingRu [Fact] public async Task HandleGAgentServiceChatStreamAsync_ShouldThrow_WhenAgentTypeCannotBeResolved() { - var act = () => InvokePrivateTaskAsync( - HandleGAgentStreamMethod, + var act = () => InvokeStaticStreamAsync( CreateHttpContext(), CreateStaticTarget("Missing.Agent, Missing.Assembly", primaryActorId: "actor-1"), "hello", @@ -370,8 +366,7 @@ await act.Should().ThrowAsync() [Fact] public async Task HandleScriptingServiceChatStreamAsync_ShouldThrow_WhenPrimaryActorMissing() { - var act = () => InvokePrivateTaskAsync( - HandleScriptingStreamMethod, + var act = () => InvokeScriptingStreamAsync( CreateHttpContext(), CreateScriptingTarget(primaryActorId: string.Empty), "hello", @@ -389,8 +384,7 @@ await act.Should().ThrowAsync() [Fact] public async Task HandleScriptingServiceChatStreamAsync_ShouldThrow_WhenActorCannotBeResolved() { - var act = () => InvokePrivateTaskAsync( - HandleScriptingStreamMethod, + var act = () => InvokeScriptingStreamAsync( CreateHttpContext(), CreateScriptingTarget(primaryActorId: "actor-1"), "hello", @@ -421,8 +415,7 @@ public async Task HandleScriptingServiceChatStreamAsync_ShouldEmitSyntheticFinis }, }; - await InvokePrivateTaskAsync( - HandleScriptingStreamMethod, + await InvokeScriptingStreamAsync( http, CreateScriptingTarget(primaryActorId: "actor-1"), "hello", @@ -469,8 +462,7 @@ public async Task HandleScriptingServiceChatStreamAsync_ShouldPreserveRunErrorWi }, }; - await InvokePrivateTaskAsync( - HandleScriptingStreamMethod, + await InvokeScriptingStreamAsync( http, CreateScriptingTarget(primaryActorId: "actor-1"), "hello", @@ -509,8 +501,7 @@ public async Task HandleScriptingServiceChatStreamAsync_ShouldAvoidSyntheticDupl }, }; - await InvokePrivateTaskAsync( - HandleScriptingStreamMethod, + await InvokeScriptingStreamAsync( http, CreateScriptingTarget(primaryActorId: "actor-1"), "hello", @@ -620,6 +611,67 @@ private static ServiceInvocationResolvedTarget CreateScriptingTarget(string prim artifact.Endpoints[0]); } + private static Task InvokeStaticStreamAsync( + HttpContext http, + ServiceInvocationResolvedTarget target, + string prompt, + string? actorId, + string? sessionId, + string scopeId, + IReadOnlyDictionary? headers, + IReadOnlyList? inputParts, + ICommandInteractionService interactionService, + CancellationToken ct) => + InvokePrivateTaskAsync( + HandleGAgentStreamMethod, + http, + target, + prompt, + actorId, + sessionId, + scopeId, + "svc-default", + headers, + inputParts, + interactionService, + new ServiceInvocationRequest(), + new NoOpServiceRunRegistrationPort(), + ct); + + private static Task InvokeScriptingStreamAsync( + HttpContext http, + ServiceInvocationResolvedTarget target, + string prompt, + string? sessionId, + string scopeId, + IReadOnlyDictionary? headers, + IScriptRuntimeCommandPort scriptRuntimeCommandPort, + IScriptExecutionProjectionPort scriptExecutionProjectionPort, + CancellationToken ct) => + InvokePrivateTaskAsync( + HandleScriptingStreamMethod, + http, + target, + prompt, + sessionId, + scopeId, + "svc-default", + headers, + scriptRuntimeCommandPort, + scriptExecutionProjectionPort, + new ServiceInvocationRequest(), + new NoOpServiceRunRegistrationPort(), + ct); + + private sealed class NoOpServiceRunRegistrationPort : IServiceRunRegistrationPort + { + public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default) => + Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.RunId}", record.RunId)); + + public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default) => + Task.CompletedTask; + } + private static async Task InvokePrivateTaskAsync(MethodInfo method, params object?[] args) { var result = method.Invoke(null, args); diff --git a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs index 090aa5c15..26c816b9b 100644 --- a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs +++ b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsTests.cs @@ -1490,6 +1490,13 @@ await host.ArtifactStore.SaveAsync( host.InteractionService.LastRequest!.ActorId.Should().Be("definition-actor-1"); host.InteractionService.LastRequest.ScopeId.Should().Be("scope-a"); host.InteractionService.LastRequest.Metadata.Should().ContainKey("source").WhoseValue.Should().Be("tests"); + // Service-run registry receives the actual workflow run actor id as the run id, so + // /runs/{runId} can resolve the same id the SSE RunStarted frame carries. + host.ServiceRunRegistrationPort.RegisterCalls.Should().ContainSingle(); + host.ServiceRunRegistrationPort.RegisterCalls[0].RunId.Should().Be("run-actor-1"); + host.ServiceRunRegistrationPort.RegisterCalls[0].CommandId.Should().Be("cmd-1"); + host.ServiceRunRegistrationPort.RegisterCalls[0].TargetActorId.Should().Be("run-actor-1"); + host.ServiceRunRegistrationPort.RegisterCalls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Workflow); } [Fact] @@ -1815,9 +1822,12 @@ public async Task ScopeServiceEndpointHelpers_ShouldRejectScriptingStream_WhenRu "hello", "session-1", "scope-a", + "default", new Dictionary(), new NoOpScriptRuntimeCommandPort(), new NoOpScriptExecutionProjectionPort(), + new ServiceInvocationRequest(), + new NoOpServiceRunRegistrationPort(), CancellationToken.None)) .Should() .ThrowAsync(); @@ -1876,9 +1886,12 @@ public async Task ScopeServiceEndpointHelpers_ShouldRejectScriptingStream_WhenRu "hello", "session-1", "scope-a", + "default", new Dictionary(), new ThrowingScriptRuntimeCommandPort(new InvalidOperationException("Script runtime actor 'script-runtime-1' could not be resolved. The service may not be activated.")), new NoOpScriptExecutionProjectionPort(), + new ServiceInvocationRequest(), + new NoOpServiceRunRegistrationPort(), CancellationToken.None)) .Should() .ThrowAsync(); @@ -2815,8 +2828,6 @@ public async Task ListDefaultRunsEndpoint_ShouldReturnDefaultServiceRunHistory() response.Runs[0].RevisionId.Should().Be("rev-1"); response.Runs[0].DeploymentId.Should().Be("dep-old"); response.Runs[0].WorkflowName.Should().Be("default-flow"); - host.RunBindingReader.Queries.Should().ContainSingle(); - host.RunBindingReader.Queries[0].ScopeId.Should().Be("scope-a"); } [Fact] @@ -2927,8 +2938,6 @@ public async Task ListMemberRunsEndpoint_ShouldReturnMemberScopedRunHistory() response.Runs[0].RevisionId.Should().Be("rev-1"); response.Runs[0].DeploymentId.Should().Be("dep-member-old"); response.Runs[0].StateVersion.Should().Be(13); - host.RunBindingReader.Queries.Should().ContainSingle(); - host.RunBindingReader.Queries[0].DefinitionActorIds.Should().BeEquivalentTo(["def-member-active", "def-member-old"]); } [Fact] @@ -3199,10 +3208,6 @@ public async Task ListRunsEndpoint_ShouldReturnScopeScopedRunHistory() response.Runs[0].CompletionStatus.Should().Be(WorkflowRunCompletionStatus.Completed); response.Runs[0].StateVersion.Should().Be(7); response.Runs[0].LastEventId.Should().Be("evt-7"); - host.RunBindingReader.Queries.Should().ContainSingle(); - host.RunBindingReader.Queries[0].ScopeId.Should().Be("scope-a"); - host.RunBindingReader.Queries[0].Take.Should().Be(5); - host.RunBindingReader.Queries[0].DefinitionActorIds.Should().BeEquivalentTo(["def-actor-active", "def-actor-old"]); } [Fact] @@ -4152,7 +4157,9 @@ private ScopeServiceEndpointTestHost( FakeWorkflowRunBindingReader runBindingReader, RecordingResumeDispatchService resumeDispatchService, RecordingSignalDispatchService signalDispatchService, - RecordingStopDispatchService stopDispatchService) + RecordingStopDispatchService stopDispatchService, + RecordingServiceRunRegistrationPort serviceRunRegistrationPort, + FakeServiceRunQueryPort serviceRunQueryPort) { _app = app; Client = client; @@ -4172,6 +4179,8 @@ private ScopeServiceEndpointTestHost( ResumeDispatchService = resumeDispatchService; SignalDispatchService = signalDispatchService; StopDispatchService = stopDispatchService; + ServiceRunRegistrationPort = serviceRunRegistrationPort; + ServiceRunQueryPort = serviceRunQueryPort; } public HttpClient Client { get; } @@ -4208,6 +4217,10 @@ private ScopeServiceEndpointTestHost( public RecordingStopDispatchService StopDispatchService { get; } + public RecordingServiceRunRegistrationPort ServiceRunRegistrationPort { get; } + + public FakeServiceRunQueryPort ServiceRunQueryPort { get; } + public static async Task StartAsync(bool authenticationEnabled = true) { var builder = WebApplication.CreateBuilder(new WebApplicationOptions @@ -4238,6 +4251,20 @@ public static async Task StartAsync(bool authentic var stopDispatchService = new RecordingStopDispatchService(); var actorRuntime = new NoOpActorRuntime(); var eventSubscriptionProvider = new NoOpActorEventSubscriptionProvider(); + var serviceRunQueryPort = new FakeServiceRunQueryPort + { + WorkflowBindingFallback = runBindingReader, + DeploymentResolver = binding => + { + var deployment = lifecycleQueryPort.Deployments?.Deployments.FirstOrDefault(d => + string.Equals(d.PrimaryActorId, binding.EffectiveDefinitionActorId, StringComparison.Ordinal)); + return (deployment?.DeploymentId ?? string.Empty, deployment?.RevisionId ?? string.Empty); + }, + }; + var serviceRunRegistrationPort = new RecordingServiceRunRegistrationPort + { + LinkedQueryPort = serviceRunQueryPort, + }; builder.Services.AddSingleton(commandPort); builder.Services.AddSingleton(queryPort); builder.Services.AddSingleton(scopeBindingPort); @@ -4262,6 +4289,8 @@ public static async Task StartAsync(bool authentic builder.Services.AddSingleton>(stopDispatchService); builder.Services.AddSingleton(actorRuntime); builder.Services.AddSingleton(eventSubscriptionProvider); + builder.Services.AddSingleton(serviceRunRegistrationPort); + builder.Services.AddSingleton(serviceRunQueryPort); builder.Services.AddSingleton>( Options.Create(new ScopeWorkflowCapabilityOptions { @@ -4369,7 +4398,9 @@ public static async Task StartAsync(bool authentic runBindingReader, resumeDispatchService, signalDispatchService, - stopDispatchService); + stopDispatchService, + serviceRunRegistrationPort, + serviceRunQueryPort); } private static bool TryGetRequestedScopeId(string? path, out string scopeId) @@ -4574,6 +4605,147 @@ private sealed class RecordingServiceGovernanceQueryPort : IServiceGovernanceQue throw new NotSupportedException(); } + private sealed class RecordingServiceRunRegistrationPort : IServiceRunRegistrationPort + { + public List RegisterCalls { get; } = []; + public List<(string runActorId, string runId, ServiceRunStatus status)> StatusCalls { get; } = []; + + public FakeServiceRunQueryPort? LinkedQueryPort { get; set; } + + public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default) + { + RegisterCalls.Add(record.Clone()); + LinkedQueryPort?.Upsert(BuildSnapshot(record)); + return Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.ScopeId}:{record.ServiceId}:{record.RunId}", record.RunId)); + } + + public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default) + { + StatusCalls.Add((runActorId, runId, status)); + return Task.CompletedTask; + } + + private static ServiceRunSnapshot BuildSnapshot(ServiceRunRecord record) => + new( + record.ScopeId, + record.ServiceId, + record.ServiceKey, + record.RunId, + record.CommandId, + record.CorrelationId, + record.EndpointId, + record.ImplementationKind, + record.TargetActorId, + record.RevisionId, + record.DeploymentId, + record.Status, + $"service-run:{record.ScopeId}:{record.ServiceId}:{record.RunId}", + record.Identity?.TenantId ?? string.Empty, + record.Identity?.AppId ?? string.Empty, + record.Identity?.Namespace ?? string.Empty, + StateVersion: 1, + LastEventId: $"{record.RunId}:registered", + CreatedAt: record.CreatedAt?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow, + UpdatedAt: record.UpdatedAt?.ToDateTimeOffset() ?? DateTimeOffset.UtcNow); + } + + private sealed class FakeServiceRunQueryPort : IServiceRunQueryPort + { + private readonly List _snapshots = []; + + // Bridge to existing FakeWorkflowRunBindingReader fixtures so tests that pre-populate + // workflow run bindings also see the runs through the new IServiceRunQueryPort surface. + public FakeWorkflowRunBindingReader? WorkflowBindingFallback { get; set; } + + // Optional resolver that maps a workflow run binding to (deploymentId, revisionId) so the + // bridged snapshot mirrors what production projector would write from the dispatcher. + public Func? DeploymentResolver { get; set; } + + public IReadOnlyList Snapshots => _snapshots; + + public void Upsert(ServiceRunSnapshot snapshot) + { + _snapshots.RemoveAll(x => + string.Equals(x.ScopeId, snapshot.ScopeId, StringComparison.Ordinal) && + string.Equals(x.ServiceId, snapshot.ServiceId, StringComparison.Ordinal) && + string.Equals(x.RunId, snapshot.RunId, StringComparison.Ordinal)); + _snapshots.Add(snapshot); + } + + public Task> ListAsync(ServiceRunQuery query, CancellationToken ct = default) + { + var bridged = MaterializeForQuery(query.ScopeId, query.ServiceId).ToList(); + IEnumerable results = bridged; + if (!string.IsNullOrWhiteSpace(query.ScopeId)) + results = results.Where(s => string.Equals(s.ScopeId, query.ScopeId, StringComparison.Ordinal)); + if (!string.IsNullOrWhiteSpace(query.ServiceId)) + results = results.Where(s => string.Equals(s.ServiceId, query.ServiceId, StringComparison.Ordinal)); + return Task.FromResult>( + results.OrderByDescending(s => s.UpdatedAt).Take(query.Take).ToList()); + } + + public Task GetByRunIdAsync(string scopeId, string serviceId, string runId, CancellationToken ct = default) => + Task.FromResult(MaterializeForQuery(scopeId, serviceId).FirstOrDefault(s => + string.Equals(s.ScopeId, scopeId, StringComparison.Ordinal) && + string.Equals(s.ServiceId, serviceId, StringComparison.Ordinal) && + string.Equals(s.RunId, runId, StringComparison.Ordinal))); + + public Task GetByCommandIdAsync(string scopeId, string serviceId, string commandId, CancellationToken ct = default) => + Task.FromResult(MaterializeForQuery(scopeId, serviceId).FirstOrDefault(s => + string.Equals(s.ScopeId, scopeId, StringComparison.Ordinal) && + string.Equals(s.ServiceId, serviceId, StringComparison.Ordinal) && + string.Equals(s.CommandId, commandId, StringComparison.Ordinal))); + + // Materializes snapshots, treating any workflow binding fixtures as belonging to the queried service + // (workflow bindings predate the service-run registry and don't carry serviceId in the test fixtures). + private IEnumerable MaterializeForQuery(string scopeId, string serviceId) + { + foreach (var snapshot in _snapshots) + yield return snapshot; + if (WorkflowBindingFallback != null) + { + foreach (var binding in WorkflowBindingFallback.AllBindings()) + { + if (_snapshots.Any(s => string.Equals(s.RunId, binding.RunId, StringComparison.Ordinal) && + string.Equals(s.ServiceId, serviceId, StringComparison.Ordinal))) + { + continue; + } + var (deploymentId, revisionId) = DeploymentResolver?.Invoke(binding) ?? (string.Empty, string.Empty); + yield return BuildSnapshotFromBinding(binding, scopeId, serviceId, deploymentId, revisionId); + } + } + } + + private static ServiceRunSnapshot BuildSnapshotFromBinding( + WorkflowActorBinding binding, + string scopeId, + string serviceId, + string deploymentId, + string revisionId) => + new( + ScopeId: string.IsNullOrWhiteSpace(scopeId) ? binding.ScopeId ?? string.Empty : scopeId, + ServiceId: serviceId ?? string.Empty, + ServiceKey: string.Empty, + RunId: binding.RunId, + CommandId: binding.RunId, + CorrelationId: binding.RunId, + EndpointId: string.Empty, + ImplementationKind: ServiceImplementationKind.Workflow, + TargetActorId: binding.ActorId, + RevisionId: revisionId, + DeploymentId: deploymentId, + Status: ServiceRunStatus.Accepted, + ActorId: binding.ActorId, + TenantId: binding.ScopeId ?? string.Empty, + AppId: string.Empty, + Namespace: string.Empty, + StateVersion: binding.SourceVersion, + LastEventId: binding.SourceEventId ?? string.Empty, + CreatedAt: binding.CreatedAt ?? DateTimeOffset.UtcNow, + UpdatedAt: binding.UpdatedAt ?? DateTimeOffset.UtcNow); + } + private sealed class RecordingServiceInvocationPort : IServiceInvocationPort { public ServiceInvocationRequest? LastRequest { get; private set; } @@ -4703,6 +4875,9 @@ private sealed class FakeWorkflowRunBindingReader : IWorkflowRunBindingReader public List Queries { get; } = []; + public IEnumerable AllBindings() => + BindingsByRunId.Values.SelectMany(x => x); + public Task> ListByRunIdAsync( string runId, int take = 20, @@ -4818,6 +4993,15 @@ public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default) => + Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.RunId}", record.RunId)); + + public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default) => + Task.CompletedTask; + } + private sealed class NoOpScriptRuntimeCommandPort : IScriptRuntimeCommandPort { public Task RunRuntimeAsync( diff --git a/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs b/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs new file mode 100644 index 000000000..399ccfd4c --- /dev/null +++ b/test/Aevatar.GAgentService.Tests/Core/ServiceRunGAgentTests.cs @@ -0,0 +1,225 @@ +using Aevatar.Foundation.Runtime.Persistence; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Core.GAgents; +using Aevatar.GAgentService.Tests.TestSupport; +using FluentAssertions; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.GAgentService.Tests.Core; + +public sealed class ServiceRunGAgentTests +{ + [Fact] + public async Task HandleRegisterAsync_ShouldPersistRecord_AndDefaultStatusToAccepted() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + await actor.ActivateAsync(); + + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + actor.State.Record.Should().NotBeNull(); + actor.State.Record!.RunId.Should().Be("run-1"); + actor.State.Record.Status.Should().Be(ServiceRunStatus.Accepted); + actor.State.LastAppliedEventVersion.Should().Be(1); + } + + [Fact] + public async Task HandleRegisterAsync_ShouldBeIdempotent_WhenRunIdAlreadyBound() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + actor.State.LastAppliedEventVersion.Should().Be(1); + } + + [Fact] + public async Task HandleRegisterAsync_ShouldRejectMismatchedRunId() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-2"), + }); + + await act.Should().ThrowAsync() + .WithMessage("*run-1*cannot register run 'run-2'*"); + } + + [Fact] + public async Task HandleRegisterAsync_ShouldRejectScopeMismatchOnReRegister() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:tenant-1:svc-1:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + var foreign = BuildRecord("run-1"); + foreign.ScopeId = "tenant-2"; + var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested { Record = foreign }); + + await act.Should().ThrowAsync() + .WithMessage("*tenant-1*cannot re-register under scope 'tenant-2'*"); + } + + [Fact] + public async Task HandleRegisterAsync_ShouldRejectServiceMismatchOnReRegister() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:tenant-1:svc-1:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + var foreign = BuildRecord("run-1"); + foreign.ServiceId = "svc-2"; + var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested { Record = foreign }); + + await act.Should().ThrowAsync() + .WithMessage("*svc-1*cannot re-register under service 'svc-2'*"); + } + + [Fact] + public async Task HandleRegisterAsync_ShouldRejectTargetMismatchOnReRegister() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:tenant-1:svc-1:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + var foreign = BuildRecord("run-1"); + foreign.TargetActorId = "different-target"; + var act = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested { Record = foreign }); + + await act.Should().ThrowAsync() + .WithMessage("*target-run-1*cannot re-register against target 'different-target'*"); + } + + [Fact] + public async Task HandleRegisterAsync_ShouldRejectMissingRequiredFields() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:bad", + static () => new ServiceRunGAgent()); + + var noRunId = () => actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = new ServiceRunRecord { ScopeId = "t", ServiceId = "s", CommandId = "c" }, + }); + await noRunId.Should().ThrowAsync().WithMessage("run_id*"); + } + + [Fact] + public async Task HandleUpdateStatusAsync_ShouldAdvanceStatusAndStamp() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + await actor.HandleUpdateStatusAsync(new UpdateServiceRunStatusRequested + { + RunId = "run-1", + Status = ServiceRunStatus.Completed, + }); + + actor.State.Record!.Status.Should().Be(ServiceRunStatus.Completed); + actor.State.LastAppliedEventVersion.Should().Be(2); + } + + [Fact] + public async Task HandleUpdateStatusAsync_ShouldNoOp_WhenStatusUnchanged() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + await actor.HandleRegisterAsync(new RegisterServiceRunRequested + { + Record = BuildRecord("run-1"), + }); + + await actor.HandleUpdateStatusAsync(new UpdateServiceRunStatusRequested + { + RunId = "run-1", + Status = ServiceRunStatus.Accepted, + }); + + actor.State.LastAppliedEventVersion.Should().Be(1); + } + + [Fact] + public async Task HandleUpdateStatusAsync_ShouldRejectWhenNotRegistered() + { + var actor = GAgentServiceTestKit.CreateStatefulAgent( + new InMemoryEventStore(), + "service-run:run-1", + static () => new ServiceRunGAgent()); + + var act = () => actor.HandleUpdateStatusAsync(new UpdateServiceRunStatusRequested + { + RunId = "run-1", + Status = ServiceRunStatus.Completed, + }); + await act.Should().ThrowAsync() + .WithMessage("*has no registered run*"); + } + + private static ServiceRunRecord BuildRecord(string runId) => + new() + { + ScopeId = "tenant-1", + ServiceId = "svc-1", + ServiceKey = "tenant-1:svc-1", + RunId = runId, + CommandId = $"cmd-{runId}", + CorrelationId = $"corr-{runId}", + EndpointId = "run", + ImplementationKind = ServiceImplementationKind.Static, + TargetActorId = $"target-{runId}", + RevisionId = "r1", + DeploymentId = "dep-1", + Status = ServiceRunStatus.Unspecified, + CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow), + }; +} diff --git a/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs b/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs index 2af93dd78..60c849349 100644 --- a/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs +++ b/test/Aevatar.GAgentService.Tests/Infrastructure/DefaultServiceInvocationDispatcherTests.cs @@ -1,6 +1,7 @@ using Aevatar.AI.Abstractions; using Aevatar.Foundation.Abstractions; using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Abstractions.Ports; using Aevatar.GAgentService.Infrastructure.Dispatch; using Aevatar.GAgentService.Tests.TestSupport; using Aevatar.Scripting.Core.Ports; @@ -19,7 +20,8 @@ public async Task DispatchAsync_ShouldDispatchStaticEnvelope() var dispatcher = new DefaultServiceInvocationDispatcher( dispatchPort, new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run"); var request = new ServiceInvocationRequest { @@ -46,7 +48,8 @@ public async Task DispatchAsync_ShouldDelegateScriptingRun() var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), scriptPort, - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Scripting, endpointId: "run", @@ -83,7 +86,8 @@ public async Task DispatchAsync_ShouldCreateWorkflowRun_AndSendEnvelope() var dispatcher = new DefaultServiceInvocationDispatcher( dispatchPort, new RecordingScriptRuntimeCommandPort(), - workflowPort); + workflowPort, + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -127,7 +131,8 @@ public async Task DispatchAsync_ShouldPreferIdentityTenantIdOverPayloadScope() var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - workflowPort); + workflowPort, + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -166,7 +171,8 @@ public async Task DispatchAsync_ShouldResolveScopeIdFromRequestScopeBeforeMetada var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - workflowPort); + workflowPort, + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -205,7 +211,8 @@ public async Task DispatchAsync_ShouldResolveScopeIdFromWorkflowMetadataKey_When var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - workflowPort); + workflowPort, + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -242,7 +249,8 @@ public async Task DispatchAsync_ShouldResolveScopeIdFromLegacyMetadataKey_WhenOt var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - workflowPort); + workflowPort, + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -277,7 +285,8 @@ public async Task DispatchAsync_ShouldRejectPayloadTypeMismatch() var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Static, endpointId: "run", @@ -302,7 +311,8 @@ public async Task DispatchAsync_ShouldGenerateCommandAndCorrelationIds_WhenMissi var dispatcher = new DefaultServiceInvocationDispatcher( dispatchPort, new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run"); var receipt = await dispatcher.DispatchAsync(target, new ServiceInvocationRequest @@ -325,7 +335,8 @@ public async Task DispatchAsync_ShouldRejectMissingPayload() var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run"); var act = () => dispatcher.DispatchAsync(target, new ServiceInvocationRequest @@ -344,7 +355,8 @@ public async Task DispatchAsync_ShouldRejectWorkflowPayloadThatIsNotChatRequest( var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Workflow, endpointId: "chat", @@ -372,7 +384,8 @@ public async Task DispatchAsync_ShouldPassRequestedEventTypeAndGeneratedRunIdToS var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), scriptPort, - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget( ServiceImplementationKind.Scripting, endpointId: "run", @@ -398,13 +411,111 @@ public async Task DispatchAsync_ShouldPassRequestedEventTypeAndGeneratedRunIdToS scriptPort.Calls[0].payload!.TypeUrl.Should().Be(Any.Pack(new StringValue()).TypeUrl); } + [Fact] + public async Task DispatchAsync_ShouldRegisterServiceRun_ForStaticPath() + { + var registry = new RecordingServiceRunRegistrationPort(); + var dispatcher = new DefaultServiceInvocationDispatcher( + new RecordingDispatchPort(), + new RecordingScriptRuntimeCommandPort(), + new RecordingWorkflowRunActorPort(), + registry); + var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run"); + var request = new ServiceInvocationRequest + { + Identity = GAgentServiceTestKit.CreateIdentity(), + EndpointId = "run", + CommandId = "cmd-static", + Payload = Any.Pack(new StringValue { Value = "payload" }), + }; + + var receipt = await dispatcher.DispatchAsync(target, request); + + registry.Calls.Should().ContainSingle(); + registry.Calls[0].RunId.Should().Be(receipt.CommandId); + registry.Calls[0].CommandId.Should().Be("cmd-static"); + registry.Calls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Static); + registry.Calls[0].TargetActorId.Should().Be("primary-actor"); + registry.Calls[0].ScopeId.Should().Be("tenant"); + registry.Calls[0].ServiceId.Should().Be("svc"); + } + + [Fact] + public async Task DispatchAsync_ShouldRegisterServiceRun_ForScriptingPath() + { + var registry = new RecordingServiceRunRegistrationPort(); + var dispatcher = new DefaultServiceInvocationDispatcher( + new RecordingDispatchPort(), + new RecordingScriptRuntimeCommandPort(), + new RecordingWorkflowRunActorPort(), + registry); + var target = CreateTarget( + ServiceImplementationKind.Scripting, + endpointId: "run", + requestTypeUrl: Any.Pack(new StringValue()).TypeUrl); + target.Artifact.DeploymentPlan.ScriptingPlan = new ScriptingServiceDeploymentPlan + { + Revision = "rev-1", + DefinitionActorId = "definition-1", + }; + var request = new ServiceInvocationRequest + { + Identity = GAgentServiceTestKit.CreateIdentity(), + EndpointId = "run", + CommandId = "cmd-script", + Payload = Any.Pack(new StringValue { Value = "payload" }), + }; + + await dispatcher.DispatchAsync(target, request); + + registry.Calls.Should().ContainSingle(); + registry.Calls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Scripting); + registry.Calls[0].CommandId.Should().Be("cmd-script"); + } + + [Fact] + public async Task DispatchAsync_ShouldRegisterServiceRun_ForWorkflowPath() + { + var registry = new RecordingServiceRunRegistrationPort(); + var workflowPort = new RecordingWorkflowRunActorPort(); + var dispatcher = new DefaultServiceInvocationDispatcher( + new RecordingDispatchPort(), + new RecordingScriptRuntimeCommandPort(), + workflowPort, + registry); + var target = CreateTarget( + ServiceImplementationKind.Workflow, + endpointId: "chat", + requestTypeUrl: Any.Pack(new ChatRequestEvent()).TypeUrl); + target.Artifact.DeploymentPlan.WorkflowPlan = new WorkflowServiceDeploymentPlan + { + WorkflowName = "wf", + WorkflowYaml = "name: wf", + }; + var request = new ServiceInvocationRequest + { + Identity = GAgentServiceTestKit.CreateIdentity(), + EndpointId = "chat", + CommandId = "cmd-wf", + Payload = Any.Pack(new ChatRequestEvent { Prompt = "hi" }), + }; + + await dispatcher.DispatchAsync(target, request); + + registry.Calls.Should().ContainSingle(); + registry.Calls[0].ImplementationKind.Should().Be(ServiceImplementationKind.Workflow); + registry.Calls[0].TargetActorId.Should().Be(workflowPort.RunActor.Id); + registry.Calls[0].CommandId.Should().Be("cmd-wf"); + } + [Fact] public async Task DispatchAsync_ShouldRejectUnsupportedImplementationKind() { var dispatcher = new DefaultServiceInvocationDispatcher( new RecordingDispatchPort(), new RecordingScriptRuntimeCommandPort(), - new RecordingWorkflowRunActorPort()); + new RecordingWorkflowRunActorPort(), + new RecordingServiceRunRegistrationPort()); var target = CreateTarget(ServiceImplementationKind.Static, endpointId: "run"); target.Artifact.ImplementationKind = ServiceImplementationKind.Unspecified; @@ -453,6 +564,20 @@ private static ServiceInvocationResolvedTarget CreateTarget( }); } + private sealed class RecordingServiceRunRegistrationPort : IServiceRunRegistrationPort + { + public List Calls { get; } = []; + + public Task RegisterAsync(ServiceRunRecord record, CancellationToken ct = default) + { + Calls.Add(record.Clone()); + return Task.FromResult(new ServiceRunRegistrationResult($"service-run:{record.RunId}", record.RunId)); + } + + public Task UpdateStatusAsync(string runActorId, string runId, ServiceRunStatus status, CancellationToken ct = default) => + Task.CompletedTask; + } + private sealed class RecordingDispatchPort : IActorDispatchPort { public List<(string actorId, EventEnvelope envelope)> Calls { get; } = []; diff --git a/test/Aevatar.GAgentService.Tests/Infrastructure/ServiceRunRegistrationAdapterTests.cs b/test/Aevatar.GAgentService.Tests/Infrastructure/ServiceRunRegistrationAdapterTests.cs new file mode 100644 index 000000000..fa94d5049 --- /dev/null +++ b/test/Aevatar.GAgentService.Tests/Infrastructure/ServiceRunRegistrationAdapterTests.cs @@ -0,0 +1,193 @@ +using Aevatar.Foundation.Abstractions; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Abstractions.Ports; +using Aevatar.GAgentService.Core.GAgents; +using Aevatar.GAgentService.Infrastructure.Adapters; +using Aevatar.GAgentService.Tests.TestSupport; +using FluentAssertions; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.GAgentService.Tests.Infrastructure; + +public sealed class ServiceRunRegistrationAdapterTests +{ + [Fact] + public async Task RegisterAsync_ShouldCreateActorWithCompositeId_AndDispatchRegisterEnvelope() + { + var runtime = new RecordingRunRegistryRuntime(); + var dispatchPort = new RecordingDispatchPort(); + var projectionPort = new RecordingServiceRunProjectionPort(); + var adapter = new ServiceRunRegistrationAdapter(runtime, dispatchPort, projectionPort); + + var record = BuildRecord(scopeId: "tenant-1", serviceId: "svc-1", runId: "run-1"); + var result = await adapter.RegisterAsync(record); + + var expectedActorId = ServiceRunIds.BuildActorId("tenant-1", "svc-1", "run-1"); + result.RunActorId.Should().Be(expectedActorId); + result.RunId.Should().Be("run-1"); + runtime.CreateCalls.Should().ContainSingle(); + runtime.CreateCalls[0].agentType.Should().Be(typeof(ServiceRunGAgent)); + runtime.CreateCalls[0].actorId.Should().Be(expectedActorId); + projectionPort.EnsureCalls.Should().Equal(expectedActorId); + dispatchPort.Calls.Should().ContainSingle(); + dispatchPort.Calls[0].actorId.Should().Be(expectedActorId); + dispatchPort.Calls[0].envelope.Payload.TypeUrl.Should().Contain("RegisterServiceRunRequested"); + } + + [Fact] + public async Task RegisterAsync_ShouldNotCollide_OnSameRunIdAcrossScopes() + { + var runtime = new RecordingRunRegistryRuntime(); + var adapter = new ServiceRunRegistrationAdapter( + runtime, + new RecordingDispatchPort(), + new RecordingServiceRunProjectionPort()); + + await adapter.RegisterAsync(BuildRecord("tenant-a", "svc", "run-shared")); + await adapter.RegisterAsync(BuildRecord("tenant-b", "svc", "run-shared")); + + runtime.CreateCalls.Should().HaveCount(2); + runtime.CreateCalls[0].actorId.Should().Be(ServiceRunIds.BuildActorId("tenant-a", "svc", "run-shared")); + runtime.CreateCalls[1].actorId.Should().Be(ServiceRunIds.BuildActorId("tenant-b", "svc", "run-shared")); + runtime.CreateCalls[0].actorId.Should().NotBe(runtime.CreateCalls[1].actorId); + } + + [Fact] + public async Task RegisterAsync_ShouldRejectMissingRequiredFields() + { + var adapter = new ServiceRunRegistrationAdapter( + new RecordingRunRegistryRuntime(), + new RecordingDispatchPort(), + new RecordingServiceRunProjectionPort()); + + var noRun = BuildRecord("tenant", "svc", string.Empty); + var act = () => adapter.RegisterAsync(noRun); + await act.Should().ThrowAsync().WithMessage("run_id*"); + + var noScope = BuildRecord(string.Empty, "svc", "run-1"); + var act2 = () => adapter.RegisterAsync(noScope); + await act2.Should().ThrowAsync().WithMessage("scope_id*"); + + var noService = BuildRecord("tenant", string.Empty, "run-1"); + var act3 = () => adapter.RegisterAsync(noService); + await act3.Should().ThrowAsync().WithMessage("service_id*"); + } + + [Fact] + public async Task UpdateStatusAsync_ShouldDispatchUpdateEnvelope() + { + var dispatchPort = new RecordingDispatchPort(); + var adapter = new ServiceRunRegistrationAdapter( + new RecordingRunRegistryRuntime(), + dispatchPort, + new RecordingServiceRunProjectionPort()); + + await adapter.UpdateStatusAsync("service-run:tenant:svc:run-1", "run-1", ServiceRunStatus.Completed); + + dispatchPort.Calls.Should().ContainSingle(); + dispatchPort.Calls[0].actorId.Should().Be("service-run:tenant:svc:run-1"); + dispatchPort.Calls[0].envelope.Payload.TypeUrl.Should().Contain("UpdateServiceRunStatusRequested"); + } + + [Fact] + public async Task UpdateStatusAsync_ShouldNoOp_WhenStatusUnspecified() + { + var dispatchPort = new RecordingDispatchPort(); + var adapter = new ServiceRunRegistrationAdapter( + new RecordingRunRegistryRuntime(), + dispatchPort, + new RecordingServiceRunProjectionPort()); + + await adapter.UpdateStatusAsync("service-run:tenant:svc:run-1", "run-1", ServiceRunStatus.Unspecified); + + dispatchPort.Calls.Should().BeEmpty(); + } + + private static ServiceRunRecord BuildRecord(string scopeId, string serviceId, string runId) => + new() + { + ScopeId = scopeId, + ServiceId = serviceId, + ServiceKey = $"{scopeId}:{serviceId}", + RunId = runId, + CommandId = $"cmd-{runId}", + CorrelationId = $"corr-{runId}", + EndpointId = "run", + ImplementationKind = ServiceImplementationKind.Static, + TargetActorId = "primary-actor", + RevisionId = "r1", + DeploymentId = "dep-1", + Status = ServiceRunStatus.Unspecified, + CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow), + }; + + private sealed class RecordingRunRegistryRuntime : IActorRuntime + { + public List<(System.Type agentType, string actorId)> CreateCalls { get; } = []; + + public Task CreateAsync(string? id = null, CancellationToken ct = default) + where TAgent : IAgent => + CreateAsync(typeof(TAgent), id, ct); + + public Task CreateAsync(System.Type agentType, string? id = null, CancellationToken ct = default) + { + var actorId = id ?? $"created:{agentType.Name}"; + CreateCalls.Add((agentType, actorId)); + return Task.FromResult(new RecordingActor(actorId)); + } + + public Task DestroyAsync(string id, CancellationToken ct = default) => Task.CompletedTask; + + public Task GetAsync(string id) => Task.FromResult(null); + + public Task ExistsAsync(string id) => Task.FromResult(false); + + public Task LinkAsync(string parentId, string childId, CancellationToken ct = default) => Task.CompletedTask; + + public Task UnlinkAsync(string childId, CancellationToken ct = default) => Task.CompletedTask; + } + + private sealed class RecordingDispatchPort : IActorDispatchPort + { + public List<(string actorId, EventEnvelope envelope)> Calls { get; } = []; + + public Task DispatchAsync(string actorId, EventEnvelope envelope, CancellationToken ct = default) + { + Calls.Add((actorId, envelope)); + return Task.CompletedTask; + } + } + + private sealed class RecordingServiceRunProjectionPort : IServiceRunCurrentStateProjectionPort + { + public List EnsureCalls { get; } = []; + + public Task EnsureProjectionAsync(string actorId, CancellationToken ct = default) + { + EnsureCalls.Add(actorId); + return Task.CompletedTask; + } + } + + private sealed class RecordingActor : IActor + { + public RecordingActor(string id) + { + Id = id; + } + + public string Id { get; } + + public IAgent Agent { get; } = new TestStaticServiceAgent(); + + public Task ActivateAsync(CancellationToken ct = default) => Task.CompletedTask; + + public Task DeactivateAsync(CancellationToken ct = default) => Task.CompletedTask; + + public Task HandleEventAsync(EventEnvelope envelope, CancellationToken ct = default) => Task.CompletedTask; + + public Task GetParentIdAsync() => Task.FromResult(null); + + public Task> GetChildrenIdsAsync() => Task.FromResult>([]); + } +} diff --git a/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs b/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs new file mode 100644 index 000000000..88c661218 --- /dev/null +++ b/test/Aevatar.GAgentService.Tests/Projection/ServiceRunCurrentStateProjectorTests.cs @@ -0,0 +1,253 @@ +using Aevatar.CQRS.Projection.Core.Abstractions; +using Aevatar.Foundation.Abstractions; +using Aevatar.GAgentService.Abstractions; +using Aevatar.GAgentService.Projection.Contexts; +using Aevatar.GAgentService.Projection.Projectors; +using Aevatar.GAgentService.Projection.Queries; +using Aevatar.GAgentService.Projection.ReadModels; +using Aevatar.GAgentService.Abstractions.Queries; +using FluentAssertions; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; + +namespace Aevatar.GAgentService.Tests.Projection; + +public sealed class ServiceRunCurrentStateProjectorTests +{ + [Fact] + public async Task ProjectAsync_ShouldMaterializeCurrentState_FromCommittedStateRoot() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new ServiceRunCurrentStateProjector( + store, + new FixedProjectionClock(DateTimeOffset.Parse("2026-04-27T00:00:00+00:00"))); + var observedAt = DateTimeOffset.Parse("2026-04-27T01:00:00+00:00"); + var record = BuildRecord( + scopeId: "tenant-1", + serviceId: "svc-1", + runId: "run-1", + commandId: "cmd-1", + implementation: ServiceImplementationKind.Workflow, + targetActorId: "workflow-run:abc", + createdAt: observedAt); + var envelope = WrapCommittedRunState( + record, + stateVersion: 3, + eventId: "evt-registered", + observedAt: observedAt); + var context = new ServiceRunCurrentStateProjectionContext + { + RootActorId = "service-run:run-1", + ProjectionKind = "service-runs", + }; + + await projector.ProjectAsync(context, envelope); + + var doc = await store.GetAsync(ServiceRunIds.BuildKey("tenant-1", "svc-1", "run-1")); + doc.Should().NotBeNull(); + doc!.RunId.Should().Be("run-1"); + doc.CommandId.Should().Be("cmd-1"); + doc.ScopeId.Should().Be("tenant-1"); + doc.ServiceId.Should().Be("svc-1"); + doc.ActorId.Should().Be("service-run:run-1"); + doc.ImplementationKind.Should().Be((int)ServiceImplementationKind.Workflow); + doc.TargetActorId.Should().Be("workflow-run:abc"); + doc.Status.Should().Be((int)ServiceRunStatus.Accepted); + doc.StateVersion.Should().Be(3); + doc.LastEventId.Should().Be("evt-registered"); + } + + [Fact] + public async Task ProjectAsync_ShouldIgnoreEnvelope_WithoutCommittedStateRoot() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new ServiceRunCurrentStateProjector( + store, + new FixedProjectionClock(DateTimeOffset.UtcNow)); + var context = new ServiceRunCurrentStateProjectionContext + { + RootActorId = "service-run:run-x", + ProjectionKind = "service-runs", + }; + + await projector.ProjectAsync(context, new EventEnvelope + { + Id = "raw", + Payload = Any.Pack(new StringValue { Value = "noop" }), + }); + + (await store.ReadItemsAsync()).Should().BeEmpty(); + } + + [Fact] + public async Task QueryReader_ShouldFilterByScopeAndService_AndResolveByRunIdAndCommandId() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new ServiceRunCurrentStateProjector( + store, + new FixedProjectionClock(DateTimeOffset.Parse("2026-04-27T00:00:00+00:00"))); + var reader = new ServiceRunQueryReader(store); + await projector.ProjectAsync( + CreateContext("service-run:run-a"), + WrapCommittedRunState( + BuildRecord("tenant-1", "svc-1", "run-a", "cmd-a", ServiceImplementationKind.Static, "actor-a"), + stateVersion: 1, + eventId: "evt-a", + observedAt: DateTimeOffset.Parse("2026-04-27T01:00:00+00:00"))); + await projector.ProjectAsync( + CreateContext("service-run:run-b"), + WrapCommittedRunState( + BuildRecord("tenant-1", "svc-1", "run-b", "cmd-b", ServiceImplementationKind.Workflow, "actor-b"), + stateVersion: 1, + eventId: "evt-b", + observedAt: DateTimeOffset.Parse("2026-04-27T02:00:00+00:00"))); + await projector.ProjectAsync( + CreateContext("service-run:run-c"), + WrapCommittedRunState( + BuildRecord("tenant-2", "svc-1", "run-c", "cmd-c", ServiceImplementationKind.Scripting, "actor-c"), + stateVersion: 1, + eventId: "evt-c", + observedAt: DateTimeOffset.Parse("2026-04-27T03:00:00+00:00"))); + + var listForTenant1 = await reader.ListAsync(new ServiceRunQuery("tenant-1", "svc-1")); + listForTenant1.Should().HaveCount(2); + listForTenant1.Select(x => x.RunId).Should().BeEquivalentTo(new[] { "run-a", "run-b" }); + + var listForTenant2 = await reader.ListAsync(new ServiceRunQuery("tenant-2", "svc-1")); + listForTenant2.Select(x => x.RunId).Should().Equal("run-c"); + + var byRun = await reader.GetByRunIdAsync("tenant-1", "svc-1", "run-a"); + byRun.Should().NotBeNull(); + byRun!.CommandId.Should().Be("cmd-a"); + + var byCommand = await reader.GetByCommandIdAsync("tenant-1", "svc-1", "cmd-b"); + byCommand.Should().NotBeNull(); + byCommand!.RunId.Should().Be("run-b"); + + var byRunWrongScope = await reader.GetByRunIdAsync("tenant-1", "svc-1", "run-c"); + byRunWrongScope.Should().BeNull(); + } + + [Fact] + public async Task ProjectAsync_ShouldNotCollide_WhenSameRunIdAcrossDifferentScopes() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new ServiceRunCurrentStateProjector( + store, + new FixedProjectionClock(DateTimeOffset.Parse("2026-04-27T00:00:00+00:00"))); + var observedAt = DateTimeOffset.Parse("2026-04-27T01:00:00+00:00"); + + await projector.ProjectAsync( + CreateContext("service-run:tenant-a:svc:run-shared"), + WrapCommittedRunState( + BuildRecord("tenant-a", "svc", "run-shared", "cmd-x", ServiceImplementationKind.Static, "actor-a", observedAt), + stateVersion: 1, + eventId: "evt-a", + observedAt: observedAt)); + await projector.ProjectAsync( + CreateContext("service-run:tenant-b:svc:run-shared"), + WrapCommittedRunState( + BuildRecord("tenant-b", "svc", "run-shared", "cmd-x", ServiceImplementationKind.Static, "actor-b", observedAt), + stateVersion: 1, + eventId: "evt-b", + observedAt: observedAt)); + + var docA = await store.GetAsync(ServiceRunIds.BuildKey("tenant-a", "svc", "run-shared")); + var docB = await store.GetAsync(ServiceRunIds.BuildKey("tenant-b", "svc", "run-shared")); + docA.Should().NotBeNull(); + docB.Should().NotBeNull(); + docA!.TargetActorId.Should().Be("actor-a"); + docB!.TargetActorId.Should().Be("actor-b"); + } + + [Fact] + public async Task ProjectAsync_ShouldIgnoreState_WithMissingScopeOrService() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new ServiceRunCurrentStateProjector( + store, + new FixedProjectionClock(DateTimeOffset.UtcNow)); + + var record = BuildRecord("tenant-1", "svc-1", "run-1", "cmd-1", ServiceImplementationKind.Static, "actor-1"); + record.ScopeId = string.Empty; + await projector.ProjectAsync( + CreateContext("service-run:bad"), + WrapCommittedRunState(record, stateVersion: 1, eventId: "evt-bad", observedAt: DateTimeOffset.UtcNow)); + + (await store.ReadItemsAsync()).Should().BeEmpty(); + } + + private static ServiceRunCurrentStateProjectionContext CreateContext(string rootActorId) => + new() + { + RootActorId = rootActorId, + ProjectionKind = "service-runs", + }; + + private static ServiceRunRecord BuildRecord( + string scopeId, + string serviceId, + string runId, + string commandId, + ServiceImplementationKind implementation, + string targetActorId, + DateTimeOffset? createdAt = null) => + new() + { + ScopeId = scopeId, + ServiceId = serviceId, + ServiceKey = $"{scopeId}:{serviceId}", + RunId = runId, + CommandId = commandId, + CorrelationId = commandId, + EndpointId = "run", + ImplementationKind = implementation, + TargetActorId = targetActorId, + RevisionId = "r1", + DeploymentId = "dep-1", + Status = ServiceRunStatus.Accepted, + CreatedAt = createdAt.HasValue ? Timestamp.FromDateTimeOffset(createdAt.Value) : null, + UpdatedAt = createdAt.HasValue ? Timestamp.FromDateTimeOffset(createdAt.Value) : null, + Identity = new ServiceIdentity + { + TenantId = scopeId, + AppId = "app", + Namespace = "default", + ServiceId = serviceId, + }, + }; + + private static EventEnvelope WrapCommittedRunState( + ServiceRunRecord record, + long stateVersion, + string eventId, + DateTimeOffset observedAt) + { + var state = new ServiceRunState + { + Record = record.Clone(), + LastAppliedEventVersion = stateVersion, + LastEventId = eventId, + }; + return new EventEnvelope + { + Id = $"outer-{eventId}", + Timestamp = Timestamp.FromDateTimeOffset(observedAt), + Route = EnvelopeRouteSemantics.CreateObserverPublication("root-actor"), + Payload = Any.Pack(new CommittedStateEventPublished + { + StateEvent = new StateEvent + { + EventId = eventId, + Version = stateVersion, + Timestamp = Timestamp.FromDateTimeOffset(observedAt), + EventData = Any.Pack(new ServiceRunRegisteredEvent + { + Record = record.Clone(), + }), + }, + StateRoot = Any.Pack(state), + }), + }; + } +}