Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/AISmart.GAgent.Core/GAgentBase.Observers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ private Task UpdateObserverList()
var observer = new EventWrapperBaseAsyncObserver(async item =>
{
var grainId = (GrainId)item.GetType().GetProperty(nameof(EventWrapper<EventBase>.GrainId))?.GetValue(item)!;
if (grainId == this.GetGrainId() && eventHandlerMethod.Name != nameof(ForwardEventAsync))
if (grainId == this.GetGrainId())
{
// Skip the event if it is sent by itself.
return;
Expand Down
47 changes: 6 additions & 41 deletions src/AISmart.GAgent.Core/GAgentBase.Publish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,17 @@ public abstract partial class GAgentBase<TState, TEvent>
protected async Task PublishAsync<T>(EventWrapper<T> eventWrapper) where T : EventBase
{
await SendEventUpwardsAsync(eventWrapper);
await SendEventDownwardsAsync(eventWrapper);
}

protected async Task<Guid> PublishAsync<T>(T @event) where T : EventBase
{
var isTop = _correlationId == null;
_correlationId ??= Guid.NewGuid();
@event.CorrelationId = _correlationId;
@event.StreamId = StreamId.Create(CommonConstants.StreamNamespace, this.GetPrimaryKey());
var eventId = Guid.NewGuid();
switch (isTop)
{
case true:
// This event is the first time appeared to silo.
await SendEventToSelfAsync(new EventWrapper<T>(@event, eventId, this.GetGrainId()));
break;
case false when _streamIdDictionary.TryGetValue(_correlationId!.Value, out var streamIdValue):
@event.StreamId = streamIdValue;
await PublishEventUpwardsAsync(@event, eventId);
break;
}

await SendEventToSelfStreamAsync(new EventWrapper<T>(@event, eventId, this.GetGrainId()));
await PublishEventUpwardsAsync(@event, eventId);

return eventId;
}
Expand All @@ -44,38 +34,13 @@ private async Task PublishEventUpwardsAsync<T>(T @event, Guid eventId) where T :

private async Task SendEventUpwardsAsync<T>(EventWrapper<T> eventWrapper) where T : EventBase
{
var stream = StreamProvider.GetStream<EventWrapperBase>(eventWrapper.StreamId!.Value);
var stream = StreamProvider.GetStream<EventWrapperBase>(_parentStreamId.State);
await stream.OnNextAsync(eventWrapper);
}

private async Task PublishEventDownwardsAsync<T>(T @event, Guid eventId) where T : EventBase
{
@event.CorrelationId ??= Guid.NewGuid();
var streamOfThisGAgent = StreamId.Create(CommonConstants.StreamNamespace, this.GetPrimaryKey());
@event.StreamId ??= streamOfThisGAgent;
await SendEventDownwardsAsync(new EventWrapper<T>(@event, eventId, this.GetGrainId()));
}

private async Task SendEventToSelfAsync<T>(EventWrapper<T> eventWrapper) where T : EventBase
private async Task SendEventToSelfStreamAsync<T>(EventWrapper<T> eventWrapper) where T : EventBase
{
var streamIdOfThisGAgent = StreamId.Create(CommonConstants.StreamNamespace, this.GetPrimaryKey());
var streamOfThisGAgent = StreamProvider.GetStream<EventWrapperBase>(streamIdOfThisGAgent);
var streamOfThisGAgent = StreamProvider.GetStream<EventWrapperBase>(eventWrapper.StreamId!.Value);
await streamOfThisGAgent.OnNextAsync(eventWrapper);
}

private async Task SendEventDownwardsAsync<T>(EventWrapper<T> eventWrapper) where T : EventBase
{
await LoadSubscribersAsync();
if (_subscribers.State.IsNullOrEmpty())
{
return;
}

foreach (var stream in _subscribers.State
.Select(subscriber => StreamId.Create(CommonConstants.StreamNamespace, subscriber.GetGuidKey()))
.Select(streamId => StreamProvider.GetStream<EventWrapperBase>(streamId)))
{
await stream.OnNextAsync(eventWrapper);
}
}
}
40 changes: 0 additions & 40 deletions src/AISmart.GAgent.Core/GAgentBase.Subscribers.cs

This file was deleted.

68 changes: 21 additions & 47 deletions src/AISmart.GAgent.Core/GAgentBase.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Collections.Concurrent;
using AISmart.Agents;
using AISmart.Dapr;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -27,6 +26,8 @@ public abstract partial class GAgentBase<TState, TEvent> : JournaledGrain<TState
/// </summary>
private readonly Dictionary<EventWrapperBaseAsyncObserver, Dictionary<StreamId, Guid>> Observers = new();

private IGrainState<StreamId> _parentStreamId = new GrainState<StreamId>();

private IEventDispatcher EventDispatcher { get; set; }

protected GAgentBase(ILogger logger)
Expand All @@ -44,14 +45,15 @@ public Task ActivateAsync()

public async Task RegisterAsync(IGAgent gAgent)
{
var streamIdOfThisGAgent = StreamId.Create(CommonConstants.StreamNamespace, this.GetPrimaryKey());
var streamOfThisGAgent = StreamProvider.GetStream<EventWrapperBase>(streamIdOfThisGAgent);
await gAgent.SubscribeAsync(streamOfThisGAgent);
var guid = gAgent.GetPrimaryKey();
await AddSubscriberAsync(gAgent.GetGrainId());
await OnRegisterAgentAsync(guid);
}

public async Task UnregisterAsync(IGAgent gAgent)
{
await RemoveSubscriberAsync(gAgent.GetGrainId());
await OnUnregisterAgentAsync(gAgent.GetPrimaryKey());
}

Expand All @@ -73,47 +75,16 @@ public async Task UnregisterAsync(IGAgent gAgent)
public async Task<SubscribedEventListEvent> HandleRequestAllSubscriptionsEventAsync(
RequestAllSubscriptionsEvent request)
{
return await GetGroupSubscribedEventListEvent();
}

private async Task<SubscribedEventListEvent> GetGroupSubscribedEventListEvent()
{
await LoadSubscribersAsync();

var gAgentList = _subscribers.State.Select(grainId => GrainFactory.GetGrain<IGAgent>(grainId)).ToList();

if (gAgentList.IsNullOrEmpty())
{
return new SubscribedEventListEvent
{
GAgentType = GetType()
};
}

if (gAgentList.Any(grain => grain == null))
{
// Only happened on test environment.
throw new InvalidOperationException("One or more grains in gAgentList are null.");
}

var dict = new ConcurrentDictionary<Type, List<Type>>();
foreach (var gAgent in gAgentList.AsParallel())
{
var eventList = await gAgent.GetAllSubscribedEventsAsync();
dict[gAgent.GetType()] = eventList ?? [];
}

return new SubscribedEventListEvent
{
Value = dict.ToDictionary(),
GAgentType = GetType()
};
var streamOfThisGAgent = GetStreamOfCurrentGAgent();
var handles = await streamOfThisGAgent.GetAllSubscriptionHandles();
// TODO: Get event type list from handles.
return new SubscribedEventListEvent();
}

[AllEventHandler]
internal async Task ForwardEventAsync(EventWrapperBase eventWrapper)
{
await SendEventDownwardsAsync((EventWrapper<EventBase>)eventWrapper);
await SendEventToSelfStreamAsync((EventWrapper<EventBase>)eventWrapper);
}

protected virtual async Task OnRegisterAgentAsync(Guid agentGuid)
Expand All @@ -140,6 +111,8 @@ public async Task SubscribeAsync(IAsyncStream<EventWrapperBase> stream)
var handleId = handle.HandleId;
Observers[observer][streamId] = handleId;
}

_parentStreamId.State = streamId;
}

