Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Aevatar.Foundation.Abstractions;
using Aevatar.Foundation.Abstractions.Attributes;
using Aevatar.Foundation.Abstractions.Persistence;
using Aevatar.Foundation.Core;
using Aevatar.Foundation.Core.EventSourcing;
using Google.Protobuf;
Expand All @@ -21,19 +22,22 @@ public sealed class ConnectorCatalogGAgent : GAgentBase<ConnectorCatalogState>,
[EventHandler(EndpointName = "saveCatalog")]
public async Task HandleCatalogSaved(ConnectorCatalogSavedEvent evt)
{
EnsureExpectedVersionMatches(evt.HasExpectedVersion, evt.ExpectedVersion);
await PersistDomainEventAsync(evt);
}

[EventHandler(EndpointName = "saveDraft")]
public async Task HandleDraftSaved(ConnectorDraftSavedEvent evt)
{
EnsureExpectedVersionMatches(evt.HasExpectedVersion, evt.ExpectedVersion);
await PersistDomainEventAsync(evt);
}

[EventHandler(EndpointName = "deleteDraft")]
public async Task HandleDraftDeleted(ConnectorDraftDeletedEvent evt)
{
// Idempotent: skip if no draft exists
EnsureExpectedVersionMatches(evt.HasExpectedVersion, evt.ExpectedVersion);

if (State.Draft is null)
return;

Expand All @@ -56,12 +60,27 @@ protected override ConnectorCatalogState TransitionState(
.OrCurrent();
}

private void EnsureExpectedVersionMatches(bool hasExpectedVersion, long expectedVersion)
{
if (!hasExpectedVersion)
return;

if (expectedVersion != State.LastAppliedEventVersion)
{
throw new EventStoreOptimisticConcurrencyException(
Id,
expectedVersion,
State.LastAppliedEventVersion);
}
}

private static ConnectorCatalogState ApplyCatalogSaved(
ConnectorCatalogState state, ConnectorCatalogSavedEvent evt)
{
var next = state.Clone();
next.Connectors.Clear();
next.Connectors.AddRange(evt.Connectors);
next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1;
return next;
}

Expand All @@ -74,6 +93,7 @@ private static ConnectorCatalogState ApplyDraftSaved(
Draft = evt.Draft?.Clone(),
UpdatedAtUtc = evt.UpdatedAtUtc,
};
next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1;
return next;
}

