diff --git a/README.md b/README.md index eed99ab..9af48c1 100644 --- a/README.md +++ b/README.md @@ -760,7 +760,7 @@ transports; only the constructor line changes.** ```rust use std::sync::Arc; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; // Built once — handlers are transport-agnostic. let service = Arc::new(build_service()); @@ -805,7 +805,7 @@ carries a `FailurePolicy` controlling what happens to a **permanent** handler failure — `Retry`, `DeadLetter`, `Park`, `LogAndAck`, or `Stop`: ```rust -use sourced_rust::microsvc::transport::{FailurePolicy, RunOptions}; +use sourced_rust::bus::{FailurePolicy, RunOptions}; bus.listen( service.clone(), diff --git a/docs/async-transports.md b/docs/async-transports.md index 4631f8a..a0447d5 100644 --- a/docs/async-transports.md +++ b/docs/async-transports.md @@ -2,14 +2,14 @@ Distributed (published from the `sourced_rust` crate) keeps the synchronous in-memory bus intact and adds an async transport layer under -`microsvc::transport`. The design line is: +`bus`. The design line is: - **`microsvc`** owns handler registration, guards, typed input decoding, dispatch, and handler metadata; - **transport adapters** own how messages are received, acknowledged, retried, published, and mapped to external topics/subjects/queues/routes. -The shared vocabulary lives in `microsvc::transport` and depends on no concrete +The shared vocabulary lives in `bus` and depends on no concrete broker. The same application code runs over any transport — selecting one is an adapter/wiring change, not a handler change. @@ -43,7 +43,7 @@ Direct transports implement `AsyncMessageSource` (pull a message) and `Service::dispatch_message` and settling only after the handler completes: ```rust,ignore -use sourced_rust::microsvc::transport::{run_source, RunOptions}; +use sourced_rust::bus::{run_source, RunOptions}; run_source(service, source, RunOptions::idempotent()).await?; ``` @@ -108,7 +108,7 @@ The app surface is identical across transports; only the constructor changes: ```rust use std::sync::Arc; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; // Built once — handlers are transport-agnostic. let service = Arc::new(build_service()); diff --git a/src/microsvc/transport/bus.rs b/src/bus/bus.rs similarity index 79% rename from src/microsvc/transport/bus.rs rename to src/bus/bus.rs index ffe9b1a..cc1ddfd 100644 --- a/src/microsvc/transport/bus.rs +++ b/src/bus/bus.rs @@ -20,8 +20,7 @@ use std::future::Future; use std::sync::Arc; -use super::{Message, RunOptions, TransportError}; -use crate::microsvc::Service; +use super::{Message, MessageRouter, RunOptions, TransportError}; /// Produce side of the bus — uniform across every transport. pub trait Bus: Send + Sync { @@ -54,26 +53,25 @@ pub trait Bus: Send + Sync { /// Consume side of the bus — pull transports that run a [`run_source`] loop. /// -/// `listen`/`subscribe` derive the message names from the service's registered -/// handlers ([`Service::command_names`]/[`Service::event_names`]), build the -/// transport's source with the matching topology, and run it. Both run until the -/// source drains/stops. +/// `listen`/`subscribe` derive the message names from the router's registered +/// handlers ([`MessageRouter::subscription_plan`]), build the transport's source +/// with the matching topology, and run it. Both run until the source drains/stops. /// /// [`run_source`]: super::run_source pub trait BusConsumer: Send + Sync { - /// Run `service` as a command listener: consume its command names with + /// Run `router` as a command listener: consume its command names with /// competing-consumer (point-to-point) semantics. - fn listen( + fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> impl Future> + Send; - /// Run `service` as an event subscriber: consume its event names with + /// Run `router` as an event subscriber: consume its event names with /// fan-out semantics. - fn subscribe( + fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> impl Future> + Send; } diff --git a/src/microsvc/transport/capabilities.rs b/src/bus/capabilities.rs similarity index 100% rename from src/microsvc/transport/capabilities.rs rename to src/bus/capabilities.rs diff --git a/src/microsvc/transport/error.rs b/src/bus/error.rs similarity index 54% rename from src/microsvc/transport/error.rs rename to src/bus/error.rs index ca6a9eb..4e509b2 100644 --- a/src/microsvc/transport/error.rs +++ b/src/bus/error.rs @@ -9,8 +9,6 @@ use std::error::Error; use std::fmt; -use crate::microsvc::HandlerError; - /// Whether a [`TransportError`] should be retried. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub enum TransportErrorKind { @@ -103,40 +101,6 @@ impl TransportError { pub fn message(&self) -> &str { &self.message } - - /// Classify a [`HandlerError`] for transport retry purposes. - /// - /// Transient failures are retryable: repository errors, not-found results, - /// and otherwise-unclassified errors. Not-found is transient in an - /// at-least-once event-driven system because an out-of-order delivery can - /// reference an aggregate or projection that has not been created yet, and a - /// later redelivery resolves the race. (This also keeps a handler-raised - /// [`HandlerError::NotFound`] consistent with a repository-raised - /// [`RepositoryError::NotFound`](crate::RepositoryError), which arrives - /// wrapped in `Repository(_)`.) - /// - /// Deterministic failures — unknown routing, payload decode failures, - /// business rejections, authorization, and guard rejections — are permanent - /// because redelivering the identical message cannot change the outcome. An - /// entity that is genuinely missing forever still drains to the failure - /// policy once the transport's retry ceiling is reached, so defaulting - /// not-found to retryable does not risk an unbounded loop. - /// - /// This is a default. Handlers that know better can return a classification - /// directly, and the runner's [`FailurePolicy`](super::FailurePolicy) can - /// override what happens to a permanent error. - pub fn classify_handler_error(error: &HandlerError) -> TransportErrorKind { - match error { - HandlerError::Repository(_) | HandlerError::NotFound(_) | HandlerError::Other(_) => { - TransportErrorKind::Retryable - } - HandlerError::UnknownCommand(_) - | HandlerError::DecodeFailed(_) - | HandlerError::Rejected(_) - | HandlerError::Unauthorized(_) - | HandlerError::GuardRejected(_) => TransportErrorKind::Permanent, - } - } } impl fmt::Display for TransportError { @@ -157,30 +121,6 @@ impl Error for TransportError { } } -impl From for TransportError { - fn from(error: HandlerError) -> Self { - let kind = Self::classify_handler_error(&error); - Self { - kind, - message: error.to_string(), - source: Some(Box::new(error)), - } - } -} - -impl From for TransportError { - /// Repository/store failures (lock contention, storage hiccups, stale-claim - /// conflicts) are treated as retryable: they are usually transient, and a - /// later redelivery or re-claim resolves them. - fn from(error: crate::RepositoryError) -> Self { - Self { - kind: TransportErrorKind::Retryable, - message: error.to_string(), - source: Some(Box::new(error)), - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -217,52 +157,4 @@ mod tests { assert!(err.source().is_some()); assert_eq!(err.message(), "publish timed out"); } - - #[test] - fn transient_handler_errors_are_retryable() { - use crate::repository::RepositoryError; - - for error in [ - HandlerError::Repository(RepositoryError::NotFound { id: "agg-1".into() }), - // A handler-raised not-found is transient too: it is usually an - // out-of-order delivery race, and stays consistent with the - // repository-raised not-found above. - HandlerError::NotFound("agg-1".into()), - HandlerError::Other("boom".into()), - ] { - assert_eq!( - TransportError::classify_handler_error(&error), - TransportErrorKind::Retryable, - "{error} should be retryable" - ); - } - } - - #[test] - fn deterministic_handler_errors_are_permanent() { - for error in [ - HandlerError::UnknownCommand("x".into()), - HandlerError::DecodeFailed("x".into()), - HandlerError::Rejected("x".into()), - HandlerError::Unauthorized("x".into()), - HandlerError::GuardRejected("x".into()), - ] { - assert_eq!( - TransportError::classify_handler_error(&error), - TransportErrorKind::Permanent, - "{error} should be permanent" - ); - } - } - - #[test] - fn from_handler_error_preserves_classification_and_source() { - let err: TransportError = HandlerError::Rejected("invalid".into()).into(); - assert!(err.is_permanent()); - assert!(err.source().is_some()); - - let err: TransportError = - HandlerError::Other(Box::::from("infra")).into(); - assert!(err.is_retryable()); - } } diff --git a/src/microsvc/transport/failure_policy.rs b/src/bus/failure_policy.rs similarity index 100% rename from src/microsvc/transport/failure_policy.rs rename to src/bus/failure_policy.rs diff --git a/src/bus/handlers.rs b/src/bus/handlers.rs new file mode 100644 index 0000000..62e516e --- /dev/null +++ b/src/bus/handlers.rs @@ -0,0 +1,223 @@ +//! [`Handlers`] — a dependency-free inline consumer registry. +//! +//! The standalone, `Service`-free way to consume the bus: register a closure per +//! `(kind, name)` and run it with `bus.listen`/`bus.subscribe`. It is the second +//! implementation of [`MessageRouter`] (the first being `microsvc::Service`), +//! and the analog of the Node `bus.listen('seat.reserved', handler)` ergonomic. +//! +//! ```ignore +//! let handlers = Handlers::new() +//! .on_event("seat.reserved", |msg: &Message| async move { /* ... */ Ok(()) }) +//! .on_command("place.bet", |msg: &Message| async move { Ok(()) }); +//! bus.subscribe(Arc::new(handlers), RunOptions::idempotent()).await?; +//! ``` +//! +//! Deliberately minimal: handlers take `&Message` (no `Context`, dependencies, +//! guards, or sessions) and the registry is idempotent-only. The rich path — +//! typed input, guards, `Session`, dependencies, the inbox hook — stays on +//! `microsvc::Service`. A handler returns `Ok(())` to ack, `Err` to nack +//! (the runner classifies retryable vs permanent via the [`TransportError`]). +//! +//! The win over `Service<()>` is dropping `D`/`Context` and not depending on +//! `microsvc` at all — a `Service<()>` facade is impossible here, because the bus +//! cannot depend up into `microsvc` where `Service` lives. + +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use super::{Message, MessageKind, SubscriptionPlan}; +use super::{MessageRouter, TransportError}; + +type HandlerFuture<'a> = Pin> + Send + 'a>>; +type HandlerFn = dyn for<'a> Fn(&'a Message) -> HandlerFuture<'a> + Send + Sync; + +/// Lets an `async fn(msg: &Message) -> Result<(), TransportError>` register +/// directly as a handler. The higher-ranked bound ties the returned future's +/// lifetime to the borrowed [`Message`], which a plain generic future parameter +/// cannot express (the same shape `microsvc::Service` uses for `Context`). +pub trait AsyncMessageHandler<'a>: Send + Sync { + /// The future returned by the handler for a message borrowed for `'a`. + type Future: Future> + Send + 'a; + /// Run the handler against the borrowed message. + fn call(&self, message: &'a Message) -> Self::Future; +} + +impl<'a, F, Fut> AsyncMessageHandler<'a> for F +where + F: Fn(&'a Message) -> Fut + Send + Sync, + Fut: Future> + Send + 'a, +{ + type Future = Fut; + fn call(&self, message: &'a Message) -> Fut { + self(message) + } +} + +fn boxed_handler(handler: F) -> Arc +where + F: for<'a> AsyncMessageHandler<'a> + 'static, +{ + Arc::new(move |message| Box::pin(handler.call(message)) as HandlerFuture<'_>) +} + +/// A dependency-free [`MessageRouter`]: a set of `(kind, name)` → async closure +/// bindings. Build it fluently with [`on_command`](Handlers::on_command) / +/// [`on_event`](Handlers::on_event), then run it with `bus.listen`/`bus.subscribe`. +#[derive(Clone, Default)] +pub struct Handlers { + handlers: HashMap<(MessageKind, String), Arc>, +} + +impl Handlers { + /// An empty registry. + pub fn new() -> Self { + Self::default() + } + + /// Register a command handler (point-to-point / competing-consumer via `listen`). + pub fn on_command(self, name: impl Into, handler: F) -> Self + where + F: for<'a> AsyncMessageHandler<'a> + 'static, + { + self.with(MessageKind::Command, name, handler) + } + + /// Register an event handler (fan-out via `subscribe`). + pub fn on_event(self, name: impl Into, handler: F) -> Self + where + F: for<'a> AsyncMessageHandler<'a> + 'static, + { + self.with(MessageKind::Event, name, handler) + } + + fn with(mut self, kind: MessageKind, name: impl Into, handler: F) -> Self + where + F: for<'a> AsyncMessageHandler<'a> + 'static, + { + self.handlers + .insert((kind, name.into()), boxed_handler(handler)); + self + } +} + +impl MessageRouter for Handlers { + fn handles(&self, kind: MessageKind, name: &str) -> bool { + self.handlers.contains_key(&(kind, name.to_string())) + } + + fn subscription_plan(&self) -> SubscriptionPlan { + let mut plan = SubscriptionPlan::default(); + for (kind, name) in self.handlers.keys() { + let bucket = match kind { + MessageKind::Command => &mut plan.commands, + MessageKind::Event => &mut plan.events, + }; + if !bucket.iter().any(|existing| existing == name) { + bucket.push(name.clone()); + } + } + plan + } + + async fn dispatch(&self, message: &Message) -> Result<(), TransportError> { + // Clone the handler Arc so the map is not borrowed across the awaited + // future (mirrors `Service::invoke`). + let handler = self + .handlers + .get(&(message.kind, message.name().to_string())) + .cloned(); + match handler { + Some(handler) => handler(message).await, + // No binding: the runner already filters via `handles`, so this is a + // benign no-op for any message that slips through. + None => Ok(()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; + use std::future::Future; + use std::sync::atomic::{AtomicUsize, Ordering}; + + fn block_on(future: F) -> F::Output { + use std::ptr; + use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + const VTABLE: RawWakerVTable = RawWakerVTable::new( + |_| RawWaker::new(ptr::null(), &VTABLE), + |_| {}, + |_| {}, + |_| {}, + ); + let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &VTABLE)) }; + let mut cx = Context::from_waker(&waker); + let mut future = std::pin::pin!(future); + loop { + if let Poll::Ready(output) = future.as_mut().poll(&mut cx) { + return output; + } + } + } + + #[test] + fn subscription_plan_groups_by_kind() { + let handlers = Handlers::new() + .on_command("place.bet", |_: &Message| async { Ok(()) }) + .on_event("seat.reserved", |_: &Message| async { Ok(()) }); + let plan = handlers.subscription_plan(); + assert_eq!(plan.commands, vec!["place.bet".to_string()]); + assert_eq!(plan.events, vec!["seat.reserved".to_string()]); + assert!(handlers.handles(MessageKind::Command, "place.bet")); + assert!(handlers.handles(MessageKind::Event, "seat.reserved")); + assert!(!handlers.handles(MessageKind::Event, "place.bet")); + } + + #[test] + fn fan_out_round_trip_without_service() { + // Full InMemoryBus publish -> subscribe round trip using only Handlers — + // no Service, no microsvc handler machinery. + let bus = InMemoryBus::new(); + for _ in 0..3 { + block_on(bus.publish("seat.reserved", b"{}".to_vec())).unwrap(); + } + let count = Arc::new(AtomicUsize::new(0)); + let seen = count.clone(); + let handlers = Arc::new( + Handlers::new().on_event("seat.reserved", move |msg: &Message| { + let seen = seen.clone(); + // Touch the message to prove the borrow is usable in the handler. + let is_event = matches!(msg.kind, MessageKind::Event); + async move { + if is_event { + seen.fetch_add(1, Ordering::SeqCst); + } + Ok(()) + } + }), + ); + block_on(bus.subscribe(handlers, RunOptions::idempotent())).unwrap(); + assert_eq!(count.load(Ordering::SeqCst), 3); + } + + #[test] + fn point_to_point_round_trip_without_service() { + let bus = InMemoryBus::new(); + block_on(bus.send("place.bet", b"{}".to_vec())).unwrap(); + block_on(bus.send("place.bet", b"{}".to_vec())).unwrap(); + let count = Arc::new(AtomicUsize::new(0)); + let seen = count.clone(); + let handlers = Arc::new(Handlers::new().on_command("place.bet", move |_: &Message| { + let seen = seen.clone(); + async move { + seen.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + })); + block_on(bus.listen(handlers, RunOptions::idempotent())).unwrap(); + assert_eq!(count.load(Ordering::SeqCst), 2); + } +} diff --git a/src/microsvc/transport/in_memory_bus.rs b/src/bus/in_memory_bus.rs similarity index 82% rename from src/microsvc/transport/in_memory_bus.rs rename to src/bus/in_memory_bus.rs index 464f187..d47c3d9 100644 --- a/src/microsvc/transport/in_memory_bus.rs +++ b/src/bus/in_memory_bus.rs @@ -14,8 +14,8 @@ use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use super::source::{AsyncMessageSource, ReceivedMessage}; -use super::{run_source, Bus, BusConsumer, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind, Service}; +use super::{run_source, Bus, BusConsumer, MessageRouter, RunOptions, TransportError}; +use super::{Message, MessageKind}; type Queues = Arc>>>; type Topics = Arc>>>; @@ -79,39 +79,31 @@ impl Bus for InMemoryBus { } impl BusConsumer for InMemoryBus { - async fn listen( + async fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { - let names = service - .command_names() - .iter() - .map(|n| n.to_string()) - .collect(); + let names = router.subscription_plan().commands; let source = QueueSource { queues: self.queues.clone(), names, }; - run_source(service, source, options).await + run_source(router, source, options).await } - async fn subscribe( + async fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { - let names = service - .event_names() - .iter() - .map(|n| n.to_string()) - .collect(); + let names = router.subscription_plan().events; let source = TopicSource { topics: self.topics.clone(), names, cursors: HashMap::new(), }; - run_source(service, source, options).await + run_source(router, source, options).await } } @@ -184,8 +176,7 @@ impl ReceivedMessage for InMemoryReceived { #[cfg(test)] mod tests { use super::*; - use crate::microsvc::HandlerError; - use serde_json::json; + use crate::bus::Handlers; use std::future::Future; fn block_on(future: F) -> F::Output { @@ -211,32 +202,26 @@ mod tests { Arc::new(Mutex::new(Vec::new())) } - fn command_service(rec: Arc>>) -> Arc> { - Arc::new(Service::new(()).command("work").handle( - move |ctx: &crate::microsvc::Context<()>| { - let rec = rec.clone(); - let name = ctx.message().name().to_string(); - async move { - rec.lock().unwrap().push(name); - Ok(json!({})) - } - }, - )) + fn command_service(rec: Arc>>) -> Arc { + Arc::new(Handlers::new().on_command("work", move |msg: &Message| { + let rec = rec.clone(); + let name = msg.name().to_string(); + async move { + rec.lock().unwrap().push(name); + Ok(()) + } + })) } - fn event_service(rec: Arc>>) -> Arc> { - Arc::new( - Service::new(()) - .event("evt") - .handle(move |ctx: &crate::microsvc::Context<()>| { - let rec = rec.clone(); - let id = ctx.message().id().unwrap_or("?").to_string(); - async move { - rec.lock().unwrap().push(id); - Ok(json!({})) - } - }), - ) + fn event_service(rec: Arc>>) -> Arc { + Arc::new(Handlers::new().on_event("evt", move |msg: &Message| { + let rec = rec.clone(); + let id = msg.id().unwrap_or("?").to_string(); + async move { + rec.lock().unwrap().push(id); + Ok(()) + } + })) } #[test] @@ -330,14 +315,13 @@ mod tests { fn handler_error_does_not_panic_the_loop() { let bus = InMemoryBus::new(); block_on(bus.send("work", b"{}".to_vec())).unwrap(); - let service: Arc> = - Arc::new(Service::new(()).command("work").handle( - |_: &crate::microsvc::Context<()>| async move { - Err(HandlerError::Rejected("no".into())) - }, - )); + let handlers: Arc = Arc::new( + Handlers::new().on_command("work", |_: &Message| async move { + Err(TransportError::permanent("no")) + }), + ); // Default failure policy dead-letters the permanent failure; in-memory // dead_letter is a no-op nack, so the run completes cleanly. - block_on(bus.listen(service, RunOptions::idempotent())).unwrap(); + block_on(bus.listen(handlers, RunOptions::idempotent())).unwrap(); } } diff --git a/src/microsvc/transport/kafka.rs b/src/bus/kafka.rs similarity index 99% rename from src/microsvc/transport/kafka.rs rename to src/bus/kafka.rs index 1c5db8b..345501b 100644 --- a/src/microsvc/transport/kafka.rs +++ b/src/bus/kafka.rs @@ -20,7 +20,7 @@ use rdkafka::{Message as KafkaMessageTrait, Offset, TopicPartitionList}; use super::source::{AsyncMessageSource, ReceivedMessage}; use super::{AsyncMessagePublisher, TransportError}; -use crate::microsvc::{Message, MessageKind}; +use super::{Message, MessageKind}; const MESSAGE_ID_HEADER: &str = "x-sourced-id"; const MESSAGE_KIND_HEADER: &str = "x-sourced-kind"; diff --git a/src/microsvc/transport/kafka_bus.rs b/src/bus/kafka_bus.rs similarity index 86% rename from src/microsvc/transport/kafka_bus.rs rename to src/bus/kafka_bus.rs index ccabb4a..e450fa7 100644 --- a/src/microsvc/transport/kafka_bus.rs +++ b/src/bus/kafka_bus.rs @@ -22,8 +22,10 @@ use std::sync::Arc; use std::time::Duration; use super::kafka::{KafkaPublisher, KafkaSource}; -use super::{run_source, AsyncMessagePublisher, Bus, BusConsumer, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind, Service}; +use super::{ + run_source, AsyncMessagePublisher, Bus, BusConsumer, MessageRouter, RunOptions, TransportError, +}; +use super::{Message, MessageKind}; const DEFAULT_FETCH_TIMEOUT: Duration = Duration::from_secs(8); @@ -71,9 +73,9 @@ impl KafkaBus { format!("{}.evt.", self.namespace) } - async fn run( + async fn run( &self, - service: Arc>, + router: Arc, topics: Vec, group_id: String, strip_prefix: String, @@ -87,7 +89,7 @@ impl KafkaBus { .await? .with_fetch_timeout(self.fetch_timeout) .with_strip_prefix(strip_prefix); - run_source(service, source, options).await + run_source(router, source, options).await } } @@ -115,33 +117,35 @@ impl Bus for KafkaBus { } impl BusConsumer for KafkaBus { - async fn listen( + async fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { let prefix = self.command_prefix(); - let topics: Vec = service - .command_names() + let topics: Vec = router + .subscription_plan() + .commands .iter() .map(|name| format!("{prefix}{name}")) .collect(); let group_id = format!("{}.{}.cmd", self.namespace, self.group); - self.run(service, topics, group_id, prefix, options).await + self.run(router, topics, group_id, prefix, options).await } - async fn subscribe( + async fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { let prefix = self.event_prefix(); - let topics: Vec = service - .event_names() + let topics: Vec = router + .subscription_plan() + .events .iter() .map(|name| format!("{prefix}{name}")) .collect(); let group_id = format!("{}.{}.evt", self.namespace, self.group); - self.run(service, topics, group_id, prefix, options).await + self.run(router, topics, group_id, prefix, options).await } } diff --git a/src/bus/knative.rs b/src/bus/knative.rs new file mode 100644 index 0000000..fe929fc --- /dev/null +++ b/src/bus/knative.rs @@ -0,0 +1,168 @@ +//! Knative manifest helpers: render `Trigger` YAML and sanitize Kubernetes names. +//! +//! The CloudEvents HTTP *ingress* (the consume side) is `Service`-coupled and +//! lives on the microsvc side (`microsvc::cloud_events_router`); this module keeps +//! only the produce/manifest helpers, which need just the bus vocabulary +//! ([`SubscriptionPlan`]). Used by [`KnativeBus`](super::KnativeBus). +//! +//! Requires the `http` feature. + +use std::collections::HashSet; + +use super::SubscriptionPlan; + +/// Sanitize a string into an RFC 1123 DNS label usable as a Kubernetes resource +/// name: lowercase, ASCII-alphanumeric or `-`, no leading/trailing `-`, ≤63 +/// chars. Used for generated `Trigger` names, whose CloudEvent-type segment can +/// contain dots/uppercase/other characters that are invalid in k8s names. +pub(super) fn sanitize_k8s_name(name: &str) -> String { + let mapped: String = name + .chars() + .map(|c| { + let c = c.to_ascii_lowercase(); + if c.is_ascii_alphanumeric() { + c + } else { + '-' + } + }) + .collect(); + let capped: String = mapped.trim_matches('-').chars().take(63).collect(); + let trimmed = capped.trim_end_matches('-'); + if trimmed.is_empty() { + "x".to_string() + } else { + trimmed.to_string() + } +} + +/// Sanitize `raw` into a Kubernetes name that is **unique** within `used`. +/// +/// Distinct CloudEvent types can normalize to the same DNS label (e.g. +/// `order.created` and `order-created`, or a command and an event of the same +/// name) — which would otherwise emit two `Trigger`s with the same +/// `metadata.name`, the second silently clobbering the first on apply. On a +/// collision, append a numeric suffix, truncating the base so the result stays +/// within the 63-char label limit. The Trigger's `type:` filter still carries the +/// raw event name, so routing is unaffected; only the resource name is adjusted. +pub(super) fn unique_k8s_name(raw: &str, used: &mut HashSet) -> String { + let base = sanitize_k8s_name(raw); + if used.insert(base.clone()) { + return base; + } + let mut n = 2usize; + loop { + let suffix = format!("-{n}"); + let head = base.len().min(63usize.saturating_sub(suffix.len())); + let candidate = format!("{}{suffix}", base[..head].trim_end_matches('-')); + if used.insert(candidate.clone()) { + return candidate; + } + n += 1; + } +} + +/// Render Knative `Trigger` YAML for each event a service subscribes to, derived +/// from its [`SubscriptionPlan`]. Each Trigger filters on the CloudEvent `type` +/// and routes to `subscriber_service` on `broker`. +pub fn knative_triggers(plan: &SubscriptionPlan, broker: &str, subscriber_service: &str) -> String { + let mut out = String::new(); + let mut used = HashSet::new(); + for event in &plan.events { + let trigger_name = unique_k8s_name(&format!("{subscriber_service}-{event}"), &mut used); + out.push_str(&format!( + "apiVersion: eventing.knative.dev/v1\n\ + kind: Trigger\n\ + metadata:\n\ + \x20 name: {trigger_name}\n\ + spec:\n\ + \x20 broker: {broker}\n\ + \x20 filter:\n\ + \x20 attributes:\n\ + \x20 type: {event}\n\ + \x20 subscriber:\n\ + \x20 ref:\n\ + \x20 apiVersion: serving.knative.dev/v1\n\ + \x20 kind: Service\n\ + \x20 name: {subscriber_service}\n\ + ---\n" + )); + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn triggers_render_from_subscription_plan() { + let plan = SubscriptionPlan { + commands: vec![], + events: vec!["seat.reserved".to_string()], + }; + let yaml = knative_triggers(&plan, "default", "checkout-projection"); + assert!(yaml.contains("kind: Trigger")); + assert!(yaml.contains("type: seat.reserved")); + assert!(yaml.contains("name: checkout-projection-seat-reserved")); + assert!(yaml.contains("broker: default")); + } + + #[test] + fn sanitize_k8s_name_enforces_rfc1123() { + // Valid names pass through unchanged. + assert_eq!( + sanitize_k8s_name("checkout-projection-seat-reserved"), + "checkout-projection-seat-reserved" + ); + // Dots, uppercase, and other characters become '-' and lowercase. + assert_eq!(sanitize_k8s_name("Order.Created!"), "order-created"); + // No leading/trailing dashes, capped at 63 chars. + let long = "a".repeat(80); + let out = sanitize_k8s_name(&format!(".{long}.")); + assert_eq!(out.len(), 63); + assert!(!out.starts_with('-') && !out.ends_with('-')); + // All-invalid degrades to a safe placeholder. + assert_eq!(sanitize_k8s_name("..."), "x"); + } + + #[test] + fn trigger_name_is_sanitized_for_messy_event_types() { + let plan = SubscriptionPlan { + commands: vec![], + events: vec!["Order.Created".to_string()], + }; + let yaml = knative_triggers(&plan, "default", "checkout-projection"); + // The CloudEvent type filter keeps the raw type; the resource name is sanitized. + assert!(yaml.contains("type: Order.Created")); + assert!(yaml.contains("name: checkout-projection-order-created")); + } + + #[test] + fn trigger_names_are_deduped_when_event_types_normalize_alike() { + // `order.created` and `order-created` both sanitize to the same label; + // the second Trigger must get a distinct name so neither clobbers the + // other on apply, while both keep their raw `type:` filters. + let plan = SubscriptionPlan { + commands: vec![], + events: vec!["order.created".to_string(), "order-created".to_string()], + }; + let yaml = knative_triggers(&plan, "default", "svc"); + assert!(yaml.contains("name: svc-order-created\n")); + assert!(yaml.contains("name: svc-order-created-2\n")); + assert!(yaml.contains("type: order.created")); + assert!(yaml.contains("type: order-created")); + } + + #[test] + fn unique_k8s_name_caps_suffixed_names_at_63_chars() { + let mut used = HashSet::new(); + let long = "a".repeat(80); + let first = unique_k8s_name(&long, &mut used); + let second = unique_k8s_name(&long, &mut used); + assert_eq!(first.len(), 63); + assert_ne!(first, second); + assert!(second.len() <= 63); + assert!(second.ends_with("-2")); + } +} diff --git a/src/microsvc/transport/knative_bus.rs b/src/bus/knative_bus.rs similarity index 91% rename from src/microsvc/transport/knative_bus.rs rename to src/bus/knative_bus.rs index 07a3c76..5c8fa7a 100644 --- a/src/microsvc/transport/knative_bus.rs +++ b/src/bus/knative_bus.rs @@ -8,7 +8,7 @@ //! //! - [`KnativeBus::manifests`] — the role-based `Broker` + per-name `Trigger` //! YAML for a service (and a local/kubefwd variant); -//! - mounting [`cloud_events_router`](super::cloud_events_router) so the Triggers' +//! - mounting [`cloud_events_router`](crate::microsvc::cloud_events_router) so the Triggers' //! `/cloudevent/` subscriber URIs reach `Service::dispatch_message`. //! //! Producing POSTs a binary-mode CloudEvent to a broker-ingress URL: @@ -20,9 +20,11 @@ use std::time::Duration; -use super::knative::sanitize_k8s_name; +use std::collections::HashSet; + +use super::knative::unique_k8s_name; use super::{Bus, TransportError}; -use crate::microsvc::{Message, MessageKind, SubscriptionPlan}; +use super::{Message, MessageKind, SubscriptionPlan}; const SEND_TIMEOUT: Duration = Duration::from_secs(10); @@ -148,9 +150,13 @@ impl KnativeBus { /// subscribed event name to the producing service's events broker. Subscriber /// URIs are `/cloudevent/` (matching [`cloud_events_router`]). /// - /// [`cloud_events_router`]: super::cloud_events_router + /// [`cloud_events_router`]: crate::microsvc::cloud_events_router pub fn manifests(&self, plan: &SubscriptionPlan, subscriptions: &[(&str, &str)]) -> String { let mut out = String::new(); + // One dedup set across commands and events: their Trigger names share the + // `{source}-{name}` shape, so a command and an event whose names normalize + // to the same label must not collide on `metadata.name`. + let mut used = HashSet::new(); if !plan.commands.is_empty() { // The broker a service *owns* for commands it handles is // `{source}-commands` — distinct from `commands_broker` (the @@ -158,7 +164,7 @@ impl KnativeBus { let own_commands = format!("{}-commands", self.source); out.push_str(&self.broker_yaml(&own_commands)); for command in &plan.commands { - out.push_str(&self.trigger_yaml(&own_commands, command)); + out.push_str(&self.trigger_yaml(&own_commands, command, &mut used)); } } if self.publishes_events { @@ -170,7 +176,7 @@ impl KnativeBus { .find(|(name, _)| *name == event.as_str()) .map(|(_, broker)| *broker) .unwrap_or("UNMAPPED-events"); - out.push_str(&self.trigger_yaml(broker, event)); + out.push_str(&self.trigger_yaml(broker, event, &mut used)); } out } @@ -187,8 +193,8 @@ impl KnativeBus { ) } - fn trigger_yaml(&self, broker: &str, event: &str) -> String { - let trigger_name = sanitize_k8s_name(&format!("{}-{}", self.source, event)); + fn trigger_yaml(&self, broker: &str, event: &str, used: &mut HashSet) -> String { + let trigger_name = unique_k8s_name(&format!("{}-{}", self.source, event), used); let subscriber = match &self.local { Some(addr) => format!( "\x20 subscriber:\n\ diff --git a/src/bus/message.rs b/src/bus/message.rs new file mode 100644 index 0000000..a13254e --- /dev/null +++ b/src/bus/message.rs @@ -0,0 +1,130 @@ +//! The canonical transport message vocabulary: [`Message`] + [`MessageKind`]. +//! +//! Bus-core types with no dependency on `microsvc`: a `Message` carries an +//! optional durable id, a name, a [`MessageKind`] (command vs event), the raw +//! payload, a content type, and metadata. Payload decoding returns a bus-core +//! [`PayloadDecodeError`] (not `microsvc::HandlerError`), so the bus does not +//! depend up into microsvc; microsvc maps it back via `From`. + +/// The kind of message a handler consumes. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)] +pub enum MessageKind { + /// A command addressed to one handler. + Command, + /// A published event that may be consumed by many handlers. + Event, +} + +/// Transport subscription metadata derived from a router's registered handlers. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct SubscriptionPlan { + /// Command names consumed by point-to-point command transports. + pub commands: Vec, + /// Event names consumed by fan-out event transports. + pub events: Vec, +} + +/// Failure decoding a [`Message`] payload. +/// +/// A bus-core error so [`Message`] stays free of `microsvc::HandlerError`. The +/// microsvc side provides `From for HandlerError`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PayloadDecodeError(pub String); + +impl std::fmt::Display for PayloadDecodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for PayloadDecodeError {} + +/// Serializable transport message used by handlers. +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct Message { + pub id: Option, + pub name: String, + pub kind: MessageKind, + pub payload: Vec, + pub content_type: String, + pub metadata: Vec<(String, String)>, +} + +impl Message { + /// Create a transport message. + pub fn new(name: impl Into, kind: MessageKind, payload: Vec) -> Self { + Self { + id: None, + name: name.into(), + kind, + payload, + content_type: "application/json".to_string(), + metadata: Vec::new(), + } + } + + /// Add a durable message id. + pub fn with_id(mut self, id: impl Into) -> Self { + self.id = Some(id.into()); + self + } + + /// Add metadata. + pub fn with_metadata(mut self, key: impl Into, value: impl Into) -> Self { + self.metadata.push((key.into(), value.into())); + self + } + + /// Get the durable message id, if this message has one. + pub fn id(&self) -> Option<&str> { + self.id.as_deref() + } + + /// Get the message name. + pub fn name(&self) -> &str { + &self.name + } + + /// Get the raw payload bytes. + pub fn payload(&self) -> &[u8] { + &self.payload + } + + /// Get a metadata value by key. + pub fn metadata(&self, key: &str) -> Option<&str> { + self.metadata + .iter() + .find(|(existing, _)| existing.eq_ignore_ascii_case(key)) + .map(|(_, value)| value.as_str()) + } + + /// Get the correlation id, if present. + pub fn correlation_id(&self) -> Option<&str> { + self.metadata("correlation_id") + } + + /// Get the causation id, if present. + pub fn causation_id(&self) -> Option<&str> { + self.metadata("causation_id") + } + + /// Decode the raw payload as JSON. + pub fn payload_json(&self) -> Result { + serde_json::from_slice(&self.payload).map_err(|e| { + PayloadDecodeError(format!( + "invalid JSON payload for message '{}': {}", + self.name, e + )) + }) + } + + /// Decode the raw payload as bitcode. + pub fn payload_bitcode(&self) -> Result { + bitcode::deserialize(&self.payload).map_err(|e| { + PayloadDecodeError(format!( + "invalid bitcode payload for message '{}': {}", + self.name, e + )) + }) + } +} diff --git a/src/microsvc/transport/mod.rs b/src/bus/mod.rs similarity index 87% rename from src/microsvc/transport/mod.rs rename to src/bus/mod.rs index 320e316..48c66a3 100644 --- a/src/microsvc/transport/mod.rs +++ b/src/bus/mod.rs @@ -10,19 +10,21 @@ //! //! - [`AsyncMessageSource`] / [`ReceivedMessage`] — pull a message from a direct //! transport (Postgres, RabbitMQ, Kafka, NATS, in-memory) and settle it; -//! - [`run_source`] — the runner that dispatches through -//! `Service::dispatch_message`, then acks/nacks/dead-letters per policy. +//! - [`MessageRouter`] — the consume seam a runner dispatches to, implemented by +//! `microsvc::Service` and the dependency-free [`Handlers`] builder; +//! - [`run_source`] — the runner that dispatches each message through the +//! consumer's [`MessageRouter::dispatch`], then acks/nacks/dead-letters per policy. //! //! The producer-side publish path lives here too: //! //! - [`AsyncMessagePublisher`] — the single publish boundary, with each adapter's //! durable publish threshold documented; -//! - [`OutboxDispatcher`] / [`OutboxDispatchOutcome`] — map durable outbox rows to +//! - [`OutboxDispatcher`](crate::OutboxDispatcher) / [`OutboxDispatchOutcome`](crate::OutboxDispatchOutcome) — map durable outbox rows to //! `Message` and dispatch them, sharing one claim → publish → complete path //! between background polling and after-commit immediate dispatch. //! //! Concrete adapters build on these traits: the Postgres outbox-backed source -//! ([`OutboxSource`]) is always available; the NATS JetStream and RabbitMQ +//! ([`OutboxSource`](crate::OutboxSource)) is always available; the NATS JetStream and RabbitMQ //! adapters are behind the `nats` and `rabbitmq` features. The Knative/HTTP //! ingress shape and the Kafka adapter are still separate slices. Everything //! builds on the vocabulary defined here: @@ -79,12 +81,11 @@ //! deduplication, when needed, is the optional consumer inbox, not an outbox or //! publish guarantee. -use crate::microsvc::Message; - mod bus; mod capabilities; mod error; mod failure_policy; +mod handlers; mod in_memory_bus; #[cfg(feature = "kafka")] mod kafka; @@ -94,12 +95,11 @@ mod kafka_bus; mod knative; #[cfg(feature = "http")] mod knative_bus; +mod message; #[cfg(feature = "nats")] mod nats; #[cfg(feature = "nats")] mod nats_bus; -mod outbox_dispatch; -mod outbox_source; #[cfg(feature = "postgres")] mod postgres_bus; mod publisher; @@ -107,6 +107,7 @@ mod publisher; mod rabbit_bus; #[cfg(feature = "rabbitmq")] mod rabbitmq; +mod router; mod run_options; mod runner; mod source; @@ -117,7 +118,7 @@ pub use kafka::{KafkaPublisher, KafkaReceived, KafkaSource}; #[cfg(feature = "kafka")] pub use kafka_bus::KafkaBus; #[cfg(feature = "http")] -pub use knative::{cloud_events_router, knative_triggers}; +pub use knative::knative_triggers; #[cfg(feature = "http")] pub use knative_bus::KnativeBus; #[cfg(feature = "nats")] @@ -133,14 +134,13 @@ pub use bus::{Bus, BusConsumer}; pub use capabilities::{ConsumerAckKind, KnativeIntegrationKind, TransportCapabilities}; pub use error::{TransportError, TransportErrorKind}; pub use failure_policy::{FailureAction, FailurePolicy}; +pub use handlers::{AsyncMessageHandler, Handlers}; pub use in_memory_bus::{InMemoryBus, InMemoryReceived}; -pub use outbox_dispatch::{OutboxDispatchOutcome, OutboxDispatcher, SOURCED_METADATA_PREFIX}; -pub use outbox_source::{ - OutboxSource, ReceivedOutboxMessage, DEFAULT_OUTBOX_SOURCE_BATCH, DEFAULT_OUTBOX_SOURCE_LEASE, -}; +pub use message::{Message, MessageKind, PayloadDecodeError, SubscriptionPlan}; #[cfg(feature = "postgres")] pub use postgres_bus::{LogReceived, PostgresBus, QueueReceived}; pub use publisher::AsyncMessagePublisher; +pub use router::MessageRouter; pub use run_options::{ConsumerDeliveryMode, InboxHook, NoInbox, RunOptions}; pub use runner::run_source; pub use source::{AsyncMessageSource, ReceivedMessage}; diff --git a/src/microsvc/transport/nats.rs b/src/bus/nats.rs similarity index 99% rename from src/microsvc/transport/nats.rs rename to src/bus/nats.rs index f981996..a4e9b76 100644 --- a/src/microsvc/transport/nats.rs +++ b/src/bus/nats.rs @@ -20,7 +20,7 @@ use futures::StreamExt; use super::source::{AsyncMessageSource, ReceivedMessage}; use super::{AsyncMessagePublisher, TransportError}; -use crate::microsvc::{Message, MessageKind}; +use super::{Message, MessageKind}; /// Header carrying the stable message id (and JetStream dedup key). const MESSAGE_ID_HEADER: &str = "Nats-Msg-Id"; diff --git a/src/microsvc/transport/nats_bus.rs b/src/bus/nats_bus.rs similarity index 92% rename from src/microsvc/transport/nats_bus.rs rename to src/bus/nats_bus.rs index 4870083..244e4de 100644 --- a/src/microsvc/transport/nats_bus.rs +++ b/src/bus/nats_bus.rs @@ -26,8 +26,10 @@ use async_nats::jetstream::consumer::pull::Config as PullConfig; use async_nats::jetstream::stream::{Config as StreamConfig, Stream}; use super::nats::{NatsJetStreamSource, NatsPublisher}; -use super::{run_source, AsyncMessagePublisher, Bus, BusConsumer, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind, Service}; +use super::{ + run_source, AsyncMessagePublisher, Bus, BusConsumer, MessageRouter, RunOptions, TransportError, +}; +use super::{Message, MessageKind}; const DEFAULT_FETCH_TIMEOUT: Duration = Duration::from_millis(500); @@ -166,13 +168,14 @@ impl Bus for NatsBus { } impl BusConsumer for NatsBus { - async fn listen( + async fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { - let subjects: Vec = service - .command_names() + let subjects: Vec = router + .subscription_plan() + .commands .iter() .map(|name| format!("{}.cmd.{name}", self.namespace)) .collect(); @@ -186,16 +189,17 @@ impl BusConsumer for NatsBus { format!("{}.cmd.", self.namespace), ) .await?; - run_source(service, source, options).await + run_source(router, source, options).await } - async fn subscribe( + async fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { - let subjects: Vec = service - .event_names() + let subjects: Vec = router + .subscription_plan() + .events .iter() .map(|name| format!("{}.evt.{name}", self.namespace)) .collect(); @@ -209,6 +213,6 @@ impl BusConsumer for NatsBus { format!("{}.evt.", self.namespace), ) .await?; - run_source(service, source, options).await + run_source(router, source, options).await } } diff --git a/src/microsvc/transport/postgres_bus.rs b/src/bus/postgres_bus.rs similarity index 95% rename from src/microsvc/transport/postgres_bus.rs rename to src/bus/postgres_bus.rs index 09b1812..44ae658 100644 --- a/src/microsvc/transport/postgres_bus.rs +++ b/src/bus/postgres_bus.rs @@ -37,8 +37,8 @@ use std::time::Duration; use sqlx::{PgPool, Row}; use super::source::{AsyncMessageSource, ReceivedMessage}; -use super::{run_source, Bus, BusConsumer, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind, Service}; +use super::{run_source, Bus, BusConsumer, MessageRouter, RunOptions, TransportError}; +use super::{Message, MessageKind}; const DEFAULT_LEASE: Duration = Duration::from_secs(30); @@ -211,17 +211,13 @@ impl Bus for PostgresBus { } impl BusConsumer for PostgresBus { - async fn listen( + async fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { self.ensure_tables().await?; - let names: Vec = service - .command_names() - .iter() - .map(|n| n.to_string()) - .collect(); + let names = router.subscription_plan().commands; if names.is_empty() { return Ok(()); } @@ -230,20 +226,16 @@ impl BusConsumer for PostgresBus { names, lease_secs: self.lease.as_secs_f64(), }; - run_source(service, source, options).await + run_source(router, source, options).await } - async fn subscribe( + async fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { self.ensure_tables().await?; - let names: Vec = service - .event_names() - .iter() - .map(|n| n.to_string()) - .collect(); + let names = router.subscription_plan().events; if names.is_empty() { return Ok(()); } @@ -252,7 +244,7 @@ impl BusConsumer for PostgresBus { names, consumer: self.group.clone(), }; - run_source(service, source, options).await + run_source(router, source, options).await } } diff --git a/src/microsvc/transport/publisher.rs b/src/bus/publisher.rs similarity index 99% rename from src/microsvc/transport/publisher.rs rename to src/bus/publisher.rs index c4ac29b..19bc259 100644 --- a/src/microsvc/transport/publisher.rs +++ b/src/bus/publisher.rs @@ -57,7 +57,7 @@ pub trait AsyncMessagePublisher: Send + Sync { #[cfg(test)] mod tests { use super::*; - use crate::microsvc::MessageKind; + use crate::bus::MessageKind; use std::future::Future; use std::sync::Mutex; diff --git a/src/microsvc/transport/rabbit_bus.rs b/src/bus/rabbit_bus.rs similarity index 92% rename from src/microsvc/transport/rabbit_bus.rs rename to src/bus/rabbit_bus.rs index f140717..bda4614 100644 --- a/src/microsvc/transport/rabbit_bus.rs +++ b/src/bus/rabbit_bus.rs @@ -31,8 +31,8 @@ use lapin::{Channel, ExchangeKind}; use super::rabbitmq::{connect_channel, message_properties, RabbitReceived}; use super::source::AsyncMessageSource; -use super::{run_source, Bus, BusConsumer, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind, Service}; +use super::{run_source, Bus, BusConsumer, MessageRouter, RunOptions, TransportError}; +use super::{Message, MessageKind}; fn retryable(context: &str, err: impl std::fmt::Display) -> TransportError { TransportError::retryable(format!("{context}: {err}")) @@ -145,14 +145,15 @@ impl RabbitBus { /// service's event names — the durable setup `subscribe` needs. Exposed so a /// producer can ensure all subscriber bindings exist *before* publishing /// (RabbitMQ drops events with no matching binding). - pub async fn ensure_subscription( + pub async fn ensure_subscription( &self, - service: &Service, + router: &R, ) -> Result<(), TransportError> { self.declare_events_exchange(&self.channel).await?; let queue = self.group_queue(); self.declare_queue(&self.channel, &queue).await?; - for name in service.event_names() { + let plan = router.subscription_plan(); + for name in &plan.events { self.channel .queue_bind( &queue, @@ -197,14 +198,15 @@ impl Bus for RabbitBus { } impl BusConsumer for RabbitBus { - async fn listen( + async fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { let channel = connect_channel(&self.uri).await?; + let plan = router.subscription_plan(); let mut queues = Vec::new(); - for name in service.command_names() { + for name in &plan.commands { let queue = self.command_queue(name); self.declare_queue(&channel, &queue).await?; queues.push(queue); @@ -217,16 +219,16 @@ impl BusConsumer for RabbitBus { queues, strip_prefix: Some(self.command_prefix()), }; - run_source(service, source, options).await + run_source(router, source, options).await } - async fn subscribe( + async fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { - self.ensure_subscription(&service).await?; - if service.event_names().is_empty() { + self.ensure_subscription(router.as_ref()).await?; + if router.subscription_plan().events.is_empty() { return Ok(()); } let channel = connect_channel(&self.uri).await?; @@ -236,7 +238,7 @@ impl BusConsumer for RabbitBus { // Events are published with routing key == the bare event name. strip_prefix: None, }; - run_source(service, source, options).await + run_source(router, source, options).await } } diff --git a/src/microsvc/transport/rabbitmq.rs b/src/bus/rabbitmq.rs similarity index 99% rename from src/microsvc/transport/rabbitmq.rs rename to src/bus/rabbitmq.rs index 3218392..f9dbbb4 100644 --- a/src/microsvc/transport/rabbitmq.rs +++ b/src/bus/rabbitmq.rs @@ -20,7 +20,7 @@ use lapin::{BasicProperties, Channel, Connection, ConnectionProperties}; use super::source::{AsyncMessageSource, ReceivedMessage}; use super::{AsyncMessagePublisher, TransportError}; -use crate::microsvc::{Message, MessageKind}; +use super::{Message, MessageKind}; const MESSAGE_KIND_HEADER: &str = "x-sourced-kind"; diff --git a/src/bus/router.rs b/src/bus/router.rs new file mode 100644 index 0000000..4bfec3b --- /dev/null +++ b/src/bus/router.rs @@ -0,0 +1,45 @@ +//! The consume seam: a minimal router the [`run_source`](super::run_source) loop +//! and the [`BusConsumer`](super::BusConsumer) adapters depend on instead of the +//! concrete `microsvc::Service`. +//! +//! Implemented by `microsvc::Service` (the rich, dependency-carrying registry) +//! and — once it lands — the dependency-free `Handlers` builder. Splitting the +//! consume path behind this trait is what lets the bus become a standalone module +//! that does not name `Service`. See `specs/bus-module-decomposition`. + +use std::future::Future; + +use super::TransportError; +use super::{Message, MessageKind, SubscriptionPlan}; + +/// What the receive loop and the consumer adapters need from a message consumer. +/// +/// Three responsibilities, each used by a different part of the consume path: +/// +/// - [`handles`](MessageRouter::handles) — the runner's ack-and-ignore vs dispatch +/// predicate; +/// - [`subscription_plan`](MessageRouter::subscription_plan) — the command/event +/// names the adapters use to build their broker topology *before* the run loop; +/// - [`dispatch`](MessageRouter::dispatch) — route and run one delivered message, +/// returning an already-classified [`TransportError`] so the bus-core runner +/// never sees a `microsvc::HandlerError`. +/// +/// Not dyn-compatible (the RPITIT `dispatch`), exactly like [`Bus`](super::Bus) / +/// [`BusConsumer`](super::BusConsumer): consume it as `Arc` or a generic +/// ``, never `Arc`. +pub trait MessageRouter: Send + Sync { + /// Whether this router has a handler for `(kind, name)`. The runner acks and + /// ignores a delivered message it does not handle rather than dead-lettering it. + fn handles(&self, kind: MessageKind, name: &str) -> bool; + + /// The command and event names a transport should subscribe to, derived from + /// the router's registered handlers. + fn subscription_plan(&self) -> SubscriptionPlan; + + /// Route and run one delivered message. `Ok(())` means the handler (and its + /// effects) succeeded; the error is already classified retryable/permanent. + fn dispatch( + &self, + message: &Message, + ) -> impl Future> + Send; +} diff --git a/src/microsvc/transport/run_options.rs b/src/bus/run_options.rs similarity index 99% rename from src/microsvc/transport/run_options.rs rename to src/bus/run_options.rs index 3ff1533..9047eec 100644 --- a/src/microsvc/transport/run_options.rs +++ b/src/bus/run_options.rs @@ -140,7 +140,7 @@ impl RunOptions { #[cfg(test)] mod tests { use super::*; - use crate::microsvc::MessageKind; + use crate::bus::MessageKind; struct FakeInbox { consumer: &'static str, diff --git a/src/microsvc/transport/runner.rs b/src/bus/runner.rs similarity index 90% rename from src/microsvc/transport/runner.rs rename to src/bus/runner.rs index be6ae6c..f8e4a25 100644 --- a/src/microsvc/transport/runner.rs +++ b/src/bus/runner.rs @@ -10,15 +10,15 @@ use std::sync::Arc; use super::source::{AsyncMessageSource, ReceivedMessage}; -use super::{FailureAction, RunOptions, TransportError}; -use crate::microsvc::{Message, Service}; +use super::Message; +use super::{FailureAction, MessageRouter, RunOptions, TransportError}; /// Run the receive loop for a direct transport source. /// /// For each message the runner: /// /// 1. enforces the inbox stable-id contract (a no-op in idempotent mode); -/// 2. dispatches through [`Service::dispatch_message`]; +/// 2. dispatches through [`MessageRouter::dispatch`]; /// 3. on success, acknowledges via the adapter; /// 4. on failure, routes through [`RunOptions::failure_policy`] — retryable /// failures are nacked for redelivery, permanent failures take the configured @@ -29,7 +29,7 @@ use crate::microsvc::{Message, Service}; /// acks it and moves on rather than dead-lettering it. Fan-out event transports /// may deliver events this service does not consume, and acking matches /// `microsvc::subscribe`; production transports should use -/// [`Service::subscription_plan`] to avoid delivering unrelated messages at all. +/// [`MessageRouter::subscription_plan`] to avoid delivering unrelated messages at all. /// /// The runner **acks only after handler effects have completed**, never before. /// It stops gracefully when the source returns `Ok(None)`, having fully settled @@ -43,24 +43,24 @@ use crate::microsvc::{Message, Service}; /// /// `I: Send` keeps the returned future `Send` so the runner can be spawned on a /// multi-threaded executor regardless of the inbox hook type. -pub async fn run_source( - service: Arc>, +pub async fn run_source( + router: Arc, mut source: S, options: RunOptions, ) -> Result<(), TransportError> where - D: Send + Sync + 'static, + R: MessageRouter, S: AsyncMessageSource, I: Send, { while let Some(received) = source.recv().await? { // No handler for this message: intentionally ignore (ack) rather than // dead-letter, so unrelated fan-out events don't pile into the DLQ. - if !service.handles_message(received.message().kind, received.message().name()) { + if !router.handles(received.message().kind, received.message().name()) { received.ack().await?; continue; } - match dispatch(&service, &options, received.message()).await { + match dispatch(router.as_ref(), &options, received.message()).await { Ok(()) => received.ack().await?, Err(error) => match options.failure_policy.resolve(&error) { FailureAction::Nack => received.nack(&error.to_string()).await?, @@ -68,7 +68,7 @@ where FailureAction::Park => received.park(&error.to_string()).await?, FailureAction::LogAndAck => { eprintln!( - "[microsvc::transport] dropping message '{}' after permanent failure: {error}", + "[bus::runner] dropping message '{}' after permanent failure: {error}", received.message().name() ); received.ack().await? @@ -85,30 +85,21 @@ where /// Enforces the inbox stable-id contract first (idempotent mode yields no key /// and skips it), then dispatches. A failed stable-id check is a permanent /// failure — redelivery cannot supply a missing or malformed id. -async fn dispatch( - service: &Service, +async fn dispatch( + router: &R, options: &RunOptions, message: &Message, -) -> Result<(), TransportError> -where - D: Send + Sync + 'static, -{ +) -> Result<(), TransportError> { options .validate_message_id(message) .map_err(|err| TransportError::permanent(err.to_string()).with_source(err))?; - service - .dispatch_message(message) - .await - .map(|_| ()) - .map_err(TransportError::from) + router.dispatch(message).await } #[cfg(test)] mod tests { use super::*; - use crate::microsvc::transport::FailurePolicy; - use crate::microsvc::{HandlerError, MessageKind}; - use serde_json::json; + use crate::bus::{FailurePolicy, Handlers, MessageKind}; use std::collections::VecDeque; use std::future::Future; use std::sync::Mutex; @@ -231,37 +222,34 @@ mod tests { message } - fn service(recorder: &Arc) -> Arc> { + fn router(recorder: &Arc) -> Arc { let ok = recorder.clone(); let retryable = recorder.clone(); let permanent = recorder.clone(); Arc::new( - Service::new(()) - .event("ok") - .handle(move |ctx: &crate::microsvc::Context<()>| { + Handlers::new() + .on_event("ok", move |msg: &Message| { let ok = ok.clone(); - let name = ctx.message().name().to_string(); + let name = msg.name().to_string(); async move { ok.push(Event::Handled(name)); - Ok(json!({})) + Ok(()) } }) - .event("retryable") - .handle(move |ctx: &crate::microsvc::Context<()>| { + .on_event("retryable", move |msg: &Message| { let retryable = retryable.clone(); - let name = ctx.message().name().to_string(); + let name = msg.name().to_string(); async move { retryable.push(Event::Handled(name)); - Err(HandlerError::Other("infra".into())) + Err(TransportError::retryable("infra")) } }) - .event("permanent") - .handle(move |ctx: &crate::microsvc::Context<()>| { + .on_event("permanent", move |msg: &Message| { let permanent = permanent.clone(); - let name = ctx.message().name().to_string(); + let name = msg.name().to_string(); async move { permanent.push(Event::Handled(name)); - Err(HandlerError::Rejected("nope".into())) + Err(TransportError::permanent("nope")) } }), ) @@ -283,7 +271,7 @@ mod tests { recv_error: bool, ) -> RunResult { let recorder = Recorder::new(); - let svc = service(&recorder); + let svc = router(&recorder); let source = FakeSource { queue: messages.into_iter().collect(), recorder: recorder.clone(), @@ -502,7 +490,7 @@ mod tests { // (no-inbox) path: the runner future must be Send. fn assert_send(_: &T) {} let recorder = Recorder::new(); - let svc = service(&recorder); + let svc = router(&recorder); let source = FakeSource { queue: VecDeque::new(), recorder, diff --git a/src/microsvc/transport/source.rs b/src/bus/source.rs similarity index 100% rename from src/microsvc/transport/source.rs rename to src/bus/source.rs diff --git a/src/microsvc/transport/stable_id.rs b/src/bus/stable_id.rs similarity index 100% rename from src/microsvc/transport/stable_id.rs rename to src/bus/stable_id.rs diff --git a/src/lib.rs b/src/lib.rs index 546c661..b9b40fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ extern crate self as sourced_rust; pub mod aggregate; +pub mod bus; pub mod entity; pub mod repository; @@ -90,6 +91,10 @@ pub use outbox_worker::{ // LocalEmitterPublisher requires the emitter feature #[cfg(feature = "emitter")] pub use outbox_worker::LocalEmitterPublisher; +pub use outbox_worker::{ + OutboxDispatchOutcome, OutboxDispatcher, OutboxSource, ReceivedOutboxMessage, + DEFAULT_OUTBOX_SOURCE_BATCH, DEFAULT_OUTBOX_SOURCE_LEASE, SOURCED_METADATA_PREFIX, +}; pub use queued_repo::{ // Async WithOpts + unlock traits (async lock manager variant) diff --git a/src/microsvc/context.rs b/src/microsvc/context.rs index 43f81aa..6a7dbc2 100644 --- a/src/microsvc/context.rs +++ b/src/microsvc/context.rs @@ -9,8 +9,8 @@ use serde_json::Value; use super::dependencies::{HasReadModelStore, HasRepo}; use super::error::HandlerError; -use super::service::Message; use super::session::Session; +use crate::bus::Message; /// The context passed to every handler. /// @@ -58,7 +58,7 @@ impl<'a, D> Context<'a, D> { /// Deserialize the input payload into a typed struct. pub fn input(&self) -> Result { - self.message.payload_json() + self.message.payload_json().map_err(HandlerError::from) } /// Get the raw JSON input. diff --git a/src/microsvc/error.rs b/src/microsvc/error.rs index 56c8c1d..410a418 100644 --- a/src/microsvc/error.rs +++ b/src/microsvc/error.rs @@ -3,6 +3,7 @@ use std::error::Error; use std::fmt; +use crate::bus::{PayloadDecodeError, TransportError, TransportErrorKind}; use crate::{repository::RepositoryError, EventRecordError}; /// Error type for command handler operations. @@ -71,6 +72,12 @@ impl From for HandlerError { } } +impl From for HandlerError { + fn from(err: PayloadDecodeError) -> Self { + HandlerError::DecodeFailed(err.0) + } +} + impl HandlerError { /// Map this error to an HTTP-style status code. pub fn status_code(&self) -> u16 { @@ -85,4 +92,74 @@ impl HandlerError { HandlerError::Other(_) => 500, } } + + /// Classify this error for transport retry purposes (retryable vs permanent). + /// + /// Transient failures (repository errors, not-found, otherwise-unclassified) + /// are retryable: in an at-least-once event-driven system a not-found is + /// usually an out-of-order delivery race a later redelivery resolves. + /// Deterministic failures (unknown routing, decode, rejection, auth, guard) + /// are permanent — redelivering the identical message cannot change them. + pub(crate) fn transport_error_kind(&self) -> TransportErrorKind { + match self { + HandlerError::Repository(_) | HandlerError::NotFound(_) | HandlerError::Other(_) => { + TransportErrorKind::Retryable + } + HandlerError::UnknownCommand(_) + | HandlerError::DecodeFailed(_) + | HandlerError::Rejected(_) + | HandlerError::Unauthorized(_) + | HandlerError::GuardRejected(_) => TransportErrorKind::Permanent, + } + } +} + +/// Classify and convert a handler error into the transport's retryable/permanent +/// vocabulary. This lives on the microsvc side (not the bus) so the bus core does +/// not depend on `HandlerError`. +impl From for TransportError { + fn from(error: HandlerError) -> Self { + let kind = error.transport_error_kind(); + TransportError::new(kind, error.to_string()).with_source(error) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn transient_handler_errors_are_retryable() { + for error in [ + HandlerError::Repository(RepositoryError::NotFound { id: "agg-1".into() }), + HandlerError::NotFound("agg-1".into()), + HandlerError::Other("boom".into()), + ] { + assert_eq!(error.transport_error_kind(), TransportErrorKind::Retryable); + } + } + + #[test] + fn deterministic_handler_errors_are_permanent() { + for error in [ + HandlerError::UnknownCommand("x".into()), + HandlerError::DecodeFailed("x".into()), + HandlerError::Rejected("x".into()), + HandlerError::Unauthorized("x".into()), + HandlerError::GuardRejected("x".into()), + ] { + assert_eq!(error.transport_error_kind(), TransportErrorKind::Permanent); + } + } + + #[test] + fn from_handler_error_preserves_classification_and_source() { + let err: TransportError = HandlerError::Rejected("invalid".into()).into(); + assert!(err.is_permanent()); + assert!(err.source().is_some()); + + let err: TransportError = + HandlerError::Other(Box::::from("infra")).into(); + assert!(err.is_retryable()); + } } diff --git a/src/microsvc/transport/knative.rs b/src/microsvc/knative_ingress.rs similarity index 61% rename from src/microsvc/transport/knative.rs rename to src/microsvc/knative_ingress.rs index ca66e4e..0a58016 100644 --- a/src/microsvc/transport/knative.rs +++ b/src/microsvc/knative_ingress.rs @@ -1,23 +1,25 @@ -//! Knative / CloudEvents HTTP ingress. +//! Knative / CloudEvents HTTP ingress (microsvc side). //! //! Knative is *endpoint-driven*: the platform invokes an HTTP route, so this is -//! a separate ingress shape from the pull-based [`AsyncMessageSource`]. It is -//! NOT modeled as a polling source. The route parses a CloudEvent (binary or -//! structured HTTP mode), maps it to the canonical [`Message`], and calls the -//! same [`Service::dispatch_message`] boundary the direct-transport runner uses. +//! a separate ingress shape from the pull-based bus sources. The route parses a +//! CloudEvent (binary or structured HTTP mode), maps it to the canonical +//! [`Message`], and calls the same [`Service::dispatch_message`] boundary the +//! direct-transport runner uses. +//! +//! This lives on the **microsvc side**, not in the bus crate, because it is +//! intrinsically `Service`-coupled: it serializes the handler's returned `Value` +//! into the HTTP body and classifies a `HandlerError` to pick the status code — +//! neither of which the bus's `MessageRouter::dispatch` (unit-returning) exposes. //! //! Acknowledgement is the HTTP response: //! //! - handler success → `200 OK` (the only ack); //! - retryable failure → `503` so Knative redelivers per its Delivery config; -//! - permanent failure → `422` so Knative stops retrying / dead-letters per its -//! Delivery config. +//! - permanent failure → `422` so Knative stops retrying / dead-letters. //! //! Retry, backoff, and dead-lettering are **platform-managed** by Knative -//! Eventing here — unlike direct transports, where this crate's -//! [`FailurePolicy`](super::FailurePolicy) owns them. -//! -//! Requires the `http` feature. +//! Eventing here — unlike direct transports, where the bus `FailurePolicy` owns +//! them. Requires the `http` feature. use std::sync::Arc; @@ -29,8 +31,8 @@ use axum::{Json, Router}; use base64::Engine; use serde_json::{json, Value}; -use super::TransportError; -use crate::microsvc::{Message, MessageKind, Service, SubscriptionPlan}; +use crate::bus::{Message, MessageKind}; +use crate::microsvc::Service; const STRUCTURED_CONTENT_TYPE: &str = "application/cloudevents+json"; @@ -39,8 +41,7 @@ const STRUCTURED_CONTENT_TYPE: &str = "application/cloudevents+json"; /// /// Both dispatch by the parsed CloudEvent `type` (the path segment is for routing /// alignment only), so a Knative Trigger can target either a single shared `ref` -/// (`/`) or the per-type subscriber URI [`KnativeBus`](super::KnativeBus) emits -/// (`/cloudevent/`). Compose it with other routes or serve it directly. +/// (`/`) or the per-type subscriber URI `KnativeBus` emits (`/cloudevent/`). pub fn cloud_events_router(service: Arc>) -> Router { Router::new() .route("/", axum::routing::post(ingress_handler)) @@ -61,9 +62,9 @@ async fn ingress_handler( match service.dispatch_message(&message).await { Ok(value) => (StatusCode::OK, Json(value)).into_response(), Err(err) => { - // Map our retryable/permanent classification onto HTTP so Knative's + // Map the retryable/permanent classification onto HTTP so Knative's // platform-managed retry/DLQ does the right thing. - let status = if TransportError::classify_handler_error(&err).is_retryable() { + let status = if err.transport_error_kind().is_retryable() { StatusCode::SERVICE_UNAVAILABLE } else { StatusCode::UNPROCESSABLE_ENTITY @@ -188,59 +189,6 @@ fn parse_structured(body: &Bytes) -> Result { }) } -/// Sanitize a string into an RFC 1123 DNS label usable as a Kubernetes resource -/// name: lowercase, ASCII-alphanumeric or `-`, no leading/trailing `-`, ≤63 -/// chars. Used for generated `Trigger` names, whose CloudEvent-type segment can -/// contain dots/uppercase/other characters that are invalid in k8s names. -pub(super) fn sanitize_k8s_name(name: &str) -> String { - let mapped: String = name - .chars() - .map(|c| { - let c = c.to_ascii_lowercase(); - if c.is_ascii_alphanumeric() { - c - } else { - '-' - } - }) - .collect(); - let capped: String = mapped.trim_matches('-').chars().take(63).collect(); - let trimmed = capped.trim_end_matches('-'); - if trimmed.is_empty() { - "x".to_string() - } else { - trimmed.to_string() - } -} - -/// Render Knative `Trigger` YAML for each event a service subscribes to, derived -/// from its [`SubscriptionPlan`]. Each Trigger filters on the CloudEvent `type` -/// and routes to `subscriber_service` on `broker`. -pub fn knative_triggers(plan: &SubscriptionPlan, broker: &str, subscriber_service: &str) -> String { - let mut out = String::new(); - for event in &plan.events { - let trigger_name = sanitize_k8s_name(&format!("{subscriber_service}-{event}")); - out.push_str(&format!( - "apiVersion: eventing.knative.dev/v1\n\ - kind: Trigger\n\ - metadata:\n\ - \x20 name: {trigger_name}\n\ - spec:\n\ - \x20 broker: {broker}\n\ - \x20 filter:\n\ - \x20 attributes:\n\ - \x20 type: {event}\n\ - \x20 subscriber:\n\ - \x20 ref:\n\ - \x20 apiVersion: serving.knative.dev/v1\n\ - \x20 kind: Service\n\ - \x20 name: {subscriber_service}\n\ - ---\n" - )); - } - out -} - #[cfg(test)] mod tests { use super::*; @@ -299,47 +247,4 @@ mod tests { let h = headers(&[("ce-type", "order.created")]); assert!(parse_cloud_event(&h, &Bytes::new()).is_err()); } - - #[test] - fn triggers_render_from_subscription_plan() { - let plan = SubscriptionPlan { - commands: vec![], - events: vec!["seat.reserved".to_string()], - }; - let yaml = knative_triggers(&plan, "default", "checkout-projection"); - assert!(yaml.contains("kind: Trigger")); - assert!(yaml.contains("type: seat.reserved")); - assert!(yaml.contains("name: checkout-projection-seat-reserved")); - assert!(yaml.contains("broker: default")); - } - - #[test] - fn sanitize_k8s_name_enforces_rfc1123() { - // Valid names pass through unchanged. - assert_eq!( - sanitize_k8s_name("checkout-projection-seat-reserved"), - "checkout-projection-seat-reserved" - ); - // Dots, uppercase, and other characters become '-' and lowercase. - assert_eq!(sanitize_k8s_name("Order.Created!"), "order-created"); - // No leading/trailing dashes, capped at 63 chars. - let long = "a".repeat(80); - let out = sanitize_k8s_name(&format!(".{long}.")); - assert_eq!(out.len(), 63); - assert!(!out.starts_with('-') && !out.ends_with('-')); - // All-invalid degrades to a safe placeholder. - assert_eq!(sanitize_k8s_name("..."), "x"); - } - - #[test] - fn trigger_name_is_sanitized_for_messy_event_types() { - let plan = SubscriptionPlan { - commands: vec![], - events: vec!["Order.Created".to_string()], - }; - let yaml = knative_triggers(&plan, "default", "checkout-projection"); - // The CloudEvent type filter keeps the raw type; the resource name is sanitized. - assert!(yaml.contains("type: Order.Created")); - assert!(yaml.contains("name: checkout-projection-order-created")); - } } diff --git a/src/microsvc/message_router.rs b/src/microsvc/message_router.rs new file mode 100644 index 0000000..97378df --- /dev/null +++ b/src/microsvc/message_router.rs @@ -0,0 +1,27 @@ +//! `microsvc::Service` as a transport [`MessageRouter`]. +//! +//! This is the bridge that lets the bus consume side run a service without the +//! bus naming `Service`. Handler-error classification into the transport's +//! retryable/permanent vocabulary happens here, on the microsvc side, so the +//! bus-core runner only ever sees an already-classified `TransportError`. + +use crate::bus::{MessageRouter, TransportError}; +use crate::microsvc::{Message, MessageKind, Service, SubscriptionPlan}; + +impl MessageRouter for Service { + fn handles(&self, kind: MessageKind, name: &str) -> bool { + self.handles_message(kind, name) + } + + fn subscription_plan(&self) -> SubscriptionPlan { + // Call the inherent method, not this trait method (which would recurse). + Service::subscription_plan(self) + } + + async fn dispatch(&self, message: &Message) -> Result<(), TransportError> { + self.dispatch_message(message) + .await + .map(|_| ()) + .map_err(TransportError::from) + } +} diff --git a/src/microsvc/mod.rs b/src/microsvc/mod.rs index d46ee2d..a053c31 100644 --- a/src/microsvc/mod.rs +++ b/src/microsvc/mod.rs @@ -56,10 +56,11 @@ mod context; mod dependencies; mod error; +mod message_router; mod service; mod session; -pub mod transport; +pub use crate::bus::{Message, MessageKind, PayloadDecodeError, SubscriptionPlan}; pub use context::Context; pub use dependencies::{ HasReadModelStore, HasRepo, ReadModelStoreDependencies, RepoDependencies, @@ -68,7 +69,7 @@ pub use dependencies::{ pub use error::HandlerError; pub use service::{ CommandRequest, CommandResponse, DeliveryKind, HandlerBuilder, HandlerNames, HandlerSpec, - Message, MessageKind, Service, SubscriptionPlan, + Service, }; pub use session::Session; @@ -78,6 +79,13 @@ mod http; #[cfg(feature = "http")] pub use http::{router, serve}; +// Knative / CloudEvents HTTP ingress (Service-coupled; the bus keeps only the +// produce/manifest helpers). Requires the "http" feature. +#[cfg(feature = "http")] +mod knative_ingress; +#[cfg(feature = "http")] +pub use knative_ingress::cloud_events_router; + // gRPC transport (requires "grpc" feature) #[cfg(feature = "grpc")] pub mod grpc; diff --git a/src/microsvc/service.rs b/src/microsvc/service.rs index dcf300d..1412add 100644 --- a/src/microsvc/service.rs +++ b/src/microsvc/service.rs @@ -30,6 +30,7 @@ use super::context::Context; use super::dependencies::{HasReadModelStore, HasRepo, RepoReadModelDependencies}; use super::error::HandlerError; use super::session::Session; +use crate::bus::{Message, MessageKind, SubscriptionPlan}; type GuardFn = dyn Fn(&Context) -> bool + Send + Sync; type HandlerFuture<'a> = Pin> + Send + 'a>>; @@ -64,15 +65,6 @@ where Arc::new(move |ctx| Box::pin(handler.call(ctx)) as HandlerFuture<'_>) } -/// The kind of message a handler consumes. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)] -pub enum MessageKind { - /// A command addressed to one handler. - Command, - /// A published event that may be consumed by many handlers. - Event, -} - /// How a handler expects the transport to deliver matching messages. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum DeliveryKind { @@ -142,105 +134,6 @@ impl HandlerSpec { } } -/// Transport subscription metadata derived from registered handlers. -#[derive(Debug, Clone, Default, PartialEq, Eq)] -pub struct SubscriptionPlan { - /// Command names consumed by point-to-point command transports. - pub commands: Vec, - /// Event names consumed by fan-out event transports. - pub events: Vec, -} - -/// Serializable transport message used by handlers. -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct Message { - pub id: Option, - pub name: String, - pub kind: MessageKind, - pub payload: Vec, - pub content_type: String, - pub metadata: Vec<(String, String)>, -} - -impl Message { - /// Create a transport message. - pub fn new(name: impl Into, kind: MessageKind, payload: Vec) -> Self { - Self { - id: None, - name: name.into(), - kind, - payload, - content_type: "application/json".to_string(), - metadata: Vec::new(), - } - } - - /// Add a durable message id. - pub fn with_id(mut self, id: impl Into) -> Self { - self.id = Some(id.into()); - self - } - - /// Add metadata. - pub fn with_metadata(mut self, key: impl Into, value: impl Into) -> Self { - self.metadata.push((key.into(), value.into())); - self - } - - /// Get the durable message id, if this message has one. - pub fn id(&self) -> Option<&str> { - self.id.as_deref() - } - - /// Get the message name. - pub fn name(&self) -> &str { - &self.name - } - - /// Get the raw payload bytes. - pub fn payload(&self) -> &[u8] { - &self.payload - } - - /// Get a metadata value by key. - pub fn metadata(&self, key: &str) -> Option<&str> { - self.metadata - .iter() - .find(|(existing, _)| existing.eq_ignore_ascii_case(key)) - .map(|(_, value)| value.as_str()) - } - - /// Get the correlation id, if present. - pub fn correlation_id(&self) -> Option<&str> { - self.metadata("correlation_id") - } - - /// Get the causation id, if present. - pub fn causation_id(&self) -> Option<&str> { - self.metadata("causation_id") - } - - /// Decode the raw payload as JSON. - pub fn payload_json(&self) -> Result { - serde_json::from_slice(&self.payload).map_err(|e| { - HandlerError::DecodeFailed(format!( - "invalid JSON payload for message '{}': {}", - self.name, e - )) - }) - } - - /// Decode the raw payload as bitcode. - pub fn payload_bitcode(&self) -> Result { - bitcode::deserialize(&self.payload).map_err(|e| { - HandlerError::DecodeFailed(format!( - "invalid bitcode payload for message '{}': {}", - self.name, e - )) - }) - } -} - /// A registered handler with optional guard. struct RegisteredHandler { guard: Option>>, diff --git a/src/outbox_worker/mod.rs b/src/outbox_worker/mod.rs index bfcd2a1..317ddcf 100644 --- a/src/outbox_worker/mod.rs +++ b/src/outbox_worker/mod.rs @@ -37,6 +37,8 @@ //! } //! ``` +mod outbox_dispatch; +mod outbox_source; mod publisher; mod store; mod worker; @@ -55,3 +57,9 @@ pub use store::{ // Worker pub use worker::{DrainResult, OutboxWorker, ProcessOneResult}; + +// Outbox -> bus bridge (moved out of the bus module; depends up on bus traits). +pub use outbox_dispatch::{OutboxDispatchOutcome, OutboxDispatcher, SOURCED_METADATA_PREFIX}; +pub use outbox_source::{ + OutboxSource, ReceivedOutboxMessage, DEFAULT_OUTBOX_SOURCE_BATCH, DEFAULT_OUTBOX_SOURCE_LEASE, +}; diff --git a/src/microsvc/transport/outbox_dispatch.rs b/src/outbox_worker/outbox_dispatch.rs similarity index 96% rename from src/microsvc/transport/outbox_dispatch.rs rename to src/outbox_worker/outbox_dispatch.rs index 1c437ce..68694c9 100644 --- a/src/microsvc/transport/outbox_dispatch.rs +++ b/src/outbox_worker/outbox_dispatch.rs @@ -12,13 +12,20 @@ use std::time::Duration; -use super::publisher::AsyncMessagePublisher; -use super::TransportError; -use crate::microsvc::{Message, MessageKind}; +use super::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef, OutboxPublishFailureAction}; +use crate::bus::{AsyncMessagePublisher, Message, MessageKind, TransportError, TransportErrorKind}; use crate::outbox::OutboxMessage; -use crate::outbox_worker::{ - AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef, OutboxPublishFailureAction, -}; +use crate::repository::RepositoryError; + +/// Repository/store failures (lock contention, storage hiccups, stale-claim +/// conflicts) are retryable: usually transient, resolved by a later re-claim. This +/// conversion lives with the outbox bridge — which legitimately knows both the +/// store and the bus — so bus core stays free of `RepositoryError`. +impl From for TransportError { + fn from(error: RepositoryError) -> Self { + TransportError::new(TransportErrorKind::Retryable, error.to_string()).with_source(error) + } +} /// Content type for an outbox payload. Outbox payloads are codec-encoded bytes /// (bitcode or raw), so the media type is binary; the exact codec travels in diff --git a/src/microsvc/transport/outbox_source.rs b/src/outbox_worker/outbox_source.rs similarity index 97% rename from src/microsvc/transport/outbox_source.rs rename to src/outbox_worker/outbox_source.rs index de58ebb..03f1d9f 100644 --- a/src/microsvc/transport/outbox_source.rs +++ b/src/outbox_worker/outbox_source.rs @@ -18,11 +18,9 @@ use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; -use super::source::{AsyncMessageSource, ReceivedMessage}; -use super::TransportError; -use crate::microsvc::Message; +use super::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef}; +use crate::bus::{AsyncMessageSource, Message, ReceivedMessage, TransportError}; use crate::outbox::OutboxMessage; -use crate::outbox_worker::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef}; /// Default lease held on a claimed row while it is being dispatched. pub const DEFAULT_OUTBOX_SOURCE_LEASE: Duration = Duration::from_secs(30); @@ -175,7 +173,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::microsvc::transport::{run_source, RunOptions}; + use crate::bus::{run_source, RunOptions}; use crate::microsvc::Service; use crate::{ AsyncCommitBatch, AsyncTransactionalCommit, HashMapRepository, OutboxMessage, diff --git a/tests/distributed_read_model/main.rs b/tests/distributed_read_model/main.rs index 8605307..9db91ce 100644 --- a/tests/distributed_read_model/main.rs +++ b/tests/distributed_read_model/main.rs @@ -466,7 +466,7 @@ where /// complete, so each event is forwarded exactly once. async fn publish_pending_outbox( outbox: &sourced_rust::HashMapOutboxStore, - bus: &sourced_rust::microsvc::transport::InMemoryBus, + bus: &sourced_rust::bus::InMemoryBus, ) { let claimed = outbox .claim_async(sourced_rust::ClaimOutboxMessages::new( @@ -501,7 +501,7 @@ async fn publish_pending_outbox( /// assertions — only the transport changed. #[tokio::test] async fn seat_checkout_saga_reserves_seat_and_projects_user_screen() { - use sourced_rust::microsvc::transport::InMemoryBus; + use sourced_rust::bus::InMemoryBus; let checkout_store = HashMapRepository::new(); let checkout_service = @@ -770,7 +770,7 @@ async fn checkout_commands_can_be_grpc_service() { use std::collections::HashMap as StdHashMap; use std::sync::{Arc as StdArc, Mutex as StdMutex}; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, RunOptions}; use sourced_rust::microsvc::{Message, MessageKind}; /// The four checkout events in flow (causal) order, by CloudEvent/event type. @@ -957,7 +957,7 @@ fn matrix_ids(tag: &str) -> AsyncFlowIds { /// In-memory persistence × in-memory transport — the always-on matrix cell. #[tokio::test] async fn matrix_in_memory_persistence_over_in_memory_bus() { - use sourced_rust::microsvc::transport::InMemoryBus; + use sourced_rust::bus::InMemoryBus; let (collector, collected) = build_collector(); run_checkout_over_bus( InMemoryBus::new(), @@ -1007,7 +1007,8 @@ where + Sync + 'static, { - use sourced_rust::microsvc::transport::{cloud_events_router, KnativeBus}; + use sourced_rust::bus::KnativeBus; + use sourced_rust::microsvc::cloud_events_router; let (collector, collected) = build_collector(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0") @@ -1058,7 +1059,7 @@ where #[cfg(feature = "sqlite")] #[tokio::test] async fn matrix_sqlite_persistence_over_in_memory_bus() { - use sourced_rust::microsvc::transport::InMemoryBus; + use sourced_rust::bus::InMemoryBus; let (collector, collected) = build_collector(); run_checkout_over_bus( InMemoryBus::new(), @@ -1088,9 +1089,9 @@ fn nats_url() -> Option { } #[cfg(feature = "nats")] -async fn nats_matrix_bus(ns: &str) -> sourced_rust::microsvc::transport::NatsBus { +async fn nats_matrix_bus(ns: &str) -> sourced_rust::bus::NatsBus { let url = nats_url().expect("NATS_URL set"); - let bus = sourced_rust::microsvc::transport::NatsBus::connect(&url, "matrix", ns) + let bus = sourced_rust::bus::NatsBus::connect(&url, "matrix", ns) .await .expect("nats connect") .with_fetch_timeout(Duration::from_millis(800)); @@ -1143,9 +1144,9 @@ fn amqp_url() -> Option { async fn rabbit_matrix_bus( ns: &str, collector: &StdArc>, -) -> sourced_rust::microsvc::transport::RabbitBus { +) -> sourced_rust::bus::RabbitBus { let url = amqp_url().expect("AMQP_URL set"); - let bus = sourced_rust::microsvc::transport::RabbitBus::connect(&url, "matrix", ns) + let bus = sourced_rust::bus::RabbitBus::connect(&url, "matrix", ns) .await .expect("rabbit connect"); // Topic exchange drops events with no bound queue, so bind before publishing. @@ -1199,9 +1200,9 @@ fn kafka_brokers() -> Option { } #[cfg(feature = "kafka")] -async fn kafka_matrix_bus(ns: &str) -> sourced_rust::microsvc::transport::KafkaBus { +async fn kafka_matrix_bus(ns: &str) -> sourced_rust::bus::KafkaBus { let brokers = kafka_brokers().expect("KAFKA_BROKERS set"); - sourced_rust::microsvc::transport::KafkaBus::connect(&brokers, "matrix", ns) + sourced_rust::bus::KafkaBus::connect(&brokers, "matrix", ns) .await .expect("kafka connect") .with_fetch_timeout(Duration::from_secs(10)) @@ -1246,7 +1247,7 @@ async fn matrix_sqlite_persistence_over_kafka_bus() { #[cfg(feature = "postgres")] #[tokio::test] async fn matrix_in_memory_persistence_over_postgres_bus() { - use sourced_rust::microsvc::transport::PostgresBus; + use sourced_rust::bus::PostgresBus; let Some(schema) = postgres::PostgresTestSchema::create_from_env( "matrix_pgbus", "skipping Postgres-bus matrix cell", @@ -1272,7 +1273,7 @@ async fn matrix_in_memory_persistence_over_postgres_bus() { #[cfg(feature = "postgres")] #[tokio::test] async fn matrix_postgres_persistence_over_in_memory_bus() { - use sourced_rust::microsvc::transport::InMemoryBus; + use sourced_rust::bus::InMemoryBus; let Some(schema) = postgres::PostgresTestSchema::create_from_env( "matrix_pg", "skipping Postgres-persistence matrix cell", @@ -1318,8 +1319,8 @@ async fn postgres_matrix_repo() -> Option<( } #[cfg(feature = "postgres")] -async fn postgres_matrix_bus() -> Option { - use sourced_rust::microsvc::transport::PostgresBus; +async fn postgres_matrix_bus() -> Option { + use sourced_rust::bus::PostgresBus; let schema = postgres::PostgresTestSchema::create_from_env( "matrix_pgbus", "skipping Postgres-bus matrix cell", diff --git a/tests/distributed_read_model_board/main.rs b/tests/distributed_read_model_board/main.rs index 6f37abe..349ca66 100644 --- a/tests/distributed_read_model_board/main.rs +++ b/tests/distributed_read_model_board/main.rs @@ -21,7 +21,7 @@ use projections_service::{load_board, service as build_projection}; use query_service::BoardQueryService; use read_models::register_schemas; use serde::Serialize; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; use sourced_rust::microsvc::{Message, MessageKind, Service, Session}; use sourced_rust::{ AsyncAggregateBuilder, AsyncOutboxStore, ClaimOutboxMessages, HashMapOutboxStore, diff --git a/tests/kafka_transport/main.rs b/tests/kafka_transport/main.rs index df1304f..79f4b1f 100644 --- a/tests/kafka_transport/main.rs +++ b/tests/kafka_transport/main.rs @@ -10,7 +10,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use serde_json::json; -use sourced_rust::microsvc::transport::{ +use sourced_rust::bus::{ run_source, AsyncMessagePublisher, Bus, BusConsumer, KafkaBus, KafkaPublisher, KafkaSource, RunOptions, }; diff --git a/tests/knative_cloudevents/main.rs b/tests/knative_cloudevents/main.rs index 74a1958..3de5417 100644 --- a/tests/knative_cloudevents/main.rs +++ b/tests/knative_cloudevents/main.rs @@ -8,7 +8,8 @@ use std::sync::{Arc, Mutex}; use serde_json::json; -use sourced_rust::microsvc::transport::{cloud_events_router, Bus, KnativeBus}; +use sourced_rust::bus::{Bus, KnativeBus}; +use sourced_rust::microsvc::cloud_events_router; use sourced_rust::microsvc::{ Context, HandlerError, Message, MessageKind, Service, SubscriptionPlan, }; diff --git a/tests/microsvc/transport_listen.rs b/tests/microsvc/transport_listen.rs index 6227eb6..d849da3 100644 --- a/tests/microsvc/transport_listen.rs +++ b/tests/microsvc/transport_listen.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use serde_json::json; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, FailurePolicy, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, FailurePolicy, InMemoryBus, RunOptions}; use sourced_rust::microsvc::{Message, MessageKind, Service, Session}; use sourced_rust::{AsyncAggregateBuilder, HashMapRepository, Queueable}; diff --git a/tests/microsvc/transport_subscribe.rs b/tests/microsvc/transport_subscribe.rs index 12a47c6..ccad38c 100644 --- a/tests/microsvc/transport_subscribe.rs +++ b/tests/microsvc/transport_subscribe.rs @@ -5,7 +5,7 @@ use std::sync::Arc; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; use sourced_rust::microsvc::{Message, MessageKind, Service}; use sourced_rust::{AsyncAggregateBuilder, HashMapRepository, Queueable}; diff --git a/tests/nats_transport/main.rs b/tests/nats_transport/main.rs index 316ccfe..4c450d7 100644 --- a/tests/nats_transport/main.rs +++ b/tests/nats_transport/main.rs @@ -9,7 +9,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use serde_json::json; -use sourced_rust::microsvc::transport::{ +use sourced_rust::bus::{ run_source, AsyncMessagePublisher, Bus, BusConsumer, NatsBus, NatsJetStreamSource, NatsPublisher, RunOptions, }; diff --git a/tests/postgres_transport/main.rs b/tests/postgres_transport/main.rs index 70fb72f..e4a9957 100644 --- a/tests/postgres_transport/main.rs +++ b/tests/postgres_transport/main.rs @@ -12,11 +12,11 @@ mod postgres; use std::sync::{Arc, Mutex}; use serde_json::json; -use sourced_rust::microsvc::transport::{ - run_source, AsyncMessageSource, Bus, BusConsumer, OutboxSource, PostgresBus, ReceivedMessage, - RunOptions, +use sourced_rust::bus::{ + run_source, AsyncMessageSource, Bus, BusConsumer, PostgresBus, ReceivedMessage, RunOptions, }; use sourced_rust::microsvc::{Context, Message, MessageKind, Service}; +use sourced_rust::OutboxSource; use sourced_rust::{ AsyncCommitBatch, AsyncOutboxStore, AsyncTransactionalCommit, OutboxMessage, OutboxMessageStatus, PostgresOutboxStore, PostgresRepository, diff --git a/tests/rabbitmq_transport/main.rs b/tests/rabbitmq_transport/main.rs index 73eed09..6a83d7c 100644 --- a/tests/rabbitmq_transport/main.rs +++ b/tests/rabbitmq_transport/main.rs @@ -8,7 +8,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use serde_json::json; -use sourced_rust::microsvc::transport::{ +use sourced_rust::bus::{ run_source, AsyncMessagePublisher, Bus, BusConsumer, RabbitBus, RabbitPublisher, RabbitSource, RunOptions, }; @@ -223,11 +223,11 @@ async fn bus_publish_subscribe_fans_out_across_groups() { let bus_proj = RabbitBus::connect(&url, "projections", &ns).await.unwrap(); let bus_audit = RabbitBus::connect(&url, "audit", &ns).await.unwrap(); bus_proj - .ensure_subscription(&svc_proj) + .ensure_subscription(svc_proj.as_ref()) .await .expect("bind proj"); bus_audit - .ensure_subscription(&svc_audit) + .ensure_subscription(svc_audit.as_ref()) .await .expect("bind audit"); diff --git a/tests/sagas/microsvc_saga.rs b/tests/sagas/microsvc_saga.rs index b2f24cb..98af470 100644 --- a/tests/sagas/microsvc_saga.rs +++ b/tests/sagas/microsvc_saga.rs @@ -16,7 +16,7 @@ use std::time::Duration; use serde_json::json; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; use sourced_rust::microsvc::{Message, MessageKind, Service, Session}; use sourced_rust::{ AsyncAggregateBuilder, AsyncOutboxStore, ClaimOutboxMessages, HashMapOutboxStore, diff --git a/tests/transport_conformance/mod.rs b/tests/transport_conformance/mod.rs index 5a0a52d..28d7253 100644 --- a/tests/transport_conformance/mod.rs +++ b/tests/transport_conformance/mod.rs @@ -16,11 +16,12 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use serde_json::json; -use sourced_rust::microsvc::transport::{ - run_source, AsyncMessagePublisher, AsyncMessageSource, FailurePolicy, OutboxDispatcher, - ReceivedMessage, RunOptions, TransportError, +use sourced_rust::bus::{ + run_source, AsyncMessagePublisher, AsyncMessageSource, FailurePolicy, ReceivedMessage, + RunOptions, TransportError, }; use sourced_rust::microsvc::{Context, HandlerError, Message, MessageKind, Service}; +use sourced_rust::OutboxDispatcher; use sourced_rust::{ AsyncCommitBatch, AsyncTransactionalCommit, HashMapOutboxStore, HashMapRepository, OutboxMessage, OutboxMessageStatus,