Skip to content

feat: event bus configurable drop policy and drop callback#22

Open
gauvainw wants to merge 1 commit into
mainfrom
feat/event-bus-improvements
Open

feat: event bus configurable drop policy and drop callback#22
gauvainw wants to merge 1 commit into
mainfrom
feat/event-bus-improvements

Conversation

@gauvainw

Copy link
Copy Markdown
Owner

What changed and why

Improves the EventBus to handle slow subscribers gracefully with configurable behavior.

New API

  • DropPolicy type: DropNewest (default) | DropOldest
  • SubscribeWithOpts(SubscribeOpts) — configurable subscription with:
    • BufSize: channel buffer size (defaults to 256)
    • Policy: DropNewest or DropOldest
    • OnDrop: optional callback when events are dropped (enables monitoring)
  • Existing Subscribe(bufSize) preserved unchanged for backward compatibility

Drop policies

Policy Behavior when full Use case
DropNewest (default) Discard incoming event Real-time dashboards, SSE
DropOldest Discard oldest buffered event Audit logs, catch-up consumers

Design decisions

  • OnDrop callback instead of Prometheus metric: Keeps the ECS library decoupled from specific telemetry frameworks. Users can increment a counter in the callback.
  • No blocking mode: The game loop must never block on slow subscribers.

Tests

  • TestEventBus_SubscribeDefaultBufSize — verifies 0 defaults to 256
  • TestEventBus_DropNewest — verifies incoming event dropped with callback
  • TestEventBus_DropOldest — verifies oldest event evicted with callback
  • TestEventBus_DropNewest_NoCallback — no panic without OnDrop
  • TestEventBus_DropOldest_NoCallback — no panic without OnDrop
  • TestEventBus_MixedPolicies — both policies on same bus work correctly

How to test

go test -race -count=1 ./internal/ecs/ ./internal/web/

Related issue

Closes #14

Add DropPolicy (DropNewest, DropOldest) and SubscribeWithOpts for
configurable async event subscriptions:

- DropNewest (default): drops incoming event when channel is full
- DropOldest: discards oldest buffered event to make room for new one
- Optional OnDrop callback fires when events are dropped, enabling
  monitoring without coupling to a specific metrics library

The existing Subscribe(bufSize) API is preserved unchanged for
backward compatibility.

Tests cover both policies with and without callbacks, mixed policies
on the same bus, and default buffer size fallback.

Closes #14

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the ECS EventBus to support configurable drop behavior for slow async subscribers, including an optional drop callback for monitoring, while keeping the existing Subscribe(bufSize) API for backward compatibility.

Changes:

  • Added DropPolicy (DropNewest, DropOldest) and SubscribeWithOpts(SubscribeOpts) to configure buffer size, drop policy, and an optional OnDrop callback.
  • Updated internal subscription tracking to store per-subscriber policy/callback and applied the policy during publish.
  • Added unit tests covering default buffer size, both drop policies, nil callback behavior, and mixed-policy subscriptions.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.

File Description
internal/ecs/event.go Introduces drop policies + SubscribeWithOpts, and applies per-subscriber drop handling during publish.
internal/ecs/event_test.go Adds tests validating default buffering and new drop-policy behavior (with/without callback).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread internal/ecs/event.go
Comment on lines 112 to 146
func (b *EventBus) publish(event Event) {
b.mu.RLock()
defer b.mu.RUnlock()

for _, handler := range b.handlers {
handler(event)
}
for _, ch := range b.subs {
for _, sub := range b.subs {
select {
case ch <- event:
case sub.ch <- event:
continue
default:
// Drop event if subscriber is too slow — prevents blocking the game loop.
}

if sub.policy == DropOldest {
// Discard the oldest event to make room.
select {
case dropped := <-sub.ch:
if sub.onDrop != nil {
sub.onDrop(dropped)
}
default:
}
// Try to send again (may still fail if another goroutine filled it).
select {
case sub.ch <- event:
default:
if sub.onDrop != nil {
sub.onDrop(event)
}
}
} else if sub.onDrop != nil {
// DropNewest: discard the incoming event.
sub.onDrop(event)
}

Copilot AI Mar 28, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publish holds b.mu.RLock() while invoking sub.onDrop(...). Because OnDrop is user-supplied, it can block the game loop and can also self-deadlock if it calls back into EventBus methods that need the write lock (e.g., Unsubscribe, SubscribeWithOpts, On). Consider copying handlers/subs under lock and releasing the lock before invoking callbacks, or explicitly documenting/enforcing that OnDrop must be non-blocking and must not call back into the bus.

Copilot uses AI. Check for mistakes.
Comment on lines +126 to +131
// Channel should contain events 0 and 1.
e1 := <-ch
e2 := <-ch
if e1.Entity.Index() != 0 || e2.Entity.Index() != 1 {
t.Fatalf("expected entities 0,1 in channel, got %d,%d", e1.Entity.Index(), e2.Entity.Index())
}

Copilot AI Mar 28, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These direct receives (e1 := <-ch, e2 := <-ch) will block forever if a regression causes the expected events not to be present, which can hang the entire test run. Prefer using a select with a timeout (as done in TestEventBus_AsyncSubscribe) or draining with non-blocking receives + assertions so failures are reported instead of stalling.

Copilot uses AI. Check for mistakes.
Comment on lines +160 to +165
// Channel should contain events 1 and 2.
e1 := <-ch
e2 := <-ch
if e1.Entity.Index() != 1 || e2.Entity.Index() != 2 {
t.Fatalf("expected entities 1,2 in channel, got %d,%d", e1.Entity.Index(), e2.Entity.Index())
}

Copilot AI Mar 28, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These direct receives (e1 := <-ch, e2 := <-ch) can block indefinitely on failure and hang the test suite. Use a select with a timeout (similar to TestEventBus_AsyncSubscribe) so the test fails fast if the expected buffered events aren't present.

Copilot uses AI. Check for mistakes.
Comment on lines +182 to +185
e := <-ch
if e.Entity.Index() != 0 {
t.Fatalf("expected entity 0, got %d", e.Entity.Index())
}

Copilot AI Mar 28, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e := <-ch will block forever if the channel doesn't contain an event due to a regression, potentially hanging the whole test run. Consider using a select with a short timeout so the test reports a failure instead of stalling.

Copilot uses AI. Check for mistakes.
Comment on lines +201 to +204
e := <-ch
if e.Entity.Index() != 1 {
t.Fatalf("expected entity 1 (newest), got %d", e.Entity.Index())
}

Copilot AI Mar 28, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e := <-ch can block indefinitely if the expected event isn't present, which turns logic bugs into hung tests. Prefer using a timeout-based select (as in TestEventBus_AsyncSubscribe) so CI fails fast.

Copilot uses AI. Check for mistakes.
Comment on lines +233 to +245
e := <-chNewest
if e.Entity.Index() != 0 {
t.Fatalf("DropNewest channel should have entity 0, got %d", e.Entity.Index())
}

// DropOldest: channel has entity 1, dropped entity 0.
if len(droppedOldest) != 1 || droppedOldest[0].Entity.Index() != 0 {
t.Fatal("DropOldest subscriber should have dropped entity 0")
}
e = <-chOldest
if e.Entity.Index() != 1 {
t.Fatalf("DropOldest channel should have entity 1, got %d", e.Entity.Index())
}

Copilot AI Mar 28, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blocking receives from the subscription channels (e := <-chNewest / <-chOldest) can hang the test run if the expected events aren't available (e.g., due to a regression in drop behavior). Consider switching these reads to select + timeout to ensure the test fails deterministically.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: event bus improvements (backpressure and drop policy)

2 participants