Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/AISmart.Application.Contracts/Sender/IPublishingAgent.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using AISmart.Agents;
using Orleans;

namespace AISmart.Sender;

public interface IPublishingAgent : IAgent
{
Task PublishEventAsync<T>(T @event) where T : EventBase;
Task PublishEventAsync<T>(List<T> events) where T : EventBase;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,14 @@ public async Task PublishEventAsync<T>(T @event) where T : EventBase
Logger.LogInformation($"PublishingAgent publish {@event}");
await PublishAsync(@event);
}

public async Task PublishEventAsync<T>(List<T> events) where T : EventBase
{
if (events.IsNullOrEmpty())
{
throw new ArgumentNullException(nameof(events));
}

await PublishAsync(events);
}
}
37 changes: 27 additions & 10 deletions src/AISmart.Application.Grains/GAgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using AISmart.Dapr;
using Microsoft.Extensions.Logging;
using Orleans.EventSourcing;
using Orleans.Runtime;
using Orleans.Streams;

namespace AISmart.Application.Grains;
Expand All @@ -17,7 +16,7 @@ public abstract class GAgentBase<TState, TEvent> : JournaledGrain<TState, TEvent
// need to use persistent storage to store this
private readonly Dictionary<Guid, IAsyncStream<EventWrapperBase>> _subscriptions = new();
private readonly Dictionary<Guid, IAsyncStream<EventWrapperBase>> _publishers = new();
private readonly List<Func<EventWrapperBase, StreamSequenceToken, Task>> _subscriptionHandlers = new();
private readonly List<Func<IList<SequentialItem<EventWrapperBase>>, Task>> _subscriptionHandlers = new();

