diff --git a/AGENTS.md b/AGENTS.md index 9229be6..7789f86 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -31,7 +31,12 @@ - `src/extensions/plugins.rs` for `TeamTalkPlugin` trait, `PluginFlow`, `PluginError`, and `PluginManager` (register, load, dispatch, unload). - `bot/router` is split into `src/bot/router/{mod,builder,dispatch,help,helpers}.rs`. - `bot` currently includes: - - `src/bot/fsm.rs` for dialog/session state, timeout policy, metadata, and flow helpers. + - `src/bot/fsm/` is directory-first: + - `src/bot/fsm/state.rs` for the `DialogState` record and its netstring encoding. + - `src/bot/fsm/status.rs` for `DialogStatus` and `DialogTimeoutPolicy` enums. + - `src/bot/fsm/flow.rs` for the declarative `DialogFlow` linear step sequence. + - `src/bot/fsm/machine.rs` for the `DialogMachine` mutating facade over `StateStore`. + - `src/bot/fsm/encoding.rs` for shared constants, netstring codec, and time helpers. - `src/bot/middleware.rs` for function middleware, guards, and rate limiting. - `src/bot/permissions.rs` for rights-based permission presets on top of TeamTalk account rights. - `src/bot/storage.rs` plus Redis/SQLite adapters for bot state backends; StateStore v2 adds `exists`, `set_with_ttl`, `keys`, `remove_prefix`, `get_many`, `set_many` with TTL support in Memory (Instant+Entry), Redis (SCAN/SETEX), and SQLite (expires_at column). diff --git a/crates/teamtalk/src/bot/fsm.rs b/crates/teamtalk/src/bot/fsm.rs deleted file mode 100644 index eb7fe9a..0000000 --- a/crates/teamtalk/src/bot/fsm.rs +++ /dev/null @@ -1,559 +0,0 @@ -use super::storage::StateStore; -use crate::types::UserId; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -const DIALOG_ENCODING_VERSION: &str = "v2"; -const INTERNAL_SESSION_KEY: &str = "__session"; -const INTERNAL_TIMEOUT_POLICY_KEY: &str = "__timeout_policy"; -static NEXT_DIALOG_SESSION: AtomicU64 = AtomicU64::new(1); - -#[non_exhaustive] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum DialogStatus { - Active, - Paused, -} - -#[non_exhaustive] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum DialogTimeoutPolicy { - Clear, - Pause, -} - -impl DialogTimeoutPolicy { - fn encode(self) -> &'static str { - match self { - Self::Clear => "clear", - Self::Pause => "pause", - } - } - - fn decode(raw: &str) -> Option { - match raw { - "clear" => Some(Self::Clear), - "pause" => Some(Self::Pause), - _ => None, - } - } -} - -impl DialogStatus { - fn encode(self) -> &'static str { - match self { - Self::Active => "active", - Self::Paused => "paused", - } - } - - fn decode(raw: &str) -> Option { - match raw { - "active" => Some(Self::Active), - "paused" => Some(Self::Paused), - _ => None, - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -#[non_exhaustive] -pub struct DialogState { - pub dialog: String, - pub step: String, - pub status: DialogStatus, - pub deadline_unix_ms: Option, - pub metadata: Vec<(String, String)>, -} - -impl DialogState { - pub fn new(dialog: impl Into, step: impl Into) -> Self { - Self { - dialog: dialog.into(), - step: step.into(), - status: DialogStatus::Active, - deadline_unix_ms: None, - metadata: Vec::new(), - } - } - - #[must_use] - pub fn with_deadline_unix_ms(mut self, deadline_unix_ms: u64) -> Self { - self.deadline_unix_ms = Some(deadline_unix_ms); - self - } - - #[must_use] - pub fn with_timeout(mut self, timeout: Duration) -> Self { - self.deadline_unix_ms = Some(now_unix_ms().saturating_add(duration_to_millis(timeout))); - self - } - - #[must_use] - pub fn with_status(mut self, status: DialogStatus) -> Self { - self.status = status; - self - } - - #[must_use] - pub fn with_timeout_policy(mut self, policy: DialogTimeoutPolicy) -> Self { - self.set_metadata(INTERNAL_TIMEOUT_POLICY_KEY, policy.encode()); - self - } - - #[must_use] - pub fn with_metadata( - mut self, - metadata: impl IntoIterator, impl Into)>, - ) -> Self { - self.metadata = metadata - .into_iter() - .map(|(key, value)| (key.into(), value.into())) - .collect(); - self - } - - #[must_use] - pub fn is_active(&self) -> bool { - matches!(self.status, DialogStatus::Active) - } - - #[must_use] - pub fn is_paused(&self) -> bool { - matches!(self.status, DialogStatus::Paused) - } - - #[must_use] - pub fn is_expired(&self) -> bool { - self.is_expired_at(now_unix_ms()) - } - - #[must_use] - pub fn is_expired_at(&self, now_unix_ms: u64) -> bool { - self.deadline_unix_ms - .is_some_and(|deadline| deadline <= now_unix_ms) - } - - #[allow(clippy::must_use_candidate)] - pub fn metadata(&self, key: &str) -> Option<&str> { - self.metadata - .iter() - .find_map(|(existing, value)| (existing == key).then_some(value.as_str())) - } - - #[allow(clippy::must_use_candidate)] - pub fn session_id(&self) -> Option<&str> { - self.metadata(INTERNAL_SESSION_KEY) - } - - #[must_use] - pub fn timeout_policy(&self) -> DialogTimeoutPolicy { - self.metadata(INTERNAL_TIMEOUT_POLICY_KEY) - .and_then(DialogTimeoutPolicy::decode) - .unwrap_or(DialogTimeoutPolicy::Clear) - } - - pub fn set_metadata(&mut self, key: impl Into, value: impl Into) { - let key = key.into(); - let value = value.into(); - if let Some((_, existing)) = self - .metadata - .iter_mut() - .find(|(existing, _)| *existing == key) - { - *existing = value; - return; - } - self.metadata.push((key, value)); - } - - pub fn remove_metadata(&mut self, key: &str) -> Option { - let index = self - .metadata - .iter() - .position(|(existing, _)| existing == key)?; - Some(self.metadata.remove(index).1) - } - - #[must_use] - pub fn encode(&self) -> String { - let mut fields = Vec::with_capacity(6 + self.metadata.len() * 2); - fields.push(netstring(DIALOG_ENCODING_VERSION)); - fields.push(netstring(self.status.encode())); - fields.push(netstring( - &self - .deadline_unix_ms - .map(|value| value.to_string()) - .unwrap_or_default(), - )); - fields.push(netstring(&self.dialog)); - fields.push(netstring(&self.step)); - fields.push(netstring(&self.metadata.len().to_string())); - for (key, value) in &self.metadata { - fields.push(netstring(key)); - fields.push(netstring(value)); - } - fields.concat() - } - - #[allow(clippy::must_use_candidate)] - pub fn decode(raw: &str) -> Option { - if !raw.contains(':') { - let (dialog, step) = raw.split_once('|')?; - if dialog.is_empty() || step.is_empty() { - return None; - } - return Some(Self::new(dialog, step)); - } - - let mut remainder = raw; - let Some(version) = parse_netstring(&mut remainder) else { - let (dialog, step) = raw.split_once('|')?; - if dialog.is_empty() || step.is_empty() { - return None; - } - return Some(Self::new(dialog, step)); - }; - if version != DIALOG_ENCODING_VERSION { - let (dialog, step) = raw.split_once('|')?; - if dialog.is_empty() || step.is_empty() { - return None; - } - return Some(Self::new(dialog, step)); - } - - let status = DialogStatus::decode(parse_netstring(&mut remainder)?)?; - let deadline_unix_ms = { - let value = parse_netstring(&mut remainder)?; - if value.is_empty() { - None - } else { - Some(value.parse::().ok()?) - } - }; - let dialog = parse_netstring(&mut remainder)?.to_owned(); - let step = parse_netstring(&mut remainder)?.to_owned(); - if dialog.is_empty() || step.is_empty() { - return None; - } - let metadata_len = parse_netstring(&mut remainder)?.parse::().ok()?; - let mut metadata = Vec::with_capacity(metadata_len); - for _ in 0..metadata_len { - let key = parse_netstring(&mut remainder)?.to_owned(); - let value = parse_netstring(&mut remainder)?.to_owned(); - metadata.push((key, value)); - } - if !remainder.is_empty() { - return None; - } - - Some(Self { - dialog, - step, - status, - deadline_unix_ms, - metadata, - }) - } -} - -pub struct DialogMachine<'a> { - store: &'a mut dyn StateStore, - prefix: String, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct DialogFlow { - name: String, - start_step: String, - steps: Vec, -} - -impl DialogFlow { - pub fn new(name: impl Into, start_step: impl Into) -> Self { - Self { - name: name.into(), - start_step: start_step.into(), - steps: Vec::new(), - } - } - - #[must_use] - pub fn step(mut self, step: impl Into) -> Self { - self.steps.push(step.into()); - self - } - - #[must_use] - pub fn name(&self) -> &str { - &self.name - } - - #[must_use] - pub fn start_step(&self) -> &str { - &self.start_step - } - - #[must_use] - pub fn steps(&self) -> &[String] { - &self.steps - } - - #[must_use] - pub fn contains_step(&self, step: &str) -> bool { - step == self.start_step || self.steps.iter().any(|item| item == step) - } - - pub fn next_step(&self, step: &str) -> Option<&str> { - if step == self.start_step { - return self.steps.first().map(String::as_str); - } - - self.steps - .windows(2) - .find_map(|window| (window[0] == step).then_some(window[1].as_str())) - } - - #[allow(clippy::must_use_candidate)] - pub fn previous_step(&self, step: &str) -> Option<&str> { - if let Some(first) = self.steps.first() - && first == step - { - return Some(&self.start_step); - } - - self.steps - .windows(2) - .find_map(|window| (window[1] == step).then_some(window[0].as_str())) - } - - #[must_use] - pub fn is_start_step(&self, step: &str) -> bool { - step == self.start_step - } - - #[must_use] - pub fn is_terminal_step(&self, step: &str) -> bool { - self.steps.last().is_some_and(|last| last == step) - } -} - -impl<'a> DialogMachine<'a> { - pub fn new(store: &'a mut dyn StateStore) -> Self { - Self { - store, - prefix: "bot:dialog".to_owned(), - } - } - - #[must_use] - pub fn with_prefix(store: &'a mut dyn StateStore, prefix: impl Into) -> Self { - Self { - store, - prefix: prefix.into(), - } - } - - fn key(&self, source_id: UserId) -> String { - format!("{}:{}", self.prefix, source_id.raw()) - } - - pub fn start(&mut self, source_id: UserId, dialog: impl Into, step: impl Into) { - self.start_state(source_id, DialogState::new(dialog, step)); - } - - pub fn start_state(&mut self, source_id: UserId, state: DialogState) { - self.store - .set(self.key(source_id), Self::prepare_state(state).encode()); - } - - #[allow(clippy::must_use_candidate)] - pub fn current(&self, source_id: UserId) -> Option { - self.store - .get(&self.key(source_id)) - .and_then(|raw| DialogState::decode(&raw)) - } - - pub fn current_live(&mut self, source_id: UserId) -> Option { - let mut state = self.current(source_id)?; - if state.is_expired() { - match state.timeout_policy() { - DialogTimeoutPolicy::Clear => { - let _ = self.stop(source_id); - return None; - } - DialogTimeoutPolicy::Pause => { - state.deadline_unix_ms = None; - state.status = DialogStatus::Paused; - self.store.set(self.key(source_id), state.encode()); - } - } - } - Some(state) - } - - pub fn current_active(&mut self, source_id: UserId) -> Option { - let state = self.current_live(source_id)?; - state.is_active().then_some(state) - } - - #[must_use] - pub fn is_in(&mut self, source_id: UserId, dialog: &str, step: &str) -> bool { - self.current_active(source_id) - .is_some_and(|state| state.dialog == dialog && state.step == step) - } - - pub fn advance( - &mut self, - source_id: UserId, - next_step: impl Into, - ) -> Option { - let mut state = self.current_live(source_id)?; - state.step = next_step.into(); - state.status = DialogStatus::Active; - self.store.set(self.key(source_id), state.encode()); - Some(state) - } - - pub fn pause(&mut self, source_id: UserId) -> Option { - self.update(source_id, |state| state.status = DialogStatus::Paused) - } - - pub fn resume(&mut self, source_id: UserId) -> Option { - self.update(source_id, |state| state.status = DialogStatus::Active) - } - - pub fn set_timeout(&mut self, source_id: UserId, timeout: Duration) -> Option { - self.update(source_id, |state| { - state.deadline_unix_ms = - Some(now_unix_ms().saturating_add(duration_to_millis(timeout))); - }) - } - - pub fn clear_timeout(&mut self, source_id: UserId) -> Option { - self.update(source_id, |state| state.deadline_unix_ms = None) - } - - pub fn set_timeout_policy( - &mut self, - source_id: UserId, - policy: DialogTimeoutPolicy, - ) -> Option { - self.update(source_id, |state| { - state.set_metadata(INTERNAL_TIMEOUT_POLICY_KEY, policy.encode()); - }) - } - - pub fn timeout_policy(&mut self, source_id: UserId) -> Option { - self.current(source_id).map(|state| state.timeout_policy()) - } - - pub fn metadata(&mut self, source_id: UserId, key: &str) -> Option { - self.current_live(source_id)? - .metadata(key) - .map(ToOwned::to_owned) - } - - pub fn set_metadata( - &mut self, - source_id: UserId, - key: impl Into, - value: impl Into, - ) -> Option { - let key = key.into(); - let value = value.into(); - self.update(source_id, move |state| { - state.set_metadata(key.clone(), value.clone()); - }) - } - - pub fn remove_metadata( - &mut self, - source_id: UserId, - key: &str, - ) -> Option<(DialogState, Option)> { - let mut removed = None; - let state = self.update(source_id, |state| { - removed = state.remove_metadata(key); - })?; - Some((state, removed)) - } - - pub fn stop(&mut self, source_id: UserId) -> Option { - self.store - .remove(&self.key(source_id)) - .and_then(|raw| DialogState::decode(&raw)) - } - - #[must_use] - pub fn restart_flow(&mut self, source_id: UserId, flow: &DialogFlow) -> DialogState { - let state = DialogState::new(flow.name(), flow.start_step()); - self.start_state(source_id, state.clone()); - self.current(source_id).unwrap_or(state) - } - - pub fn advance_flow(&mut self, source_id: UserId, flow: &DialogFlow) -> Option { - let current = self.current_live(source_id)?; - if !current.dialog.eq_ignore_ascii_case(flow.name()) { - return None; - } - let next = flow.next_step(¤t.step)?; - self.advance(source_id, next) - } - - fn update(&mut self, source_id: UserId, mut update: F) -> Option - where - F: FnMut(&mut DialogState), - { - let mut state = self.current_live(source_id)?; - update(&mut state); - self.store.set(self.key(source_id), state.encode()); - Some(state) - } - - fn prepare_state(mut state: DialogState) -> DialogState { - if state.session_id().is_none() { - state.set_metadata(INTERNAL_SESSION_KEY, generate_session_id()); - } - state - } -} - -fn netstring(value: &str) -> String { - format!("{}:{value},", value.len()) -} - -fn parse_netstring<'a>(input: &mut &'a str) -> Option<&'a str> { - let colon = input.find(':')?; - let len = input[..colon].parse::().ok()?; - let start = colon + 1; - let end = start.checked_add(len)?; - let trailing = end.checked_add(1)?; - if input.len() < trailing || input.as_bytes().get(end).copied()? != b',' { - return None; - } - let value = &input[start..end]; - *input = &input[trailing..]; - Some(value) -} - -fn now_unix_ms() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_millis() - .try_into() - .unwrap_or(u64::MAX) -} - -fn duration_to_millis(duration: Duration) -> u64 { - duration.as_millis().try_into().unwrap_or(u64::MAX) -} - -fn generate_session_id() -> String { - format!( - "{}-{}", - now_unix_ms(), - NEXT_DIALOG_SESSION.fetch_add(1, Ordering::Relaxed) - ) -} diff --git a/crates/teamtalk/src/bot/fsm/encoding.rs b/crates/teamtalk/src/bot/fsm/encoding.rs new file mode 100644 index 0000000..3e4671c --- /dev/null +++ b/crates/teamtalk/src/bot/fsm/encoding.rs @@ -0,0 +1,71 @@ +//! Shared constants, time helpers, and the netstring wire format used +//! by [`crate::bot::DialogState::encode`] / [`crate::bot::DialogState::decode`]. +//! +//! The dialog encoding is a sequence of [netstrings][dj-netstring] of the +//! form `len:value,`. This makes the format self-delimiting, forward- +//! compatible (old decoders can be replaced by version-aware ones by +//! bumping [`DIALOG_ENCODING_VERSION`]), and trivially safe to embed +//! inside the arbitrary `String` values accepted by [`StateStore`][1] +//! backends. +//! +//! [dj-netstring]: https://cr.yp.to/proto/netstrings.txt +//! [1]: crate::bot::StateStore + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +/// Current dialog wire-format version; bump on any breaking encoding +/// change so [`super::state::DialogState::decode`] can fall back to the +/// legacy `dialog|step` path for values stored before netstrings were +/// introduced. +pub(super) const DIALOG_ENCODING_VERSION: &str = "v2"; + +/// Metadata key under which [`super::state::DialogState`] stores the +/// per-session id. Internal; never exposed to user metadata. +pub(super) const INTERNAL_SESSION_KEY: &str = "__session"; + +/// Metadata key under which [`super::state::DialogState`] stores the +/// encoded [`super::status::DialogTimeoutPolicy`]. Internal; never +/// exposed to user metadata. +pub(super) const INTERNAL_TIMEOUT_POLICY_KEY: &str = "__timeout_policy"; + +static NEXT_DIALOG_SESSION: AtomicU64 = AtomicU64::new(1); + +pub(super) fn netstring(value: &str) -> String { + format!("{}:{value},", value.len()) +} + +pub(super) fn parse_netstring<'a>(input: &mut &'a str) -> Option<&'a str> { + let colon = input.find(':')?; + let len = input[..colon].parse::().ok()?; + let start = colon + 1; + let end = start.checked_add(len)?; + let trailing = end.checked_add(1)?; + if input.len() < trailing || input.as_bytes().get(end).copied()? != b',' { + return None; + } + let value = &input[start..end]; + *input = &input[trailing..]; + Some(value) +} + +pub(super) fn now_unix_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .try_into() + .unwrap_or(u64::MAX) +} + +pub(super) fn duration_to_millis(duration: Duration) -> u64 { + duration.as_millis().try_into().unwrap_or(u64::MAX) +} + +pub(super) fn generate_session_id() -> String { + format!( + "{}-{}", + now_unix_ms(), + NEXT_DIALOG_SESSION.fetch_add(1, Ordering::Relaxed) + ) +} diff --git a/crates/teamtalk/src/bot/fsm/flow.rs b/crates/teamtalk/src/bot/fsm/flow.rs new file mode 100644 index 0000000..dd6f76f --- /dev/null +++ b/crates/teamtalk/src/bot/fsm/flow.rs @@ -0,0 +1,99 @@ +//! Declarative linear dialog flow used by +//! [`super::machine::DialogMachine::restart_flow`] and +//! [`super::machine::DialogMachine::advance_flow`]. + +/// A named, ordered sequence of dialog steps. +/// +/// A `DialogFlow` is a lightweight description of a linear conversation: +/// a `start_step`, then zero or more subsequent `steps` visited in +/// order. It does not own any state itself - step navigation is always +/// performed against an external [`super::state::DialogState`] via +/// [`super::machine::DialogMachine`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DialogFlow { + name: String, + start_step: String, + steps: Vec, +} + +impl DialogFlow { + /// Creates a new flow with the given dialog name and start step. + pub fn new(name: impl Into, start_step: impl Into) -> Self { + Self { + name: name.into(), + start_step: start_step.into(), + steps: Vec::new(), + } + } + + /// Appends `step` to the end of the flow. + #[must_use] + pub fn step(mut self, step: impl Into) -> Self { + self.steps.push(step.into()); + self + } + + /// Returns the dialog name associated with this flow. + #[must_use] + pub fn name(&self) -> &str { + &self.name + } + + /// Returns the start step label. + #[must_use] + pub fn start_step(&self) -> &str { + &self.start_step + } + + /// Returns the ordered list of non-start steps. + #[must_use] + pub fn steps(&self) -> &[String] { + &self.steps + } + + /// Returns `true` if `step` is the start step or appears in the + /// step list. + #[must_use] + pub fn contains_step(&self, step: &str) -> bool { + step == self.start_step || self.steps.iter().any(|item| item == step) + } + + /// Returns the step that follows `step`, or `None` if `step` is + /// unknown or terminal. + pub fn next_step(&self, step: &str) -> Option<&str> { + if step == self.start_step { + return self.steps.first().map(String::as_str); + } + + self.steps + .windows(2) + .find_map(|window| (window[0] == step).then_some(window[1].as_str())) + } + + /// Returns the step that precedes `step`, or `None` if `step` is + /// the start step or unknown. + #[allow(clippy::must_use_candidate)] + pub fn previous_step(&self, step: &str) -> Option<&str> { + if let Some(first) = self.steps.first() + && first == step + { + return Some(&self.start_step); + } + + self.steps + .windows(2) + .find_map(|window| (window[1] == step).then_some(window[0].as_str())) + } + + /// Returns `true` when `step` is the start step. + #[must_use] + pub fn is_start_step(&self, step: &str) -> bool { + step == self.start_step + } + + /// Returns `true` when `step` is the last step in the flow. + #[must_use] + pub fn is_terminal_step(&self, step: &str) -> bool { + self.steps.last().is_some_and(|last| last == step) + } +} diff --git a/crates/teamtalk/src/bot/fsm/machine.rs b/crates/teamtalk/src/bot/fsm/machine.rs new file mode 100644 index 0000000..3ef530a --- /dev/null +++ b/crates/teamtalk/src/bot/fsm/machine.rs @@ -0,0 +1,236 @@ +//! Mutating dialog façade over a [`StateStore`]. +//! +//! [`DialogMachine`] borrows a `&mut dyn StateStore` and treats it as +//! a keyed bag of [`DialogState`] values, one per [`UserId`]. All +//! dialog lifecycle operations (start, advance, pause/resume, expire, +//! timeout-policy enforcement) live here. + +use super::encoding::{ + INTERNAL_SESSION_KEY, INTERNAL_TIMEOUT_POLICY_KEY, duration_to_millis, generate_session_id, + now_unix_ms, +}; +use super::flow::DialogFlow; +use super::state::DialogState; +use super::status::{DialogStatus, DialogTimeoutPolicy}; +use crate::bot::storage::StateStore; +use crate::types::UserId; +use std::time::Duration; + +/// Mutating façade that stores dialog state per-user in a +/// [`StateStore`]. +pub struct DialogMachine<'a> { + store: &'a mut dyn StateStore, + prefix: String, +} + +impl<'a> DialogMachine<'a> { + /// Creates a machine that stores entries under the default + /// `bot:dialog` key prefix. + pub fn new(store: &'a mut dyn StateStore) -> Self { + Self { + store, + prefix: "bot:dialog".to_owned(), + } + } + + /// Creates a machine with a custom key prefix. Useful when + /// multiple independent dialog machines share the same store. + #[must_use] + pub fn with_prefix(store: &'a mut dyn StateStore, prefix: impl Into) -> Self { + Self { + store, + prefix: prefix.into(), + } + } + + fn key(&self, source_id: UserId) -> String { + format!("{}:{}", self.prefix, source_id.raw()) + } + + /// Starts a fresh dialog for `source_id` at the given step. + pub fn start(&mut self, source_id: UserId, dialog: impl Into, step: impl Into) { + self.start_state(source_id, DialogState::new(dialog, step)); + } + + /// Starts a dialog at the given pre-built [`DialogState`]. + pub fn start_state(&mut self, source_id: UserId, state: DialogState) { + self.store + .set(self.key(source_id), Self::prepare_state(state).encode()); + } + + /// Returns the stored state for `source_id` without applying any + /// expiration or timeout-policy side effects. + #[allow(clippy::must_use_candidate)] + pub fn current(&self, source_id: UserId) -> Option { + self.store + .get(&self.key(source_id)) + .and_then(|raw| DialogState::decode(&raw)) + } + + /// Returns the stored state for `source_id`, applying the + /// [`DialogTimeoutPolicy`] if its deadline has expired. + pub fn current_live(&mut self, source_id: UserId) -> Option { + let mut state = self.current(source_id)?; + if state.is_expired() { + match state.timeout_policy() { + DialogTimeoutPolicy::Clear => { + let _ = self.stop(source_id); + return None; + } + DialogTimeoutPolicy::Pause => { + state.deadline_unix_ms = None; + state.status = DialogStatus::Paused; + self.store.set(self.key(source_id), state.encode()); + } + } + } + Some(state) + } + + /// Same as [`Self::current_live`] but only returns `Some` when the + /// resulting state is [`DialogStatus::Active`]. + pub fn current_active(&mut self, source_id: UserId) -> Option { + let state = self.current_live(source_id)?; + state.is_active().then_some(state) + } + + /// Returns `true` when the current active state has the given + /// dialog/step pair. + #[must_use] + pub fn is_in(&mut self, source_id: UserId, dialog: &str, step: &str) -> bool { + self.current_active(source_id) + .is_some_and(|state| state.dialog == dialog && state.step == step) + } + + /// Advances to the next step, marking the state as + /// [`DialogStatus::Active`]. + pub fn advance( + &mut self, + source_id: UserId, + next_step: impl Into, + ) -> Option { + let mut state = self.current_live(source_id)?; + state.step = next_step.into(); + state.status = DialogStatus::Active; + self.store.set(self.key(source_id), state.encode()); + Some(state) + } + + /// Marks the state as [`DialogStatus::Paused`]. + pub fn pause(&mut self, source_id: UserId) -> Option { + self.update(source_id, |state| state.status = DialogStatus::Paused) + } + + /// Marks the state as [`DialogStatus::Active`]. + pub fn resume(&mut self, source_id: UserId) -> Option { + self.update(source_id, |state| state.status = DialogStatus::Active) + } + + /// Sets a new relative deadline for the stored dialog. + pub fn set_timeout(&mut self, source_id: UserId, timeout: Duration) -> Option { + self.update(source_id, |state| { + state.deadline_unix_ms = + Some(now_unix_ms().saturating_add(duration_to_millis(timeout))); + }) + } + + /// Clears any deadline on the stored dialog. + pub fn clear_timeout(&mut self, source_id: UserId) -> Option { + self.update(source_id, |state| state.deadline_unix_ms = None) + } + + /// Sets the [`DialogTimeoutPolicy`] on the stored dialog. + pub fn set_timeout_policy( + &mut self, + source_id: UserId, + policy: DialogTimeoutPolicy, + ) -> Option { + self.update(source_id, |state| { + state.set_metadata(INTERNAL_TIMEOUT_POLICY_KEY, policy.encode()); + }) + } + + /// Returns the current [`DialogTimeoutPolicy`] for `source_id`. + pub fn timeout_policy(&mut self, source_id: UserId) -> Option { + self.current(source_id).map(|state| state.timeout_policy()) + } + + /// Returns the metadata value at `key` for the current live state. + pub fn metadata(&mut self, source_id: UserId, key: &str) -> Option { + self.current_live(source_id)? + .metadata(key) + .map(ToOwned::to_owned) + } + + /// Sets metadata on the stored dialog. + pub fn set_metadata( + &mut self, + source_id: UserId, + key: impl Into, + value: impl Into, + ) -> Option { + let key = key.into(); + let value = value.into(); + self.update(source_id, move |state| { + state.set_metadata(key.clone(), value.clone()); + }) + } + + /// Removes metadata at `key` from the stored dialog. + pub fn remove_metadata( + &mut self, + source_id: UserId, + key: &str, + ) -> Option<(DialogState, Option)> { + let mut removed = None; + let state = self.update(source_id, |state| { + removed = state.remove_metadata(key); + })?; + Some((state, removed)) + } + + /// Removes the dialog for `source_id` and returns its last state. + pub fn stop(&mut self, source_id: UserId) -> Option { + self.store + .remove(&self.key(source_id)) + .and_then(|raw| DialogState::decode(&raw)) + } + + /// Resets the dialog to `flow.start_step()`, returning the new + /// stored state. + #[must_use] + pub fn restart_flow(&mut self, source_id: UserId, flow: &DialogFlow) -> DialogState { + let state = DialogState::new(flow.name(), flow.start_step()); + self.start_state(source_id, state.clone()); + self.current(source_id).unwrap_or(state) + } + + /// Advances the dialog to the next step of `flow`, or returns + /// `None` if the current dialog name does not match or the step + /// has no successor. + pub fn advance_flow(&mut self, source_id: UserId, flow: &DialogFlow) -> Option { + let current = self.current_live(source_id)?; + if !current.dialog.eq_ignore_ascii_case(flow.name()) { + return None; + } + let next = flow.next_step(¤t.step)?; + self.advance(source_id, next) + } + + fn update(&mut self, source_id: UserId, mut update: F) -> Option + where + F: FnMut(&mut DialogState), + { + let mut state = self.current_live(source_id)?; + update(&mut state); + self.store.set(self.key(source_id), state.encode()); + Some(state) + } + + fn prepare_state(mut state: DialogState) -> DialogState { + if state.session_id().is_none() { + state.set_metadata(INTERNAL_SESSION_KEY, generate_session_id()); + } + state + } +} diff --git a/crates/teamtalk/src/bot/fsm/mod.rs b/crates/teamtalk/src/bot/fsm/mod.rs new file mode 100644 index 0000000..cde6da0 --- /dev/null +++ b/crates/teamtalk/src/bot/fsm/mod.rs @@ -0,0 +1,35 @@ +//! Finite-state-machine layer for per-user bot dialogs. +//! +//! Public surface: +//! +//! * [`DialogStatus`] / [`DialogTimeoutPolicy`] - lifecycle enums. +//! * [`DialogState`] - per-user persistent state record with +//! netstring-based encoding. +//! * [`DialogFlow`] - declarative linear step sequence. +//! * [`DialogMachine`] - mutating façade over a [`crate::bot::StateStore`]. +//! +//! The module is split so that each concern lives in its own file: +//! +//! * `status.rs` - [`DialogStatus`] and [`DialogTimeoutPolicy`] enums +//! with their wire encode/decode helpers. +//! * `state.rs` - [`DialogState`] + [`Self::encode`][1] / +//! [`Self::decode`][2]. +//! * `flow.rs` - [`DialogFlow`] (self-contained). +//! * `machine.rs` - [`DialogMachine`] lifecycle/timeout/metadata +//! operations. +//! * `encoding.rs` - shared constants, netstring codec, and time +//! helpers used by `state.rs` and `machine.rs`. +//! +//! [1]: DialogState::encode +//! [2]: DialogState::decode + +mod encoding; +mod flow; +mod machine; +mod state; +mod status; + +pub use flow::DialogFlow; +pub use machine::DialogMachine; +pub use state::DialogState; +pub use status::{DialogStatus, DialogTimeoutPolicy}; diff --git a/crates/teamtalk/src/bot/fsm/state.rs b/crates/teamtalk/src/bot/fsm/state.rs new file mode 100644 index 0000000..b22b36a --- /dev/null +++ b/crates/teamtalk/src/bot/fsm/state.rs @@ -0,0 +1,249 @@ +//! Per-user dialog state record plus its netstring wire encoding. + +use super::encoding::{ + DIALOG_ENCODING_VERSION, INTERNAL_SESSION_KEY, INTERNAL_TIMEOUT_POLICY_KEY, duration_to_millis, + netstring, now_unix_ms, parse_netstring, +}; +use super::status::{DialogStatus, DialogTimeoutPolicy}; +use std::time::Duration; + +/// Persistent dialog state for a single user. +/// +/// Stored as the encoded value in a [`crate::bot::StateStore`] keyed +/// by `:`. The encoding is a sequence of netstrings +/// so it round-trips cleanly through any string-based backend and can +/// be extended with additional metadata without breaking older +/// readers (see [`DIALOG_ENCODING_VERSION`]). +#[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub struct DialogState { + /// Dialog name (first axis of the state). + pub dialog: String, + /// Current step within the dialog. + pub step: String, + /// Current lifecycle status. + pub status: DialogStatus, + /// Optional absolute deadline in milliseconds since the Unix epoch. + pub deadline_unix_ms: Option, + /// Arbitrary user metadata plus a few internal keys (session id, + /// timeout policy). Kept as `Vec<(String, String)>` rather than a + /// map to preserve insertion order across encode/decode. + pub metadata: Vec<(String, String)>, +} + +impl DialogState { + /// Builds a fresh [`DialogStatus::Active`] state with no deadline + /// and no metadata. + pub fn new(dialog: impl Into, step: impl Into) -> Self { + Self { + dialog: dialog.into(), + step: step.into(), + status: DialogStatus::Active, + deadline_unix_ms: None, + metadata: Vec::new(), + } + } + + /// Sets an absolute deadline in milliseconds since the Unix epoch. + #[must_use] + pub fn with_deadline_unix_ms(mut self, deadline_unix_ms: u64) -> Self { + self.deadline_unix_ms = Some(deadline_unix_ms); + self + } + + /// Sets a relative deadline by adding `timeout` to the current + /// wall-clock time. + #[must_use] + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.deadline_unix_ms = Some(now_unix_ms().saturating_add(duration_to_millis(timeout))); + self + } + + /// Sets the initial [`DialogStatus`]. + #[must_use] + pub fn with_status(mut self, status: DialogStatus) -> Self { + self.status = status; + self + } + + /// Sets the [`DialogTimeoutPolicy`] on the state. + #[must_use] + pub fn with_timeout_policy(mut self, policy: DialogTimeoutPolicy) -> Self { + self.set_metadata(INTERNAL_TIMEOUT_POLICY_KEY, policy.encode()); + self + } + + /// Replaces the metadata vector with the given `(key, value)` + /// pairs. + #[must_use] + pub fn with_metadata( + mut self, + metadata: impl IntoIterator, impl Into)>, + ) -> Self { + self.metadata = metadata + .into_iter() + .map(|(key, value)| (key.into(), value.into())) + .collect(); + self + } + + /// Returns `true` if the state is [`DialogStatus::Active`]. + #[must_use] + pub fn is_active(&self) -> bool { + matches!(self.status, DialogStatus::Active) + } + + /// Returns `true` if the state is [`DialogStatus::Paused`]. + #[must_use] + pub fn is_paused(&self) -> bool { + matches!(self.status, DialogStatus::Paused) + } + + /// Returns `true` if the state has a deadline that is in the past + /// at the current wall-clock time. + #[must_use] + pub fn is_expired(&self) -> bool { + self.is_expired_at(now_unix_ms()) + } + + /// Returns `true` if the state has a deadline that is less than or + /// equal to `now_unix_ms`. + #[must_use] + pub fn is_expired_at(&self, now_unix_ms: u64) -> bool { + self.deadline_unix_ms + .is_some_and(|deadline| deadline <= now_unix_ms) + } + + /// Returns the metadata value stored at `key`, if any. + #[allow(clippy::must_use_candidate)] + pub fn metadata(&self, key: &str) -> Option<&str> { + self.metadata + .iter() + .find_map(|(existing, value)| (existing == key).then_some(value.as_str())) + } + + /// Returns the internal per-state session id if set. + #[allow(clippy::must_use_candidate)] + pub fn session_id(&self) -> Option<&str> { + self.metadata(INTERNAL_SESSION_KEY) + } + + /// Returns the effective [`DialogTimeoutPolicy`] + /// (defaulting to [`DialogTimeoutPolicy::Clear`]). + #[must_use] + pub fn timeout_policy(&self) -> DialogTimeoutPolicy { + self.metadata(INTERNAL_TIMEOUT_POLICY_KEY) + .and_then(DialogTimeoutPolicy::decode) + .unwrap_or(DialogTimeoutPolicy::Clear) + } + + /// Inserts or overwrites the metadata value at `key`. + pub fn set_metadata(&mut self, key: impl Into, value: impl Into) { + let key = key.into(); + let value = value.into(); + if let Some((_, existing)) = self + .metadata + .iter_mut() + .find(|(existing, _)| *existing == key) + { + *existing = value; + return; + } + self.metadata.push((key, value)); + } + + /// Removes the metadata entry at `key`, returning its value. + pub fn remove_metadata(&mut self, key: &str) -> Option { + let index = self + .metadata + .iter() + .position(|(existing, _)| existing == key)?; + Some(self.metadata.remove(index).1) + } + + /// Encodes the state as a sequence of netstrings suitable for + /// storing in a [`crate::bot::StateStore`]. + #[must_use] + pub fn encode(&self) -> String { + let mut fields = Vec::with_capacity(6 + self.metadata.len() * 2); + fields.push(netstring(DIALOG_ENCODING_VERSION)); + fields.push(netstring(self.status.encode())); + fields.push(netstring( + &self + .deadline_unix_ms + .map(|value| value.to_string()) + .unwrap_or_default(), + )); + fields.push(netstring(&self.dialog)); + fields.push(netstring(&self.step)); + fields.push(netstring(&self.metadata.len().to_string())); + for (key, value) in &self.metadata { + fields.push(netstring(key)); + fields.push(netstring(value)); + } + fields.concat() + } + + /// Decodes a state previously produced by [`Self::encode`], with + /// a best-effort fallback to the legacy `dialog|step` pipe + /// encoding used before [`DIALOG_ENCODING_VERSION`]. + #[allow(clippy::must_use_candidate)] + pub fn decode(raw: &str) -> Option { + if !raw.contains(':') { + let (dialog, step) = raw.split_once('|')?; + if dialog.is_empty() || step.is_empty() { + return None; + } + return Some(Self::new(dialog, step)); + } + + let mut remainder = raw; + let Some(version) = parse_netstring(&mut remainder) else { + let (dialog, step) = raw.split_once('|')?; + if dialog.is_empty() || step.is_empty() { + return None; + } + return Some(Self::new(dialog, step)); + }; + if version != DIALOG_ENCODING_VERSION { + let (dialog, step) = raw.split_once('|')?; + if dialog.is_empty() || step.is_empty() { + return None; + } + return Some(Self::new(dialog, step)); + } + + let status = DialogStatus::decode(parse_netstring(&mut remainder)?)?; + let deadline_unix_ms = { + let value = parse_netstring(&mut remainder)?; + if value.is_empty() { + None + } else { + Some(value.parse::().ok()?) + } + }; + let dialog = parse_netstring(&mut remainder)?.to_owned(); + let step = parse_netstring(&mut remainder)?.to_owned(); + if dialog.is_empty() || step.is_empty() { + return None; + } + let metadata_len = parse_netstring(&mut remainder)?.parse::().ok()?; + let mut metadata = Vec::with_capacity(metadata_len); + for _ in 0..metadata_len { + let key = parse_netstring(&mut remainder)?.to_owned(); + let value = parse_netstring(&mut remainder)?.to_owned(); + metadata.push((key, value)); + } + if !remainder.is_empty() { + return None; + } + + Some(Self { + dialog, + step, + status, + deadline_unix_ms, + metadata, + }) + } +} diff --git a/crates/teamtalk/src/bot/fsm/status.rs b/crates/teamtalk/src/bot/fsm/status.rs new file mode 100644 index 0000000..3fb4e6d --- /dev/null +++ b/crates/teamtalk/src/bot/fsm/status.rs @@ -0,0 +1,56 @@ +//! Dialog status and timeout-policy enums plus their wire encodings. + +/// Current lifecycle of a [`super::state::DialogState`]. +#[non_exhaustive] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DialogStatus { + /// Dialog is active and advancing on each matching event. + Active, + /// Dialog is paused and ignores events until explicitly resumed. + Paused, +} + +/// What [`super::machine::DialogMachine::current_live`] should do when +/// a dialog's deadline has elapsed. +#[non_exhaustive] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DialogTimeoutPolicy { + /// Drop the state entirely (default). + Clear, + /// Move to [`DialogStatus::Paused`] and clear the deadline. + Pause, +} + +impl DialogTimeoutPolicy { + pub(super) fn encode(self) -> &'static str { + match self { + Self::Clear => "clear", + Self::Pause => "pause", + } + } + + pub(super) fn decode(raw: &str) -> Option { + match raw { + "clear" => Some(Self::Clear), + "pause" => Some(Self::Pause), + _ => None, + } + } +} + +impl DialogStatus { + pub(super) fn encode(self) -> &'static str { + match self { + Self::Active => "active", + Self::Paused => "paused", + } + } + + pub(super) fn decode(raw: &str) -> Option { + match raw { + "active" => Some(Self::Active), + "paused" => Some(Self::Paused), + _ => None, + } + } +}