Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions agents/Aevatar.GAgents.NyxidChat/NyxIdChatEndpoints.Streaming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using Aevatar.AI.Abstractions.LLMProviders;
using Aevatar.Foundation.Abstractions;
using Aevatar.Foundation.Abstractions.Streaming;
using Aevatar.Studio.Application.Studio.Abstractions;
using Aevatar.GAgentService.Abstractions.ScopeGAgents;
using Google.Protobuf.WellKnownTypes;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
Expand All @@ -18,6 +20,7 @@ private static async Task HandleStreamMessageAsync(
string actorId,
NyxIdChatStreamRequest request,
[FromServices] IActorRuntime actorRuntime,
[FromServices] IScopeResourceAdmissionPort admissionPort,
[FromServices] IActorEventSubscriptionProvider subscriptionProvider,
[FromServices] ILoggerFactory loggerFactory,
CancellationToken ct)
Expand All @@ -43,8 +46,21 @@ private static async Task HandleStreamMessageAsync(
return;
}

actor = await actorRuntime.GetAsync(actorId)
?? await actorRuntime.CreateAsync<NyxIdChatGAgent>(actorId, ct);
if (!await TryAuthorizeConversationAsync(
http,
admissionPort,
scopeId,
actorId,
ScopeResourceOperation.Stream,
ct))
return;

actor = await actorRuntime.GetAsync(actorId);
if (actor == null)
{
http.Response.StatusCode = StatusCodes.Status404NotFound;
return;
}
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -189,6 +205,7 @@ private static async Task HandleApproveAsync(
string actorId,
NyxIdApprovalRequest request,
[FromServices] IActorRuntime actorRuntime,
[FromServices] IScopeResourceAdmissionPort admissionPort,
[FromServices] IActorEventSubscriptionProvider subscriptionProvider,
[FromServices] ILoggerFactory loggerFactory,
CancellationToken ct)
Expand All @@ -211,6 +228,15 @@ private static async Task HandleApproveAsync(
return;
}

if (!await TryAuthorizeConversationAsync(
http,
admissionPort,
scopeId,
actorId,
ScopeResourceOperation.Approve,
ct))
return;

