diff --git a/src/AISmart.Application.Contracts/Agents/EventWrapper.cs b/src/AISmart.Application.Contracts/Agents/EventWrapper.cs index 505bd694..e69d5989 100644 --- a/src/AISmart.Application.Contracts/Agents/EventWrapper.cs +++ b/src/AISmart.Application.Contracts/Agents/EventWrapper.cs @@ -10,14 +10,21 @@ public class EventWrapper public StreamId StreamId { get; private set; } public GrainId GrainId { get; private set; } - public int count { get; set; } = 0; + public int Count { get; set; } = 0; + + public int SubscriberCount { get; set; } = 0; + + public Boolean Success { get; set; } // Constructor - public EventWrapper(T @event, StreamId streamId, GrainId grainId) + public EventWrapper(T @event, StreamId streamId, GrainId grainId,int subscriberCount) { Event = @event; StreamId = streamId; GrainId = grainId; + SubscriberCount = subscriberCount; + Success = false; + } // Optionally, you can add methods or other functionality as needed diff --git a/src/AISmart.Application.Grains/GAgent.cs b/src/AISmart.Application.Grains/GAgent.cs index ff7e9c27..f29cebee 100644 --- a/src/AISmart.Application.Grains/GAgent.cs +++ b/src/AISmart.Application.Grains/GAgent.cs @@ -10,13 +10,16 @@ namespace AISmart.Application.Grains; public abstract class GAgent : JournaledGrain, IAgent where TState : class, new() - where TEvent : class + where TEvent : class, new() { protected IStreamProvider? StreamProvider { get; private set; } = null; protected ILogger Logger { get; } private StreamId StreamId { get; set; } private readonly IClusterClient _clusterClient; + + private IDisposable? _timer; + protected GAgent(ILogger logger,IClusterClient clusterClient) @@ -63,9 +66,16 @@ protected async Task PublishAsync(TEvent @event) Logger.LogError("StreamProvider is null"); return; } - EventWrapper eventWrapper = new EventWrapper(@event, StreamId,this.GetGrainId()); + + var subscriptionHandles = stream.GetAllSubscriptionHandles(); + // The count of current subscriptions (consumers). + var subscriberCount = subscriptionHandles.Result.Count; + + EventWrapper eventWrapper = new EventWrapper(@event, StreamId,this.GetGrainId(),subscriberCount); await stream.OnNextAsync(eventWrapper); + + } @@ -85,22 +95,47 @@ public async Task AckAsync(EventWrapper eventWrapper) public async Task DoACKAysnc(EventWrapper eventWrapper) { - eventWrapper.count ++; + using var cts = new CancellationTokenSource(); + + cts.CancelAfter(CommonConstants.TimeOutMilliseconds); + + + eventWrapper.Count ++; RaiseEvent(eventWrapper.Event); await ConfirmEvents(); - StreamId = eventWrapper.StreamId; - var stream = this.GetStreamProvider(CommonConstants.StreamProvider) - .GetStream(StreamId); - var subscriptionHandles = stream.GetAllSubscriptionHandles(); - // The count of current subscriptions (consumers). - var subscriberCount = subscriptionHandles.Result.Count; - - if (eventWrapper.count == subscriberCount) + if (eventWrapper.Count == eventWrapper.SubscriberCount) { await CompleteAsync(eventWrapper.Event); + eventWrapper.Success = true; + } + else + { + var period = TimeSpan.FromSeconds(60); + RegisterTimer( + asyncCallback: async _ => await TimerTick(), + state: null, + dueTime: period, + period: period); } + + } + + + private async Task TimerTick() + { + var historyEvents = this.RetrieveConfirmedEvents(Version-1,Version); + // todo to Retrieve eventWrapper + if (historyEvents.Result[0].Equals("to do ")) + { + Console.WriteLine("eventWrapper has been completed"); + } + else + { + await CompleteAsync(new TEvent()); + } + } diff --git a/src/AISmart.Domain.Shared/Dapr/CommonConstants.cs b/src/AISmart.Domain.Shared/Dapr/CommonConstants.cs index ddfd0f35..8eb5845c 100644 --- a/src/AISmart.Domain.Shared/Dapr/CommonConstants.cs +++ b/src/AISmart.Domain.Shared/Dapr/CommonConstants.cs @@ -16,5 +16,7 @@ public static class CommonConstants public const string StreamProvider = "AISmart"; public static Guid StreamGuid = Guid.NewGuid(); + public const int TimeOutMilliseconds = 60*1000; + } \ No newline at end of file