Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
<Protobuf Include="Protos/service_revision.proto" GrpcServices="None" ProtoRoot="Protos" />
<Protobuf Include="Protos/service_deployment.proto" GrpcServices="None" ProtoRoot="Protos" />
<Protobuf Include="Protos/service_serving.proto" GrpcServices="None" ProtoRoot="Protos" />
<Protobuf Include="Protos/service_runs.proto" GrpcServices="None" ProtoRoot="Protos" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace Aevatar.GAgentService.Abstractions.Ports;

/// <summary>
/// Activation port for the durable service-run current-state projection.
/// Mirrors <see cref="IServiceCatalogProjectionPort"/> shape but scoped to service-run actors.
/// </summary>
public interface IServiceRunCurrentStateProjectionPort
{
Task EnsureProjectionAsync(string actorId, CancellationToken ct = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Aevatar.GAgentService.Abstractions.Queries;

namespace Aevatar.GAgentService.Abstractions.Ports;

/// <summary>
/// Read contract for the implementation-agnostic service-run registry.
/// Backed by the durable <c>ServiceRunGAgent</c> projection.
/// </summary>
public interface IServiceRunQueryPort
{
Task<IReadOnlyList<ServiceRunSnapshot>> ListAsync(
ServiceRunQuery query,
CancellationToken ct = default);

Task<ServiceRunSnapshot?> GetByRunIdAsync(
string scopeId,
string serviceId,
string runId,
CancellationToken ct = default);

Task<ServiceRunSnapshot?> GetByCommandIdAsync(
string scopeId,
string serviceId,
string commandId,
CancellationToken ct = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace Aevatar.GAgentService.Abstractions.Ports;

/// <summary>
/// Write contract for the implementation-agnostic service-run registry.
/// Used by the invocation dispatcher to register a run before returning the accepted receipt,
/// so Studio Observe can query the run from the durable readmodel even on immediate refresh.
/// </summary>
public interface IServiceRunRegistrationPort
{
Task<ServiceRunRegistrationResult> RegisterAsync(
ServiceRunRecord record,
CancellationToken ct = default);

Task UpdateStatusAsync(
string runActorId,
string runId,
ServiceRunStatus status,
CancellationToken ct = default);
}

public sealed record ServiceRunRegistrationResult(
string RunActorId,
string RunId);
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
syntax = "proto3";

package aevatar.gagentservice;

option csharp_namespace = "Aevatar.GAgentService.Abstractions";

import "google/protobuf/timestamp.proto";
import "service_endpoint.proto";
import "service_revision.proto";

enum ServiceRunStatus {
SERVICE_RUN_STATUS_UNSPECIFIED = 0;
SERVICE_RUN_STATUS_ACCEPTED = 1;
SERVICE_RUN_STATUS_COMPLETED = 2;
SERVICE_RUN_STATUS_FAILED = 3;
SERVICE_RUN_STATUS_STOPPED = 4;
}

message ServiceRunRecord {
string scope_id = 1;
string service_id = 2;
string service_key = 3;
string run_id = 4;
string command_id = 5;
string correlation_id = 6;
string endpoint_id = 7;
ServiceImplementationKind implementation_kind = 8;
string target_actor_id = 9;
string revision_id = 10;
string deployment_id = 11;
ServiceRunStatus status = 12;
google.protobuf.Timestamp created_at = 13;
google.protobuf.Timestamp updated_at = 14;
ServiceIdentity identity = 15;
}

message ServiceRunState {
ServiceRunRecord record = 1;
int64 last_applied_event_version = 2;
string last_event_id = 3;
}

message RegisterServiceRunRequested {
ServiceRunRecord record = 1;
}

message UpdateServiceRunStatusRequested {
string run_id = 1;
ServiceRunStatus status = 2;
}

message ServiceRunRegisteredEvent {
ServiceRunRecord record = 1;
}

message ServiceRunStatusUpdatedEvent {
string run_id = 1;
ServiceRunStatus status = 2;
google.protobuf.Timestamp updated_at = 3;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace Aevatar.GAgentService.Abstractions.Queries;

public sealed record ServiceRunSnapshot(
string ScopeId,
string ServiceId,
string ServiceKey,
string RunId,
string CommandId,
string CorrelationId,
string EndpointId,
ServiceImplementationKind ImplementationKind,
string TargetActorId,
string RevisionId,
string DeploymentId,
ServiceRunStatus Status,
string ActorId,
string TenantId,
string AppId,
string Namespace,
long StateVersion,
string LastEventId,
DateTimeOffset CreatedAt,
DateTimeOffset UpdatedAt);

public sealed record ServiceRunQuery(
string ScopeId,
string ServiceId,
int Take = 50);
23 changes: 23 additions & 0 deletions src/platform/Aevatar.GAgentService.Abstractions/ServiceRunIds.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace Aevatar.GAgentService.Abstractions;

public static class ServiceRunIds
{
public const string ActorPrefix = "service-run:";

public static string BuildKey(string scopeId, string serviceId, string runId)
{
if (string.IsNullOrWhiteSpace(scopeId))
throw new ArgumentException("scopeId is required.", nameof(scopeId));
if (string.IsNullOrWhiteSpace(serviceId))
throw new ArgumentException("serviceId is required.", nameof(serviceId));
if (string.IsNullOrWhiteSpace(runId))
throw new ArgumentException("runId is required.", nameof(runId));

return $"{Normalize(scopeId)}:{Normalize(serviceId)}:{Normalize(runId)}";
}

public static string BuildActorId(string scopeId, string serviceId, string runId) =>
ActorPrefix + BuildKey(scopeId, serviceId, runId);

private static string Normalize(string value) => value.Trim();
}
142 changes: 142 additions & 0 deletions src/platform/Aevatar.GAgentService.Core/GAgents/ServiceRunGAgent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
using Aevatar.Foundation.Abstractions.Attributes;
using Aevatar.Foundation.Core;
using Aevatar.Foundation.Core.EventSourcing;
using Aevatar.GAgentService.Abstractions;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;

namespace Aevatar.GAgentService.Core.GAgents;

public sealed class ServiceRunGAgent : GAgentBase<ServiceRunState>
{
public ServiceRunGAgent()
{
InitializeId();
}

[EventHandler]
public async Task HandleRegisterAsync(RegisterServiceRunRequested command)
{
ArgumentNullException.ThrowIfNull(command);
ArgumentNullException.ThrowIfNull(command.Record);
ValidateRecord(command.Record);

var existing = State.Record;
if (existing != null && !string.IsNullOrWhiteSpace(existing.RunId))
{
EnsureExistingMatches(existing, command.Record);
return;
}

var record = command.Record.Clone();
if (record.CreatedAt == null)
record.CreatedAt = Timestamp.FromDateTime(DateTime.UtcNow);
record.UpdatedAt = record.CreatedAt;
if (record.Status == ServiceRunStatus.Unspecified)
record.Status = ServiceRunStatus.Accepted;

await PersistDomainEventAsync(new ServiceRunRegisteredEvent
{
Record = record,
});
}

[EventHandler]
public async Task HandleUpdateStatusAsync(UpdateServiceRunStatusRequested command)
{
ArgumentNullException.ThrowIfNull(command);
var existing = State.Record;
if (existing == null || string.IsNullOrWhiteSpace(existing.RunId))
{
throw new InvalidOperationException(
$"Service run actor '{Id}' has no registered run; status update rejected.");
}

if (!string.IsNullOrWhiteSpace(command.RunId) &&
!string.Equals(existing.RunId, command.RunId, StringComparison.Ordinal))
{
throw new InvalidOperationException(
$"Service run actor '{Id}' is bound to run '{existing.RunId}' and cannot update run '{command.RunId}'.");
}

if (command.Status == ServiceRunStatus.Unspecified)
return;

if (existing.Status == command.Status)
return;

await PersistDomainEventAsync(new ServiceRunStatusUpdatedEvent
{
RunId = existing.RunId,
Status = command.Status,
UpdatedAt = Timestamp.FromDateTime(DateTime.UtcNow),
});
}

protected override ServiceRunState TransitionState(ServiceRunState current, IMessage evt) =>
StateTransitionMatcher
.Match(current, evt)
.On<ServiceRunRegisteredEvent>(ApplyRegistered)
.On<ServiceRunStatusUpdatedEvent>(ApplyStatusUpdated)
.OrCurrent();

private static ServiceRunState ApplyRegistered(ServiceRunState state, ServiceRunRegisteredEvent evt)
{
var next = state.Clone();
next.Record = evt.Record?.Clone() ?? new ServiceRunRecord();
next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1;
next.LastEventId = $"{next.Record.RunId}:registered";
return next;
}

private static ServiceRunState ApplyStatusUpdated(ServiceRunState state, ServiceRunStatusUpdatedEvent evt)
{
var next = state.Clone();
if (next.Record == null)
next.Record = new ServiceRunRecord();
next.Record.Status = evt.Status;
next.Record.UpdatedAt = evt.UpdatedAt ?? Timestamp.FromDateTime(DateTime.UtcNow);
next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1;
next.LastEventId = $"{next.Record.RunId}:status:{(int)evt.Status}";
return next;
}

private void EnsureExistingMatches(ServiceRunRecord existing, ServiceRunRecord incoming)
{
if (!string.Equals(existing.RunId, incoming.RunId, StringComparison.Ordinal))
{
throw new InvalidOperationException(
$"Service run actor '{Id}' is bound to run '{existing.RunId}' and cannot register run '{incoming.RunId}'.");
}
if (!string.Equals(existing.ScopeId, incoming.ScopeId, StringComparison.Ordinal))
{
throw new InvalidOperationException(
$"Service run actor '{Id}' is bound to scope '{existing.ScopeId}' and cannot re-register under scope '{incoming.ScopeId}'.");
}
if (!string.Equals(existing.ServiceId, incoming.ServiceId, StringComparison.Ordinal))
{
throw new InvalidOperationException(
$"Service run actor '{Id}' is bound to service '{existing.ServiceId}' and cannot re-register under service '{incoming.ServiceId}'.");
}
if (!string.IsNullOrWhiteSpace(incoming.TargetActorId) &&
!string.IsNullOrWhiteSpace(existing.TargetActorId) &&
!string.Equals(existing.TargetActorId, incoming.TargetActorId, StringComparison.Ordinal))
{
throw new InvalidOperationException(
$"Service run actor '{Id}' is bound to target '{existing.TargetActorId}' and cannot re-register against target '{incoming.TargetActorId}'.");
}
}

private static void ValidateRecord(ServiceRunRecord record)
{
ArgumentNullException.ThrowIfNull(record);
if (string.IsNullOrWhiteSpace(record.RunId))
throw new InvalidOperationException("run_id is required.");
if (string.IsNullOrWhiteSpace(record.ScopeId))
throw new InvalidOperationException("scope_id is required.");
if (string.IsNullOrWhiteSpace(record.ServiceId))
throw new InvalidOperationException("service_id is required.");
if (string.IsNullOrWhiteSpace(record.CommandId))
throw new InvalidOperationException("command_id is required.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static IServiceCollection AddGAgentServiceCapability(
services.TryAddSingleton<IServiceCommandTargetProvisioner, DefaultServiceCommandTargetProvisioner>();
services.TryAddSingleton<IServiceRevisionArtifactStore, ConfiguredServiceRevisionArtifactStore>();
services.TryAddSingleton<IServiceRuntimeActivator, DefaultServiceRuntimeActivator>();
services.TryAddSingleton<IServiceRunRegistrationPort, ServiceRunRegistrationAdapter>();
services.TryAddSingleton<IServiceInvocationDispatcher, DefaultServiceInvocationDispatcher>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IServiceImplementationAdapter, StaticServiceImplementationAdapter>());
services.TryAddEnumerable(ServiceDescriptor.Singleton<IServiceImplementationAdapter, ScriptingServiceImplementationAdapter>());
Expand Down Expand Up @@ -118,6 +119,7 @@ public static IServiceCollection AddGAgentServiceProjectionReadModelProviders(
TryAddElasticsearchDocumentProjectionStore<ServiceRolloutReadModel>(services, configuration, static readModel => readModel.Id);
TryAddElasticsearchDocumentProjectionStore<ServiceRolloutCommandObservationReadModel>(services, configuration, static readModel => readModel.Id);
TryAddElasticsearchDocumentProjectionStore<ServiceTrafficViewReadModel>(services, configuration, static readModel => readModel.Id);
TryAddElasticsearchDocumentProjectionStore<ServiceRunCurrentStateReadModel>(services, configuration, static readModel => readModel.Id);
TryAddElasticsearchDocumentProjectionStore<UserConfigCurrentStateDocument>(services, configuration, static readModel => readModel.Id);
}
else
Expand All @@ -129,6 +131,7 @@ public static IServiceCollection AddGAgentServiceProjectionReadModelProviders(
TryAddInMemoryDocumentProjectionStore<ServiceRolloutReadModel>(services, static readModel => readModel.Id);
TryAddInMemoryDocumentProjectionStore<ServiceRolloutCommandObservationReadModel>(services, static readModel => readModel.Id);
TryAddInMemoryDocumentProjectionStore<ServiceTrafficViewReadModel>(services, static readModel => readModel.Id);
TryAddInMemoryDocumentProjectionStore<ServiceRunCurrentStateReadModel>(services, static readModel => readModel.Id);
TryAddInMemoryDocumentProjectionStore<UserConfigCurrentStateDocument>(services, static readModel => readModel.Id);
}

Expand All @@ -146,6 +149,7 @@ private static bool HasAllGAgentServiceProjectionReaders(
&& HasProjectionDocumentReaderForProvider<ServiceRolloutReadModel>(services, providerKind)
&& HasProjectionDocumentReaderForProvider<ServiceRolloutCommandObservationReadModel>(services, providerKind)
&& HasProjectionDocumentReaderForProvider<ServiceTrafficViewReadModel>(services, providerKind)
&& HasProjectionDocumentReaderForProvider<ServiceRunCurrentStateReadModel>(services, providerKind)
&& HasProjectionDocumentReaderForProvider<UserConfigCurrentStateDocument>(services, providerKind);
}

Expand Down
Loading
Loading