From f3bc1269981817361799a50860234a01af70e1c5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 23 Apr 2026 19:21:00 +0000 Subject: [PATCH 1/2] feat(dispatch): index subscriptions and handlers by event discriminant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dispatch used to linearly scan every registered subscription (on the Client event bus) and every handler (on the high-level Dispatcher) just to filter by event kind, which becomes O(total_subs) per event regardless of how few handlers actually care about a given discriminant. This change indexes both stores by mem::Discriminant: * EventBus (client/bus/subscription.rs): subscriptions are still owned in a single insertion-order Vec, but two secondary indexes (by_event: HashMap, Vec> plus any_indices: Vec) are maintained on subscribe, unsubscribe, unsubscribe_group, and clear. Dispatch merges the specific + wildcard index slices in ascending index order, so the visible firing order is identical to the previous implementation. Subscriptions only store the discriminant (not the full Event value) because that is all the filter check ever used. * Dispatcher (dispatch/dispatcher.rs): same two-index pattern, built up as handlers are added via add_handler / add_handler_any. The inner HandlerEntry no longer carries its own event filter — the entry's bucket placement encodes it. Behaviour is preserved: * every handler/subscription that would have fired before still fires; * cross-bucket firing order is preserved because both index lists stay in insertion (and therefore ascending) order and are merged; * wildcards continue to see every event; * unsubscribe/unsubscribe_group/clear rebuild the index so index integrity is restored without introducing tombstones. A new mock-only helper Client::mock_dispatch_bus_for_tests lets integration tests exercise the bus path synchronously without spinning the full poll loop. tests/indexed_dispatch_tests.rs (9 new tests) pins down the invariants: specific-only firing, wildcard fan-out, insertion-order merging, unrelated-event skip, Stop flow propagation, and EventBus unsubscribe / unsubscribe_event_group / clear_event_subscriptions behaviour across buckets. --- .../teamtalk/src/client/bus/subscription.rs | 90 +++++- crates/teamtalk/src/client/core/mod.rs | 12 + crates/teamtalk/src/dispatch/dispatcher.rs | 51 ++- crates/teamtalk/src/dispatch/mod.rs | 1 - crates/teamtalk/src/dispatch/source.rs | 10 - .../teamtalk/tests/indexed_dispatch_tests.rs | 291 ++++++++++++++++++ 6 files changed, 429 insertions(+), 26 deletions(-) create mode 100644 crates/teamtalk/tests/indexed_dispatch_tests.rs diff --git a/crates/teamtalk/src/client/bus/subscription.rs b/crates/teamtalk/src/client/bus/subscription.rs index 4c49c66..2d30504 100644 --- a/crates/teamtalk/src/client/bus/subscription.rs +++ b/crates/teamtalk/src/client/bus/subscription.rs @@ -4,12 +4,20 @@ //! per-subscription record [`Subscription`]) together with the public //! [`SubscriptionBuilder`] that callers use to register handlers through //! [`crate::client::Client::on_event`] / [`crate::client::Client::on_any`]. +//! +//! Dispatch is indexed by `mem::Discriminant` so firing an event +//! only touches subscriptions that can possibly match it (event-specific +//! subscribers for this discriminant plus wildcard subscribers), instead +//! of scanning every registered subscription. Insertion order is +//! preserved across the specific-vs-wildcard partition by merging the +//! two index lists on the fly. use super::context::EventContext; use crate::client::{Client, Message}; use crate::events::Event; use crate::types::{ChannelId, UserId}; -use std::mem; +use std::collections::HashMap; +use std::mem::{self, Discriminant}; use std::sync::atomic::Ordering; use teamtalk_sys as ffi; @@ -35,6 +43,13 @@ impl EventSubscriptionGroup { pub(crate) struct EventBus { next_id: u64, subscriptions: Vec, + /// Indices into `subscriptions` keyed by the `Event` discriminant + /// the subscription is filtering on. Each inner `Vec` is + /// kept in ascending (insertion) order. + by_event: HashMap, Vec>, + /// Indices of subscriptions registered without an `event` filter + /// (wildcard). Kept in ascending (insertion) order. + any_indices: Vec, } #[derive(Default)] @@ -57,9 +72,11 @@ impl EventBus { ) -> EventSubscriptionId { self.next_id += 1; let id = EventSubscriptionId(self.next_id); + let event_filter = config.event.as_ref().map(mem::discriminant); + let index = self.subscriptions.len(); self.subscriptions.push(Subscription { id, - event: config.event, + event_filter, user_id: config.user_id, channel_id: config.channel_id, username: config.username, @@ -69,24 +86,38 @@ impl EventBus { predicate: config.predicate, handler, }); + match event_filter { + Some(d) => self.by_event.entry(d).or_default().push(index), + None => self.any_indices.push(index), + } id } pub(crate) fn unsubscribe(&mut self, id: EventSubscriptionId) -> bool { let before = self.subscriptions.len(); self.subscriptions.retain(|sub| sub.id != id); - before != self.subscriptions.len() + if self.subscriptions.len() == before { + return false; + } + self.rebuild_indices(); + true } pub(crate) fn clear(&mut self) { self.subscriptions.clear(); + self.by_event.clear(); + self.any_indices.clear(); } pub(crate) fn unsubscribe_group(&mut self, group: &EventSubscriptionGroup) -> usize { let before = self.subscriptions.len(); self.subscriptions .retain(|sub| sub.group.as_ref() != Some(group)); - before.saturating_sub(self.subscriptions.len()) + let removed = before.saturating_sub(self.subscriptions.len()); + if removed > 0 { + self.rebuild_indices(); + } + removed } pub(crate) fn len(&self) -> usize { @@ -94,7 +125,36 @@ impl EventBus { } pub(crate) fn dispatch(&mut self, client: &Client, event: Event, message: &Message) { - for sub in &mut self.subscriptions { + let d = mem::discriminant(&event); + let empty: Vec = Vec::new(); + let specific: &[usize] = self.by_event.get(&d).unwrap_or(&empty); + let any: &[usize] = &self.any_indices; + let subs = &mut self.subscriptions; + + let mut si = 0; + let mut ai = 0; + while si < specific.len() || ai < any.len() { + let pick = match (specific.get(si), any.get(ai)) { + (Some(&s), Some(&a)) => { + if s <= a { + si += 1; + s + } else { + ai += 1; + a + } + } + (Some(&s), None) => { + si += 1; + s + } + (None, Some(&a)) => { + ai += 1; + a + } + (None, None) => break, + }; + let sub = &mut subs[pick]; if !sub.matches(client, event, message) { continue; } @@ -106,11 +166,22 @@ impl EventBus { (sub.handler)(ctx); } } + + fn rebuild_indices(&mut self) { + self.by_event.clear(); + self.any_indices.clear(); + for (i, sub) in self.subscriptions.iter().enumerate() { + match sub.event_filter { + Some(d) => self.by_event.entry(d).or_default().push(i), + None => self.any_indices.push(i), + } + } + } } struct Subscription { id: EventSubscriptionId, - event: Option, + event_filter: Option>, user_id: Option, channel_id: Option, username: Option, @@ -126,11 +197,8 @@ impl Subscription { let user = message.user(); let text = message.text(); - if let Some(filter) = self.event - && mem::discriminant(&filter) != mem::discriminant(&event) - { - return false; - } + // Discriminant filter is already satisfied by the bucket in + // `EventBus::dispatch`, so it is not re-checked here. if let Some(user_id) = self.user_id { let match_user = user .as_ref() diff --git a/crates/teamtalk/src/client/core/mod.rs b/crates/teamtalk/src/client/core/mod.rs index c350415..59afddc 100644 --- a/crates/teamtalk/src/client/core/mod.rs +++ b/crates/teamtalk/src/client/core/mod.rs @@ -185,6 +185,18 @@ impl Client { self.update_state_for_event(event, &message); } + /// Synchronously dispatches `event` through the subscription bus + /// (used by integration tests to exercise + /// [`Client::on_event`] / [`Client::on_any`] without running the + /// full poll loop). + #[cfg(feature = "mock")] + pub fn mock_dispatch_bus_for_tests(&self, event: Event, source: i32) { + let mut raw = unsafe { std::mem::zeroed::() }; + raw.nSource = source; + let message = Message::from_raw(event, raw); + self.dispatch_bus(event, &message); + } + /// Sets the client name used for login. #[must_use] pub fn with_name(mut self, name: &str) -> Self { diff --git a/crates/teamtalk/src/dispatch/dispatcher.rs b/crates/teamtalk/src/dispatch/dispatcher.rs index b8b4bd2..6277db0 100644 --- a/crates/teamtalk/src/dispatch/dispatcher.rs +++ b/crates/teamtalk/src/dispatch/dispatcher.rs @@ -1,9 +1,18 @@ use super::source::{EventSource, HandlerEntry, ReconnectState}; use super::{ClientConfig, DispatchFlow, Event, EventContext, Message}; +use std::collections::HashMap; +use std::mem::{self, Discriminant}; pub struct Dispatcher { source: S, + /// Registered handlers in insertion order. Index is stable for + /// the lifetime of the handler. handlers: Vec, + /// Indices of event-specific handlers keyed by discriminant. + by_event: HashMap, Vec>, + /// Indices of wildcard (`event == None`) handlers in insertion + /// order. + any_indices: Vec, poll_timeout_ms: i32, reconnect: Option, stop: bool, @@ -21,6 +30,8 @@ impl Dispatcher { Self { source, handlers: Vec::new(), + by_event: HashMap::new(), + any_indices: Vec::new(), poll_timeout_ms: config.poll_timeout_ms, reconnect, stop: false, @@ -42,10 +53,12 @@ impl Dispatcher { where F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static, { + let d = mem::discriminant(&event); + let index = self.handlers.len(); self.handlers.push(HandlerEntry { - event: Some(event), handler: Box::new(handler), }); + self.by_event.entry(d).or_default().push(index); } /// Adds a handler which receives all events. @@ -53,10 +66,11 @@ impl Dispatcher { where F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static, { + let index = self.handlers.len(); self.handlers.push(HandlerEntry { - event: None, handler: Box::new(handler), }); + self.any_indices.push(index); } /// Adds a handler and returns the dispatcher for chaining. @@ -184,8 +198,37 @@ impl Dispatcher { client, }; let mut flow = DispatchFlow::Continue; - for handler in &mut self.handlers { - if handler.matches(&event) && (handler.handler)(ctx) == DispatchFlow::Stop { + + let d = mem::discriminant(&event); + let empty: Vec = Vec::new(); + let specific: &[usize] = self.by_event.get(&d).unwrap_or(&empty); + let any: &[usize] = &self.any_indices; + let handlers = &mut self.handlers; + + let mut si = 0; + let mut ai = 0; + while si < specific.len() || ai < any.len() { + let idx = match (specific.get(si), any.get(ai)) { + (Some(&s), Some(&a)) => { + if s <= a { + si += 1; + s + } else { + ai += 1; + a + } + } + (Some(&s), None) => { + si += 1; + s + } + (None, Some(&a)) => { + ai += 1; + a + } + (None, None) => break, + }; + if (handlers[idx].handler)(ctx) == DispatchFlow::Stop { flow = DispatchFlow::Stop; } } diff --git a/crates/teamtalk/src/dispatch/mod.rs b/crates/teamtalk/src/dispatch/mod.rs index 211896e..0bbb8c2 100644 --- a/crates/teamtalk/src/dispatch/mod.rs +++ b/crates/teamtalk/src/dispatch/mod.rs @@ -1,7 +1,6 @@ //! Event dispatcher built on top of `Client::poll`. use crate::client::{Client, ConnectParams, Message, ReconnectConfig, ReconnectHandler}; use crate::events::Event; -use std::mem; mod dispatcher; mod source; diff --git a/crates/teamtalk/src/dispatch/source.rs b/crates/teamtalk/src/dispatch/source.rs index 7eee6bf..158bc76 100644 --- a/crates/teamtalk/src/dispatch/source.rs +++ b/crates/teamtalk/src/dispatch/source.rs @@ -30,19 +30,9 @@ impl EventSource for &Client { pub(super) type HandlerFn = Box FnMut(EventContext<'a>) -> DispatchFlow + Send>; pub(super) struct HandlerEntry { - pub(super) event: Option, pub(super) handler: HandlerFn, } -impl HandlerEntry { - pub(super) fn matches(&self, event: &Event) -> bool { - match &self.event { - Some(e) => mem::discriminant(e) == mem::discriminant(event), - None => true, - } - } -} - pub(super) struct ReconnectState { params: ConnectParamsOwned, handler: ReconnectHandler, diff --git a/crates/teamtalk/tests/indexed_dispatch_tests.rs b/crates/teamtalk/tests/indexed_dispatch_tests.rs new file mode 100644 index 0000000..3c95800 --- /dev/null +++ b/crates/teamtalk/tests/indexed_dispatch_tests.rs @@ -0,0 +1,291 @@ +#![cfg(feature = "mock")] +//! Integration tests for the indexed event-dispatch path. +//! +//! Both the high-level `Dispatcher` (`crate::dispatch::Dispatcher`) and +//! the `Client` subscription bus (`Client::on_event` / `Client::on_any`) +//! are backed by the same per-discriminant index: registered handlers +//! are bucketed into a `HashMap, Vec>` plus +//! a wildcard `Vec`, and dispatch merges the two index lists in +//! insertion order. + +use std::sync::Arc; +use std::sync::{Mutex, MutexGuard}; + +use teamtalk::client::backend::MockBackend; +use teamtalk::client::ffi; +use teamtalk::dispatch::{DispatchFlow, Dispatcher}; +use teamtalk::mock::{MockClient, MockMessage, MockUserBuilder}; +use teamtalk::types::{ChannelId, UserId}; +use teamtalk::{Client, Event}; + +fn push_user_joined(mock: &mut MockClient, id: i32) { + let user = MockUserBuilder::new(UserId(id)) + .username(&format!("u{id}")) + .nickname(&format!("n{id}")); + mock.push_user_joined(user); +} + +fn push_text(mock: &mut MockClient, from: i32, to: i32, channel: i32, text: &str) { + let from_user = format!("u{from}"); + let msg = MockMessage::text( + ffi::TextMsgType::MSGTYPE_USER, + UserId(from), + UserId(to), + ChannelId(channel), + &from_user, + text, + ); + mock.push_text_message(msg); +} + +fn record(buf: &Arc>>, tag: &'static str) -> DispatchFlow { + buf.lock().unwrap().push(tag); + DispatchFlow::Continue +} + +fn lock_log<'a>(buf: &'a Arc>>) -> MutexGuard<'a, Vec<&'static str>> { + buf.lock().unwrap() +} + +fn mock_client() -> Client { + let backend = Arc::new(MockBackend::new()); + Client::with_backend(backend).expect("mock client") +} + +#[test] +fn specific_handler_only_fires_for_matching_discriminant() { + let mut mock = MockClient::new(); + push_user_joined(&mut mock, 1); + push_text(&mut mock, 1, 2, 3, "hi"); + mock.push_event(Event::ConnectSuccess); + + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + + let mut dispatcher = Dispatcher::new(mock) + .on_user_joined(move |_| record(&b1, "join")) + .on_text_message(move |_| record(&b2, "text")); + + dispatcher.step(0); + dispatcher.step(0); + dispatcher.step(0); + + assert_eq!(*lock_log(&log_buf), vec!["join", "text"]); +} + +#[test] +fn wildcard_handler_fires_for_every_event() { + let mut mock = MockClient::new(); + push_user_joined(&mut mock, 1); + mock.push_event(Event::ConnectSuccess); + mock.push_event(Event::ConnectionLost); + + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + let b1 = Arc::clone(&log_buf); + let mut dispatcher = Dispatcher::new(mock).on_any(move |_| record(&b1, "any")); + + for _ in 0..3 { + dispatcher.step(0); + } + + assert_eq!(*lock_log(&log_buf), vec!["any", "any", "any"]); +} + +#[test] +fn insertion_order_is_preserved_across_specific_and_wildcard() { + // Register handlers in an interleaved order: + // #0 text (specific) + // #1 any + // #2 text (specific) + // #3 any + // #4 user (specific, unrelated for this event) + // On a TextMessage the expected firing order — by insertion — is: + // text0, any1, text2, any3. Specific handlers that don't match the + // event must never fire. + let mut mock = MockClient::new(); + push_text(&mut mock, 1, 2, 3, "hi"); + + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + let b0 = Arc::clone(&log_buf); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + let b3 = Arc::clone(&log_buf); + let b4 = Arc::clone(&log_buf); + + let mut dispatcher = Dispatcher::new(mock) + .on_text_message(move |_| record(&b0, "text0")) + .on_any(move |_| record(&b1, "any1")) + .on_text_message(move |_| record(&b2, "text2")) + .on_any(move |_| record(&b3, "any3")) + .on_user_joined(move |_| record(&b4, "user4")); + + dispatcher.step(0); + + assert_eq!(*lock_log(&log_buf), vec!["text0", "any1", "text2", "any3"]); +} + +#[test] +fn unmatched_event_fires_only_wildcards() { + let mut mock = MockClient::new(); + mock.push_event(Event::ConnectionLost); + + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + let b3 = Arc::clone(&log_buf); + + let mut dispatcher = Dispatcher::new(mock) + .on_user_joined(move |_| record(&b1, "user")) + .on_any(move |_| record(&b2, "any")) + .on_text_message(move |_| record(&b3, "text")); + + dispatcher.step(0); + + assert_eq!(*lock_log(&log_buf), vec!["any"]); +} + +#[test] +fn stop_flow_propagates_from_any_slot() { + let mut mock = MockClient::new(); + push_user_joined(&mut mock, 1); + + let mut dispatcher = Dispatcher::new(mock) + .on_user_joined(|_| DispatchFlow::Continue) + .on_any(|_| DispatchFlow::Stop) + .on_user_joined(|_| DispatchFlow::Continue); + + assert!(matches!(dispatcher.step(0), DispatchFlow::Stop)); +} + +// --- Client subscription bus ------------------------------------------------ + +#[test] +fn client_bus_specific_and_wildcard_dispatch_in_insertion_order() { + let client = mock_client(); + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let b0 = Arc::clone(&log_buf); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + let b3 = Arc::clone(&log_buf); + + let _id0 = client + .on_event(Event::UserJoined) + .subscribe(move |_| b0.lock().unwrap().push("join0")); + let _id1 = client + .on_any() + .subscribe(move |_| b1.lock().unwrap().push("any1")); + let _id2 = client + .on_event(Event::UserJoined) + .subscribe(move |_| b2.lock().unwrap().push("join2")); + let _id3 = client + .on_event(Event::TextMessage) + .subscribe(move |_| b3.lock().unwrap().push("text3")); + + client.mock_dispatch_bus_for_tests(Event::UserJoined, 0); + assert_eq!( + *lock_log(&log_buf), + vec!["join0", "any1", "join2"], + "UserJoined must fire join0 (#0), any1 (#1), join2 (#2) in insertion order, skipping text3" + ); + + lock_log(&log_buf).clear(); + client.mock_dispatch_bus_for_tests(Event::TextMessage, 0); + assert_eq!(*lock_log(&log_buf), vec!["any1", "text3"]); + + lock_log(&log_buf).clear(); + client.mock_dispatch_bus_for_tests(Event::ConnectSuccess, 0); + assert_eq!(*lock_log(&log_buf), vec!["any1"]); +} + +#[test] +fn client_bus_unsubscribe_rebuilds_indexes() { + let client = mock_client(); + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let b0 = Arc::clone(&log_buf); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + + let id0 = client + .on_event(Event::UserJoined) + .subscribe(move |_| b0.lock().unwrap().push("join0")); + let _id1 = client + .on_any() + .subscribe(move |_| b1.lock().unwrap().push("any1")); + let _id2 = client + .on_event(Event::UserJoined) + .subscribe(move |_| b2.lock().unwrap().push("join2")); + assert_eq!(client.event_subscription_count(), 3); + + client.mock_dispatch_bus_for_tests(Event::UserJoined, 0); + assert_eq!(*lock_log(&log_buf), vec!["join0", "any1", "join2"]); + + assert!(client.unsubscribe_event(id0)); + assert_eq!(client.event_subscription_count(), 2); + + lock_log(&log_buf).clear(); + client.mock_dispatch_bus_for_tests(Event::UserJoined, 0); + assert_eq!(*lock_log(&log_buf), vec!["any1", "join2"]); +} + +#[test] +fn client_bus_unsubscribe_group_removes_from_all_buckets() { + let client = mock_client(); + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let b0 = Arc::clone(&log_buf); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + let b3 = Arc::clone(&log_buf); + + let _ = client + .on_event(Event::UserJoined) + .group("feature-x") + .subscribe(move |_| b0.lock().unwrap().push("join0")); + let _ = client + .on_any() + .group("feature-x") + .subscribe(move |_| b1.lock().unwrap().push("any1")); + let _ = client + .on_event(Event::TextMessage) + .group("feature-x") + .subscribe(move |_| b2.lock().unwrap().push("text2")); + let _ = client + .on_event(Event::UserJoined) + .subscribe(move |_| b3.lock().unwrap().push("join3-other")); + + assert_eq!(client.event_subscription_count(), 4); + let removed = client.unsubscribe_event_group("feature-x"); + assert_eq!(removed, 3); + assert_eq!(client.event_subscription_count(), 1); + + client.mock_dispatch_bus_for_tests(Event::UserJoined, 0); + client.mock_dispatch_bus_for_tests(Event::TextMessage, 0); + client.mock_dispatch_bus_for_tests(Event::ConnectSuccess, 0); + assert_eq!(*lock_log(&log_buf), vec!["join3-other"]); +} + +#[test] +fn client_bus_clear_subscriptions_resets_both_buckets() { + let client = mock_client(); + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + let b0 = Arc::clone(&log_buf); + let b1 = Arc::clone(&log_buf); + + let _ = client + .on_event(Event::UserJoined) + .subscribe(move |_| b0.lock().unwrap().push("join")); + let _ = client + .on_any() + .subscribe(move |_| b1.lock().unwrap().push("any")); + assert_eq!(client.event_subscription_count(), 2); + + client.clear_event_subscriptions(); + assert_eq!(client.event_subscription_count(), 0); + + client.mock_dispatch_bus_for_tests(Event::UserJoined, 0); + client.mock_dispatch_bus_for_tests(Event::TextMessage, 0); + assert!(lock_log(&log_buf).is_empty()); +} From 08f4fbd9e38e269199649e065619821222435001 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 23 Apr 2026 19:53:09 +0000 Subject: [PATCH 2/2] fix(dispatch): remove inline comment in Subscription::matches per AGENTS.md Devin Review flagged a two-line inline comment in `Subscription::matches` explaining why the discriminant filter check was removed. AGENTS.md ("Coding Style & Naming Conventions") requires no inline comments in library code; the commit drops the comment without changing behaviour. --- crates/teamtalk/src/client/bus/subscription.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/teamtalk/src/client/bus/subscription.rs b/crates/teamtalk/src/client/bus/subscription.rs index 2d30504..50131ee 100644 --- a/crates/teamtalk/src/client/bus/subscription.rs +++ b/crates/teamtalk/src/client/bus/subscription.rs @@ -197,8 +197,6 @@ impl Subscription { let user = message.user(); let text = message.text(); - // Discriminant filter is already satisfied by the bucket in - // `EventBus::dispatch`, so it is not re-checked here. if let Some(user_id) = self.user_id { let match_user = user .as_ref()