protected GAgentBase(ILogger logger)
{
Expand Down Expand Up @@ -119,15 +118,18 @@ protected Task SubscribeAsync<T>(Func<T, Task> onEvent) where T : EventBase
_subscriptionHandlers.Add(OnNextWrapperAsync);
return Task.CompletedTask;

Task OnNextWrapperAsync(EventWrapperBase @event, StreamSequenceToken token = null)
Task OnNextWrapperAsync(IList<SequentialItem<EventWrapperBase>> events)
{
Logger.LogInformation("Received message: {@Message}", @event);
if(@event is EventWrapper<T> eventWrapper)
foreach (var @event in events)
{
Logger.LogInformation("Received EventWrapper message: {@Message}", eventWrapper);

onEvent(eventWrapper.Event);
//await DoAckAsync(eventWrapper);
Logger.LogInformation("Received message: {@Message}", @event);
if(@event.Item is EventWrapper<T> eventWrapper)
{
Logger.LogInformation("Received EventWrapper message: {@Message}", eventWrapper);

onEvent(eventWrapper.Event);
//await DoAckAsync(eventWrapper);
}
}

return Task.CompletedTask;
Expand All @@ -152,7 +154,22 @@ protected async Task PublishAsync<T>(T @event) where T : EventBase

foreach (var publisher in _publishers.Select(kp => kp.Value))
{
await publisher.OnNextAsync(eventWrapper);
await publisher.OnNextBatchAsync([eventWrapper]);
}
}

protected async Task PublishAsync<T>(List<T> events) where T : EventBase
{
if(_publishers.Count == 0)
{
return;
}

var eventWrappers = events.Select(e => new EventWrapper<T>(e, this.GetGrainId())).ToList();

foreach (var publisher in _publishers.Select(kp => kp.Value))
{
await publisher.OnNextBatchAsync(eventWrappers);
}
}

Expand Down
86 changes: 63 additions & 23 deletions test/AISmart.Agents.Tests/AgentsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,78 @@

namespace AISmart.Grains.Tests;

public class AgentsTests : TestKitBase
public class AgentsTests : TestKitBase, IAsyncLifetime
{
[Fact]
public async Task GroupTest()
private GroupGAgent _groupAgent;
private PublishingGAgent _publishingAgent;
private XGAgent _xAgent;
private MarketLeaderGAgent _marketLeaderAgent;
private DeveloperGAgent _developerAgent;
private InvestmentGAgent _investmentAgent;

public async Task InitializeAsync()
{
var groupAgent = await Silo.CreateGrainAsync<GroupGAgent>(Guid.NewGuid());
var publishingAgent = await Silo.CreateGrainAsync<PublishingGAgent>(Guid.NewGuid());
var xAgent = await Silo.CreateGrainAsync<XGAgent>(Guid.NewGuid());
var marketLeaderAgent = await Silo.CreateGrainAsync<MarketLeaderGAgent>(Guid.NewGuid());
var developerAgent = await Silo.CreateGrainAsync<DeveloperGAgent>(Guid.NewGuid());
var investmentAgent = await Silo.CreateGrainAsync<InvestmentGAgent>(Guid.NewGuid());
_groupAgent = await Silo.CreateGrainAsync<GroupGAgent>(Guid.NewGuid());
_publishingAgent = await Silo.CreateGrainAsync<PublishingGAgent>(Guid.NewGuid());
_xAgent = await Silo.CreateGrainAsync<XGAgent>(Guid.NewGuid());
_marketLeaderAgent = await Silo.CreateGrainAsync<MarketLeaderGAgent>(Guid.NewGuid());
_developerAgent = await Silo.CreateGrainAsync<DeveloperGAgent>(Guid.NewGuid());
_investmentAgent = await Silo.CreateGrainAsync<InvestmentGAgent>(Guid.NewGuid());

await groupAgent.Register(xAgent);
await groupAgent.Register(marketLeaderAgent);
await groupAgent.Register(developerAgent);
await groupAgent.Register(investmentAgent);
await _groupAgent.Register(_xAgent);
await _groupAgent.Register(_marketLeaderAgent);
await _groupAgent.Register(_developerAgent);
await _groupAgent.Register(_investmentAgent);

await publishingAgent.PublishTo(groupAgent);
await _publishingAgent.PublishTo(_groupAgent);
}

var xThreadCreatedEvent = new XThreadCreatedEvent
[Fact]
public async Task GroupPublishTest()
{
await _publishingAgent.PublishEventAsync(new XThreadCreatedEvent
{
Id = "mock_x_thread_id",
Content = "BTC REACHED 100k WOOHOOOO!"
};

await publishingAgent.PublishEventAsync(xThreadCreatedEvent);
});

var xAgentState = await xAgent.GetStateAsync();
var xAgentState = await _xAgent.GetStateAsync();
xAgentState.ThreadIds.Count.ShouldBe(1);
var investmentAgentState = await investmentAgent.GetStateAsync();

var investmentAgentState = await _investmentAgent.GetStateAsync();
investmentAgentState.Content.Count.ShouldBe(1);
var developerAgentState = await developerAgent.GetStateAsync();

var developerAgentState = await _developerAgent.GetStateAsync();
developerAgentState.Content.Count.ShouldBe(1);
}

[Fact]
public async Task GroupBatchPublishTest()
{
var events = new List<XThreadCreatedEvent>
{
new()
{
Id = "mock_x_thread_id_1",
Content = "BTC REACHED 100k WOOHOOOO!"
},
new()
{
Id = "mock_x_thread_id_2",
Content = "ETH REACHED 4k WOOHOOOO!"
}
};
await _publishingAgent.PublishEventAsync(events);

var xAgentState = await _xAgent.GetStateAsync();
xAgentState.ThreadIds.Count.ShouldBe(2);

var investmentAgentState = await _investmentAgent.GetStateAsync();
investmentAgentState.Content.Count.ShouldBe(2);

var developerAgentState = await _developerAgent.GetStateAsync();
developerAgentState.Content.Count.ShouldBe(2);
}

[Fact]
public async Task SendTransactionTest()
Expand All @@ -78,4 +114,8 @@ public async Task SendTransactionTest()
var aelfGAgentState = await aelfGAgent.GetAElfAgentDto();
aelfGAgentState.PendingTransactions.Count.ShouldBe(1);
}

public async Task DisposeAsync()
{
}
}