Traceable, extendable, and minimalist event bus implementation for Elixir, with an ETS-backed event store and built-in subscriber lifecycle tracking.
This repository is a maintained and modernized fork of otobus/event_bus, which is no longer actively maintained. It features an improved architecture, concurrent dispatch, subscriber priorities and guards, cancellation, and limited subscriptions.
- Highlights
- Installation
- Quick start
- Usage
- Debug mode
- Storage model
- Documentation and ecosystem
- License
- Asynchronous dispatch via
EventBus.notify/1, backed byTask.Supervisor - Synchronous dispatch via
EventBus.notify_sync/1when you want to stay in the calling process - Minimal contention under concurrent load
- Priority-based dispatch, guard filters, and event cancellation
- Limited subscriptions (
subscribe_once/2,subscribe_n/3) with safe concurrent delivery - Event shadow delivery (
{topic, id}) instead of copying full event payloads to every subscriber - In-memory event store with automatic cleanup after all subscribers complete
- Optional traceability fields such as
transaction_id,initialized_at,occurred_at, andttl
Install this fork from GitHub:
def deps do
[
{:event_bus, github: "santiment/event_bus"}
]
endThe application starts automatically as an OTP dependency.
Register a topic, define a subscriber, subscribe it, and publish an event:
config :event_bus,
topics: [:order_created]defmodule OrderSubscriber do
use EventBus.Subscriber
@impl true
def process({topic, id}) do
event = EventBus.fetch_event({topic, id})
IO.inspect(event.data, label: "received #{topic}")
EventBus.mark_as_completed({__MODULE__, {topic, id}})
end
endEventBus.subscribe({OrderSubscriber, ["order_created"]})
event = %EventBus.Model.Event{
id: "evt-1",
topic: :order_created,
data: %{order_id: 42, amount: 1500}
}
EventBus.notify(event)For more patterns — GenServer-based processing, configured subscribers, or persistence — see the examples/ directory.
Configure topics up front:
config :event_bus,
topics: [:message_received, :another_event_occurred]Or register and unregister them at runtime:
EventBus.register_topic(:webhook_received)
> :ok
EventBus.unregister_topic(:webhook_received)
> :okUnregistering a topic also removes its related runtime state from the internal ETS tables.
Subscribe a module to all topics:
EventBus.subscribe({MyEventSubscriber, [".*"]})
> :okSubscribe to a subset of topics by regex:
EventBus.subscribe(
{MyEventSubscriber, ["purchase_", "booking_confirmed$", "flight_passed$"]}
)
> :okSubscribe a configured subscriber:
config = %{region: "eu"}
subscriber = {MyConfiguredSubscriber, config}
EventBus.subscribe({subscriber, ["order_.*"]})
> :okConfigured subscribers receive {config, topic, id} in process/1.
Plain subscribers receive {topic, id}.
When you inspect subscribers via EventBus.subscribers/0 or EventBus.subscribers/1, plain subscribers are returned as {Module, nil}.
You can set :priority and :guard per subscriber.
Higher priority runs first:
EventBus.subscribe({AuthValidator, ["order_.*"]}, priority: 100)
EventBus.subscribe({OrderProcessor, ["order_.*"]}, priority: 0)
EventBus.subscribe({AuditLogger, ["order_.*"]}, priority: -10)Guard functions receive the full %EventBus.Model.Event{} and decide whether the event should be delivered to the subscriber:
EventBus.subscribe(
{BigOrderHandler, ["order_.*"]},
guard: fn event -> event.data.amount > 1000 end,
priority: 50
)Guard behavior:
- A truthy result dispatches the subscriber.
- A falsy result marks the subscriber as skipped.
- A raised exception is logged and also marks the subscriber as skipped.
Re-subscribing with EventBus.subscribe/1 clears previously configured guard and priority options for that subscriber.
Subscribers can automatically unsubscribe after processing a fixed number of events:
EventBus.subscribe_once({MySubscriber, ["order_created"]})
EventBus.subscribe_n({MySubscriber, ["order_created"]}, 5)Important details:
- The counter is decremented when the subscriber reaches a terminal state via
mark_as_completed/1ormark_as_skipped/1. - Crashes consume a count because EventBus marks the subscriber as skipped.
- In-flight tracking prevents concurrent
notify/1calls from overdelivering limited subscriptions. - Re-subscribing creates a new subscription generation, so old async completions do not consume the limit of a fresh subscription.
EventBus.unsubscribe(MyEventSubscriber)
> :ok
config = %{region: "eu"}
EventBus.unsubscribe({MyConfiguredSubscriber, config})
> :okEventBus.subscribers()
> [{{MyEventSubscriber, nil}, [".*"]}, {{AnotherSubscriber, %{}}, [".*"]}]EventBus.subscribers(:hello_received)
> [{MyEventSubscriber, nil}, {AnotherSubscriber, %{}}]EventBus.Model.Event is the core payload shape:
%EventBus.Model.Event{
id: String.t() | integer(),
transaction_id: String.t() | integer(),
topic: atom(),
data: any(),
initialized_at: integer(),
occurred_at: integer(),
source: String.t(),
ttl: integer()
}At minimum, provide:
idtopicdata
Optional fields:
transaction_idhelps correlate related events across a workflow.initialized_atcaptures when event creation started.occurred_atcaptures when the underlying business event happened.ttlcan be used by your own consumers for expiration or retention logic.
For end-to-end tracing, populate transaction_id for related events and use initialized_at / occurred_at consistently. EventSource.build/2 and EventSource.notify/2 can fill these automatically.
Example:
alias EventBus.Model.Event
event = %Event{
id: "123",
transaction_id: "tx-1",
topic: :hello_received,
data: %{message: "Hello"}
}Dispatch asynchronously in a supervised task:
EventBus.notify(event)
> :okDispatch synchronously in the current process:
EventBus.notify_sync(event)
> :oknotify_sync/1 runs subscriber dispatch in the caller. If a subscriber hands work off to another process and marks completion later, that later completion is still asynchronous.
topic = :bye_received
id = "124"
EventBus.fetch_event({topic, id})
> %EventBus.Model.Event{...}
EventBus.fetch_event_data({topic, id})
> [user_id: 1, goal: "exit"]Subscribers are responsible for reaching a terminal state.
Mark as completed:
EventBus.mark_as_completed({MyEventSubscriber, {:bye_received, id}})
> :okFor configured subscribers:
subscriber = {MyConfiguredSubscriber, config}
EventBus.mark_as_completed({subscriber, {:bye_received, id}})
> :okMark as skipped:
EventBus.mark_as_skipped({MyEventSubscriber, {:bye_received, id}})
> :okA subscriber can stop propagation to remaining lower-priority subscribers during process/1.
Return {:cancel, reason}:
def process({topic, id}) do
event = EventBus.fetch_event({topic, id})
if authorized?(event.data) do
EventBus.mark_as_completed({__MODULE__, {topic, id}})
else
{:cancel, "unauthorized"}
end
endOr raise EventBus.CancelEvent:
def process({_topic, _id}) do
raise EventBus.CancelEvent, reason: "validation failed"
endCancellation behavior:
- Remaining lower-priority subscribers are marked as skipped.
- Returning
{:cancel, reason}marks the cancelling subscriber as completed. - Raising
EventBus.CancelEventmarks the cancelling subscriber as skipped. - Regular exceptions do not cancel propagation.
EventBus.EventSource helps create event structs with sensible defaults.
use EventBus.EventSource
params = %{topic: :user_created}
EventSource.build(params) do
%{email: "jd@example.com", name: "John Doe"}
endWith config like this:
config :event_bus,
topics: [],
ttl: 30_000_000,
time_unit: :microsecond,
id_generator: EventBus.Util.Base62EventSource.build/2 can auto-fill values such as:
idtransaction_id(defaults toid)sourcettlinitialized_atoccurred_at
EventSource.notify/2 builds and dispatches in one step:
use EventBus.EventSource
params = %{topic: :user_created, error_topic: :user_create_erred}
EventSource.notify(params) do
%{email: "mrsjd@example.com", name: "Mrs Jane Doe"}
endIf the block returns {:error, reason}, the event is emitted under :error_topic when provided.
See examples/ for ready-to-use patterns:
examples/simple_subscriber.ex- minimal synchronous subscriberexamples/genserver_subscriber.ex- asynchronous processing via GenServerexamples/configured_subscriber.ex- configured subscribers with{Module, config}examples/persistent_store_subscriber.ex- persisting all events to a data store
Enable debug logging:
config :event_bus, debug: trueOr toggle it at runtime:
EventBus.toggle_debug(true)
EventBus.toggle_debug(false)When enabled, EventBus logs lifecycle events such as:
notifydispatchcompletedskippedcleanedsubscribeunsubscriberegister_topic
Subscriber durations are measured from dispatch until the subscriber reaches a terminal state, so they reflect real processing time even for async subscribers.
EventBus uses ETS tables shared across all topics:
| Table | Purpose |
|---|---|
:eb_event_store |
Stores event structs keyed by {topic, id} |
:eb_event_watchers |
Tracks subscriber lists and remaining count per event |
:eb_event_watcher_status |
Tracks per-subscriber terminal state |
:eb_event_subscription_generations |
Stores the subscription generation snapshot per event |
:eb_topics |
Stores registered topic names |
:eb_subscribers |
Stores subscriber-to-pattern mappings |
:eb_topic_subscribers |
Stores the precomputed topic-to-subscriber index |
:eb_subscription_opts |
Stores priority, guard, and generation per subscriber |
When every subscriber for an event reaches a terminal state, the event store entry and observation state are cleaned up automatically.
To inspect in-flight events manually:
:ets.tab2list(:eb_event_watchers)
> [{{topic, id}, subscribers, pending_count}, ...]
:ets.lookup(:eb_event_watcher_status, {topic, id, subscriber})
> [{{topic, id, subscriber}, :pending}]ETS is not durable storage. If you need persistence, subscribe a consumer to [".*"] and write the fetched events to your database or message archive.
- Original wiki (some pages may be outdated relative to this fork)
- Contributing
- Code of Conduct
Sample ecosystem projects:
| Addon | Description | Link | Docs |
|---|---|---|---|
event_bus_postgres |
Persists event_bus events to Postgres using GenStage |
GitHub | HexDocs |
event_bus_logger |
Simple log subscriber implementation | GitHub | HexDocs |
event_bus_metrics |
Metrics UI and metrics API endpoints for EventBus | Hex | HexDocs |
These addons were built for the original otobus/event_bus and may need updates to work with this fork.
EventBus is released under the MIT License.