diff --git a/crates/teamtalk/src/client/bus/subscription.rs b/crates/teamtalk/src/client/bus/subscription.rs index 4c49c66..50131ee 100644 --- a/crates/teamtalk/src/client/bus/subscription.rs +++ b/crates/teamtalk/src/client/bus/subscription.rs @@ -4,12 +4,20 @@ //! per-subscription record [`Subscription`]) together with the public //! [`SubscriptionBuilder`] that callers use to register handlers through //! [`crate::client::Client::on_event`] / [`crate::client::Client::on_any`]. +//! +//! Dispatch is indexed by `mem::Discriminant` so firing an event +//! only touches subscriptions that can possibly match it (event-specific +//! subscribers for this discriminant plus wildcard subscribers), instead +//! of scanning every registered subscription. Insertion order is +//! preserved across the specific-vs-wildcard partition by merging the +//! two index lists on the fly. use super::context::EventContext; use crate::client::{Client, Message}; use crate::events::Event; use crate::types::{ChannelId, UserId}; -use std::mem; +use std::collections::HashMap; +use std::mem::{self, Discriminant}; use std::sync::atomic::Ordering; use teamtalk_sys as ffi; @@ -35,6 +43,13 @@ impl EventSubscriptionGroup { pub(crate) struct EventBus { next_id: u64, subscriptions: Vec, + /// Indices into `subscriptions` keyed by the `Event` discriminant + /// the subscription is filtering on. Each inner `Vec` is + /// kept in ascending (insertion) order. + by_event: HashMap, Vec>, + /// Indices of subscriptions registered without an `event` filter + /// (wildcard). Kept in ascending (insertion) order. + any_indices: Vec, } #[derive(Default)] @@ -57,9 +72,11 @@ impl EventBus { ) -> EventSubscriptionId { self.next_id += 1; let id = EventSubscriptionId(self.next_id); + let event_filter = config.event.as_ref().map(mem::discriminant); + let index = self.subscriptions.len(); self.subscriptions.push(Subscription { id, - event: config.event, + event_filter, user_id: config.user_id, channel_id: config.channel_id, username: config.username, @@ -69,24 +86,38 @@ impl EventBus { predicate: config.predicate, handler, }); + match event_filter { + Some(d) => self.by_event.entry(d).or_default().push(index), + None => self.any_indices.push(index), + } id } pub(crate) fn unsubscribe(&mut self, id: EventSubscriptionId) -> bool { let before = self.subscriptions.len(); self.subscriptions.retain(|sub| sub.id != id); - before != self.subscriptions.len() + if self.subscriptions.len() == before { + return false; + } + self.rebuild_indices(); + true } pub(crate) fn clear(&mut self) { self.subscriptions.clear(); + self.by_event.clear(); + self.any_indices.clear(); } pub(crate) fn unsubscribe_group(&mut self, group: &EventSubscriptionGroup) -> usize { let before = self.subscriptions.len(); self.subscriptions .retain(|sub| sub.group.as_ref() != Some(group)); - before.saturating_sub(self.subscriptions.len()) + let removed = before.saturating_sub(self.subscriptions.len()); + if removed > 0 { + self.rebuild_indices(); + } + removed } pub(crate) fn len(&self) -> usize { @@ -94,7 +125,36 @@ impl EventBus { } pub(crate) fn dispatch(&mut self, client: &Client, event: Event, message: &Message) { - for sub in &mut self.subscriptions { + let d = mem::discriminant(&event); + let empty: Vec = Vec::new(); + let specific: &[usize] = self.by_event.get(&d).unwrap_or(&empty); + let any: &[usize] = &self.any_indices; + let subs = &mut self.subscriptions; + + let mut si = 0; + let mut ai = 0; + while si < specific.len() || ai < any.len() { + let pick = match (specific.get(si), any.get(ai)) { + (Some(&s), Some(&a)) => { + if s <= a { + si += 1; + s + } else { + ai += 1; + a + } + } + (Some(&s), None) => { + si += 1; + s + } + (None, Some(&a)) => { + ai += 1; + a + } + (None, None) => break, + }; + let sub = &mut subs[pick]; if !sub.matches(client, event, message) { continue; } @@ -106,11 +166,22 @@ impl EventBus { (sub.handler)(ctx); } } + + fn rebuild_indices(&mut self) { + self.by_event.clear(); + self.any_indices.clear(); + for (i, sub) in self.subscriptions.iter().enumerate() { + match sub.event_filter { + Some(d) => self.by_event.entry(d).or_default().push(i), + None => self.any_indices.push(i), + } + } + } } struct Subscription { id: EventSubscriptionId, - event: Option, + event_filter: Option>, user_id: Option, channel_id: Option, username: Option, @@ -126,11 +197,6 @@ impl Subscription { let user = message.user(); let text = message.text(); - if let Some(filter) = self.event - && mem::discriminant(&filter) != mem::discriminant(&event) - { - return false; - } if let Some(user_id) = self.user_id { let match_user = user .as_ref() diff --git a/crates/teamtalk/src/client/core/mod.rs b/crates/teamtalk/src/client/core/mod.rs index c350415..59afddc 100644 --- a/crates/teamtalk/src/client/core/mod.rs +++ b/crates/teamtalk/src/client/core/mod.rs @@ -185,6 +185,18 @@ impl Client { self.update_state_for_event(event, &message); } + /// Synchronously dispatches `event` through the subscription bus + /// (used by integration tests to exercise + /// [`Client::on_event`] / [`Client::on_any`] without running the + /// full poll loop). + #[cfg(feature = "mock")] + pub fn mock_dispatch_bus_for_tests(&self, event: Event, source: i32) { + let mut raw = unsafe { std::mem::zeroed::() }; + raw.nSource = source; + let message = Message::from_raw(event, raw); + self.dispatch_bus(event, &message); + } + /// Sets the client name used for login. #[must_use] pub fn with_name(mut self, name: &str) -> Self { diff --git a/crates/teamtalk/src/dispatch/dispatcher.rs b/crates/teamtalk/src/dispatch/dispatcher.rs index b8b4bd2..6277db0 100644 --- a/crates/teamtalk/src/dispatch/dispatcher.rs +++ b/crates/teamtalk/src/dispatch/dispatcher.rs @@ -1,9 +1,18 @@ use super::source::{EventSource, HandlerEntry, ReconnectState}; use super::{ClientConfig, DispatchFlow, Event, EventContext, Message}; +use std::collections::HashMap; +use std::mem::{self, Discriminant}; pub struct Dispatcher { source: S, + /// Registered handlers in insertion order. Index is stable for + /// the lifetime of the handler. handlers: Vec, + /// Indices of event-specific handlers keyed by discriminant. + by_event: HashMap, Vec>, + /// Indices of wildcard (`event == None`) handlers in insertion + /// order. + any_indices: Vec, poll_timeout_ms: i32, reconnect: Option, stop: bool, @@ -21,6 +30,8 @@ impl Dispatcher { Self { source, handlers: Vec::new(), + by_event: HashMap::new(), + any_indices: Vec::new(), poll_timeout_ms: config.poll_timeout_ms, reconnect, stop: false, @@ -42,10 +53,12 @@ impl Dispatcher { where F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static, { + let d = mem::discriminant(&event); + let index = self.handlers.len(); self.handlers.push(HandlerEntry { - event: Some(event), handler: Box::new(handler), }); + self.by_event.entry(d).or_default().push(index); } /// Adds a handler which receives all events. @@ -53,10 +66,11 @@ impl Dispatcher { where F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static, { + let index = self.handlers.len(); self.handlers.push(HandlerEntry { - event: None, handler: Box::new(handler), }); + self.any_indices.push(index); } /// Adds a handler and returns the dispatcher for chaining. @@ -184,8 +198,37 @@ impl Dispatcher { client, }; let mut flow = DispatchFlow::Continue; - for handler in &mut self.handlers { - if handler.matches(&event) && (handler.handler)(ctx) == DispatchFlow::Stop { + + let d = mem::discriminant(&event); + let empty: Vec = Vec::new(); + let specific: &[usize] = self.by_event.get(&d).unwrap_or(&empty); + let any: &[usize] = &self.any_indices; + let handlers = &mut self.handlers; + + let mut si = 0; + let mut ai = 0; + while si < specific.len() || ai < any.len() { + let idx = match (specific.get(si), any.get(ai)) { + (Some(&s), Some(&a)) => { + if s <= a { + si += 1; + s + } else { + ai += 1; + a + } + } + (Some(&s), None) => { + si += 1; + s + } + (None, Some(&a)) => { + ai += 1; + a + } + (None, None) => break, + }; + if (handlers[idx].handler)(ctx) == DispatchFlow::Stop { flow = DispatchFlow::Stop; } } diff --git a/crates/teamtalk/src/dispatch/mod.rs b/crates/teamtalk/src/dispatch/mod.rs index 211896e..0bbb8c2 100644 --- a/crates/teamtalk/src/dispatch/mod.rs +++ b/crates/teamtalk/src/dispatch/mod.rs @@ -1,7 +1,6 @@ //! Event dispatcher built on top of `Client::poll`. use crate::client::{Client, ConnectParams, Message, ReconnectConfig, ReconnectHandler}; use crate::events::Event; -use std::mem; mod dispatcher; mod source; diff --git a/crates/teamtalk/src/dispatch/source.rs b/crates/teamtalk/src/dispatch/source.rs index 7eee6bf..158bc76 100644 --- a/crates/teamtalk/src/dispatch/source.rs +++ b/crates/teamtalk/src/dispatch/source.rs @@ -30,19 +30,9 @@ impl EventSource for &Client { pub(super) type HandlerFn = Box FnMut(EventContext<'a>) -> DispatchFlow + Send>; pub(super) struct HandlerEntry { - pub(super) event: Option, pub(super) handler: HandlerFn, } -impl HandlerEntry { - pub(super) fn matches(&self, event: &Event) -> bool { - match &self.event { - Some(e) => mem::discriminant(e) == mem::discriminant(event), - None => true, - } - } -} - pub(super) struct ReconnectState { params: ConnectParamsOwned, handler: ReconnectHandler, diff --git a/crates/teamtalk/tests/indexed_dispatch_tests.rs b/crates/teamtalk/tests/indexed_dispatch_tests.rs new file mode 100644 index 0000000..3c95800 --- /dev/null +++ b/crates/teamtalk/tests/indexed_dispatch_tests.rs @@ -0,0 +1,291 @@ +#![cfg(feature = "mock")] +//! Integration tests for the indexed event-dispatch path. +//! +//! Both the high-level `Dispatcher` (`crate::dispatch::Dispatcher`) and +//! the `Client` subscription bus (`Client::on_event` / `Client::on_any`) +//! are backed by the same per-discriminant index: registered handlers +//! are bucketed into a `HashMap, Vec>` plus +//! a wildcard `Vec`, and dispatch merges the two index lists in +//! insertion order. + +use std::sync::Arc; +use std::sync::{Mutex, MutexGuard}; + +use teamtalk::client::backend::MockBackend; +use teamtalk::client::ffi; +use teamtalk::dispatch::{DispatchFlow, Dispatcher}; +use teamtalk::mock::{MockClient, MockMessage, MockUserBuilder}; +use teamtalk::types::{ChannelId, UserId}; +use teamtalk::{Client, Event}; + +fn push_user_joined(mock: &mut MockClient, id: i32) { + let user = MockUserBuilder::new(UserId(id)) + .username(&format!("u{id}")) + .nickname(&format!("n{id}")); + mock.push_user_joined(user); +} + +fn push_text(mock: &mut MockClient, from: i32, to: i32, channel: i32, text: &str) { + let from_user = format!("u{from}"); + let msg = MockMessage::text( + ffi::TextMsgType::MSGTYPE_USER, + UserId(from), + UserId(to), + ChannelId(channel), + &from_user, + text, + ); + mock.push_text_message(msg); +} + +fn record(buf: &Arc>>, tag: &'static str) -> DispatchFlow { + buf.lock().unwrap().push(tag); + DispatchFlow::Continue +} + +fn lock_log<'a>(buf: &'a Arc>>) -> MutexGuard<'a, Vec<&'static str>> { + buf.lock().unwrap() +} + +fn mock_client() -> Client { + let backend = Arc::new(MockBackend::new()); + Client::with_backend(backend).expect("mock client") +} + +#[test] +fn specific_handler_only_fires_for_matching_discriminant() { + let mut mock = MockClient::new(); + push_user_joined(&mut mock, 1); + push_text(&mut mock, 1, 2, 3, "hi"); + mock.push_event(Event::ConnectSuccess); + + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + + let mut dispatcher = Dispatcher::new(mock) + .on_user_joined(move |_| record(&b1, "join")) + .on_text_message(move |_| record(&b2, "text")); + + dispatcher.step(0); + dispatcher.step(0); + dispatcher.step(0); + + assert_eq!(*lock_log(&log_buf), vec!["join", "text"]); +} + +#[test] +fn wildcard_handler_fires_for_every_event() { + let mut mock = MockClient::new(); + push_user_joined(&mut mock, 1); + mock.push_event(Event::ConnectSuccess); + mock.push_event(Event::ConnectionLost); + + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + let b1 = Arc::clone(&log_buf); + let mut dispatcher = Dispatcher::new(mock).on_any(move |_| record(&b1, "any")); + + for _ in 0..3 { + dispatcher.step(0); + } + + assert_eq!(*lock_log(&log_buf), vec!["any", "any", "any"]); +} + +#[test] +fn insertion_order_is_preserved_across_specific_and_wildcard() { + // Register handlers in an interleaved order: + // #0 text (specific) + // #1 any + // #2 text (specific) + // #3 any + // #4 user (specific, unrelated for this event) + // On a TextMessage the expected firing order — by insertion — is: + // text0, any1, text2, any3. Specific handlers that don't match the + // event must never fire. + let mut mock = MockClient::new(); + push_text(&mut mock, 1, 2, 3, "hi"); + + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + let b0 = Arc::clone(&log_buf); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + let b3 = Arc::clone(&log_buf); + let b4 = Arc::clone(&log_buf); + + let mut dispatcher = Dispatcher::new(mock) + .on_text_message(move |_| record(&b0, "text0")) + .on_any(move |_| record(&b1, "any1")) + .on_text_message(move |_| record(&b2, "text2")) + .on_any(move |_| record(&b3, "any3")) + .on_user_joined(move |_| record(&b4, "user4")); + + dispatcher.step(0); + + assert_eq!(*lock_log(&log_buf), vec!["text0", "any1", "text2", "any3"]); +} + +#[test] +fn unmatched_event_fires_only_wildcards() { + let mut mock = MockClient::new(); + mock.push_event(Event::ConnectionLost); + + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + let b3 = Arc::clone(&log_buf); + + let mut dispatcher = Dispatcher::new(mock) + .on_user_joined(move |_| record(&b1, "user")) + .on_any(move |_| record(&b2, "any")) + .on_text_message(move |_| record(&b3, "text")); + + dispatcher.step(0); + + assert_eq!(*lock_log(&log_buf), vec!["any"]); +} + +#[test] +fn stop_flow_propagates_from_any_slot() { + let mut mock = MockClient::new(); + push_user_joined(&mut mock, 1); + + let mut dispatcher = Dispatcher::new(mock) + .on_user_joined(|_| DispatchFlow::Continue) + .on_any(|_| DispatchFlow::Stop) + .on_user_joined(|_| DispatchFlow::Continue); + + assert!(matches!(dispatcher.step(0), DispatchFlow::Stop)); +} + +// --- Client subscription bus ------------------------------------------------ + +#[test] +fn client_bus_specific_and_wildcard_dispatch_in_insertion_order() { + let client = mock_client(); + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let b0 = Arc::clone(&log_buf); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + let b3 = Arc::clone(&log_buf); + + let _id0 = client + .on_event(Event::UserJoined) + .subscribe(move |_| b0.lock().unwrap().push("join0")); + let _id1 = client + .on_any() + .subscribe(move |_| b1.lock().unwrap().push("any1")); + let _id2 = client + .on_event(Event::UserJoined) + .subscribe(move |_| b2.lock().unwrap().push("join2")); + let _id3 = client + .on_event(Event::TextMessage) + .subscribe(move |_| b3.lock().unwrap().push("text3")); + + client.mock_dispatch_bus_for_tests(Event::UserJoined, 0); + assert_eq!( + *lock_log(&log_buf), + vec!["join0", "any1", "join2"], + "UserJoined must fire join0 (#0), any1 (#1), join2 (#2) in insertion order, skipping text3" + ); + + lock_log(&log_buf).clear(); + client.mock_dispatch_bus_for_tests(Event::TextMessage, 0); + assert_eq!(*lock_log(&log_buf), vec!["any1", "text3"]); + + lock_log(&log_buf).clear(); + client.mock_dispatch_bus_for_tests(Event::ConnectSuccess, 0); + assert_eq!(*lock_log(&log_buf), vec!["any1"]); +} + +#[test] +fn client_bus_unsubscribe_rebuilds_indexes() { + let client = mock_client(); + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let b0 = Arc::clone(&log_buf); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + + let id0 = client + .on_event(Event::UserJoined) + .subscribe(move |_| b0.lock().unwrap().push("join0")); + let _id1 = client + .on_any() + .subscribe(move |_| b1.lock().unwrap().push("any1")); + let _id2 = client + .on_event(Event::UserJoined) + .subscribe(move |_| b2.lock().unwrap().push("join2")); + assert_eq!(client.event_subscription_count(), 3); + + client.mock_dispatch_bus_for_tests(Event::UserJoined, 0); + assert_eq!(*lock_log(&log_buf), vec!["join0", "any1", "join2"]); + + assert!(client.unsubscribe_event(id0)); + assert_eq!(client.event_subscription_count(), 2); + + lock_log(&log_buf).clear(); + client.mock_dispatch_bus_for_tests(Event::UserJoined, 0); + assert_eq!(*lock_log(&log_buf), vec!["any1", "join2"]); +} + +#[test] +fn client_bus_unsubscribe_group_removes_from_all_buckets() { + let client = mock_client(); + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let b0 = Arc::clone(&log_buf); + let b1 = Arc::clone(&log_buf); + let b2 = Arc::clone(&log_buf); + let b3 = Arc::clone(&log_buf); + + let _ = client + .on_event(Event::UserJoined) + .group("feature-x") + .subscribe(move |_| b0.lock().unwrap().push("join0")); + let _ = client + .on_any() + .group("feature-x") + .subscribe(move |_| b1.lock().unwrap().push("any1")); + let _ = client + .on_event(Event::TextMessage) + .group("feature-x") + .subscribe(move |_| b2.lock().unwrap().push("text2")); + let _ = client + .on_event(Event::UserJoined) + .subscribe(move |_| b3.lock().unwrap().push("join3-other")); + + assert_eq!(client.event_subscription_count(), 4); + let removed = client.unsubscribe_event_group("feature-x"); + assert_eq!(removed, 3); + assert_eq!(client.event_subscription_count(), 1); + + client.mock_dispatch_bus_for_tests(Event::UserJoined, 0); + client.mock_dispatch_bus_for_tests(Event::TextMessage, 0); + client.mock_dispatch_bus_for_tests(Event::ConnectSuccess, 0); + assert_eq!(*lock_log(&log_buf), vec!["join3-other"]); +} + +#[test] +fn client_bus_clear_subscriptions_resets_both_buckets() { + let client = mock_client(); + let log_buf: Arc>> = Arc::new(Mutex::new(Vec::new())); + let b0 = Arc::clone(&log_buf); + let b1 = Arc::clone(&log_buf); + + let _ = client + .on_event(Event::UserJoined) + .subscribe(move |_| b0.lock().unwrap().push("join")); + let _ = client + .on_any() + .subscribe(move |_| b1.lock().unwrap().push("any")); + assert_eq!(client.event_subscription_count(), 2); + + client.clear_event_subscriptions(); + assert_eq!(client.event_subscription_count(), 0); + + client.mock_dispatch_bus_for_tests(Event::UserJoined, 0); + client.mock_dispatch_bus_for_tests(Event::TextMessage, 0); + assert!(lock_log(&log_buf).is_empty()); +}