actor = await actorRuntime.GetAsync(actorId);
if (actor == null)
{
Expand Down
176 changes: 162 additions & 14 deletions agents/Aevatar.GAgents.NyxidChat/NyxIdChatEndpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Aevatar.Foundation.Abstractions;
using Aevatar.Foundation.Abstractions.Streaming;
using Aevatar.Studio.Application.Studio.Abstractions;
using Aevatar.GAgentService.Abstractions.ScopeGAgents;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
Expand Down Expand Up @@ -81,54 +82,154 @@ public static IEndpointRouteBuilder MapNyxIdChatEndpoints(this IEndpointRouteBui
private static async Task<IResult> HandleCreateConversationAsync(
HttpContext http,
string scopeId,
[FromServices] IGAgentActorStore actorStore,
[FromServices] IGAgentActorRegistryCommandPort registryCommandPort,
[FromServices] IActorRuntime actorRuntime,
CancellationToken ct)
{
// Conversation creation is fail-fast on IGAgentActorStore persistence.
// Conversation creation is fail-fast on registry persistence.
// NyxId chat depends on the registry being available; there is no
// degraded mode where a conversation can run without being registered.
var actorId = NyxIdChatServiceDefaults.GenerateActorId();
await actorStore.AddActorAsync(scopeId, NyxIdChatServiceDefaults.GAgentTypeName, actorId, ct);
await actorRuntime.CreateAsync<NyxIdChatGAgent>(actorId, ct);
var registrationAttempted = false;
try
{
registrationAttempted = true;
var receipt = await registryCommandPort.RegisterActorAsync(
new GAgentActorRegistration(scopeId, NyxIdChatServiceDefaults.GAgentTypeName, actorId),
ct);
if (!receipt.IsAdmissionVisible)
{
await TryRollbackConversationCreationAsync(
http,
scopeId,
actorId,
registryCommandPort,
actorRuntime,
registrationAttempted);
return Results.Json(
new { error = "Conversation registration is not admission-visible" },
statusCode: StatusCodes.Status503ServiceUnavailable);
}
}
catch
{
await TryRollbackConversationCreationAsync(
http,
scopeId,
actorId,
registryCommandPort,
actorRuntime,
registrationAttempted);
throw;
}

return Results.Ok(new { actorId });
}

private static async Task TryRollbackConversationCreationAsync(
HttpContext http,
string scopeId,
string actorId,
IGAgentActorRegistryCommandPort registryCommandPort,
IActorRuntime actorRuntime,
bool registrationAttempted)
{
var logger = http.RequestServices?.GetService<ILoggerFactory>()
?.CreateLogger("Aevatar.NyxId.Chat.CreateConversation");

if (registrationAttempted)
{
try
{
await registryCommandPort.UnregisterActorAsync(
new GAgentActorRegistration(scopeId, NyxIdChatServiceDefaults.GAgentTypeName, actorId),
CancellationToken.None);
}
catch (Exception ex)
{
logger?.LogWarning(
ex,
"Failed to unregister NyxId chat conversation during create rollback: scope={ScopeId}, actor={ActorId}",
scopeId,
actorId);
return;
}
}

try
{
await actorRuntime.DestroyAsync(actorId, CancellationToken.None);
}
catch (Exception ex)
{
logger?.LogWarning(ex, "Failed to destroy NyxId chat actor {ActorId} during create rollback", actorId);
}
}

private static async Task<IResult> HandleListConversationsAsync(
HttpContext http,
string scopeId,
[FromServices] IGAgentActorStore actorStore,
[FromServices] IGAgentActorRegistryQueryPort registryQueryPort,
CancellationToken ct)
{
try
{
var groups = await actorStore.GetAsync(scopeId, ct);
var actorIds = groups
var snapshot = await registryQueryPort.ListActorsAsync(scopeId, ct);
var actorIds = snapshot.Groups
.FirstOrDefault(g => string.Equals(g.GAgentType, NyxIdChatServiceDefaults.GAgentTypeName, StringComparison.Ordinal))
?.ActorIds
?? [];
return Results.Ok(actorIds.Select(actorId => new { actorId }));
return Results.Ok(new
{
snapshot.ScopeId,
snapshot.StateVersion,
snapshot.UpdatedAt,
snapshot.ObservedAt,
Conversations = actorIds.Select(actorId => new { actorId }),
});
}
catch (InvalidOperationException)
{
return Results.Ok(Array.Empty<object>());
return Results.Ok(new
{
ScopeId = scopeId,
StateVersion = 0L,
UpdatedAt = DateTimeOffset.MinValue,
ObservedAt = DateTimeOffset.UtcNow,
Conversations = Array.Empty<object>(),
});
}
}

private static async Task<IResult> HandleDeleteConversationAsync(
HttpContext http,
string scopeId,
string actorId,
[FromServices] IGAgentActorStore actorStore,
[FromServices] IGAgentActorRegistryCommandPort registryCommandPort,
[FromServices] IScopeResourceAdmissionPort admissionPort,
[FromServices] IChatHistoryStore chatHistoryStore,
CancellationToken ct)
{
await actorStore.RemoveActorAsync(scopeId, NyxIdChatServiceDefaults.GAgentTypeName, actorId, ct);
var admissionError = await AuthorizeConversationAsync(
admissionPort,
scopeId,
actorId,
ScopeResourceOperation.Delete,
ct);
if (admissionError != null)
return admissionError;

await registryCommandPort.UnregisterActorAsync(
new GAgentActorRegistration(scopeId, NyxIdChatServiceDefaults.GAgentTypeName, actorId),
ct);
try
{
await chatHistoryStore.DeleteConversationAsync(scopeId, actorId, ct);
}
catch
{
await TryRestoreConversationRegistrationAsync(http, scopeId, actorId, actorStore);
await TryRestoreConversationRegistrationAsync(http, scopeId, actorId, registryCommandPort);
throw;
}

Expand All @@ -139,11 +240,13 @@ private static async Task TryRestoreConversationRegistrationAsync(
HttpContext http,
string scopeId,
string actorId,
IGAgentActorStore actorStore)
IGAgentActorRegistryCommandPort registryCommandPort)
{
try
{
await actorStore.AddActorAsync(scopeId, NyxIdChatServiceDefaults.GAgentTypeName, actorId, CancellationToken.None);
await registryCommandPort.RegisterActorAsync(
new GAgentActorRegistration(scopeId, NyxIdChatServiceDefaults.GAgentTypeName, actorId),
CancellationToken.None);
}
catch (Exception ex)
{
Expand All @@ -153,10 +256,55 @@ private static async Task TryRestoreConversationRegistrationAsync(
ex,
"Failed to restore NyxId chat conversation registration after history deletion failure: scope={ScopeId}, actor={ActorId}",
scopeId,
actorId);
actorId);
}
}

private static async Task<IResult?> AuthorizeConversationAsync(
IScopeResourceAdmissionPort admissionPort,
string scopeId,
string actorId,
ScopeResourceOperation operation,
CancellationToken ct)
{
var admission = await admissionPort.AuthorizeTargetAsync(
new ScopeResourceTarget(
scopeId,
ScopeResourceKind.GAgentActor,
NyxIdChatServiceDefaults.GAgentTypeName,
actorId,
operation),
ct);
return admission.Status switch
{
ScopeResourceAdmissionStatus.Allowed => null,
ScopeResourceAdmissionStatus.NotFound => Results.NotFound(new { error = "Conversation not found" }),
ScopeResourceAdmissionStatus.Denied or ScopeResourceAdmissionStatus.ScopeMismatch =>
Results.Json(new { error = "Conversation access denied" }, statusCode: StatusCodes.Status403Forbidden),
ScopeResourceAdmissionStatus.Unavailable =>
Results.Json(new { error = "Conversation admission unavailable" }, statusCode: StatusCodes.Status503ServiceUnavailable),
_ => Results.Json(new { error = "Conversation admission failed" }, statusCode: StatusCodes.Status503ServiceUnavailable),
};
}

private static async Task<bool> TryAuthorizeConversationAsync(
HttpContext http,
IScopeResourceAdmissionPort admissionPort,
string scopeId,
string actorId,
ScopeResourceOperation operation,
CancellationToken ct)
{
var admissionError = await AuthorizeConversationAsync(admissionPort, scopeId, actorId, operation, ct);
if (admissionError == null)
return true;

http.Response.StatusCode = admissionError is IStatusCodeHttpResult { StatusCode: { } statusCode }
? statusCode
: StatusCodes.Status500InternalServerError;
return false;
}

private static async Task InjectUserConfigMetadataAsync(
HttpContext http,
IDictionary<string, string> metadata,
Expand Down
24 changes: 24 additions & 0 deletions agents/Aevatar.GAgents.Registry/GAgentRegistryGAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ public async Task HandleActorRegistered(ActorRegisteredEvent evt)
await PersistDomainEventAsync(evt);
}

[EventHandler(EndpointName = "authorizeScopeResource")]
public Task HandleScopeResourceAdmissionRequested(ScopeResourceAdmissionRequested request)
{
if (string.IsNullOrWhiteSpace(request.ScopeId) ||
string.IsNullOrWhiteSpace(request.GagentType) ||
string.IsNullOrWhiteSpace(request.ActorId))
throw new GAgentRegistryAdmissionNotFoundException();

var group = State.Groups.FirstOrDefault(g =>
string.Equals(g.GagentType, request.GagentType, StringComparison.Ordinal));
if (group is null || !group.ActorIds.Contains(request.ActorId))
throw new GAgentRegistryAdmissionNotFoundException();

return Task.CompletedTask;
}

[EventHandler(EndpointName = "unregisterActor")]
public async Task HandleActorUnregistered(ActorUnregisteredEvent evt)
{
Expand Down Expand Up @@ -100,3 +116,11 @@ private static GAgentRegistryState ApplyUnregistered(
}

}

public sealed class GAgentRegistryAdmissionNotFoundException : Exception
{
public GAgentRegistryAdmissionNotFoundException()
: base("Registry target was not found.")
{
}
}
20 changes: 20 additions & 0 deletions agents/Aevatar.GAgents.Registry/gagent_registry_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,23 @@ message ActorUnregisteredEvent {
string actor_id = 2;
}

// ─── Admission ───

enum GAgentRegistryOperation {
UNKNOWN = 0;
USE = 1;
DELETE = 2;
CHAT = 3;
STREAM = 4;
APPROVE = 5;
JOIN = 6;
LIST_PARTICIPANTS = 7;
DRAFT_RUN_REUSE = 8;
}

message ScopeResourceAdmissionRequested {
string gagent_type = 1;
string actor_id = 2;
GAgentRegistryOperation operation = 3;
string scope_id = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<ProjectReference Include="..\..\src\Aevatar.CQRS.Projection.Stores.Abstractions\Aevatar.CQRS.Projection.Stores.Abstractions.csproj" />
<ProjectReference Include="..\..\src\Aevatar.Foundation.Abstractions\Aevatar.Foundation.Abstractions.csproj" />
<ProjectReference Include="..\..\src\Aevatar.Hosting\Aevatar.Hosting.csproj" />
<ProjectReference Include="..\..\src\platform\Aevatar.GAgentService.Abstractions\Aevatar.GAgentService.Abstractions.csproj" />
<ProjectReference Include="..\..\src\Aevatar.Studio.Application\Aevatar.Studio.Application.csproj" />
</ItemGroup>
<ItemGroup>
Expand Down
Loading
Loading