diff --git a/src/AISmart.Application.Contracts/Sender/IPublishingAgent.cs b/src/AISmart.Application.Contracts/Sender/IPublishingAgent.cs index 615208e1..7610e666 100644 --- a/src/AISmart.Application.Contracts/Sender/IPublishingAgent.cs +++ b/src/AISmart.Application.Contracts/Sender/IPublishingAgent.cs @@ -1,10 +1,11 @@ +using System.Collections.Generic; using System.Threading.Tasks; using AISmart.Agents; -using Orleans; namespace AISmart.Sender; public interface IPublishingAgent : IAgent { Task PublishEventAsync(T @event) where T : EventBase; + Task PublishEventAsync(List events) where T : EventBase; } \ No newline at end of file diff --git a/src/AISmart.Application.Grains/Agents/Publisher/PublishingGAgent.cs b/src/AISmart.Application.Grains/Agents/Publisher/PublishingGAgent.cs index 6014fb3e..61bf0c0d 100644 --- a/src/AISmart.Application.Grains/Agents/Publisher/PublishingGAgent.cs +++ b/src/AISmart.Application.Grains/Agents/Publisher/PublishingGAgent.cs @@ -34,4 +34,14 @@ public async Task PublishEventAsync(T @event) where T : EventBase Logger.LogInformation($"PublishingAgent publish {@event}"); await PublishAsync(@event); } + + public async Task PublishEventAsync(List events) where T : EventBase + { + if (events.IsNullOrEmpty()) + { + throw new ArgumentNullException(nameof(events)); + } + + await PublishAsync(events); + } } \ No newline at end of file diff --git a/src/AISmart.Application.Grains/GAgentBase.cs b/src/AISmart.Application.Grains/GAgentBase.cs index 20d1802a..78671d9d 100644 --- a/src/AISmart.Application.Grains/GAgentBase.cs +++ b/src/AISmart.Application.Grains/GAgentBase.cs @@ -2,7 +2,6 @@ using AISmart.Dapr; using Microsoft.Extensions.Logging; using Orleans.EventSourcing; -using Orleans.Runtime; using Orleans.Streams; namespace AISmart.Application.Grains; @@ -17,7 +16,7 @@ public abstract class GAgentBase : JournaledGrain> _subscriptions = new(); private readonly Dictionary> _publishers = new(); - private readonly List> _subscriptionHandlers = new(); + private readonly List>, Task>> _subscriptionHandlers = new(); protected GAgentBase(ILogger logger) { @@ -119,15 +118,18 @@ protected Task SubscribeAsync(Func onEvent) where T : EventBase _subscriptionHandlers.Add(OnNextWrapperAsync); return Task.CompletedTask; - Task OnNextWrapperAsync(EventWrapperBase @event, StreamSequenceToken token = null) + Task OnNextWrapperAsync(IList> events) { - Logger.LogInformation("Received message: {@Message}", @event); - if(@event is EventWrapper eventWrapper) + foreach (var @event in events) { - Logger.LogInformation("Received EventWrapper message: {@Message}", eventWrapper); - - onEvent(eventWrapper.Event); - //await DoAckAsync(eventWrapper); + Logger.LogInformation("Received message: {@Message}", @event); + if(@event.Item is EventWrapper eventWrapper) + { + Logger.LogInformation("Received EventWrapper message: {@Message}", eventWrapper); + + onEvent(eventWrapper.Event); + //await DoAckAsync(eventWrapper); + } } return Task.CompletedTask; @@ -152,7 +154,22 @@ protected async Task PublishAsync(T @event) where T : EventBase foreach (var publisher in _publishers.Select(kp => kp.Value)) { - await publisher.OnNextAsync(eventWrapper); + await publisher.OnNextBatchAsync([eventWrapper]); + } + } + + protected async Task PublishAsync(List events) where T : EventBase + { + if(_publishers.Count == 0) + { + return; + } + + var eventWrappers = events.Select(e => new EventWrapper(e, this.GetGrainId())).ToList(); + + foreach (var publisher in _publishers.Select(kp => kp.Value)) + { + await publisher.OnNextBatchAsync(eventWrappers); } } diff --git a/test/AISmart.Agents.Tests/AgentsTests.cs b/test/AISmart.Agents.Tests/AgentsTests.cs index 8539f675..749c59cf 100644 --- a/test/AISmart.Agents.Tests/AgentsTests.cs +++ b/test/AISmart.Agents.Tests/AgentsTests.cs @@ -17,42 +17,78 @@ namespace AISmart.Grains.Tests; -public class AgentsTests : TestKitBase +public class AgentsTests : TestKitBase, IAsyncLifetime { - [Fact] - public async Task GroupTest() + private GroupGAgent _groupAgent; + private PublishingGAgent _publishingAgent; + private XGAgent _xAgent; + private MarketLeaderGAgent _marketLeaderAgent; + private DeveloperGAgent _developerAgent; + private InvestmentGAgent _investmentAgent; + + public async Task InitializeAsync() { - var groupAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); - var publishingAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); - var xAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); - var marketLeaderAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); - var developerAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); - var investmentAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); + _groupAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); + _publishingAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); + _xAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); + _marketLeaderAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); + _developerAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); + _investmentAgent = await Silo.CreateGrainAsync(Guid.NewGuid()); - await groupAgent.Register(xAgent); - await groupAgent.Register(marketLeaderAgent); - await groupAgent.Register(developerAgent); - await groupAgent.Register(investmentAgent); + await _groupAgent.Register(_xAgent); + await _groupAgent.Register(_marketLeaderAgent); + await _groupAgent.Register(_developerAgent); + await _groupAgent.Register(_investmentAgent); - await publishingAgent.PublishTo(groupAgent); + await _publishingAgent.PublishTo(_groupAgent); + } - var xThreadCreatedEvent = new XThreadCreatedEvent + [Fact] + public async Task GroupPublishTest() + { + await _publishingAgent.PublishEventAsync(new XThreadCreatedEvent { Id = "mock_x_thread_id", Content = "BTC REACHED 100k WOOHOOOO!" - }; - - await publishingAgent.PublishEventAsync(xThreadCreatedEvent); + }); - var xAgentState = await xAgent.GetStateAsync(); + var xAgentState = await _xAgent.GetStateAsync(); xAgentState.ThreadIds.Count.ShouldBe(1); - - var investmentAgentState = await investmentAgent.GetStateAsync(); + + var investmentAgentState = await _investmentAgent.GetStateAsync(); investmentAgentState.Content.Count.ShouldBe(1); - - var developerAgentState = await developerAgent.GetStateAsync(); + + var developerAgentState = await _developerAgent.GetStateAsync(); developerAgentState.Content.Count.ShouldBe(1); } + + [Fact] + public async Task GroupBatchPublishTest() + { + var events = new List + { + new() + { + Id = "mock_x_thread_id_1", + Content = "BTC REACHED 100k WOOHOOOO!" + }, + new() + { + Id = "mock_x_thread_id_2", + Content = "ETH REACHED 4k WOOHOOOO!" + } + }; + await _publishingAgent.PublishEventAsync(events); + + var xAgentState = await _xAgent.GetStateAsync(); + xAgentState.ThreadIds.Count.ShouldBe(2); + + var investmentAgentState = await _investmentAgent.GetStateAsync(); + investmentAgentState.Content.Count.ShouldBe(2); + + var developerAgentState = await _developerAgent.GetStateAsync(); + developerAgentState.Content.Count.ShouldBe(2); + } [Fact] public async Task SendTransactionTest() @@ -78,4 +114,8 @@ public async Task SendTransactionTest() var aelfGAgentState = await aelfGAgent.GetAElfAgentDto(); aelfGAgentState.PendingTransactions.Count.ShouldBe(1); } + + public async Task DisposeAsync() + { + } } \ No newline at end of file