Feature Description
Introduce automatic waiting for StreamingEventProcessor catch-up at each phase transition in AxonTestFixture, so that tests with streaming processors no longer need explicit await() calls for basic processor-driven assertions.
A new EventProcessorCatchUpWaiter (or different name - similar of what it's here) utility captures the event store's head TrackingToken once (or in the loop - consider pros and cons), then polls with Awaitility until every registered StreamingEventProcessor tracking token covers that head. This is invoked automatically at three transition points:
given() → when() — ensures Given-phase events are fully processed before the When command executes (critical when processors build read models consumed by command handlers).
given() → then() — ensures Given-phase processor side effects (e.g., dispatched commands) are visible before assertions.
when.xxx() → then() — ensures When-phase side effects (command-handler-published events processed by streaming processors) are visible before assertions. Applies to when().command(), when().event(), and when().nothing() paths.
The waiting uses a snapshot-outside strategy: the head token is captured once before the Awaitility loop starts. This guarantees deterministic convergence (fixed target) and avoids infinite loops from cascading automation chains. Tests with deep cascade chains (event → processor → command → handler → event → ...) can still use await() for those secondary effects.
Waiting is ON by default and configurable via Customization (it's an idea, you can design different API for configuration):
disableProcessorWait() — disables auto-waiting entirely
processorWaitTimeout(Duration) — adjusts the per-transition timeout (default: 5 seconds)
If no EventStore is available (EventBus-only setup) or no StreamingEventProcessor instances are registered, the waiter returns immediately with no overhead.
Current Behaviour
Streaming event processors run asynchronously, so processor-driven side effects (commands dispatched by event handlers, read models built from events) are not guaranteed to be visible at phase transitions. Tests must use await() for any assertion that depends on processor output:
// Must use await() — processor may not have dispatched the command yet
fixture.given()
.events(new StudentNameChangedEvent("s1", "Alice", 1))
.then()
.await(r -> r.commands(new SendNotificationCommand("s1", "Name changed")));
// Flaky — read model built by processor may not be ready when When command executes
fixture.given()
.events(new DwellingBuiltEvent("d1", "angel"))
.when()
.command(new ProclaimWeekSymbolCommand("angel", 3))
.then()
.commands(new IncreaseAvailableCreaturesCommand("d1", "angel", 3)); // race condition
Wanted Behaviour
Phase transitions automatically wait for processors to catch up, making await() unnecessary for basic processor-driven assertions:
// No await() needed — processors are caught up before then() asserts
fixture.given()
.events(new StudentNameChangedEvent("s1", "Alice", 1))
.then()
.commands(new SendNotificationCommand("s1", "Name changed"));
// No race condition — read model is guaranteed up-to-date before When command executes
fixture.given()
.events(new DwellingBuiltEvent("d1", "angel"))
.when()
.command(new ProclaimWeekSymbolCommand("angel", 3))
.then()
.commands(new IncreaseAvailableCreaturesCommand("d1", "angel", 3));
Users can opt out or adjust timeout via Customization:
// Disable auto-wait
var fixture = AxonTestFixture.with(config, c -> c.disableProcessorWait()); // I'm not sure if it should be general or configurable per phase
// Custom timeout
var fixture = AxonTestFixture.with(config, c -> c.processorWaitTimeout(Duration.ofSeconds(10)));
Possible Workarounds
Use explicit await() calls at every assertion that depends on processor output. This works but is verbose, easy to forget, and produces flaky tests when omitted.
Feature Description
Introduce automatic waiting for
StreamingEventProcessorcatch-up at each phase transition inAxonTestFixture, so that tests with streaming processors no longer need explicitawait()calls for basic processor-driven assertions.A new
EventProcessorCatchUpWaiter(or different name - similar of what it's here) utility captures the event store's headTrackingTokenonce (or in the loop - consider pros and cons), then polls with Awaitility until every registeredStreamingEventProcessortracking token covers that head. This is invoked automatically at three transition points:given() → when()— ensures Given-phase events are fully processed before the When command executes (critical when processors build read models consumed by command handlers).given() → then()— ensures Given-phase processor side effects (e.g., dispatched commands) are visible before assertions.when.xxx() → then()— ensures When-phase side effects (command-handler-published events processed by streaming processors) are visible before assertions. Applies towhen().command(),when().event(), andwhen().nothing()paths.The waiting uses a snapshot-outside strategy: the head token is captured once before the Awaitility loop starts. This guarantees deterministic convergence (fixed target) and avoids infinite loops from cascading automation chains. Tests with deep cascade chains (event → processor → command → handler → event → ...) can still use
await()for those secondary effects.Waiting is ON by default and configurable via
Customization(it's an idea, you can design different API for configuration):disableProcessorWait()— disables auto-waiting entirelyprocessorWaitTimeout(Duration)— adjusts the per-transition timeout (default: 5 seconds)If no
EventStoreis available (EventBus-only setup) or noStreamingEventProcessorinstances are registered, the waiter returns immediately with no overhead.Current Behaviour
Streaming event processors run asynchronously, so processor-driven side effects (commands dispatched by event handlers, read models built from events) are not guaranteed to be visible at phase transitions. Tests must use
await()for any assertion that depends on processor output:Wanted Behaviour
Phase transitions automatically wait for processors to catch up, making
await()unnecessary for basic processor-driven assertions:Users can opt out or adjust timeout via
Customization:Possible Workarounds
Use explicit
await()calls at every assertion that depends on processor output. This works but is verbose, easy to forget, and produces flaky tests when omitted.