Expand All @@ -82,6 +102,7 @@ private static ConnectorCatalogState ApplyDraftDeleted(
{
var next = state.Clone();
next.Draft = null;
next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1;
return next;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,23 @@ message ConnectorDraftEntry {
message ConnectorCatalogState {
repeated ConnectorDefinitionEntry connectors = 1;
ConnectorDraftEntry draft = 2; // null when no draft exists
int64 last_applied_event_version = 3;
}

// ─── Events ───

message ConnectorCatalogSavedEvent {
repeated ConnectorDefinitionEntry connectors = 1;
optional int64 expected_version = 2; // unset = skip optimistic concurrency check
}

message ConnectorDraftSavedEvent {
ConnectorDefinitionEntry draft = 1;
google.protobuf.Timestamp updated_at_utc = 2;
optional int64 expected_version = 3; // unset = skip optimistic concurrency check
}

message ConnectorDraftDeletedEvent {
optional int64 expected_version = 1; // unset = skip optimistic concurrency check
}

22 changes: 22 additions & 0 deletions agents/Aevatar.GAgents.RoleCatalog/RoleCatalogGAgent.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Aevatar.Foundation.Abstractions;
using Aevatar.Foundation.Abstractions.Attributes;
using Aevatar.Foundation.Abstractions.Persistence;
using Aevatar.Foundation.Core;
using Aevatar.Foundation.Core.EventSourcing;
using Google.Protobuf;
Expand All @@ -21,18 +22,22 @@ public sealed class RoleCatalogGAgent : GAgentBase<RoleCatalogState>, IProjected
[EventHandler(EndpointName = "saveCatalog")]
public async Task HandleCatalogSaved(RoleCatalogSavedEvent evt)
{
EnsureExpectedVersionMatches(evt.HasExpectedVersion, evt.ExpectedVersion);
await PersistDomainEventAsync(evt);
}

[EventHandler(EndpointName = "saveDraft")]
public async Task HandleDraftSaved(RoleDraftSavedEvent evt)
{
EnsureExpectedVersionMatches(evt.HasExpectedVersion, evt.ExpectedVersion);
await PersistDomainEventAsync(evt);
}

[EventHandler(EndpointName = "deleteDraft")]
public async Task HandleDraftDeleted(RoleDraftDeletedEvent evt)
{
EnsureExpectedVersionMatches(evt.HasExpectedVersion, evt.ExpectedVersion);

if (State.Draft is null)
return;

Expand All @@ -55,12 +60,27 @@ protected override RoleCatalogState TransitionState(
.OrCurrent();
}

private void EnsureExpectedVersionMatches(bool hasExpectedVersion, long expectedVersion)
{
if (!hasExpectedVersion)
return;

if (expectedVersion != State.LastAppliedEventVersion)
{
throw new EventStoreOptimisticConcurrencyException(
Id,
expectedVersion,
State.LastAppliedEventVersion);
}
}

private static RoleCatalogState ApplyCatalogSaved(
RoleCatalogState state, RoleCatalogSavedEvent evt)
{
var next = state.Clone();
next.Roles.Clear();
next.Roles.AddRange(evt.Roles);
next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1;
return next;
}

Expand All @@ -73,6 +93,7 @@ private static RoleCatalogState ApplyDraftSaved(
Draft = evt.Draft?.Clone(),
UpdatedAtUtc = evt.UpdatedAtUtc,
};
next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1;
return next;
}

Expand All @@ -81,6 +102,7 @@ private static RoleCatalogState ApplyDraftDeleted(
{
var next = state.Clone();
next.Draft = null;
next.LastAppliedEventVersion = state.LastAppliedEventVersion + 1;
return next;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,23 @@ message RoleDraftEntry {
message RoleCatalogState {
repeated RoleDefinitionEntry roles = 1;
RoleDraftEntry draft = 2; // null when no draft exists
int64 last_applied_event_version = 3;
}

// ─── Events ───

message RoleCatalogSavedEvent {
repeated RoleDefinitionEntry roles = 1;
optional int64 expected_version = 2; // unset = skip optimistic concurrency check
}

message RoleDraftSavedEvent {
RoleDefinitionEntry draft = 1;
google.protobuf.Timestamp updated_at_utc = 2;
optional int64 expected_version = 3; // unset = skip optimistic concurrency check
}

message RoleDraftDeletedEvent {}
message RoleDraftDeletedEvent {
optional int64 expected_version = 1; // unset = skip optimistic concurrency check
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public interface IConnectorCatalogStore

Task<StoredConnectorCatalog> SaveConnectorCatalogAsync(
StoredConnectorCatalog catalog,
long? expectedVersion = null,
CancellationToken cancellationToken = default);

Task<ImportedConnectorCatalog> ImportLocalCatalogAsync(CancellationToken cancellationToken = default);
Expand All @@ -14,9 +15,12 @@ Task<StoredConnectorCatalog> SaveConnectorCatalogAsync(

Task<StoredConnectorDraft> SaveConnectorDraftAsync(
StoredConnectorDraft draft,
long? expectedVersion = null,
CancellationToken cancellationToken = default);

Task DeleteConnectorDraftAsync(CancellationToken cancellationToken = default);
Task DeleteConnectorDraftAsync(
long? expectedVersion = null,
CancellationToken cancellationToken = default);
}

public sealed record ImportedConnectorCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public interface IRoleCatalogStore

Task<StoredRoleCatalog> SaveRoleCatalogAsync(
StoredRoleCatalog catalog,
long? expectedVersion = null,
CancellationToken cancellationToken = default);

Task<ImportedRoleCatalog> ImportLocalCatalogAsync(CancellationToken cancellationToken = default);
Expand All @@ -14,9 +15,12 @@ Task<StoredRoleCatalog> SaveRoleCatalogAsync(

Task<StoredRoleDraft> SaveRoleDraftAsync(
StoredRoleDraft draft,
long? expectedVersion = null,
CancellationToken cancellationToken = default);

Task DeleteRoleDraftAsync(CancellationToken cancellationToken = default);
Task DeleteRoleDraftAsync(
long? expectedVersion = null,
CancellationToken cancellationToken = default);
}

public sealed record ImportedRoleCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,31 @@ public sealed record StoredConnectorCatalog(
string HomeDirectory,
string FilePath,
bool FileExists,
IReadOnlyList<StoredConnectorDefinition> Connectors);
IReadOnlyList<StoredConnectorDefinition> Connectors,
long Version = 0);

public sealed record StoredRoleCatalog(
string HomeDirectory,
string FilePath,
bool FileExists,
IReadOnlyList<StoredRoleDefinition> Roles);
IReadOnlyList<StoredRoleDefinition> Roles,
long Version = 0);

public sealed record StoredConnectorDraft(
string HomeDirectory,
string FilePath,
bool FileExists,
DateTimeOffset? UpdatedAtUtc,
StoredConnectorDefinition? Draft);
StoredConnectorDefinition? Draft,
long Version = 0);

public sealed record StoredRoleDraft(
string HomeDirectory,
string FilePath,
bool FileExists,
DateTimeOffset? UpdatedAtUtc,
StoredRoleDefinition? Draft);
StoredRoleDefinition? Draft,
long Version = 0);

public sealed record StoredConnectorDefinition(
string Name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@ public sealed record ConnectorCatalogResponse(
string HomeDirectory,
string FilePath,
bool FileExists,
IReadOnlyList<ConnectorDefinitionDto> Connectors);
IReadOnlyList<ConnectorDefinitionDto> Connectors,
long Version = 0);

public sealed record ConnectorDraftResponse(
string HomeDirectory,
string FilePath,
bool FileExists,
DateTimeOffset? UpdatedAtUtc,
ConnectorDefinitionDto? Draft);
ConnectorDefinitionDto? Draft,
long Version = 0);

public sealed record SaveConnectorCatalogRequest(
IReadOnlyList<ConnectorDefinitionDto> Connectors);
IReadOnlyList<ConnectorDefinitionDto> Connectors,
long? ExpectedVersion = null);

public sealed record SaveConnectorDraftRequest(
ConnectorDefinitionDto? Draft);
ConnectorDefinitionDto? Draft,
long? ExpectedVersion = null);

