feat: event bus configurable drop policy and drop callback#22
Conversation
Add DropPolicy (DropNewest, DropOldest) and SubscribeWithOpts for configurable async event subscriptions: - DropNewest (default): drops incoming event when channel is full - DropOldest: discards oldest buffered event to make room for new one - Optional OnDrop callback fires when events are dropped, enabling monitoring without coupling to a specific metrics library The existing Subscribe(bufSize) API is preserved unchanged for backward compatibility. Tests cover both policies with and without callbacks, mixed policies on the same bus, and default buffer size fallback. Closes #14
There was a problem hiding this comment.
Pull request overview
This PR extends the ECS EventBus to support configurable drop behavior for slow async subscribers, including an optional drop callback for monitoring, while keeping the existing Subscribe(bufSize) API for backward compatibility.
Changes:
- Added
DropPolicy(DropNewest,DropOldest) andSubscribeWithOpts(SubscribeOpts)to configure buffer size, drop policy, and an optionalOnDropcallback. - Updated internal subscription tracking to store per-subscriber policy/callback and applied the policy during
publish. - Added unit tests covering default buffer size, both drop policies, nil callback behavior, and mixed-policy subscriptions.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
internal/ecs/event.go |
Introduces drop policies + SubscribeWithOpts, and applies per-subscriber drop handling during publish. |
internal/ecs/event_test.go |
Adds tests validating default buffering and new drop-policy behavior (with/without callback). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (b *EventBus) publish(event Event) { | ||
| b.mu.RLock() | ||
| defer b.mu.RUnlock() | ||
|
|
||
| for _, handler := range b.handlers { | ||
| handler(event) | ||
| } | ||
| for _, ch := range b.subs { | ||
| for _, sub := range b.subs { | ||
| select { | ||
| case ch <- event: | ||
| case sub.ch <- event: | ||
| continue | ||
| default: | ||
| // Drop event if subscriber is too slow — prevents blocking the game loop. | ||
| } | ||
|
|
||
| if sub.policy == DropOldest { | ||
| // Discard the oldest event to make room. | ||
| select { | ||
| case dropped := <-sub.ch: | ||
| if sub.onDrop != nil { | ||
| sub.onDrop(dropped) | ||
| } | ||
| default: | ||
| } | ||
| // Try to send again (may still fail if another goroutine filled it). | ||
| select { | ||
| case sub.ch <- event: | ||
| default: | ||
| if sub.onDrop != nil { | ||
| sub.onDrop(event) | ||
| } | ||
| } | ||
| } else if sub.onDrop != nil { | ||
| // DropNewest: discard the incoming event. | ||
| sub.onDrop(event) | ||
| } |
There was a problem hiding this comment.
publish holds b.mu.RLock() while invoking sub.onDrop(...). Because OnDrop is user-supplied, it can block the game loop and can also self-deadlock if it calls back into EventBus methods that need the write lock (e.g., Unsubscribe, SubscribeWithOpts, On). Consider copying handlers/subs under lock and releasing the lock before invoking callbacks, or explicitly documenting/enforcing that OnDrop must be non-blocking and must not call back into the bus.
| // Channel should contain events 0 and 1. | ||
| e1 := <-ch | ||
| e2 := <-ch | ||
| if e1.Entity.Index() != 0 || e2.Entity.Index() != 1 { | ||
| t.Fatalf("expected entities 0,1 in channel, got %d,%d", e1.Entity.Index(), e2.Entity.Index()) | ||
| } |
There was a problem hiding this comment.
These direct receives (e1 := <-ch, e2 := <-ch) will block forever if a regression causes the expected events not to be present, which can hang the entire test run. Prefer using a select with a timeout (as done in TestEventBus_AsyncSubscribe) or draining with non-blocking receives + assertions so failures are reported instead of stalling.
| // Channel should contain events 1 and 2. | ||
| e1 := <-ch | ||
| e2 := <-ch | ||
| if e1.Entity.Index() != 1 || e2.Entity.Index() != 2 { | ||
| t.Fatalf("expected entities 1,2 in channel, got %d,%d", e1.Entity.Index(), e2.Entity.Index()) | ||
| } |
There was a problem hiding this comment.
These direct receives (e1 := <-ch, e2 := <-ch) can block indefinitely on failure and hang the test suite. Use a select with a timeout (similar to TestEventBus_AsyncSubscribe) so the test fails fast if the expected buffered events aren't present.
| e := <-ch | ||
| if e.Entity.Index() != 0 { | ||
| t.Fatalf("expected entity 0, got %d", e.Entity.Index()) | ||
| } |
There was a problem hiding this comment.
e := <-ch will block forever if the channel doesn't contain an event due to a regression, potentially hanging the whole test run. Consider using a select with a short timeout so the test reports a failure instead of stalling.
| e := <-ch | ||
| if e.Entity.Index() != 1 { | ||
| t.Fatalf("expected entity 1 (newest), got %d", e.Entity.Index()) | ||
| } |
There was a problem hiding this comment.
e := <-ch can block indefinitely if the expected event isn't present, which turns logic bugs into hung tests. Prefer using a timeout-based select (as in TestEventBus_AsyncSubscribe) so CI fails fast.
| e := <-chNewest | ||
| if e.Entity.Index() != 0 { | ||
| t.Fatalf("DropNewest channel should have entity 0, got %d", e.Entity.Index()) | ||
| } | ||
|
|
||
| // DropOldest: channel has entity 1, dropped entity 0. | ||
| if len(droppedOldest) != 1 || droppedOldest[0].Entity.Index() != 0 { | ||
| t.Fatal("DropOldest subscriber should have dropped entity 0") | ||
| } | ||
| e = <-chOldest | ||
| if e.Entity.Index() != 1 { | ||
| t.Fatalf("DropOldest channel should have entity 1, got %d", e.Entity.Index()) | ||
| } |
There was a problem hiding this comment.
The blocking receives from the subscription channels (e := <-chNewest / <-chOldest) can hang the test run if the expected events aren't available (e.g., due to a regression in drop behavior). Consider switching these reads to select + timeout to ensure the test fails deterministically.
What changed and why
Improves the EventBus to handle slow subscribers gracefully with configurable behavior.
New API
DropPolicytype:DropNewest(default) |DropOldestSubscribeWithOpts(SubscribeOpts)— configurable subscription with:BufSize: channel buffer size (defaults to 256)Policy:DropNewestorDropOldestOnDrop: optional callback when events are dropped (enables monitoring)Subscribe(bufSize)preserved unchanged for backward compatibilityDrop policies
Design decisions
Tests
TestEventBus_SubscribeDefaultBufSize— verifies 0 defaults to 256TestEventBus_DropNewest— verifies incoming event dropped with callbackTestEventBus_DropOldest— verifies oldest event evicted with callbackTestEventBus_DropNewest_NoCallback— no panic without OnDropTestEventBus_DropOldest_NoCallback— no panic without OnDropTestEventBus_MixedPolicies— both policies on same bus work correctlyHow to test
go test -race -count=1 ./internal/ecs/ ./internal/web/Related issue
Closes #14