Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/AISmart.Application.Contracts/Agents/EventWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@ public class EventWrapper<T>
public StreamId StreamId { get; private set; }
public GrainId GrainId { get; private set; }

public int count { get; set; } = 0;
public int Count { get; set; } = 0;

public int SubscriberCount { get; set; } = 0;

public Boolean Success { get; set; }

// Constructor
public EventWrapper(T @event, StreamId streamId, GrainId grainId)
public EventWrapper(T @event, StreamId streamId, GrainId grainId,int subscriberCount)
{
Event = @event;
StreamId = streamId;
GrainId = grainId;
SubscriberCount = subscriberCount;
Success = false;

}

// Optionally, you can add methods or other functionality as needed
Expand Down
57 changes: 46 additions & 11 deletions src/AISmart.Application.Grains/GAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ namespace AISmart.Application.Grains;

public abstract class GAgent<TState, TEvent> : JournaledGrain<TState, TEvent>, IAgent<TEvent>
where TState : class, new()
where TEvent : class
where TEvent : class, new()
{
protected IStreamProvider? StreamProvider { get; private set; } = null;
protected ILogger Logger { get; }
private StreamId StreamId { get; set; }

private readonly IClusterClient _clusterClient;

private IDisposable? _timer;



protected GAgent(ILogger logger,IClusterClient clusterClient)
Expand Down Expand Up @@ -63,9 +66,16 @@ protected async Task PublishAsync<TEvent>(TEvent @event)
Logger.LogError("StreamProvider is null");
return;
}
EventWrapper<TEvent> eventWrapper = new EventWrapper<TEvent>(@event, StreamId,this.GetGrainId());

var subscriptionHandles = stream.GetAllSubscriptionHandles();
// The count of current subscriptions (consumers).
var subscriberCount = subscriptionHandles.Result.Count;

EventWrapper<TEvent> eventWrapper = new EventWrapper<TEvent>(@event, StreamId,this.GetGrainId(),subscriberCount);

await stream.OnNextAsync(eventWrapper);



}

Expand All @@ -85,22 +95,47 @@ public async Task AckAsync(EventWrapper<TEvent> eventWrapper)

public async Task DoACKAysnc(EventWrapper<TEvent> eventWrapper)
{
eventWrapper.count ++;
using var cts = new CancellationTokenSource();

cts.CancelAfter(CommonConstants.TimeOutMilliseconds);


eventWrapper.Count ++;

RaiseEvent(eventWrapper.Event);
await ConfirmEvents();

StreamId = eventWrapper.StreamId;
var stream = this.GetStreamProvider(CommonConstants.StreamProvider)
.GetStream<TEvent>(StreamId);
var subscriptionHandles = stream.GetAllSubscriptionHandles();
// The count of current subscriptions (consumers).
var subscriberCount = subscriptionHandles.Result.Count;

if (eventWrapper.count == subscriberCount)
if (eventWrapper.Count == eventWrapper.SubscriberCount)
{
await CompleteAsync(eventWrapper.Event);
eventWrapper.Success = true;
}
else
{
var period = TimeSpan.FromSeconds(60);
RegisterTimer(
asyncCallback: async _ => await TimerTick(),
state: null,
dueTime: period,
period: period);
}

}


private async Task TimerTick()
{
var historyEvents = this.RetrieveConfirmedEvents(Version-1,Version);
// todo to Retrieve eventWrapper
if (historyEvents.Result[0].Equals("to do "))
{
Console.WriteLine("eventWrapper has been completed");
}
else
{
await CompleteAsync(new TEvent());
}

}


Expand Down
2 changes: 2 additions & 0 deletions src/AISmart.Domain.Shared/Dapr/CommonConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ public static class CommonConstants
public const string StreamProvider = "AISmart";
public static Guid StreamGuid = Guid.NewGuid();

public const int TimeOutMilliseconds = 60*1000;


}