diff --git a/src/AISmart.GAgent.Core/GAgentBase.Observers.cs b/src/AISmart.GAgent.Core/GAgentBase.Observers.cs index e98f1234..634c271e 100644 --- a/src/AISmart.GAgent.Core/GAgentBase.Observers.cs +++ b/src/AISmart.GAgent.Core/GAgentBase.Observers.cs @@ -16,7 +16,7 @@ private Task UpdateObserverList() var observer = new EventWrapperBaseAsyncObserver(async item => { var grainId = (GrainId)item.GetType().GetProperty(nameof(EventWrapper.GrainId))?.GetValue(item)!; - if (grainId == this.GetGrainId() && eventHandlerMethod.Name != nameof(ForwardEventAsync)) + if (grainId == this.GetGrainId()) { // Skip the event if it is sent by itself. return; diff --git a/src/AISmart.GAgent.Core/GAgentBase.Publish.cs b/src/AISmart.GAgent.Core/GAgentBase.Publish.cs index 3d809ee6..45b988bf 100644 --- a/src/AISmart.GAgent.Core/GAgentBase.Publish.cs +++ b/src/AISmart.GAgent.Core/GAgentBase.Publish.cs @@ -12,27 +12,17 @@ public abstract partial class GAgentBase protected async Task PublishAsync(EventWrapper eventWrapper) where T : EventBase { await SendEventUpwardsAsync(eventWrapper); - await SendEventDownwardsAsync(eventWrapper); } protected async Task PublishAsync(T @event) where T : EventBase { - var isTop = _correlationId == null; _correlationId ??= Guid.NewGuid(); @event.CorrelationId = _correlationId; @event.StreamId = StreamId.Create(CommonConstants.StreamNamespace, this.GetPrimaryKey()); var eventId = Guid.NewGuid(); - switch (isTop) - { - case true: - // This event is the first time appeared to silo. - await SendEventToSelfAsync(new EventWrapper(@event, eventId, this.GetGrainId())); - break; - case false when _streamIdDictionary.TryGetValue(_correlationId!.Value, out var streamIdValue): - @event.StreamId = streamIdValue; - await PublishEventUpwardsAsync(@event, eventId); - break; - } + + await SendEventToSelfStreamAsync(new EventWrapper(@event, eventId, this.GetGrainId())); + await PublishEventUpwardsAsync(@event, eventId); return eventId; } @@ -44,38 +34,13 @@ private async Task PublishEventUpwardsAsync(T @event, Guid eventId) where T : private async Task SendEventUpwardsAsync(EventWrapper eventWrapper) where T : EventBase { - var stream = StreamProvider.GetStream(eventWrapper.StreamId!.Value); + var stream = StreamProvider.GetStream(_parentStreamId.State); await stream.OnNextAsync(eventWrapper); } - - private async Task PublishEventDownwardsAsync(T @event, Guid eventId) where T : EventBase - { - @event.CorrelationId ??= Guid.NewGuid(); - var streamOfThisGAgent = StreamId.Create(CommonConstants.StreamNamespace, this.GetPrimaryKey()); - @event.StreamId ??= streamOfThisGAgent; - await SendEventDownwardsAsync(new EventWrapper(@event, eventId, this.GetGrainId())); - } - private async Task SendEventToSelfAsync(EventWrapper eventWrapper) where T : EventBase + private async Task SendEventToSelfStreamAsync(EventWrapper eventWrapper) where T : EventBase { - var streamIdOfThisGAgent = StreamId.Create(CommonConstants.StreamNamespace, this.GetPrimaryKey()); - var streamOfThisGAgent = StreamProvider.GetStream(streamIdOfThisGAgent); + var streamOfThisGAgent = StreamProvider.GetStream(eventWrapper.StreamId!.Value); await streamOfThisGAgent.OnNextAsync(eventWrapper); } - - private async Task SendEventDownwardsAsync(EventWrapper eventWrapper) where T : EventBase - { - await LoadSubscribersAsync(); - if (_subscribers.State.IsNullOrEmpty()) - { - return; - } - - foreach (var stream in _subscribers.State - .Select(subscriber => StreamId.Create(CommonConstants.StreamNamespace, subscriber.GetGuidKey())) - .Select(streamId => StreamProvider.GetStream(streamId))) - { - await stream.OnNextAsync(eventWrapper); - } - } } \ No newline at end of file diff --git a/src/AISmart.GAgent.Core/GAgentBase.Subscribers.cs b/src/AISmart.GAgent.Core/GAgentBase.Subscribers.cs deleted file mode 100644 index 449859ba..00000000 --- a/src/AISmart.GAgent.Core/GAgentBase.Subscribers.cs +++ /dev/null @@ -1,40 +0,0 @@ -namespace AISmart.GAgent.Core; - -public abstract partial class GAgentBase -{ - private readonly IGrainState> _subscribers = new GrainState>(); - private IDisposable _stateSaveTimer; - - private async Task LoadSubscribersAsync() - { - if (_subscribers.State.IsNullOrEmpty()) - { - await GrainStorage.ReadStateAsync(AISmartGAgentConstants.SubscribersStateName, this.GetGrainId(), - _subscribers); - } - } - - private async Task AddSubscriberAsync(GrainId grainId) - { - await LoadSubscribersAsync(); - _subscribers.State ??= []; - _subscribers.State.Add(grainId); - } - - private async Task RemoveSubscriberAsync(GrainId grainId) - { - await LoadSubscribersAsync(); - if (_subscribers.State.IsNullOrEmpty()) - { - return; - } - - _subscribers.State.Remove(grainId); - } - - private async Task SaveSubscriberAsync(CancellationToken cancellationToken) - { - await GrainStorage.WriteStateAsync(AISmartGAgentConstants.SubscribersStateName, this.GetGrainId(), - _subscribers); - } -} \ No newline at end of file diff --git a/src/AISmart.GAgent.Core/GAgentBase.cs b/src/AISmart.GAgent.Core/GAgentBase.cs index 302f0c22..5bf99e28 100644 --- a/src/AISmart.GAgent.Core/GAgentBase.cs +++ b/src/AISmart.GAgent.Core/GAgentBase.cs @@ -1,4 +1,3 @@ -using System.Collections.Concurrent; using AISmart.Agents; using AISmart.Dapr; using Microsoft.Extensions.DependencyInjection; @@ -27,6 +26,8 @@ public abstract partial class GAgentBase : JournaledGrain private readonly Dictionary> Observers = new(); + private IGrainState _parentStreamId = new GrainState(); + private IEventDispatcher EventDispatcher { get; set; } protected GAgentBase(ILogger logger) @@ -44,14 +45,15 @@ public Task ActivateAsync() public async Task RegisterAsync(IGAgent gAgent) { + var streamIdOfThisGAgent = StreamId.Create(CommonConstants.StreamNamespace, this.GetPrimaryKey()); + var streamOfThisGAgent = StreamProvider.GetStream(streamIdOfThisGAgent); + await gAgent.SubscribeAsync(streamOfThisGAgent); var guid = gAgent.GetPrimaryKey(); - await AddSubscriberAsync(gAgent.GetGrainId()); await OnRegisterAgentAsync(guid); } public async Task UnregisterAsync(IGAgent gAgent) { - await RemoveSubscriberAsync(gAgent.GetGrainId()); await OnUnregisterAgentAsync(gAgent.GetPrimaryKey()); } @@ -73,47 +75,16 @@ public async Task UnregisterAsync(IGAgent gAgent) public async Task HandleRequestAllSubscriptionsEventAsync( RequestAllSubscriptionsEvent request) { - return await GetGroupSubscribedEventListEvent(); - } - - private async Task GetGroupSubscribedEventListEvent() - { - await LoadSubscribersAsync(); - - var gAgentList = _subscribers.State.Select(grainId => GrainFactory.GetGrain(grainId)).ToList(); - - if (gAgentList.IsNullOrEmpty()) - { - return new SubscribedEventListEvent - { - GAgentType = GetType() - }; - } - - if (gAgentList.Any(grain => grain == null)) - { - // Only happened on test environment. - throw new InvalidOperationException("One or more grains in gAgentList are null."); - } - - var dict = new ConcurrentDictionary>(); - foreach (var gAgent in gAgentList.AsParallel()) - { - var eventList = await gAgent.GetAllSubscribedEventsAsync(); - dict[gAgent.GetType()] = eventList ?? []; - } - - return new SubscribedEventListEvent - { - Value = dict.ToDictionary(), - GAgentType = GetType() - }; + var streamOfThisGAgent = GetStreamOfCurrentGAgent(); + var handles = await streamOfThisGAgent.GetAllSubscriptionHandles(); + // TODO: Get event type list from handles. + return new SubscribedEventListEvent(); } [AllEventHandler] internal async Task ForwardEventAsync(EventWrapperBase eventWrapper) { - await SendEventDownwardsAsync((EventWrapper)eventWrapper); + await SendEventToSelfStreamAsync((EventWrapper)eventWrapper); } protected virtual async Task OnRegisterAgentAsync(Guid agentGuid) @@ -140,6 +111,8 @@ public async Task SubscribeAsync(IAsyncStream stream) var handleId = handle.HandleId; Observers[observer][streamId] = handleId; } + + _parentStreamId.State = streamId; } public sealed override async Task OnActivateAsync(CancellationToken cancellationToken) @@ -159,9 +132,7 @@ private async Task BaseOnActivateAsync(CancellationToken cancellationToken) await UpdateObserverList(); // Register to itself. - var agentGuid = this.GetPrimaryKey(); - var streamIdOfThisGAgent = StreamId.Create(CommonConstants.StreamNamespace, agentGuid); - var streamOfThisGAgent = StreamProvider.GetStream(streamIdOfThisGAgent); + var streamOfThisGAgent = GetStreamOfCurrentGAgent(); if ((await streamOfThisGAgent.GetAllSubscriptionHandles()).Count == 0) { foreach (var observer in Observers.Keys) @@ -169,15 +140,10 @@ private async Task BaseOnActivateAsync(CancellationToken cancellationToken) await streamOfThisGAgent.SubscribeAsync(observer); } } - - _stateSaveTimer = - this.RegisterGrainTimer(SaveSubscriberAsync, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10)); } public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken) { - _stateSaveTimer.Dispose(); - await SaveSubscriberAsync(cancellationToken); await base.OnDeactivateAsync(reason, cancellationToken); } @@ -202,4 +168,12 @@ private async Task InternalOnStateChangedAsync() //TODO: need optimize use kafka,ensure Es written successfully await EventDispatcher.PublishAsync(State, this.GetGrainId().ToString()); } + + private IAsyncStream GetStreamOfCurrentGAgent() + { + var agentGuid = this.GetPrimaryKey(); + var streamIdOfThisGAgent = StreamId.Create(CommonConstants.StreamNamespace, agentGuid); + var streamOfThisGAgent = StreamProvider.GetStream(streamIdOfThisGAgent); + return streamOfThisGAgent; + } } \ No newline at end of file