Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 77 additions & 11 deletions crates/teamtalk/src/client/bus/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@
//! per-subscription record [`Subscription`]) together with the public
//! [`SubscriptionBuilder`] that callers use to register handlers through
//! [`crate::client::Client::on_event`] / [`crate::client::Client::on_any`].
//!
//! Dispatch is indexed by `mem::Discriminant<Event>` so firing an event
//! only touches subscriptions that can possibly match it (event-specific
//! subscribers for this discriminant plus wildcard subscribers), instead
//! of scanning every registered subscription. Insertion order is
//! preserved across the specific-vs-wildcard partition by merging the
//! two index lists on the fly.

use super::context::EventContext;
use crate::client::{Client, Message};
use crate::events::Event;
use crate::types::{ChannelId, UserId};
use std::mem;
use std::collections::HashMap;
use std::mem::{self, Discriminant};
use std::sync::atomic::Ordering;
use teamtalk_sys as ffi;

Expand All @@ -35,6 +43,13 @@ impl EventSubscriptionGroup {
pub(crate) struct EventBus {
next_id: u64,
subscriptions: Vec<Subscription>,
/// Indices into `subscriptions` keyed by the `Event` discriminant
/// the subscription is filtering on. Each inner `Vec<usize>` is
/// kept in ascending (insertion) order.
by_event: HashMap<Discriminant<Event>, Vec<usize>>,
/// Indices of subscriptions registered without an `event` filter
/// (wildcard). Kept in ascending (insertion) order.
any_indices: Vec<usize>,
}

#[derive(Default)]
Expand All @@ -57,9 +72,11 @@ impl EventBus {
) -> EventSubscriptionId {
self.next_id += 1;
let id = EventSubscriptionId(self.next_id);
let event_filter = config.event.as_ref().map(mem::discriminant);
let index = self.subscriptions.len();
self.subscriptions.push(Subscription {
id,
event: config.event,
event_filter,
user_id: config.user_id,
channel_id: config.channel_id,
username: config.username,
Expand All @@ -69,32 +86,75 @@ impl EventBus {
predicate: config.predicate,
handler,
});
match event_filter {
Some(d) => self.by_event.entry(d).or_default().push(index),
None => self.any_indices.push(index),
}
id
}

pub(crate) fn unsubscribe(&mut self, id: EventSubscriptionId) -> bool {
let before = self.subscriptions.len();
self.subscriptions.retain(|sub| sub.id != id);
before != self.subscriptions.len()
if self.subscriptions.len() == before {
return false;
}
self.rebuild_indices();
true
}

pub(crate) fn clear(&mut self) {
self.subscriptions.clear();
self.by_event.clear();
self.any_indices.clear();
}

pub(crate) fn unsubscribe_group(&mut self, group: &EventSubscriptionGroup) -> usize {
let before = self.subscriptions.len();
self.subscriptions
.retain(|sub| sub.group.as_ref() != Some(group));
before.saturating_sub(self.subscriptions.len())
let removed = before.saturating_sub(self.subscriptions.len());
if removed > 0 {
self.rebuild_indices();
}
removed
}

pub(crate) fn len(&self) -> usize {
self.subscriptions.len()
}

pub(crate) fn dispatch(&mut self, client: &Client, event: Event, message: &Message) {
for sub in &mut self.subscriptions {
let d = mem::discriminant(&event);
let empty: Vec<usize> = Vec::new();
let specific: &[usize] = self.by_event.get(&d).unwrap_or(&empty);
let any: &[usize] = &self.any_indices;
let subs = &mut self.subscriptions;

let mut si = 0;
let mut ai = 0;
while si < specific.len() || ai < any.len() {
let pick = match (specific.get(si), any.get(ai)) {
(Some(&s), Some(&a)) => {
if s <= a {
si += 1;
s
} else {
ai += 1;
a
}
}
(Some(&s), None) => {
si += 1;
s
}
(None, Some(&a)) => {
ai += 1;
a
}
(None, None) => break,
};
let sub = &mut subs[pick];
if !sub.matches(client, event, message) {
continue;
}
Expand All @@ -106,11 +166,22 @@ impl EventBus {
(sub.handler)(ctx);
}
}

fn rebuild_indices(&mut self) {
self.by_event.clear();
self.any_indices.clear();
for (i, sub) in self.subscriptions.iter().enumerate() {
match sub.event_filter {
Some(d) => self.by_event.entry(d).or_default().push(i),
None => self.any_indices.push(i),
}
}
}
}

struct Subscription {
id: EventSubscriptionId,
event: Option<Event>,
event_filter: Option<Discriminant<Event>>,
user_id: Option<UserId>,
channel_id: Option<ChannelId>,
username: Option<String>,
Expand All @@ -126,11 +197,6 @@ impl Subscription {
let user = message.user();
let text = message.text();

if let Some(filter) = self.event
&& mem::discriminant(&filter) != mem::discriminant(&event)
{
return false;
}
if let Some(user_id) = self.user_id {
let match_user = user
.as_ref()
Expand Down
12 changes: 12 additions & 0 deletions crates/teamtalk/src/client/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,18 @@ impl Client {
self.update_state_for_event(event, &message);
}

/// Synchronously dispatches `event` through the subscription bus
/// (used by integration tests to exercise
/// [`Client::on_event`] / [`Client::on_any`] without running the
/// full poll loop).
#[cfg(feature = "mock")]
pub fn mock_dispatch_bus_for_tests(&self, event: Event, source: i32) {
let mut raw = unsafe { std::mem::zeroed::<ffi::TTMessage>() };
raw.nSource = source;
let message = Message::from_raw(event, raw);
self.dispatch_bus(event, &message);
}

/// Sets the client name used for login.
#[must_use]
pub fn with_name(mut self, name: &str) -> Self {
Expand Down
51 changes: 47 additions & 4 deletions crates/teamtalk/src/dispatch/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
use super::source::{EventSource, HandlerEntry, ReconnectState};
use super::{ClientConfig, DispatchFlow, Event, EventContext, Message};
use std::collections::HashMap;
use std::mem::{self, Discriminant};

pub struct Dispatcher<S: EventSource> {
source: S,
/// Registered handlers in insertion order. Index is stable for
/// the lifetime of the handler.
handlers: Vec<HandlerEntry>,
/// Indices of event-specific handlers keyed by discriminant.
by_event: HashMap<Discriminant<Event>, Vec<usize>>,
/// Indices of wildcard (`event == None`) handlers in insertion
/// order.
any_indices: Vec<usize>,
poll_timeout_ms: i32,
reconnect: Option<ReconnectState>,
stop: bool,
Expand All @@ -21,6 +30,8 @@ impl<S: EventSource> Dispatcher<S> {
Self {
source,
handlers: Vec::new(),
by_event: HashMap::new(),
any_indices: Vec::new(),
poll_timeout_ms: config.poll_timeout_ms,
reconnect,
stop: false,
Expand All @@ -42,21 +53,24 @@ impl<S: EventSource> Dispatcher<S> {
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
let d = mem::discriminant(&event);
let index = self.handlers.len();
self.handlers.push(HandlerEntry {
event: Some(event),
handler: Box::new(handler),
});
self.by_event.entry(d).or_default().push(index);
}

/// Adds a handler which receives all events.
pub fn add_handler_any<F>(&mut self, handler: F)
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
let index = self.handlers.len();
self.handlers.push(HandlerEntry {
event: None,
handler: Box::new(handler),
});
self.any_indices.push(index);
}

/// Adds a handler and returns the dispatcher for chaining.
Expand Down Expand Up @@ -184,8 +198,37 @@ impl<S: EventSource> Dispatcher<S> {
client,
};
let mut flow = DispatchFlow::Continue;
for handler in &mut self.handlers {
if handler.matches(&event) && (handler.handler)(ctx) == DispatchFlow::Stop {

let d = mem::discriminant(&event);
let empty: Vec<usize> = Vec::new();
let specific: &[usize] = self.by_event.get(&d).unwrap_or(&empty);
let any: &[usize] = &self.any_indices;
let handlers = &mut self.handlers;

let mut si = 0;
let mut ai = 0;
while si < specific.len() || ai < any.len() {
let idx = match (specific.get(si), any.get(ai)) {
(Some(&s), Some(&a)) => {
if s <= a {
si += 1;
s
} else {
ai += 1;
a
}
}
(Some(&s), None) => {
si += 1;
s
}
(None, Some(&a)) => {
ai += 1;
a
}
(None, None) => break,
};
if (handlers[idx].handler)(ctx) == DispatchFlow::Stop {
flow = DispatchFlow::Stop;
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/teamtalk/src/dispatch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Event dispatcher built on top of `Client::poll`.
use crate::client::{Client, ConnectParams, Message, ReconnectConfig, ReconnectHandler};
use crate::events::Event;
use std::mem;

mod dispatcher;
mod source;
Expand Down
10 changes: 0 additions & 10 deletions crates/teamtalk/src/dispatch/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,9 @@ impl EventSource for &Client {
pub(super) type HandlerFn = Box<dyn for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send>;

pub(super) struct HandlerEntry {
pub(super) event: Option<Event>,
pub(super) handler: HandlerFn,
}

impl HandlerEntry {
pub(super) fn matches(&self, event: &Event) -> bool {
match &self.event {
Some(e) => mem::discriminant(e) == mem::discriminant(event),
None => true,
}
}
}

pub(super) struct ReconnectState {
params: ConnectParamsOwned,
handler: ReconnectHandler,
Expand Down
Loading
Loading