public sealed record ImportConnectorCatalogResponse(
string SourceFilePath,
Expand Down
12 changes: 8 additions & 4 deletions src/Aevatar.Studio.Application/Studio/Contracts/RoleContracts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@ public sealed record RoleCatalogResponse(
string HomeDirectory,
string FilePath,
bool FileExists,
IReadOnlyList<RoleDefinitionDto> Roles);
IReadOnlyList<RoleDefinitionDto> Roles,
long Version = 0);

public sealed record RoleDraftResponse(
string HomeDirectory,
string FilePath,
bool FileExists,
DateTimeOffset? UpdatedAtUtc,
RoleDefinitionDto? Draft);
RoleDefinitionDto? Draft,
long Version = 0);

public sealed record SaveRoleCatalogRequest(
IReadOnlyList<RoleDefinitionDto> Roles);
IReadOnlyList<RoleDefinitionDto> Roles,
long? ExpectedVersion = null);

public sealed record SaveRoleDraftRequest(
RoleDefinitionDto? Draft);
RoleDefinitionDto? Draft,
long? ExpectedVersion = null);

public sealed record ImportRoleCatalogResponse(
string SourceFilePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public async Task<ConnectorCatalogResponse> SaveCatalogAsync(
.Where(connector => !string.IsNullOrWhiteSpace(connector.Name))
.Select(ToStoredConnector)
.ToList()),
request.ExpectedVersion,
cancellationToken);

return ToResponse(saved);
Expand Down Expand Up @@ -103,7 +104,7 @@ public async Task<ConnectorDraftResponse> SaveDraftAsync(
{
if (request.Draft is null)
{
await _store.DeleteConnectorDraftAsync(cancellationToken);
await _store.DeleteConnectorDraftAsync(request.ExpectedVersion, cancellationToken);
return await GetDraftAsync(cancellationToken);
}

Expand All @@ -114,13 +115,14 @@ public async Task<ConnectorDraftResponse> SaveDraftAsync(
FileExists: false,
UpdatedAtUtc: DateTimeOffset.UtcNow,
Draft: ToStoredConnectorDraft(request.Draft)),
request.ExpectedVersion,
cancellationToken);

return ToDraftResponse(saved);
}

public Task DeleteDraftAsync(CancellationToken cancellationToken = default) =>
_store.DeleteConnectorDraftAsync(cancellationToken);
public Task DeleteDraftAsync(long? expectedVersion = null, CancellationToken cancellationToken = default) =>
_store.DeleteConnectorDraftAsync(expectedVersion, cancellationToken);

private static void EnsureUniqueNames(IEnumerable<ConnectorDefinitionDto> connectors)
{
Expand Down Expand Up @@ -279,7 +281,8 @@ private static ConnectorCatalogResponse ToResponse(StoredConnectorCatalog catalo
catalog.HomeDirectory,
catalog.FilePath,
catalog.FileExists,
catalog.Connectors.Select(ToDto).ToList());
catalog.Connectors.Select(ToDto).ToList(),
catalog.Version);

private static ImportConnectorCatalogResponse ToImportResponse(ImportedConnectorCatalog imported) =>
new(
Expand All @@ -297,7 +300,8 @@ private static ConnectorDraftResponse ToDraftResponse(StoredConnectorDraft draft
draft.FilePath,
draft.FileExists,
draft.UpdatedAtUtc,
draft.Draft is null ? null : ToDto(draft.Draft));
draft.Draft is null ? null : ToDto(draft.Draft),
draft.Version);

private static ConnectorDefinitionDto ToDto(StoredConnectorDefinition connector) =>
new(
Expand Down
Loading
Loading