public sealed override async Task OnActivateAsync(CancellationToken cancellationToken)
Expand All @@ -159,25 +132,18 @@ private async Task BaseOnActivateAsync(CancellationToken cancellationToken)
await UpdateObserverList();

// Register to itself.
var agentGuid = this.GetPrimaryKey();
var streamIdOfThisGAgent = StreamId.Create(CommonConstants.StreamNamespace, agentGuid);
var streamOfThisGAgent = StreamProvider.GetStream<EventWrapperBase>(streamIdOfThisGAgent);
var streamOfThisGAgent = GetStreamOfCurrentGAgent();
if ((await streamOfThisGAgent.GetAllSubscriptionHandles()).Count == 0)
{
foreach (var observer in Observers.Keys)
{
await streamOfThisGAgent.SubscribeAsync(observer);
}
}

_stateSaveTimer =
this.RegisterGrainTimer(SaveSubscriberAsync, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10));
}

public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
_stateSaveTimer.Dispose();
await SaveSubscriberAsync(cancellationToken);
await base.OnDeactivateAsync(reason, cancellationToken);
}

Expand All @@ -202,4 +168,12 @@ private async Task InternalOnStateChangedAsync()
//TODO: need optimize use kafka,ensure Es written successfully
await EventDispatcher.PublishAsync(State, this.GetGrainId().ToString());
}

private IAsyncStream<EventWrapperBase> GetStreamOfCurrentGAgent()
{
var agentGuid = this.GetPrimaryKey();
var streamIdOfThisGAgent = StreamId.Create(CommonConstants.StreamNamespace, agentGuid);
var streamOfThisGAgent = StreamProvider.GetStream<EventWrapperBase>(streamIdOfThisGAgent);
return streamOfThisGAgent;
}
}