From 13919bc47a163a272b4b1d01d587425a44c4dfe6 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sat, 30 May 2026 23:02:22 -0500 Subject: [PATCH 01/14] refactor(bus): introduce MessageRouter consume seam (Phase 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Decouple the consume path from the concrete microsvc::Service: - Add `MessageRouter` { handles, subscription_plan, dispatch } in transport/router.rs — the trait run_source and the BusConsumer adapters depend on instead of Service. - impl MessageRouter for Service (microsvc/message_router.rs); the HandlerError -> TransportError classification happens on the microsvc side so the runner only sees an already-classified error. - run_source: drop the Service generic, keep the inbox-hook generic (RunOptions::inbox still works). - BusConsumer::listen/subscribe take Arc; rewrite all five adapters (in_memory/nats/rabbitmq/kafka/postgres) + rabbit::ensure_subscription to derive topology from subscription_plan() instead of command_names()/ event_names(). No behavior change. 226 lib + 414 integration + 12 in-memory conformance tests green; cargo check --all-features clean. Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]] Co-Authored-By: Claude Opus 4.8 (1M context) --- src/microsvc/message_router.rs | 27 +++++++++++++++ src/microsvc/mod.rs | 1 + src/microsvc/transport/bus.rs | 22 ++++++------ src/microsvc/transport/in_memory_bus.rs | 30 ++++++----------- src/microsvc/transport/kafka_bus.rs | 34 ++++++++++--------- src/microsvc/transport/mod.rs | 2 ++ src/microsvc/transport/nats_bus.rs | 28 ++++++++------- src/microsvc/transport/postgres_bus.rs | 28 ++++++--------- src/microsvc/transport/rabbit_bus.rs | 30 +++++++++-------- src/microsvc/transport/router.rs | 45 +++++++++++++++++++++++++ src/microsvc/transport/runner.rs | 35 ++++++++----------- 11 files changed, 171 insertions(+), 111 deletions(-) create mode 100644 src/microsvc/message_router.rs create mode 100644 src/microsvc/transport/router.rs diff --git a/src/microsvc/message_router.rs b/src/microsvc/message_router.rs new file mode 100644 index 0000000..5d85acc --- /dev/null +++ b/src/microsvc/message_router.rs @@ -0,0 +1,27 @@ +//! `microsvc::Service` as a transport [`MessageRouter`]. +//! +//! This is the bridge that lets the bus consume side run a service without the +//! bus naming `Service`. Handler-error classification into the transport's +//! retryable/permanent vocabulary happens here, on the microsvc side, so the +//! bus-core runner only ever sees an already-classified `TransportError`. + +use crate::microsvc::transport::{MessageRouter, TransportError}; +use crate::microsvc::{Message, MessageKind, Service, SubscriptionPlan}; + +impl MessageRouter for Service { + fn handles(&self, kind: MessageKind, name: &str) -> bool { + self.handles_message(kind, name) + } + + fn subscription_plan(&self) -> SubscriptionPlan { + // Call the inherent method, not this trait method (which would recurse). + Service::subscription_plan(self) + } + + async fn dispatch(&self, message: &Message) -> Result<(), TransportError> { + self.dispatch_message(message) + .await + .map(|_| ()) + .map_err(TransportError::from) + } +} diff --git a/src/microsvc/mod.rs b/src/microsvc/mod.rs index d46ee2d..0e8b1a3 100644 --- a/src/microsvc/mod.rs +++ b/src/microsvc/mod.rs @@ -56,6 +56,7 @@ mod context; mod dependencies; mod error; +mod message_router; mod service; mod session; pub mod transport; diff --git a/src/microsvc/transport/bus.rs b/src/microsvc/transport/bus.rs index ffe9b1a..cc1ddfd 100644 --- a/src/microsvc/transport/bus.rs +++ b/src/microsvc/transport/bus.rs @@ -20,8 +20,7 @@ use std::future::Future; use std::sync::Arc; -use super::{Message, RunOptions, TransportError}; -use crate::microsvc::Service; +use super::{Message, MessageRouter, RunOptions, TransportError}; /// Produce side of the bus — uniform across every transport. pub trait Bus: Send + Sync { @@ -54,26 +53,25 @@ pub trait Bus: Send + Sync { /// Consume side of the bus — pull transports that run a [`run_source`] loop. /// -/// `listen`/`subscribe` derive the message names from the service's registered -/// handlers ([`Service::command_names`]/[`Service::event_names`]), build the -/// transport's source with the matching topology, and run it. Both run until the -/// source drains/stops. +/// `listen`/`subscribe` derive the message names from the router's registered +/// handlers ([`MessageRouter::subscription_plan`]), build the transport's source +/// with the matching topology, and run it. Both run until the source drains/stops. /// /// [`run_source`]: super::run_source pub trait BusConsumer: Send + Sync { - /// Run `service` as a command listener: consume its command names with + /// Run `router` as a command listener: consume its command names with /// competing-consumer (point-to-point) semantics. - fn listen( + fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> impl Future> + Send; - /// Run `service` as an event subscriber: consume its event names with + /// Run `router` as an event subscriber: consume its event names with /// fan-out semantics. - fn subscribe( + fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> impl Future> + Send; } diff --git a/src/microsvc/transport/in_memory_bus.rs b/src/microsvc/transport/in_memory_bus.rs index 464f187..19d6863 100644 --- a/src/microsvc/transport/in_memory_bus.rs +++ b/src/microsvc/transport/in_memory_bus.rs @@ -14,8 +14,8 @@ use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use super::source::{AsyncMessageSource, ReceivedMessage}; -use super::{run_source, Bus, BusConsumer, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind, Service}; +use super::{run_source, Bus, BusConsumer, MessageRouter, RunOptions, TransportError}; +use crate::microsvc::{Message, MessageKind}; type Queues = Arc>>>; type Topics = Arc>>>; @@ -79,39 +79,31 @@ impl Bus for InMemoryBus { } impl BusConsumer for InMemoryBus { - async fn listen( + async fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { - let names = service - .command_names() - .iter() - .map(|n| n.to_string()) - .collect(); + let names = router.subscription_plan().commands; let source = QueueSource { queues: self.queues.clone(), names, }; - run_source(service, source, options).await + run_source(router, source, options).await } - async fn subscribe( + async fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { - let names = service - .event_names() - .iter() - .map(|n| n.to_string()) - .collect(); + let names = router.subscription_plan().events; let source = TopicSource { topics: self.topics.clone(), names, cursors: HashMap::new(), }; - run_source(service, source, options).await + run_source(router, source, options).await } } @@ -184,7 +176,7 @@ impl ReceivedMessage for InMemoryReceived { #[cfg(test)] mod tests { use super::*; - use crate::microsvc::HandlerError; + use crate::microsvc::{HandlerError, Service}; use serde_json::json; use std::future::Future; diff --git a/src/microsvc/transport/kafka_bus.rs b/src/microsvc/transport/kafka_bus.rs index ccabb4a..ea198d0 100644 --- a/src/microsvc/transport/kafka_bus.rs +++ b/src/microsvc/transport/kafka_bus.rs @@ -22,8 +22,10 @@ use std::sync::Arc; use std::time::Duration; use super::kafka::{KafkaPublisher, KafkaSource}; -use super::{run_source, AsyncMessagePublisher, Bus, BusConsumer, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind, Service}; +use super::{ + run_source, AsyncMessagePublisher, Bus, BusConsumer, MessageRouter, RunOptions, TransportError, +}; +use crate::microsvc::{Message, MessageKind}; const DEFAULT_FETCH_TIMEOUT: Duration = Duration::from_secs(8); @@ -71,9 +73,9 @@ impl KafkaBus { format!("{}.evt.", self.namespace) } - async fn run( + async fn run( &self, - service: Arc>, + router: Arc, topics: Vec, group_id: String, strip_prefix: String, @@ -87,7 +89,7 @@ impl KafkaBus { .await? .with_fetch_timeout(self.fetch_timeout) .with_strip_prefix(strip_prefix); - run_source(service, source, options).await + run_source(router, source, options).await } } @@ -115,33 +117,35 @@ impl Bus for KafkaBus { } impl BusConsumer for KafkaBus { - async fn listen( + async fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { let prefix = self.command_prefix(); - let topics: Vec = service - .command_names() + let topics: Vec = router + .subscription_plan() + .commands .iter() .map(|name| format!("{prefix}{name}")) .collect(); let group_id = format!("{}.{}.cmd", self.namespace, self.group); - self.run(service, topics, group_id, prefix, options).await + self.run(router, topics, group_id, prefix, options).await } - async fn subscribe( + async fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { let prefix = self.event_prefix(); - let topics: Vec = service - .event_names() + let topics: Vec = router + .subscription_plan() + .events .iter() .map(|name| format!("{prefix}{name}")) .collect(); let group_id = format!("{}.{}.evt", self.namespace, self.group); - self.run(service, topics, group_id, prefix, options).await + self.run(router, topics, group_id, prefix, options).await } } diff --git a/src/microsvc/transport/mod.rs b/src/microsvc/transport/mod.rs index 320e316..bab48b6 100644 --- a/src/microsvc/transport/mod.rs +++ b/src/microsvc/transport/mod.rs @@ -107,6 +107,7 @@ mod publisher; mod rabbit_bus; #[cfg(feature = "rabbitmq")] mod rabbitmq; +mod router; mod run_options; mod runner; mod source; @@ -130,6 +131,7 @@ pub use rabbit_bus::RabbitBus; pub use rabbitmq::{RabbitPublisher, RabbitReceived, RabbitSource}; pub use bus::{Bus, BusConsumer}; +pub use router::MessageRouter; pub use capabilities::{ConsumerAckKind, KnativeIntegrationKind, TransportCapabilities}; pub use error::{TransportError, TransportErrorKind}; pub use failure_policy::{FailureAction, FailurePolicy}; diff --git a/src/microsvc/transport/nats_bus.rs b/src/microsvc/transport/nats_bus.rs index 4870083..34de81a 100644 --- a/src/microsvc/transport/nats_bus.rs +++ b/src/microsvc/transport/nats_bus.rs @@ -26,8 +26,10 @@ use async_nats::jetstream::consumer::pull::Config as PullConfig; use async_nats::jetstream::stream::{Config as StreamConfig, Stream}; use super::nats::{NatsJetStreamSource, NatsPublisher}; -use super::{run_source, AsyncMessagePublisher, Bus, BusConsumer, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind, Service}; +use super::{ + run_source, AsyncMessagePublisher, Bus, BusConsumer, MessageRouter, RunOptions, TransportError, +}; +use crate::microsvc::{Message, MessageKind}; const DEFAULT_FETCH_TIMEOUT: Duration = Duration::from_millis(500); @@ -166,13 +168,14 @@ impl Bus for NatsBus { } impl BusConsumer for NatsBus { - async fn listen( + async fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { - let subjects: Vec = service - .command_names() + let subjects: Vec = router + .subscription_plan() + .commands .iter() .map(|name| format!("{}.cmd.{name}", self.namespace)) .collect(); @@ -186,16 +189,17 @@ impl BusConsumer for NatsBus { format!("{}.cmd.", self.namespace), ) .await?; - run_source(service, source, options).await + run_source(router, source, options).await } - async fn subscribe( + async fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { - let subjects: Vec = service - .event_names() + let subjects: Vec = router + .subscription_plan() + .events .iter() .map(|name| format!("{}.evt.{name}", self.namespace)) .collect(); @@ -209,6 +213,6 @@ impl BusConsumer for NatsBus { format!("{}.evt.", self.namespace), ) .await?; - run_source(service, source, options).await + run_source(router, source, options).await } } diff --git a/src/microsvc/transport/postgres_bus.rs b/src/microsvc/transport/postgres_bus.rs index 09b1812..cc5bf5f 100644 --- a/src/microsvc/transport/postgres_bus.rs +++ b/src/microsvc/transport/postgres_bus.rs @@ -37,8 +37,8 @@ use std::time::Duration; use sqlx::{PgPool, Row}; use super::source::{AsyncMessageSource, ReceivedMessage}; -use super::{run_source, Bus, BusConsumer, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind, Service}; +use super::{run_source, Bus, BusConsumer, MessageRouter, RunOptions, TransportError}; +use crate::microsvc::{Message, MessageKind}; const DEFAULT_LEASE: Duration = Duration::from_secs(30); @@ -211,17 +211,13 @@ impl Bus for PostgresBus { } impl BusConsumer for PostgresBus { - async fn listen( + async fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { self.ensure_tables().await?; - let names: Vec = service - .command_names() - .iter() - .map(|n| n.to_string()) - .collect(); + let names = router.subscription_plan().commands; if names.is_empty() { return Ok(()); } @@ -230,20 +226,16 @@ impl BusConsumer for PostgresBus { names, lease_secs: self.lease.as_secs_f64(), }; - run_source(service, source, options).await + run_source(router, source, options).await } - async fn subscribe( + async fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { self.ensure_tables().await?; - let names: Vec = service - .event_names() - .iter() - .map(|n| n.to_string()) - .collect(); + let names = router.subscription_plan().events; if names.is_empty() { return Ok(()); } @@ -252,7 +244,7 @@ impl BusConsumer for PostgresBus { names, consumer: self.group.clone(), }; - run_source(service, source, options).await + run_source(router, source, options).await } } diff --git a/src/microsvc/transport/rabbit_bus.rs b/src/microsvc/transport/rabbit_bus.rs index f140717..8c29e8d 100644 --- a/src/microsvc/transport/rabbit_bus.rs +++ b/src/microsvc/transport/rabbit_bus.rs @@ -31,8 +31,8 @@ use lapin::{Channel, ExchangeKind}; use super::rabbitmq::{connect_channel, message_properties, RabbitReceived}; use super::source::AsyncMessageSource; -use super::{run_source, Bus, BusConsumer, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind, Service}; +use super::{run_source, Bus, BusConsumer, MessageRouter, RunOptions, TransportError}; +use crate::microsvc::{Message, MessageKind}; fn retryable(context: &str, err: impl std::fmt::Display) -> TransportError { TransportError::retryable(format!("{context}: {err}")) @@ -145,14 +145,15 @@ impl RabbitBus { /// service's event names — the durable setup `subscribe` needs. Exposed so a /// producer can ensure all subscriber bindings exist *before* publishing /// (RabbitMQ drops events with no matching binding). - pub async fn ensure_subscription( + pub async fn ensure_subscription( &self, - service: &Service, + router: &R, ) -> Result<(), TransportError> { self.declare_events_exchange(&self.channel).await?; let queue = self.group_queue(); self.declare_queue(&self.channel, &queue).await?; - for name in service.event_names() { + let plan = router.subscription_plan(); + for name in &plan.events { self.channel .queue_bind( &queue, @@ -197,14 +198,15 @@ impl Bus for RabbitBus { } impl BusConsumer for RabbitBus { - async fn listen( + async fn listen( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { let channel = connect_channel(&self.uri).await?; + let plan = router.subscription_plan(); let mut queues = Vec::new(); - for name in service.command_names() { + for name in &plan.commands { let queue = self.command_queue(name); self.declare_queue(&channel, &queue).await?; queues.push(queue); @@ -217,16 +219,16 @@ impl BusConsumer for RabbitBus { queues, strip_prefix: Some(self.command_prefix()), }; - run_source(service, source, options).await + run_source(router, source, options).await } - async fn subscribe( + async fn subscribe( &self, - service: Arc>, + router: Arc, options: RunOptions, ) -> Result<(), TransportError> { - self.ensure_subscription(&service).await?; - if service.event_names().is_empty() { + self.ensure_subscription(router.as_ref()).await?; + if router.subscription_plan().events.is_empty() { return Ok(()); } let channel = connect_channel(&self.uri).await?; @@ -236,7 +238,7 @@ impl BusConsumer for RabbitBus { // Events are published with routing key == the bare event name. strip_prefix: None, }; - run_source(service, source, options).await + run_source(router, source, options).await } } diff --git a/src/microsvc/transport/router.rs b/src/microsvc/transport/router.rs new file mode 100644 index 0000000..8323ed9 --- /dev/null +++ b/src/microsvc/transport/router.rs @@ -0,0 +1,45 @@ +//! The consume seam: a minimal router the [`run_source`](super::run_source) loop +//! and the [`BusConsumer`](super::BusConsumer) adapters depend on instead of the +//! concrete `microsvc::Service`. +//! +//! Implemented by `microsvc::Service` (the rich, dependency-carrying registry) +//! and — once it lands — the dependency-free `Handlers` builder. Splitting the +//! consume path behind this trait is what lets the bus become a standalone module +//! that does not name `Service`. See `specs/bus-module-decomposition`. + +use std::future::Future; + +use super::TransportError; +use crate::microsvc::{Message, MessageKind, SubscriptionPlan}; + +/// What the receive loop and the consumer adapters need from a message consumer. +/// +/// Three responsibilities, each used by a different part of the consume path: +/// +/// - [`handles`](MessageRouter::handles) — the runner's ack-and-ignore vs dispatch +/// predicate; +/// - [`subscription_plan`](MessageRouter::subscription_plan) — the command/event +/// names the adapters use to build their broker topology *before* the run loop; +/// - [`dispatch`](MessageRouter::dispatch) — route and run one delivered message, +/// returning an already-classified [`TransportError`] so the bus-core runner +/// never sees a `microsvc::HandlerError`. +/// +/// Not dyn-compatible (the RPITIT `dispatch`), exactly like [`Bus`](super::Bus) / +/// [`BusConsumer`](super::BusConsumer): consume it as `Arc` or a generic +/// ``, never `Arc`. +pub trait MessageRouter: Send + Sync { + /// Whether this router has a handler for `(kind, name)`. The runner acks and + /// ignores a delivered message it does not handle rather than dead-lettering it. + fn handles(&self, kind: MessageKind, name: &str) -> bool; + + /// The command and event names a transport should subscribe to, derived from + /// the router's registered handlers. + fn subscription_plan(&self) -> SubscriptionPlan; + + /// Route and run one delivered message. `Ok(())` means the handler (and its + /// effects) succeeded; the error is already classified retryable/permanent. + fn dispatch( + &self, + message: &Message, + ) -> impl Future> + Send; +} diff --git a/src/microsvc/transport/runner.rs b/src/microsvc/transport/runner.rs index be6ae6c..5e70711 100644 --- a/src/microsvc/transport/runner.rs +++ b/src/microsvc/transport/runner.rs @@ -10,15 +10,15 @@ use std::sync::Arc; use super::source::{AsyncMessageSource, ReceivedMessage}; -use super::{FailureAction, RunOptions, TransportError}; -use crate::microsvc::{Message, Service}; +use super::{FailureAction, MessageRouter, RunOptions, TransportError}; +use crate::microsvc::Message; /// Run the receive loop for a direct transport source. /// /// For each message the runner: /// /// 1. enforces the inbox stable-id contract (a no-op in idempotent mode); -/// 2. dispatches through [`Service::dispatch_message`]; +/// 2. dispatches through [`MessageRouter::dispatch`]; /// 3. on success, acknowledges via the adapter; /// 4. on failure, routes through [`RunOptions::failure_policy`] — retryable /// failures are nacked for redelivery, permanent failures take the configured @@ -29,7 +29,7 @@ use crate::microsvc::{Message, Service}; /// acks it and moves on rather than dead-lettering it. Fan-out event transports /// may deliver events this service does not consume, and acking matches /// `microsvc::subscribe`; production transports should use -/// [`Service::subscription_plan`] to avoid delivering unrelated messages at all. +/// [`MessageRouter::subscription_plan`] to avoid delivering unrelated messages at all. /// /// The runner **acks only after handler effects have completed**, never before. /// It stops gracefully when the source returns `Ok(None)`, having fully settled @@ -43,24 +43,24 @@ use crate::microsvc::{Message, Service}; /// /// `I: Send` keeps the returned future `Send` so the runner can be spawned on a /// multi-threaded executor regardless of the inbox hook type. -pub async fn run_source( - service: Arc>, +pub async fn run_source( + router: Arc, mut source: S, options: RunOptions, ) -> Result<(), TransportError> where - D: Send + Sync + 'static, + R: MessageRouter, S: AsyncMessageSource, I: Send, { while let Some(received) = source.recv().await? { // No handler for this message: intentionally ignore (ack) rather than // dead-letter, so unrelated fan-out events don't pile into the DLQ. - if !service.handles_message(received.message().kind, received.message().name()) { + if !router.handles(received.message().kind, received.message().name()) { received.ack().await?; continue; } - match dispatch(&service, &options, received.message()).await { + match dispatch(router.as_ref(), &options, received.message()).await { Ok(()) => received.ack().await?, Err(error) => match options.failure_policy.resolve(&error) { FailureAction::Nack => received.nack(&error.to_string()).await?, @@ -85,29 +85,22 @@ where /// Enforces the inbox stable-id contract first (idempotent mode yields no key /// and skips it), then dispatches. A failed stable-id check is a permanent /// failure — redelivery cannot supply a missing or malformed id. -async fn dispatch( - service: &Service, +async fn dispatch( + router: &R, options: &RunOptions, message: &Message, -) -> Result<(), TransportError> -where - D: Send + Sync + 'static, -{ +) -> Result<(), TransportError> { options .validate_message_id(message) .map_err(|err| TransportError::permanent(err.to_string()).with_source(err))?; - service - .dispatch_message(message) - .await - .map(|_| ()) - .map_err(TransportError::from) + router.dispatch(message).await } #[cfg(test)] mod tests { use super::*; use crate::microsvc::transport::FailurePolicy; - use crate::microsvc::{HandlerError, MessageKind}; + use crate::microsvc::{HandlerError, MessageKind, Service}; use serde_json::json; use std::collections::VecDeque; use std::future::Future; From 2cadc9d295434a77ac30cbb17e6681aa9223f400 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sat, 30 May 2026 23:04:54 -0500 Subject: [PATCH 02/14] feat(bus): add dependency-free Handlers inline registry (Phase 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Handlers is the second MessageRouter impl and the standalone, Service-free way to consume the bus: register a closure per (kind, name) and run it with bus.listen/bus.subscribe — the Rust analog of Node's bus.listen('x', fn). - Reuses the AsyncMessageHandler HRTB boxed-future pattern (over &Message, no Context/deps/guards); idempotent-only by design. - A Service<()> facade is impossible (bus must not depend on microsvc), so Handlers is its own engine. - Tests prove full InMemoryBus publish->subscribe and send->listen round trips with no Service. Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]] Co-Authored-By: Claude Opus 4.8 (1M context) --- src/microsvc/transport/handlers.rs | 221 +++++++++++++++++++++++++++++ src/microsvc/transport/mod.rs | 2 + 2 files changed, 223 insertions(+) create mode 100644 src/microsvc/transport/handlers.rs diff --git a/src/microsvc/transport/handlers.rs b/src/microsvc/transport/handlers.rs new file mode 100644 index 0000000..4b19cc4 --- /dev/null +++ b/src/microsvc/transport/handlers.rs @@ -0,0 +1,221 @@ +//! [`Handlers`] — a dependency-free inline consumer registry. +//! +//! The standalone, `Service`-free way to consume the bus: register a closure per +//! `(kind, name)` and run it with `bus.listen`/`bus.subscribe`. It is the second +//! implementation of [`MessageRouter`] (the first being `microsvc::Service`), +//! and the analog of the Node `bus.listen('seat.reserved', handler)` ergonomic. +//! +//! ```ignore +//! let handlers = Handlers::new() +//! .on_event("seat.reserved", |msg: &Message| async move { /* ... */ Ok(()) }) +//! .on_command("place.bet", |msg: &Message| async move { Ok(()) }); +//! bus.subscribe(Arc::new(handlers), RunOptions::idempotent()).await?; +//! ``` +//! +//! Deliberately minimal: handlers take `&Message` (no `Context`, dependencies, +//! guards, or sessions) and the registry is idempotent-only. The rich path — +//! typed input, guards, `Session`, dependencies, the inbox hook — stays on +//! `microsvc::Service`. A handler returns `Ok(())` to ack, `Err` to nack +//! (the runner classifies retryable vs permanent via the [`TransportError`]). +//! +//! The win over `Service<()>` is dropping `D`/`Context` and not depending on +//! `microsvc` at all — a `Service<()>` facade is impossible here, because the bus +//! cannot depend up into `microsvc` where `Service` lives. + +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use super::{MessageRouter, TransportError}; +use crate::microsvc::{Message, MessageKind, SubscriptionPlan}; + +type HandlerFuture<'a> = Pin> + Send + 'a>>; +type HandlerFn = dyn for<'a> Fn(&'a Message) -> HandlerFuture<'a> + Send + Sync; + +/// Lets an `async fn(msg: &Message) -> Result<(), TransportError>` register +/// directly as a handler. The higher-ranked bound ties the returned future's +/// lifetime to the borrowed [`Message`], which a plain generic future parameter +/// cannot express (the same shape `microsvc::Service` uses for `Context`). +pub trait AsyncMessageHandler<'a>: Send + Sync { + /// The future returned by the handler for a message borrowed for `'a`. + type Future: Future> + Send + 'a; + /// Run the handler against the borrowed message. + fn call(&self, message: &'a Message) -> Self::Future; +} + +impl<'a, F, Fut> AsyncMessageHandler<'a> for F +where + F: Fn(&'a Message) -> Fut + Send + Sync, + Fut: Future> + Send + 'a, +{ + type Future = Fut; + fn call(&self, message: &'a Message) -> Fut { + self(message) + } +} + +fn boxed_handler(handler: F) -> Arc +where + F: for<'a> AsyncMessageHandler<'a> + 'static, +{ + Arc::new(move |message| Box::pin(handler.call(message)) as HandlerFuture<'_>) +} + +/// A dependency-free [`MessageRouter`]: a set of `(kind, name)` → async closure +/// bindings. Build it fluently with [`on_command`](Handlers::on_command) / +/// [`on_event`](Handlers::on_event), then run it with `bus.listen`/`bus.subscribe`. +#[derive(Clone, Default)] +pub struct Handlers { + handlers: HashMap<(MessageKind, String), Arc>, +} + +impl Handlers { + /// An empty registry. + pub fn new() -> Self { + Self::default() + } + + /// Register a command handler (point-to-point / competing-consumer via `listen`). + pub fn on_command(self, name: impl Into, handler: F) -> Self + where + F: for<'a> AsyncMessageHandler<'a> + 'static, + { + self.with(MessageKind::Command, name, handler) + } + + /// Register an event handler (fan-out via `subscribe`). + pub fn on_event(self, name: impl Into, handler: F) -> Self + where + F: for<'a> AsyncMessageHandler<'a> + 'static, + { + self.with(MessageKind::Event, name, handler) + } + + fn with(mut self, kind: MessageKind, name: impl Into, handler: F) -> Self + where + F: for<'a> AsyncMessageHandler<'a> + 'static, + { + self.handlers + .insert((kind, name.into()), boxed_handler(handler)); + self + } +} + +impl MessageRouter for Handlers { + fn handles(&self, kind: MessageKind, name: &str) -> bool { + self.handlers.contains_key(&(kind, name.to_string())) + } + + fn subscription_plan(&self) -> SubscriptionPlan { + let mut plan = SubscriptionPlan::default(); + for (kind, name) in self.handlers.keys() { + let bucket = match kind { + MessageKind::Command => &mut plan.commands, + MessageKind::Event => &mut plan.events, + }; + if !bucket.iter().any(|existing| existing == name) { + bucket.push(name.clone()); + } + } + plan + } + + async fn dispatch(&self, message: &Message) -> Result<(), TransportError> { + // Clone the handler Arc so the map is not borrowed across the awaited + // future (mirrors `Service::invoke`). + let handler = self + .handlers + .get(&(message.kind, message.name().to_string())) + .cloned(); + match handler { + Some(handler) => handler(message).await, + // No binding: the runner already filters via `handles`, so this is a + // benign no-op for any message that slips through. + None => Ok(()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; + use std::future::Future; + use std::sync::atomic::{AtomicUsize, Ordering}; + + fn block_on(future: F) -> F::Output { + use std::ptr; + use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + const VTABLE: RawWakerVTable = RawWakerVTable::new( + |_| RawWaker::new(ptr::null(), &VTABLE), + |_| {}, + |_| {}, + |_| {}, + ); + let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &VTABLE)) }; + let mut cx = Context::from_waker(&waker); + let mut future = std::pin::pin!(future); + loop { + if let Poll::Ready(output) = future.as_mut().poll(&mut cx) { + return output; + } + } + } + + #[test] + fn subscription_plan_groups_by_kind() { + let handlers = Handlers::new() + .on_command("place.bet", |_: &Message| async { Ok(()) }) + .on_event("seat.reserved", |_: &Message| async { Ok(()) }); + let plan = handlers.subscription_plan(); + assert_eq!(plan.commands, vec!["place.bet".to_string()]); + assert_eq!(plan.events, vec!["seat.reserved".to_string()]); + assert!(handlers.handles(MessageKind::Command, "place.bet")); + assert!(handlers.handles(MessageKind::Event, "seat.reserved")); + assert!(!handlers.handles(MessageKind::Event, "place.bet")); + } + + #[test] + fn fan_out_round_trip_without_service() { + // Full InMemoryBus publish -> subscribe round trip using only Handlers — + // no Service, no microsvc handler machinery. + let bus = InMemoryBus::new(); + for _ in 0..3 { + block_on(bus.publish("seat.reserved", b"{}".to_vec())).unwrap(); + } + let count = Arc::new(AtomicUsize::new(0)); + let seen = count.clone(); + let handlers = Arc::new(Handlers::new().on_event("seat.reserved", move |msg: &Message| { + let seen = seen.clone(); + // Touch the message to prove the borrow is usable in the handler. + let is_event = matches!(msg.kind, MessageKind::Event); + async move { + if is_event { + seen.fetch_add(1, Ordering::SeqCst); + } + Ok(()) + } + })); + block_on(bus.subscribe(handlers, RunOptions::idempotent())).unwrap(); + assert_eq!(count.load(Ordering::SeqCst), 3); + } + + #[test] + fn point_to_point_round_trip_without_service() { + let bus = InMemoryBus::new(); + block_on(bus.send("place.bet", b"{}".to_vec())).unwrap(); + block_on(bus.send("place.bet", b"{}".to_vec())).unwrap(); + let count = Arc::new(AtomicUsize::new(0)); + let seen = count.clone(); + let handlers = Arc::new(Handlers::new().on_command("place.bet", move |_: &Message| { + let seen = seen.clone(); + async move { + seen.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + })); + block_on(bus.listen(handlers, RunOptions::idempotent())).unwrap(); + assert_eq!(count.load(Ordering::SeqCst), 2); + } +} diff --git a/src/microsvc/transport/mod.rs b/src/microsvc/transport/mod.rs index bab48b6..9783399 100644 --- a/src/microsvc/transport/mod.rs +++ b/src/microsvc/transport/mod.rs @@ -85,6 +85,7 @@ mod bus; mod capabilities; mod error; mod failure_policy; +mod handlers; mod in_memory_bus; #[cfg(feature = "kafka")] mod kafka; @@ -135,6 +136,7 @@ pub use router::MessageRouter; pub use capabilities::{ConsumerAckKind, KnativeIntegrationKind, TransportCapabilities}; pub use error::{TransportError, TransportErrorKind}; pub use failure_policy::{FailureAction, FailurePolicy}; +pub use handlers::{AsyncMessageHandler, Handlers}; pub use in_memory_bus::{InMemoryBus, InMemoryReceived}; pub use outbox_dispatch::{OutboxDispatchOutcome, OutboxDispatcher, SOURCED_METADATA_PREFIX}; pub use outbox_source::{ From a4110be8cf90cc80754a23269fc028a6d8397536 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sat, 30 May 2026 23:20:09 -0500 Subject: [PATCH 03/14] refactor(bus): relocate Message/MessageKind into bus-core message module (Phase 3) Move the canonical transport vocabulary out of microsvc::service into a dependency-free transport::message module, so the bus stops returning a microsvc error type from payload decoding: - New transport/message.rs: Message, MessageKind, and a bus-core PayloadDecodeError. payload_json/payload_bitcode now return PayloadDecodeError instead of microsvc::HandlerError. - microsvc::error gains From for HandlerError; Context::input maps back, so handler signatures are unchanged. - Re-export Message/MessageKind/PayloadDecodeError from both transport and microsvc, so every `crate::microsvc::Message` consumer is unaffected. No behavior change. 229 lib + full default integration suite + 12 conformance green; cargo check --all-features clean. Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]] Co-Authored-By: Claude Opus 4.8 (1M context) --- src/microsvc/context.rs | 4 +- src/microsvc/error.rs | 7 ++ src/microsvc/mod.rs | 3 +- src/microsvc/service.rs | 100 +----------------------- src/microsvc/transport/message.rs | 121 ++++++++++++++++++++++++++++++ src/microsvc/transport/mod.rs | 4 +- 6 files changed, 135 insertions(+), 104 deletions(-) create mode 100644 src/microsvc/transport/message.rs diff --git a/src/microsvc/context.rs b/src/microsvc/context.rs index 43f81aa..df27ade 100644 --- a/src/microsvc/context.rs +++ b/src/microsvc/context.rs @@ -9,7 +9,7 @@ use serde_json::Value; use super::dependencies::{HasReadModelStore, HasRepo}; use super::error::HandlerError; -use super::service::Message; +use super::transport::Message; use super::session::Session; /// The context passed to every handler. @@ -58,7 +58,7 @@ impl<'a, D> Context<'a, D> { /// Deserialize the input payload into a typed struct. pub fn input(&self) -> Result { - self.message.payload_json() + self.message.payload_json().map_err(HandlerError::from) } /// Get the raw JSON input. diff --git a/src/microsvc/error.rs b/src/microsvc/error.rs index 56c8c1d..e769ebb 100644 --- a/src/microsvc/error.rs +++ b/src/microsvc/error.rs @@ -3,6 +3,7 @@ use std::error::Error; use std::fmt; +use crate::microsvc::transport::PayloadDecodeError; use crate::{repository::RepositoryError, EventRecordError}; /// Error type for command handler operations. @@ -71,6 +72,12 @@ impl From for HandlerError { } } +impl From for HandlerError { + fn from(err: PayloadDecodeError) -> Self { + HandlerError::DecodeFailed(err.0) + } +} + impl HandlerError { /// Map this error to an HTTP-style status code. pub fn status_code(&self) -> u16 { diff --git a/src/microsvc/mod.rs b/src/microsvc/mod.rs index 0e8b1a3..27117c8 100644 --- a/src/microsvc/mod.rs +++ b/src/microsvc/mod.rs @@ -69,8 +69,9 @@ pub use dependencies::{ pub use error::HandlerError; pub use service::{ CommandRequest, CommandResponse, DeliveryKind, HandlerBuilder, HandlerNames, HandlerSpec, - Message, MessageKind, Service, SubscriptionPlan, + Service, SubscriptionPlan, }; +pub use transport::{Message, MessageKind, PayloadDecodeError}; pub use session::Session; // HTTP transport (requires "http" feature) diff --git a/src/microsvc/service.rs b/src/microsvc/service.rs index dcf300d..0017b72 100644 --- a/src/microsvc/service.rs +++ b/src/microsvc/service.rs @@ -30,6 +30,7 @@ use super::context::Context; use super::dependencies::{HasReadModelStore, HasRepo, RepoReadModelDependencies}; use super::error::HandlerError; use super::session::Session; +use super::transport::{Message, MessageKind}; type GuardFn = dyn Fn(&Context) -> bool + Send + Sync; type HandlerFuture<'a> = Pin> + Send + 'a>>; @@ -64,15 +65,6 @@ where Arc::new(move |ctx| Box::pin(handler.call(ctx)) as HandlerFuture<'_>) } -/// The kind of message a handler consumes. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)] -pub enum MessageKind { - /// A command addressed to one handler. - Command, - /// A published event that may be consumed by many handlers. - Event, -} - /// How a handler expects the transport to deliver matching messages. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum DeliveryKind { @@ -151,96 +143,6 @@ pub struct SubscriptionPlan { pub events: Vec, } -/// Serializable transport message used by handlers. -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct Message { - pub id: Option, - pub name: String, - pub kind: MessageKind, - pub payload: Vec, - pub content_type: String, - pub metadata: Vec<(String, String)>, -} - -impl Message { - /// Create a transport message. - pub fn new(name: impl Into, kind: MessageKind, payload: Vec) -> Self { - Self { - id: None, - name: name.into(), - kind, - payload, - content_type: "application/json".to_string(), - metadata: Vec::new(), - } - } - - /// Add a durable message id. - pub fn with_id(mut self, id: impl Into) -> Self { - self.id = Some(id.into()); - self - } - - /// Add metadata. - pub fn with_metadata(mut self, key: impl Into, value: impl Into) -> Self { - self.metadata.push((key.into(), value.into())); - self - } - - /// Get the durable message id, if this message has one. - pub fn id(&self) -> Option<&str> { - self.id.as_deref() - } - - /// Get the message name. - pub fn name(&self) -> &str { - &self.name - } - - /// Get the raw payload bytes. - pub fn payload(&self) -> &[u8] { - &self.payload - } - - /// Get a metadata value by key. - pub fn metadata(&self, key: &str) -> Option<&str> { - self.metadata - .iter() - .find(|(existing, _)| existing.eq_ignore_ascii_case(key)) - .map(|(_, value)| value.as_str()) - } - - /// Get the correlation id, if present. - pub fn correlation_id(&self) -> Option<&str> { - self.metadata("correlation_id") - } - - /// Get the causation id, if present. - pub fn causation_id(&self) -> Option<&str> { - self.metadata("causation_id") - } - - /// Decode the raw payload as JSON. - pub fn payload_json(&self) -> Result { - serde_json::from_slice(&self.payload).map_err(|e| { - HandlerError::DecodeFailed(format!( - "invalid JSON payload for message '{}': {}", - self.name, e - )) - }) - } - - /// Decode the raw payload as bitcode. - pub fn payload_bitcode(&self) -> Result { - bitcode::deserialize(&self.payload).map_err(|e| { - HandlerError::DecodeFailed(format!( - "invalid bitcode payload for message '{}': {}", - self.name, e - )) - }) - } -} - /// A registered handler with optional guard. struct RegisteredHandler { guard: Option>>, diff --git a/src/microsvc/transport/message.rs b/src/microsvc/transport/message.rs new file mode 100644 index 0000000..4fa6ebd --- /dev/null +++ b/src/microsvc/transport/message.rs @@ -0,0 +1,121 @@ +//! The canonical transport message vocabulary: [`Message`] + [`MessageKind`]. +//! +//! Bus-core types with no dependency on `microsvc`: a `Message` carries an +//! optional durable id, a name, a [`MessageKind`] (command vs event), the raw +//! payload, a content type, and metadata. Payload decoding returns a bus-core +//! [`PayloadDecodeError`] (not `microsvc::HandlerError`), so the bus does not +//! depend up into microsvc; microsvc maps it back via `From`. + +/// The kind of message a handler consumes. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)] +pub enum MessageKind { + /// A command addressed to one handler. + Command, + /// A published event that may be consumed by many handlers. + Event, +} + +/// Failure decoding a [`Message`] payload. +/// +/// A bus-core error so [`Message`] stays free of `microsvc::HandlerError`. The +/// microsvc side provides `From for HandlerError`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PayloadDecodeError(pub String); + +impl std::fmt::Display for PayloadDecodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for PayloadDecodeError {} + +/// Serializable transport message used by handlers. +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct Message { + pub id: Option, + pub name: String, + pub kind: MessageKind, + pub payload: Vec, + pub content_type: String, + pub metadata: Vec<(String, String)>, +} + +impl Message { + /// Create a transport message. + pub fn new(name: impl Into, kind: MessageKind, payload: Vec) -> Self { + Self { + id: None, + name: name.into(), + kind, + payload, + content_type: "application/json".to_string(), + metadata: Vec::new(), + } + } + + /// Add a durable message id. + pub fn with_id(mut self, id: impl Into) -> Self { + self.id = Some(id.into()); + self + } + + /// Add metadata. + pub fn with_metadata(mut self, key: impl Into, value: impl Into) -> Self { + self.metadata.push((key.into(), value.into())); + self + } + + /// Get the durable message id, if this message has one. + pub fn id(&self) -> Option<&str> { + self.id.as_deref() + } + + /// Get the message name. + pub fn name(&self) -> &str { + &self.name + } + + /// Get the raw payload bytes. + pub fn payload(&self) -> &[u8] { + &self.payload + } + + /// Get a metadata value by key. + pub fn metadata(&self, key: &str) -> Option<&str> { + self.metadata + .iter() + .find(|(existing, _)| existing.eq_ignore_ascii_case(key)) + .map(|(_, value)| value.as_str()) + } + + /// Get the correlation id, if present. + pub fn correlation_id(&self) -> Option<&str> { + self.metadata("correlation_id") + } + + /// Get the causation id, if present. + pub fn causation_id(&self) -> Option<&str> { + self.metadata("causation_id") + } + + /// Decode the raw payload as JSON. + pub fn payload_json(&self) -> Result { + serde_json::from_slice(&self.payload).map_err(|e| { + PayloadDecodeError(format!( + "invalid JSON payload for message '{}': {}", + self.name, e + )) + }) + } + + /// Decode the raw payload as bitcode. + pub fn payload_bitcode(&self) -> Result { + bitcode::deserialize(&self.payload).map_err(|e| { + PayloadDecodeError(format!( + "invalid bitcode payload for message '{}': {}", + self.name, e + )) + }) + } +} diff --git a/src/microsvc/transport/mod.rs b/src/microsvc/transport/mod.rs index 9783399..d5875f5 100644 --- a/src/microsvc/transport/mod.rs +++ b/src/microsvc/transport/mod.rs @@ -79,14 +79,13 @@ //! deduplication, when needed, is the optional consumer inbox, not an outbox or //! publish guarantee. -use crate::microsvc::Message; - mod bus; mod capabilities; mod error; mod failure_policy; mod handlers; mod in_memory_bus; +mod message; #[cfg(feature = "kafka")] mod kafka; #[cfg(feature = "kafka")] @@ -138,6 +137,7 @@ pub use error::{TransportError, TransportErrorKind}; pub use failure_policy::{FailureAction, FailurePolicy}; pub use handlers::{AsyncMessageHandler, Handlers}; pub use in_memory_bus::{InMemoryBus, InMemoryReceived}; +pub use message::{Message, MessageKind, PayloadDecodeError}; pub use outbox_dispatch::{OutboxDispatchOutcome, OutboxDispatcher, SOURCED_METADATA_PREFIX}; pub use outbox_source::{ OutboxSource, ReceivedOutboxMessage, DEFAULT_OUTBOX_SOURCE_BATCH, DEFAULT_OUTBOX_SOURCE_LEASE, From a682bef5b3485399401c3218ef9a6b0a6367cd95 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sat, 30 May 2026 23:25:19 -0500 Subject: [PATCH 04/14] refactor(bus)!: lift transport/ to top-level src/bus module (Phase 4a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The bus is no longer embedded under microsvc: `src/microsvc/transport/` is now `src/bus/` (crate::bus). microsvc keeps a transitional `pub use crate::bus as transport;` alias so existing `microsvc::transport::…` paths (incl. tests) keep resolving; call sites and the bus's own upward imports are cleaned in P4b. Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]] Co-Authored-By: Claude Opus 4.8 (1M context) --- src/{microsvc/transport => bus}/bus.rs | 0 src/{microsvc/transport => bus}/capabilities.rs | 0 src/{microsvc/transport => bus}/error.rs | 0 src/{microsvc/transport => bus}/failure_policy.rs | 0 src/{microsvc/transport => bus}/handlers.rs | 0 src/{microsvc/transport => bus}/in_memory_bus.rs | 0 src/{microsvc/transport => bus}/kafka.rs | 0 src/{microsvc/transport => bus}/kafka_bus.rs | 0 src/{microsvc/transport => bus}/knative.rs | 0 src/{microsvc/transport => bus}/knative_bus.rs | 0 src/{microsvc/transport => bus}/message.rs | 0 src/{microsvc/transport => bus}/mod.rs | 0 src/{microsvc/transport => bus}/nats.rs | 0 src/{microsvc/transport => bus}/nats_bus.rs | 0 src/{microsvc/transport => bus}/outbox_dispatch.rs | 0 src/{microsvc/transport => bus}/outbox_source.rs | 0 src/{microsvc/transport => bus}/postgres_bus.rs | 0 src/{microsvc/transport => bus}/publisher.rs | 0 src/{microsvc/transport => bus}/rabbit_bus.rs | 0 src/{microsvc/transport => bus}/rabbitmq.rs | 0 src/{microsvc/transport => bus}/router.rs | 0 src/{microsvc/transport => bus}/run_options.rs | 0 src/{microsvc/transport => bus}/runner.rs | 0 src/{microsvc/transport => bus}/source.rs | 0 src/{microsvc/transport => bus}/stable_id.rs | 0 src/lib.rs | 1 + src/microsvc/mod.rs | 6 ++++-- 27 files changed, 5 insertions(+), 2 deletions(-) rename src/{microsvc/transport => bus}/bus.rs (100%) rename src/{microsvc/transport => bus}/capabilities.rs (100%) rename src/{microsvc/transport => bus}/error.rs (100%) rename src/{microsvc/transport => bus}/failure_policy.rs (100%) rename src/{microsvc/transport => bus}/handlers.rs (100%) rename src/{microsvc/transport => bus}/in_memory_bus.rs (100%) rename src/{microsvc/transport => bus}/kafka.rs (100%) rename src/{microsvc/transport => bus}/kafka_bus.rs (100%) rename src/{microsvc/transport => bus}/knative.rs (100%) rename src/{microsvc/transport => bus}/knative_bus.rs (100%) rename src/{microsvc/transport => bus}/message.rs (100%) rename src/{microsvc/transport => bus}/mod.rs (100%) rename src/{microsvc/transport => bus}/nats.rs (100%) rename src/{microsvc/transport => bus}/nats_bus.rs (100%) rename src/{microsvc/transport => bus}/outbox_dispatch.rs (100%) rename src/{microsvc/transport => bus}/outbox_source.rs (100%) rename src/{microsvc/transport => bus}/postgres_bus.rs (100%) rename src/{microsvc/transport => bus}/publisher.rs (100%) rename src/{microsvc/transport => bus}/rabbit_bus.rs (100%) rename src/{microsvc/transport => bus}/rabbitmq.rs (100%) rename src/{microsvc/transport => bus}/router.rs (100%) rename src/{microsvc/transport => bus}/run_options.rs (100%) rename src/{microsvc/transport => bus}/runner.rs (100%) rename src/{microsvc/transport => bus}/source.rs (100%) rename src/{microsvc/transport => bus}/stable_id.rs (100%) diff --git a/src/microsvc/transport/bus.rs b/src/bus/bus.rs similarity index 100% rename from src/microsvc/transport/bus.rs rename to src/bus/bus.rs diff --git a/src/microsvc/transport/capabilities.rs b/src/bus/capabilities.rs similarity index 100% rename from src/microsvc/transport/capabilities.rs rename to src/bus/capabilities.rs diff --git a/src/microsvc/transport/error.rs b/src/bus/error.rs similarity index 100% rename from src/microsvc/transport/error.rs rename to src/bus/error.rs diff --git a/src/microsvc/transport/failure_policy.rs b/src/bus/failure_policy.rs similarity index 100% rename from src/microsvc/transport/failure_policy.rs rename to src/bus/failure_policy.rs diff --git a/src/microsvc/transport/handlers.rs b/src/bus/handlers.rs similarity index 100% rename from src/microsvc/transport/handlers.rs rename to src/bus/handlers.rs diff --git a/src/microsvc/transport/in_memory_bus.rs b/src/bus/in_memory_bus.rs similarity index 100% rename from src/microsvc/transport/in_memory_bus.rs rename to src/bus/in_memory_bus.rs diff --git a/src/microsvc/transport/kafka.rs b/src/bus/kafka.rs similarity index 100% rename from src/microsvc/transport/kafka.rs rename to src/bus/kafka.rs diff --git a/src/microsvc/transport/kafka_bus.rs b/src/bus/kafka_bus.rs similarity index 100% rename from src/microsvc/transport/kafka_bus.rs rename to src/bus/kafka_bus.rs diff --git a/src/microsvc/transport/knative.rs b/src/bus/knative.rs similarity index 100% rename from src/microsvc/transport/knative.rs rename to src/bus/knative.rs diff --git a/src/microsvc/transport/knative_bus.rs b/src/bus/knative_bus.rs similarity index 100% rename from src/microsvc/transport/knative_bus.rs rename to src/bus/knative_bus.rs diff --git a/src/microsvc/transport/message.rs b/src/bus/message.rs similarity index 100% rename from src/microsvc/transport/message.rs rename to src/bus/message.rs diff --git a/src/microsvc/transport/mod.rs b/src/bus/mod.rs similarity index 100% rename from src/microsvc/transport/mod.rs rename to src/bus/mod.rs diff --git a/src/microsvc/transport/nats.rs b/src/bus/nats.rs similarity index 100% rename from src/microsvc/transport/nats.rs rename to src/bus/nats.rs diff --git a/src/microsvc/transport/nats_bus.rs b/src/bus/nats_bus.rs similarity index 100% rename from src/microsvc/transport/nats_bus.rs rename to src/bus/nats_bus.rs diff --git a/src/microsvc/transport/outbox_dispatch.rs b/src/bus/outbox_dispatch.rs similarity index 100% rename from src/microsvc/transport/outbox_dispatch.rs rename to src/bus/outbox_dispatch.rs diff --git a/src/microsvc/transport/outbox_source.rs b/src/bus/outbox_source.rs similarity index 100% rename from src/microsvc/transport/outbox_source.rs rename to src/bus/outbox_source.rs diff --git a/src/microsvc/transport/postgres_bus.rs b/src/bus/postgres_bus.rs similarity index 100% rename from src/microsvc/transport/postgres_bus.rs rename to src/bus/postgres_bus.rs diff --git a/src/microsvc/transport/publisher.rs b/src/bus/publisher.rs similarity index 100% rename from src/microsvc/transport/publisher.rs rename to src/bus/publisher.rs diff --git a/src/microsvc/transport/rabbit_bus.rs b/src/bus/rabbit_bus.rs similarity index 100% rename from src/microsvc/transport/rabbit_bus.rs rename to src/bus/rabbit_bus.rs diff --git a/src/microsvc/transport/rabbitmq.rs b/src/bus/rabbitmq.rs similarity index 100% rename from src/microsvc/transport/rabbitmq.rs rename to src/bus/rabbitmq.rs diff --git a/src/microsvc/transport/router.rs b/src/bus/router.rs similarity index 100% rename from src/microsvc/transport/router.rs rename to src/bus/router.rs diff --git a/src/microsvc/transport/run_options.rs b/src/bus/run_options.rs similarity index 100% rename from src/microsvc/transport/run_options.rs rename to src/bus/run_options.rs diff --git a/src/microsvc/transport/runner.rs b/src/bus/runner.rs similarity index 100% rename from src/microsvc/transport/runner.rs rename to src/bus/runner.rs diff --git a/src/microsvc/transport/source.rs b/src/bus/source.rs similarity index 100% rename from src/microsvc/transport/source.rs rename to src/bus/source.rs diff --git a/src/microsvc/transport/stable_id.rs b/src/bus/stable_id.rs similarity index 100% rename from src/microsvc/transport/stable_id.rs rename to src/bus/stable_id.rs diff --git a/src/lib.rs b/src/lib.rs index 546c661..4c5d98e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ extern crate self as sourced_rust; pub mod aggregate; +pub mod bus; pub mod entity; pub mod repository; diff --git a/src/microsvc/mod.rs b/src/microsvc/mod.rs index 27117c8..02080e0 100644 --- a/src/microsvc/mod.rs +++ b/src/microsvc/mod.rs @@ -59,7 +59,9 @@ mod error; mod message_router; mod service; mod session; -pub mod transport; +// The bus is now the top-level `crate::bus` module. This transitional alias keeps +// `microsvc::transport::…` paths working; call sites move to `crate::bus` in P4b. +pub use crate::bus as transport; pub use context::Context; pub use dependencies::{ @@ -71,7 +73,7 @@ pub use service::{ CommandRequest, CommandResponse, DeliveryKind, HandlerBuilder, HandlerNames, HandlerSpec, Service, SubscriptionPlan, }; -pub use transport::{Message, MessageKind, PayloadDecodeError}; +pub use crate::bus::{Message, MessageKind, PayloadDecodeError}; pub use session::Session; // HTTP transport (requires "http" feature) From 1d90b9fe04e2cc138b460b5aa2d2279e834c0f78 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sun, 31 May 2026 00:23:29 -0500 Subject: [PATCH 05/14] refactor(bus): move SubscriptionPlan into bus-core message module (P4b/1) SubscriptionPlan is bus vocabulary (consumed by MessageRouter + adapters); move it out of microsvc::service into bus::message. Re-exported from microsvc for source compatibility. Implements [[tasks/bus-decomposition-phase1]] Co-Authored-By: Claude Opus 4.8 (1M context) --- src/bus/message.rs | 9 +++++++++ src/bus/mod.rs | 2 +- src/microsvc/mod.rs | 4 ++-- src/microsvc/service.rs | 11 +---------- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/bus/message.rs b/src/bus/message.rs index 4fa6ebd..a13254e 100644 --- a/src/bus/message.rs +++ b/src/bus/message.rs @@ -15,6 +15,15 @@ pub enum MessageKind { Event, } +/// Transport subscription metadata derived from a router's registered handlers. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct SubscriptionPlan { + /// Command names consumed by point-to-point command transports. + pub commands: Vec, + /// Event names consumed by fan-out event transports. + pub events: Vec, +} + /// Failure decoding a [`Message`] payload. /// /// A bus-core error so [`Message`] stays free of `microsvc::HandlerError`. The diff --git a/src/bus/mod.rs b/src/bus/mod.rs index d5875f5..4949b6b 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -137,7 +137,7 @@ pub use error::{TransportError, TransportErrorKind}; pub use failure_policy::{FailureAction, FailurePolicy}; pub use handlers::{AsyncMessageHandler, Handlers}; pub use in_memory_bus::{InMemoryBus, InMemoryReceived}; -pub use message::{Message, MessageKind, PayloadDecodeError}; +pub use message::{Message, MessageKind, PayloadDecodeError, SubscriptionPlan}; pub use outbox_dispatch::{OutboxDispatchOutcome, OutboxDispatcher, SOURCED_METADATA_PREFIX}; pub use outbox_source::{ OutboxSource, ReceivedOutboxMessage, DEFAULT_OUTBOX_SOURCE_BATCH, DEFAULT_OUTBOX_SOURCE_LEASE, diff --git a/src/microsvc/mod.rs b/src/microsvc/mod.rs index 02080e0..143d834 100644 --- a/src/microsvc/mod.rs +++ b/src/microsvc/mod.rs @@ -71,9 +71,9 @@ pub use dependencies::{ pub use error::HandlerError; pub use service::{ CommandRequest, CommandResponse, DeliveryKind, HandlerBuilder, HandlerNames, HandlerSpec, - Service, SubscriptionPlan, + Service, }; -pub use crate::bus::{Message, MessageKind, PayloadDecodeError}; +pub use crate::bus::{Message, MessageKind, PayloadDecodeError, SubscriptionPlan}; pub use session::Session; // HTTP transport (requires "http" feature) diff --git a/src/microsvc/service.rs b/src/microsvc/service.rs index 0017b72..1412add 100644 --- a/src/microsvc/service.rs +++ b/src/microsvc/service.rs @@ -30,7 +30,7 @@ use super::context::Context; use super::dependencies::{HasReadModelStore, HasRepo, RepoReadModelDependencies}; use super::error::HandlerError; use super::session::Session; -use super::transport::{Message, MessageKind}; +use crate::bus::{Message, MessageKind, SubscriptionPlan}; type GuardFn = dyn Fn(&Context) -> bool + Send + Sync; type HandlerFuture<'a> = Pin> + Send + 'a>>; @@ -134,15 +134,6 @@ impl HandlerSpec { } } -/// Transport subscription metadata derived from registered handlers. -#[derive(Debug, Clone, Default, PartialEq, Eq)] -pub struct SubscriptionPlan { - /// Command names consumed by point-to-point command transports. - pub commands: Vec, - /// Event names consumed by fan-out event transports. - pub events: Vec, -} - /// A registered handler with optional guard. struct RegisteredHandler { guard: Option>>, From c582117c102d40eab32b65348c227dcfe423be98 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sun, 31 May 2026 00:27:35 -0500 Subject: [PATCH 06/14] refactor(bus): move outbox->bus bridge into outbox_worker (P4b/2) OutboxDispatcher/OutboxSource depend on crate::outbox + crate::outbox_worker, so they belong with the worker (which depends up on the bus's publisher/source traits), not in bus core. Re-exported at crate root; the two test crates that used them via microsvc::transport now import from the crate root. Implements [[tasks/bus-decomposition-phase1]] Co-Authored-By: Claude Opus 4.8 (1M context) --- src/bus/mod.rs | 6 ------ src/lib.rs | 4 ++++ src/outbox_worker/mod.rs | 8 ++++++++ src/{bus => outbox_worker}/outbox_dispatch.rs | 8 ++------ src/{bus => outbox_worker}/outbox_source.rs | 6 ++---- tests/postgres_transport/main.rs | 4 ++-- tests/transport_conformance/mod.rs | 5 +++-- 7 files changed, 21 insertions(+), 20 deletions(-) rename src/{bus => outbox_worker}/outbox_dispatch.rs (98%) rename src/{bus => outbox_worker}/outbox_source.rs (98%) diff --git a/src/bus/mod.rs b/src/bus/mod.rs index 4949b6b..54f579e 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -98,8 +98,6 @@ mod knative_bus; mod nats; #[cfg(feature = "nats")] mod nats_bus; -mod outbox_dispatch; -mod outbox_source; #[cfg(feature = "postgres")] mod postgres_bus; mod publisher; @@ -138,10 +136,6 @@ pub use failure_policy::{FailureAction, FailurePolicy}; pub use handlers::{AsyncMessageHandler, Handlers}; pub use in_memory_bus::{InMemoryBus, InMemoryReceived}; pub use message::{Message, MessageKind, PayloadDecodeError, SubscriptionPlan}; -pub use outbox_dispatch::{OutboxDispatchOutcome, OutboxDispatcher, SOURCED_METADATA_PREFIX}; -pub use outbox_source::{ - OutboxSource, ReceivedOutboxMessage, DEFAULT_OUTBOX_SOURCE_BATCH, DEFAULT_OUTBOX_SOURCE_LEASE, -}; #[cfg(feature = "postgres")] pub use postgres_bus::{LogReceived, PostgresBus, QueueReceived}; pub use publisher::AsyncMessagePublisher; diff --git a/src/lib.rs b/src/lib.rs index 4c5d98e..b9b40fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -91,6 +91,10 @@ pub use outbox_worker::{ // LocalEmitterPublisher requires the emitter feature #[cfg(feature = "emitter")] pub use outbox_worker::LocalEmitterPublisher; +pub use outbox_worker::{ + OutboxDispatchOutcome, OutboxDispatcher, OutboxSource, ReceivedOutboxMessage, + DEFAULT_OUTBOX_SOURCE_BATCH, DEFAULT_OUTBOX_SOURCE_LEASE, SOURCED_METADATA_PREFIX, +}; pub use queued_repo::{ // Async WithOpts + unlock traits (async lock manager variant) diff --git a/src/outbox_worker/mod.rs b/src/outbox_worker/mod.rs index bfcd2a1..317ddcf 100644 --- a/src/outbox_worker/mod.rs +++ b/src/outbox_worker/mod.rs @@ -37,6 +37,8 @@ //! } //! ``` +mod outbox_dispatch; +mod outbox_source; mod publisher; mod store; mod worker; @@ -55,3 +57,9 @@ pub use store::{ // Worker pub use worker::{DrainResult, OutboxWorker, ProcessOneResult}; + +// Outbox -> bus bridge (moved out of the bus module; depends up on bus traits). +pub use outbox_dispatch::{OutboxDispatchOutcome, OutboxDispatcher, SOURCED_METADATA_PREFIX}; +pub use outbox_source::{ + OutboxSource, ReceivedOutboxMessage, DEFAULT_OUTBOX_SOURCE_BATCH, DEFAULT_OUTBOX_SOURCE_LEASE, +}; diff --git a/src/bus/outbox_dispatch.rs b/src/outbox_worker/outbox_dispatch.rs similarity index 98% rename from src/bus/outbox_dispatch.rs rename to src/outbox_worker/outbox_dispatch.rs index 1c437ce..132a97e 100644 --- a/src/bus/outbox_dispatch.rs +++ b/src/outbox_worker/outbox_dispatch.rs @@ -12,13 +12,9 @@ use std::time::Duration; -use super::publisher::AsyncMessagePublisher; -use super::TransportError; -use crate::microsvc::{Message, MessageKind}; +use crate::bus::{AsyncMessagePublisher, Message, MessageKind, TransportError}; use crate::outbox::OutboxMessage; -use crate::outbox_worker::{ - AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef, OutboxPublishFailureAction, -}; +use super::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef, OutboxPublishFailureAction}; /// Content type for an outbox payload. Outbox payloads are codec-encoded bytes /// (bitcode or raw), so the media type is binary; the exact codec travels in diff --git a/src/bus/outbox_source.rs b/src/outbox_worker/outbox_source.rs similarity index 98% rename from src/bus/outbox_source.rs rename to src/outbox_worker/outbox_source.rs index de58ebb..e1c2d9e 100644 --- a/src/bus/outbox_source.rs +++ b/src/outbox_worker/outbox_source.rs @@ -18,11 +18,9 @@ use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; -use super::source::{AsyncMessageSource, ReceivedMessage}; -use super::TransportError; -use crate::microsvc::Message; +use crate::bus::{AsyncMessageSource, Message, ReceivedMessage, TransportError}; use crate::outbox::OutboxMessage; -use crate::outbox_worker::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef}; +use super::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef}; /// Default lease held on a claimed row while it is being dispatched. pub const DEFAULT_OUTBOX_SOURCE_LEASE: Duration = Duration::from_secs(30); diff --git a/tests/postgres_transport/main.rs b/tests/postgres_transport/main.rs index 70fb72f..f389e0e 100644 --- a/tests/postgres_transport/main.rs +++ b/tests/postgres_transport/main.rs @@ -13,9 +13,9 @@ use std::sync::{Arc, Mutex}; use serde_json::json; use sourced_rust::microsvc::transport::{ - run_source, AsyncMessageSource, Bus, BusConsumer, OutboxSource, PostgresBus, ReceivedMessage, - RunOptions, + run_source, AsyncMessageSource, Bus, BusConsumer, PostgresBus, ReceivedMessage, RunOptions, }; +use sourced_rust::OutboxSource; use sourced_rust::microsvc::{Context, Message, MessageKind, Service}; use sourced_rust::{ AsyncCommitBatch, AsyncOutboxStore, AsyncTransactionalCommit, OutboxMessage, diff --git a/tests/transport_conformance/mod.rs b/tests/transport_conformance/mod.rs index 5a0a52d..a4f0513 100644 --- a/tests/transport_conformance/mod.rs +++ b/tests/transport_conformance/mod.rs @@ -17,9 +17,10 @@ use std::time::Duration; use serde_json::json; use sourced_rust::microsvc::transport::{ - run_source, AsyncMessagePublisher, AsyncMessageSource, FailurePolicy, OutboxDispatcher, - ReceivedMessage, RunOptions, TransportError, + run_source, AsyncMessagePublisher, AsyncMessageSource, FailurePolicy, ReceivedMessage, + RunOptions, TransportError, }; +use sourced_rust::OutboxDispatcher; use sourced_rust::microsvc::{Context, HandlerError, Message, MessageKind, Service}; use sourced_rust::{ AsyncCommitBatch, AsyncTransactionalCommit, HashMapOutboxStore, HashMapRepository, From dc5fdd62b9e26db99e4f43e9df1e955e148acddd Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sun, 31 May 2026 00:33:09 -0500 Subject: [PATCH 07/14] refactor(bus): flip bus files' Message/MessageKind imports to super (P4b/3) Mechanical: bus production files now reference the canonical types via super:: (crate::bus) instead of crate::microsvc::. Remaining crate::microsvc references in bus are error.rs (B4), knative ingress (B5), and Service-based test modules (B6). Implements [[tasks/bus-decomposition-phase1]] Co-Authored-By: Claude Opus 4.8 (1M context) --- src/bus/handlers.rs | 4 ++-- src/bus/in_memory_bus.rs | 2 +- src/bus/kafka.rs | 2 +- src/bus/kafka_bus.rs | 2 +- src/bus/knative_bus.rs | 2 +- src/bus/nats.rs | 2 +- src/bus/nats_bus.rs | 2 +- src/bus/postgres_bus.rs | 2 +- src/bus/publisher.rs | 2 +- src/bus/rabbit_bus.rs | 2 +- src/bus/rabbitmq.rs | 2 +- src/bus/router.rs | 2 +- src/bus/run_options.rs | 2 +- src/bus/runner.rs | 4 ++-- 14 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/bus/handlers.rs b/src/bus/handlers.rs index 4b19cc4..8b287a3 100644 --- a/src/bus/handlers.rs +++ b/src/bus/handlers.rs @@ -28,7 +28,7 @@ use std::pin::Pin; use std::sync::Arc; use super::{MessageRouter, TransportError}; -use crate::microsvc::{Message, MessageKind, SubscriptionPlan}; +use super::{Message, MessageKind, SubscriptionPlan}; type HandlerFuture<'a> = Pin> + Send + 'a>>; type HandlerFn = dyn for<'a> Fn(&'a Message) -> HandlerFuture<'a> + Send + Sync; @@ -140,7 +140,7 @@ impl MessageRouter for Handlers { #[cfg(test)] mod tests { use super::*; - use crate::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; + use crate::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; use std::future::Future; use std::sync::atomic::{AtomicUsize, Ordering}; diff --git a/src/bus/in_memory_bus.rs b/src/bus/in_memory_bus.rs index 19d6863..7f28474 100644 --- a/src/bus/in_memory_bus.rs +++ b/src/bus/in_memory_bus.rs @@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex}; use super::source::{AsyncMessageSource, ReceivedMessage}; use super::{run_source, Bus, BusConsumer, MessageRouter, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind}; +use super::{Message, MessageKind}; type Queues = Arc>>>; type Topics = Arc>>>; diff --git a/src/bus/kafka.rs b/src/bus/kafka.rs index 1c5db8b..345501b 100644 --- a/src/bus/kafka.rs +++ b/src/bus/kafka.rs @@ -20,7 +20,7 @@ use rdkafka::{Message as KafkaMessageTrait, Offset, TopicPartitionList}; use super::source::{AsyncMessageSource, ReceivedMessage}; use super::{AsyncMessagePublisher, TransportError}; -use crate::microsvc::{Message, MessageKind}; +use super::{Message, MessageKind}; const MESSAGE_ID_HEADER: &str = "x-sourced-id"; const MESSAGE_KIND_HEADER: &str = "x-sourced-kind"; diff --git a/src/bus/kafka_bus.rs b/src/bus/kafka_bus.rs index ea198d0..e450fa7 100644 --- a/src/bus/kafka_bus.rs +++ b/src/bus/kafka_bus.rs @@ -25,7 +25,7 @@ use super::kafka::{KafkaPublisher, KafkaSource}; use super::{ run_source, AsyncMessagePublisher, Bus, BusConsumer, MessageRouter, RunOptions, TransportError, }; -use crate::microsvc::{Message, MessageKind}; +use super::{Message, MessageKind}; const DEFAULT_FETCH_TIMEOUT: Duration = Duration::from_secs(8); diff --git a/src/bus/knative_bus.rs b/src/bus/knative_bus.rs index 07a3c76..a3e31e7 100644 --- a/src/bus/knative_bus.rs +++ b/src/bus/knative_bus.rs @@ -22,7 +22,7 @@ use std::time::Duration; use super::knative::sanitize_k8s_name; use super::{Bus, TransportError}; -use crate::microsvc::{Message, MessageKind, SubscriptionPlan}; +use super::{Message, MessageKind, SubscriptionPlan}; const SEND_TIMEOUT: Duration = Duration::from_secs(10); diff --git a/src/bus/nats.rs b/src/bus/nats.rs index f981996..a4e9b76 100644 --- a/src/bus/nats.rs +++ b/src/bus/nats.rs @@ -20,7 +20,7 @@ use futures::StreamExt; use super::source::{AsyncMessageSource, ReceivedMessage}; use super::{AsyncMessagePublisher, TransportError}; -use crate::microsvc::{Message, MessageKind}; +use super::{Message, MessageKind}; /// Header carrying the stable message id (and JetStream dedup key). const MESSAGE_ID_HEADER: &str = "Nats-Msg-Id"; diff --git a/src/bus/nats_bus.rs b/src/bus/nats_bus.rs index 34de81a..244e4de 100644 --- a/src/bus/nats_bus.rs +++ b/src/bus/nats_bus.rs @@ -29,7 +29,7 @@ use super::nats::{NatsJetStreamSource, NatsPublisher}; use super::{ run_source, AsyncMessagePublisher, Bus, BusConsumer, MessageRouter, RunOptions, TransportError, }; -use crate::microsvc::{Message, MessageKind}; +use super::{Message, MessageKind}; const DEFAULT_FETCH_TIMEOUT: Duration = Duration::from_millis(500); diff --git a/src/bus/postgres_bus.rs b/src/bus/postgres_bus.rs index cc5bf5f..44ae658 100644 --- a/src/bus/postgres_bus.rs +++ b/src/bus/postgres_bus.rs @@ -38,7 +38,7 @@ use sqlx::{PgPool, Row}; use super::source::{AsyncMessageSource, ReceivedMessage}; use super::{run_source, Bus, BusConsumer, MessageRouter, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind}; +use super::{Message, MessageKind}; const DEFAULT_LEASE: Duration = Duration::from_secs(30); diff --git a/src/bus/publisher.rs b/src/bus/publisher.rs index c4ac29b..19bc259 100644 --- a/src/bus/publisher.rs +++ b/src/bus/publisher.rs @@ -57,7 +57,7 @@ pub trait AsyncMessagePublisher: Send + Sync { #[cfg(test)] mod tests { use super::*; - use crate::microsvc::MessageKind; + use crate::bus::MessageKind; use std::future::Future; use std::sync::Mutex; diff --git a/src/bus/rabbit_bus.rs b/src/bus/rabbit_bus.rs index 8c29e8d..bda4614 100644 --- a/src/bus/rabbit_bus.rs +++ b/src/bus/rabbit_bus.rs @@ -32,7 +32,7 @@ use lapin::{Channel, ExchangeKind}; use super::rabbitmq::{connect_channel, message_properties, RabbitReceived}; use super::source::AsyncMessageSource; use super::{run_source, Bus, BusConsumer, MessageRouter, RunOptions, TransportError}; -use crate::microsvc::{Message, MessageKind}; +use super::{Message, MessageKind}; fn retryable(context: &str, err: impl std::fmt::Display) -> TransportError { TransportError::retryable(format!("{context}: {err}")) diff --git a/src/bus/rabbitmq.rs b/src/bus/rabbitmq.rs index 3218392..f9dbbb4 100644 --- a/src/bus/rabbitmq.rs +++ b/src/bus/rabbitmq.rs @@ -20,7 +20,7 @@ use lapin::{BasicProperties, Channel, Connection, ConnectionProperties}; use super::source::{AsyncMessageSource, ReceivedMessage}; use super::{AsyncMessagePublisher, TransportError}; -use crate::microsvc::{Message, MessageKind}; +use super::{Message, MessageKind}; const MESSAGE_KIND_HEADER: &str = "x-sourced-kind"; diff --git a/src/bus/router.rs b/src/bus/router.rs index 8323ed9..4bfec3b 100644 --- a/src/bus/router.rs +++ b/src/bus/router.rs @@ -10,7 +10,7 @@ use std::future::Future; use super::TransportError; -use crate::microsvc::{Message, MessageKind, SubscriptionPlan}; +use super::{Message, MessageKind, SubscriptionPlan}; /// What the receive loop and the consumer adapters need from a message consumer. /// diff --git a/src/bus/run_options.rs b/src/bus/run_options.rs index 3ff1533..9047eec 100644 --- a/src/bus/run_options.rs +++ b/src/bus/run_options.rs @@ -140,7 +140,7 @@ impl RunOptions { #[cfg(test)] mod tests { use super::*; - use crate::microsvc::MessageKind; + use crate::bus::MessageKind; struct FakeInbox { consumer: &'static str, diff --git a/src/bus/runner.rs b/src/bus/runner.rs index 5e70711..cd0cde5 100644 --- a/src/bus/runner.rs +++ b/src/bus/runner.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use super::source::{AsyncMessageSource, ReceivedMessage}; use super::{FailureAction, MessageRouter, RunOptions, TransportError}; -use crate::microsvc::Message; +use super::Message; /// Run the receive loop for a direct transport source. /// @@ -99,7 +99,7 @@ async fn dispatch( #[cfg(test)] mod tests { use super::*; - use crate::microsvc::transport::FailurePolicy; + use crate::bus::FailurePolicy; use crate::microsvc::{HandlerError, MessageKind, Service}; use serde_json::json; use std::collections::VecDeque; From 29609a73876e91de665398e1c66930c9442abab5 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sun, 31 May 2026 00:40:51 -0500 Subject: [PATCH 08/14] refactor(bus): invert error coupling + split Knative ingress (P4b/4) - bus/error.rs no longer names HandlerError: the From conversion and classification move to microsvc (HandlerError::transport_error_kind); From for TransportError moves to the outbox bridge (which knows both types). Bus core's error.rs is now microsvc-free. - Knative split: the Service-coupled HTTP ingress (cloud_events_router / ingress_handler / CloudEvent parsing) moves to microsvc::knative_ingress; bus/knative.rs keeps only the Message/SubscriptionPlan-only manifest helpers (knative_triggers, sanitize_k8s_name). Two test crates updated. 229 lib tests green; cargo check --all-features clean. Implements [[tasks/bus-decomposition-phase1]] Co-Authored-By: Claude Opus 4.8 (1M context) --- src/bus/error.rs | 107 ------------ src/bus/knative.rs | 246 +------------------------- src/bus/mod.rs | 2 +- src/microsvc/error.rs | 72 +++++++- src/microsvc/knative_ingress.rs | 250 +++++++++++++++++++++++++++ src/microsvc/mod.rs | 7 + src/outbox_worker/outbox_dispatch.rs | 13 +- tests/distributed_read_model/main.rs | 3 +- tests/knative_cloudevents/main.rs | 3 +- 9 files changed, 351 insertions(+), 352 deletions(-) create mode 100644 src/microsvc/knative_ingress.rs diff --git a/src/bus/error.rs b/src/bus/error.rs index ca6a9eb..f1bdb0c 100644 --- a/src/bus/error.rs +++ b/src/bus/error.rs @@ -9,8 +9,6 @@ use std::error::Error; use std::fmt; -use crate::microsvc::HandlerError; - /// Whether a [`TransportError`] should be retried. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub enum TransportErrorKind { @@ -103,40 +101,6 @@ impl TransportError { pub fn message(&self) -> &str { &self.message } - - /// Classify a [`HandlerError`] for transport retry purposes. - /// - /// Transient failures are retryable: repository errors, not-found results, - /// and otherwise-unclassified errors. Not-found is transient in an - /// at-least-once event-driven system because an out-of-order delivery can - /// reference an aggregate or projection that has not been created yet, and a - /// later redelivery resolves the race. (This also keeps a handler-raised - /// [`HandlerError::NotFound`] consistent with a repository-raised - /// [`RepositoryError::NotFound`](crate::RepositoryError), which arrives - /// wrapped in `Repository(_)`.) - /// - /// Deterministic failures — unknown routing, payload decode failures, - /// business rejections, authorization, and guard rejections — are permanent - /// because redelivering the identical message cannot change the outcome. An - /// entity that is genuinely missing forever still drains to the failure - /// policy once the transport's retry ceiling is reached, so defaulting - /// not-found to retryable does not risk an unbounded loop. - /// - /// This is a default. Handlers that know better can return a classification - /// directly, and the runner's [`FailurePolicy`](super::FailurePolicy) can - /// override what happens to a permanent error. - pub fn classify_handler_error(error: &HandlerError) -> TransportErrorKind { - match error { - HandlerError::Repository(_) | HandlerError::NotFound(_) | HandlerError::Other(_) => { - TransportErrorKind::Retryable - } - HandlerError::UnknownCommand(_) - | HandlerError::DecodeFailed(_) - | HandlerError::Rejected(_) - | HandlerError::Unauthorized(_) - | HandlerError::GuardRejected(_) => TransportErrorKind::Permanent, - } - } } impl fmt::Display for TransportError { @@ -157,30 +121,6 @@ impl Error for TransportError { } } -impl From for TransportError { - fn from(error: HandlerError) -> Self { - let kind = Self::classify_handler_error(&error); - Self { - kind, - message: error.to_string(), - source: Some(Box::new(error)), - } - } -} - -impl From for TransportError { - /// Repository/store failures (lock contention, storage hiccups, stale-claim - /// conflicts) are treated as retryable: they are usually transient, and a - /// later redelivery or re-claim resolves them. - fn from(error: crate::RepositoryError) -> Self { - Self { - kind: TransportErrorKind::Retryable, - message: error.to_string(), - source: Some(Box::new(error)), - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -218,51 +158,4 @@ mod tests { assert_eq!(err.message(), "publish timed out"); } - #[test] - fn transient_handler_errors_are_retryable() { - use crate::repository::RepositoryError; - - for error in [ - HandlerError::Repository(RepositoryError::NotFound { id: "agg-1".into() }), - // A handler-raised not-found is transient too: it is usually an - // out-of-order delivery race, and stays consistent with the - // repository-raised not-found above. - HandlerError::NotFound("agg-1".into()), - HandlerError::Other("boom".into()), - ] { - assert_eq!( - TransportError::classify_handler_error(&error), - TransportErrorKind::Retryable, - "{error} should be retryable" - ); - } - } - - #[test] - fn deterministic_handler_errors_are_permanent() { - for error in [ - HandlerError::UnknownCommand("x".into()), - HandlerError::DecodeFailed("x".into()), - HandlerError::Rejected("x".into()), - HandlerError::Unauthorized("x".into()), - HandlerError::GuardRejected("x".into()), - ] { - assert_eq!( - TransportError::classify_handler_error(&error), - TransportErrorKind::Permanent, - "{error} should be permanent" - ); - } - } - - #[test] - fn from_handler_error_preserves_classification_and_source() { - let err: TransportError = HandlerError::Rejected("invalid".into()).into(); - assert!(err.is_permanent()); - assert!(err.source().is_some()); - - let err: TransportError = - HandlerError::Other(Box::::from("infra")).into(); - assert!(err.is_retryable()); - } } diff --git a/src/bus/knative.rs b/src/bus/knative.rs index ca66e4e..37e6355 100644 --- a/src/bus/knative.rs +++ b/src/bus/knative.rs @@ -1,192 +1,13 @@ -//! Knative / CloudEvents HTTP ingress. +//! Knative manifest helpers: render `Trigger` YAML and sanitize Kubernetes names. //! -//! Knative is *endpoint-driven*: the platform invokes an HTTP route, so this is -//! a separate ingress shape from the pull-based [`AsyncMessageSource`]. It is -//! NOT modeled as a polling source. The route parses a CloudEvent (binary or -//! structured HTTP mode), maps it to the canonical [`Message`], and calls the -//! same [`Service::dispatch_message`] boundary the direct-transport runner uses. -//! -//! Acknowledgement is the HTTP response: -//! -//! - handler success → `200 OK` (the only ack); -//! - retryable failure → `503` so Knative redelivers per its Delivery config; -//! - permanent failure → `422` so Knative stops retrying / dead-letters per its -//! Delivery config. -//! -//! Retry, backoff, and dead-lettering are **platform-managed** by Knative -//! Eventing here — unlike direct transports, where this crate's -//! [`FailurePolicy`](super::FailurePolicy) owns them. +//! The CloudEvents HTTP *ingress* (the consume side) is `Service`-coupled and +//! lives on the microsvc side (`microsvc::cloud_events_router`); this module keeps +//! only the produce/manifest helpers, which need just the bus vocabulary +//! ([`SubscriptionPlan`]). Used by [`KnativeBus`](super::KnativeBus). //! //! Requires the `http` feature. -use std::sync::Arc; - -use axum::body::Bytes; -use axum::extract::State; -use axum::http::{HeaderMap, StatusCode}; -use axum::response::{IntoResponse, Response}; -use axum::{Json, Router}; -use base64::Engine; -use serde_json::{json, Value}; - -use super::TransportError; -use crate::microsvc::{Message, MessageKind, Service, SubscriptionPlan}; - -const STRUCTURED_CONTENT_TYPE: &str = "application/cloudevents+json"; - -/// Build an axum router exposing a CloudEvents ingress at `POST /` and the -/// per-type route `POST /cloudevent/:type`. -/// -/// Both dispatch by the parsed CloudEvent `type` (the path segment is for routing -/// alignment only), so a Knative Trigger can target either a single shared `ref` -/// (`/`) or the per-type subscriber URI [`KnativeBus`](super::KnativeBus) emits -/// (`/cloudevent/`). Compose it with other routes or serve it directly. -pub fn cloud_events_router(service: Arc>) -> Router { - Router::new() - .route("/", axum::routing::post(ingress_handler)) - .route("/cloudevent/:type", axum::routing::post(ingress_handler)) - .with_state(service) -} - -async fn ingress_handler( - State(service): State>>, - headers: HeaderMap, - body: Bytes, -) -> Response { - let message = match parse_cloud_event(&headers, &body) { - Ok(message) => message, - Err(reason) => return (StatusCode::BAD_REQUEST, reason).into_response(), - }; - - match service.dispatch_message(&message).await { - Ok(value) => (StatusCode::OK, Json(value)).into_response(), - Err(err) => { - // Map our retryable/permanent classification onto HTTP so Knative's - // platform-managed retry/DLQ does the right thing. - let status = if TransportError::classify_handler_error(&err).is_retryable() { - StatusCode::SERVICE_UNAVAILABLE - } else { - StatusCode::UNPROCESSABLE_ENTITY - }; - (status, Json(json!({ "error": err.to_string() }))).into_response() - } - } -} - -fn header<'a>(headers: &'a HeaderMap, name: &str) -> Option<&'a str> { - headers.get(name).and_then(|value| value.to_str().ok()) -} - -/// Parse a CloudEvent in binary or structured HTTP mode into a [`Message`]. -fn parse_cloud_event(headers: &HeaderMap, body: &Bytes) -> Result { - let content_type = header(headers, "content-type").unwrap_or(""); - if content_type.starts_with(STRUCTURED_CONTENT_TYPE) { - parse_structured(body) - } else { - parse_binary(headers, body) - } -} - -/// Binary mode: attributes are `ce-*` headers, the body is the data. -fn parse_binary(headers: &HeaderMap, body: &Bytes) -> Result { - let id = header(headers, "ce-id").ok_or("missing ce-id header")?; - let name = header(headers, "ce-type").ok_or("missing ce-type header")?; - let content_type = header(headers, "content-type") - .unwrap_or("application/json") - .to_string(); - - let mut metadata = Vec::new(); - for (key, value) in headers.iter() { - let key = key.as_str(); - if let Some(attr) = key.strip_prefix("ce-") { - if attr == "id" || attr == "type" { - continue; - } - if let Ok(value) = value.to_str() { - metadata.push((attr.to_string(), value.to_string())); - } - } - } - - Ok(Message { - id: Some(id.to_string()), - name: name.to_string(), - kind: MessageKind::Event, - payload: body.to_vec(), - content_type, - metadata, - }) -} - -/// Structured mode: a single `application/cloudevents+json` document. -fn parse_structured(body: &Bytes) -> Result { - let event: Value = - serde_json::from_slice(body).map_err(|e| format!("invalid cloudevents+json: {e}"))?; - let object = event - .as_object() - .ok_or("cloudevent must be a JSON object")?; - - let id = object - .get("id") - .and_then(Value::as_str) - .ok_or("missing cloudevent id")? - .to_string(); - let name = object - .get("type") - .and_then(Value::as_str) - .ok_or("missing cloudevent type")? - .to_string(); - let content_type = object - .get("datacontenttype") - .and_then(Value::as_str) - .unwrap_or("application/json") - .to_string(); - - // Data: `data` (any JSON; objects/arrays re-serialized to bytes) or - // `data_base64` (binary). - let payload = if let Some(data) = object.get("data") { - match data { - Value::String(s) => s.clone().into_bytes(), - other => serde_json::to_vec(other).map_err(|e| format!("invalid data: {e}"))?, - } - } else if let Some(Value::String(b64)) = object.get("data_base64") { - base64::engine::general_purpose::STANDARD - .decode(b64) - .map_err(|e| format!("invalid data_base64: {e}"))? - } else { - Vec::new() - }; - - // Remaining attributes (source, subject, extensions) become metadata. - let reserved = [ - "specversion", - "id", - "type", - "datacontenttype", - "data", - "data_base64", - ]; - let mut metadata = Vec::new(); - for (key, value) in object { - if reserved.contains(&key.as_str()) { - continue; - } - let value = match value { - Value::String(s) => s.clone(), - other => other.to_string(), - }; - metadata.push((key.clone(), value)); - } - - Ok(Message { - id: Some(id), - name, - kind: MessageKind::Event, - payload, - content_type, - metadata, - }) -} +use super::SubscriptionPlan; /// Sanitize a string into an RFC 1123 DNS label usable as a Kubernetes resource /// name: lowercase, ASCII-alphanumeric or `-`, no leading/trailing `-`, ≤63 @@ -244,61 +65,6 @@ pub fn knative_triggers(plan: &SubscriptionPlan, broker: &str, subscriber_servic #[cfg(test)] mod tests { use super::*; - use axum::http::HeaderValue; - - fn headers(pairs: &[(&str, &str)]) -> HeaderMap { - let mut map = HeaderMap::new(); - for (k, v) in pairs { - map.insert( - axum::http::HeaderName::from_bytes(k.as_bytes()).unwrap(), - HeaderValue::from_str(v).unwrap(), - ); - } - map - } - - #[test] - fn parses_binary_cloud_event() { - let h = headers(&[ - ("ce-id", "evt-1"), - ("ce-type", "order.created"), - ("ce-source", "/orders"), - ("content-type", "application/json"), - ]); - let body = Bytes::from_static(br#"{"order":"o1"}"#); - let message = parse_cloud_event(&h, &body).unwrap(); - assert_eq!(message.id(), Some("evt-1")); - assert_eq!(message.name(), "order.created"); - assert_eq!(message.payload(), br#"{"order":"o1"}"#); - assert_eq!(message.metadata("source"), Some("/orders")); - } - - #[test] - fn parses_structured_cloud_event() { - let h = headers(&[("content-type", "application/cloudevents+json")]); - let body = Bytes::from( - json!({ - "specversion": "1.0", - "id": "evt-2", - "type": "order.created", - "source": "/orders", - "datacontenttype": "application/json", - "data": {"order": "o2"}, - }) - .to_string(), - ); - let message = parse_cloud_event(&h, &body).unwrap(); - assert_eq!(message.id(), Some("evt-2")); - assert_eq!(message.name(), "order.created"); - assert_eq!(message.payload(), br#"{"order":"o2"}"#); - assert_eq!(message.metadata("source"), Some("/orders")); - } - - #[test] - fn missing_id_is_rejected() { - let h = headers(&[("ce-type", "order.created")]); - assert!(parse_cloud_event(&h, &Bytes::new()).is_err()); - } #[test] fn triggers_render_from_subscription_plan() { diff --git a/src/bus/mod.rs b/src/bus/mod.rs index 54f579e..dd2fe96 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -116,7 +116,7 @@ pub use kafka::{KafkaPublisher, KafkaReceived, KafkaSource}; #[cfg(feature = "kafka")] pub use kafka_bus::KafkaBus; #[cfg(feature = "http")] -pub use knative::{cloud_events_router, knative_triggers}; +pub use knative::knative_triggers; #[cfg(feature = "http")] pub use knative_bus::KnativeBus; #[cfg(feature = "nats")] diff --git a/src/microsvc/error.rs b/src/microsvc/error.rs index e769ebb..410a418 100644 --- a/src/microsvc/error.rs +++ b/src/microsvc/error.rs @@ -3,7 +3,7 @@ use std::error::Error; use std::fmt; -use crate::microsvc::transport::PayloadDecodeError; +use crate::bus::{PayloadDecodeError, TransportError, TransportErrorKind}; use crate::{repository::RepositoryError, EventRecordError}; /// Error type for command handler operations. @@ -92,4 +92,74 @@ impl HandlerError { HandlerError::Other(_) => 500, } } + + /// Classify this error for transport retry purposes (retryable vs permanent). + /// + /// Transient failures (repository errors, not-found, otherwise-unclassified) + /// are retryable: in an at-least-once event-driven system a not-found is + /// usually an out-of-order delivery race a later redelivery resolves. + /// Deterministic failures (unknown routing, decode, rejection, auth, guard) + /// are permanent — redelivering the identical message cannot change them. + pub(crate) fn transport_error_kind(&self) -> TransportErrorKind { + match self { + HandlerError::Repository(_) | HandlerError::NotFound(_) | HandlerError::Other(_) => { + TransportErrorKind::Retryable + } + HandlerError::UnknownCommand(_) + | HandlerError::DecodeFailed(_) + | HandlerError::Rejected(_) + | HandlerError::Unauthorized(_) + | HandlerError::GuardRejected(_) => TransportErrorKind::Permanent, + } + } +} + +/// Classify and convert a handler error into the transport's retryable/permanent +/// vocabulary. This lives on the microsvc side (not the bus) so the bus core does +/// not depend on `HandlerError`. +impl From for TransportError { + fn from(error: HandlerError) -> Self { + let kind = error.transport_error_kind(); + TransportError::new(kind, error.to_string()).with_source(error) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn transient_handler_errors_are_retryable() { + for error in [ + HandlerError::Repository(RepositoryError::NotFound { id: "agg-1".into() }), + HandlerError::NotFound("agg-1".into()), + HandlerError::Other("boom".into()), + ] { + assert_eq!(error.transport_error_kind(), TransportErrorKind::Retryable); + } + } + + #[test] + fn deterministic_handler_errors_are_permanent() { + for error in [ + HandlerError::UnknownCommand("x".into()), + HandlerError::DecodeFailed("x".into()), + HandlerError::Rejected("x".into()), + HandlerError::Unauthorized("x".into()), + HandlerError::GuardRejected("x".into()), + ] { + assert_eq!(error.transport_error_kind(), TransportErrorKind::Permanent); + } + } + + #[test] + fn from_handler_error_preserves_classification_and_source() { + let err: TransportError = HandlerError::Rejected("invalid".into()).into(); + assert!(err.is_permanent()); + assert!(err.source().is_some()); + + let err: TransportError = + HandlerError::Other(Box::::from("infra")).into(); + assert!(err.is_retryable()); + } } diff --git a/src/microsvc/knative_ingress.rs b/src/microsvc/knative_ingress.rs new file mode 100644 index 0000000..0a58016 --- /dev/null +++ b/src/microsvc/knative_ingress.rs @@ -0,0 +1,250 @@ +//! Knative / CloudEvents HTTP ingress (microsvc side). +//! +//! Knative is *endpoint-driven*: the platform invokes an HTTP route, so this is +//! a separate ingress shape from the pull-based bus sources. The route parses a +//! CloudEvent (binary or structured HTTP mode), maps it to the canonical +//! [`Message`], and calls the same [`Service::dispatch_message`] boundary the +//! direct-transport runner uses. +//! +//! This lives on the **microsvc side**, not in the bus crate, because it is +//! intrinsically `Service`-coupled: it serializes the handler's returned `Value` +//! into the HTTP body and classifies a `HandlerError` to pick the status code — +//! neither of which the bus's `MessageRouter::dispatch` (unit-returning) exposes. +//! +//! Acknowledgement is the HTTP response: +//! +//! - handler success → `200 OK` (the only ack); +//! - retryable failure → `503` so Knative redelivers per its Delivery config; +//! - permanent failure → `422` so Knative stops retrying / dead-letters. +//! +//! Retry, backoff, and dead-lettering are **platform-managed** by Knative +//! Eventing here — unlike direct transports, where the bus `FailurePolicy` owns +//! them. Requires the `http` feature. + +use std::sync::Arc; + +use axum::body::Bytes; +use axum::extract::State; +use axum::http::{HeaderMap, StatusCode}; +use axum::response::{IntoResponse, Response}; +use axum::{Json, Router}; +use base64::Engine; +use serde_json::{json, Value}; + +use crate::bus::{Message, MessageKind}; +use crate::microsvc::Service; + +const STRUCTURED_CONTENT_TYPE: &str = "application/cloudevents+json"; + +/// Build an axum router exposing a CloudEvents ingress at `POST /` and the +/// per-type route `POST /cloudevent/:type`. +/// +/// Both dispatch by the parsed CloudEvent `type` (the path segment is for routing +/// alignment only), so a Knative Trigger can target either a single shared `ref` +/// (`/`) or the per-type subscriber URI `KnativeBus` emits (`/cloudevent/`). +pub fn cloud_events_router(service: Arc>) -> Router { + Router::new() + .route("/", axum::routing::post(ingress_handler)) + .route("/cloudevent/:type", axum::routing::post(ingress_handler)) + .with_state(service) +} + +async fn ingress_handler( + State(service): State>>, + headers: HeaderMap, + body: Bytes, +) -> Response { + let message = match parse_cloud_event(&headers, &body) { + Ok(message) => message, + Err(reason) => return (StatusCode::BAD_REQUEST, reason).into_response(), + }; + + match service.dispatch_message(&message).await { + Ok(value) => (StatusCode::OK, Json(value)).into_response(), + Err(err) => { + // Map the retryable/permanent classification onto HTTP so Knative's + // platform-managed retry/DLQ does the right thing. + let status = if err.transport_error_kind().is_retryable() { + StatusCode::SERVICE_UNAVAILABLE + } else { + StatusCode::UNPROCESSABLE_ENTITY + }; + (status, Json(json!({ "error": err.to_string() }))).into_response() + } + } +} + +fn header<'a>(headers: &'a HeaderMap, name: &str) -> Option<&'a str> { + headers.get(name).and_then(|value| value.to_str().ok()) +} + +/// Parse a CloudEvent in binary or structured HTTP mode into a [`Message`]. +fn parse_cloud_event(headers: &HeaderMap, body: &Bytes) -> Result { + let content_type = header(headers, "content-type").unwrap_or(""); + if content_type.starts_with(STRUCTURED_CONTENT_TYPE) { + parse_structured(body) + } else { + parse_binary(headers, body) + } +} + +/// Binary mode: attributes are `ce-*` headers, the body is the data. +fn parse_binary(headers: &HeaderMap, body: &Bytes) -> Result { + let id = header(headers, "ce-id").ok_or("missing ce-id header")?; + let name = header(headers, "ce-type").ok_or("missing ce-type header")?; + let content_type = header(headers, "content-type") + .unwrap_or("application/json") + .to_string(); + + let mut metadata = Vec::new(); + for (key, value) in headers.iter() { + let key = key.as_str(); + if let Some(attr) = key.strip_prefix("ce-") { + if attr == "id" || attr == "type" { + continue; + } + if let Ok(value) = value.to_str() { + metadata.push((attr.to_string(), value.to_string())); + } + } + } + + Ok(Message { + id: Some(id.to_string()), + name: name.to_string(), + kind: MessageKind::Event, + payload: body.to_vec(), + content_type, + metadata, + }) +} + +/// Structured mode: a single `application/cloudevents+json` document. +fn parse_structured(body: &Bytes) -> Result { + let event: Value = + serde_json::from_slice(body).map_err(|e| format!("invalid cloudevents+json: {e}"))?; + let object = event + .as_object() + .ok_or("cloudevent must be a JSON object")?; + + let id = object + .get("id") + .and_then(Value::as_str) + .ok_or("missing cloudevent id")? + .to_string(); + let name = object + .get("type") + .and_then(Value::as_str) + .ok_or("missing cloudevent type")? + .to_string(); + let content_type = object + .get("datacontenttype") + .and_then(Value::as_str) + .unwrap_or("application/json") + .to_string(); + + // Data: `data` (any JSON; objects/arrays re-serialized to bytes) or + // `data_base64` (binary). + let payload = if let Some(data) = object.get("data") { + match data { + Value::String(s) => s.clone().into_bytes(), + other => serde_json::to_vec(other).map_err(|e| format!("invalid data: {e}"))?, + } + } else if let Some(Value::String(b64)) = object.get("data_base64") { + base64::engine::general_purpose::STANDARD + .decode(b64) + .map_err(|e| format!("invalid data_base64: {e}"))? + } else { + Vec::new() + }; + + // Remaining attributes (source, subject, extensions) become metadata. + let reserved = [ + "specversion", + "id", + "type", + "datacontenttype", + "data", + "data_base64", + ]; + let mut metadata = Vec::new(); + for (key, value) in object { + if reserved.contains(&key.as_str()) { + continue; + } + let value = match value { + Value::String(s) => s.clone(), + other => other.to_string(), + }; + metadata.push((key.clone(), value)); + } + + Ok(Message { + id: Some(id), + name, + kind: MessageKind::Event, + payload, + content_type, + metadata, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::http::HeaderValue; + + fn headers(pairs: &[(&str, &str)]) -> HeaderMap { + let mut map = HeaderMap::new(); + for (k, v) in pairs { + map.insert( + axum::http::HeaderName::from_bytes(k.as_bytes()).unwrap(), + HeaderValue::from_str(v).unwrap(), + ); + } + map + } + + #[test] + fn parses_binary_cloud_event() { + let h = headers(&[ + ("ce-id", "evt-1"), + ("ce-type", "order.created"), + ("ce-source", "/orders"), + ("content-type", "application/json"), + ]); + let body = Bytes::from_static(br#"{"order":"o1"}"#); + let message = parse_cloud_event(&h, &body).unwrap(); + assert_eq!(message.id(), Some("evt-1")); + assert_eq!(message.name(), "order.created"); + assert_eq!(message.payload(), br#"{"order":"o1"}"#); + assert_eq!(message.metadata("source"), Some("/orders")); + } + + #[test] + fn parses_structured_cloud_event() { + let h = headers(&[("content-type", "application/cloudevents+json")]); + let body = Bytes::from( + json!({ + "specversion": "1.0", + "id": "evt-2", + "type": "order.created", + "source": "/orders", + "datacontenttype": "application/json", + "data": {"order": "o2"}, + }) + .to_string(), + ); + let message = parse_cloud_event(&h, &body).unwrap(); + assert_eq!(message.id(), Some("evt-2")); + assert_eq!(message.name(), "order.created"); + assert_eq!(message.payload(), br#"{"order":"o2"}"#); + assert_eq!(message.metadata("source"), Some("/orders")); + } + + #[test] + fn missing_id_is_rejected() { + let h = headers(&[("ce-type", "order.created")]); + assert!(parse_cloud_event(&h, &Bytes::new()).is_err()); + } +} diff --git a/src/microsvc/mod.rs b/src/microsvc/mod.rs index 143d834..15ce293 100644 --- a/src/microsvc/mod.rs +++ b/src/microsvc/mod.rs @@ -82,6 +82,13 @@ mod http; #[cfg(feature = "http")] pub use http::{router, serve}; +// Knative / CloudEvents HTTP ingress (Service-coupled; the bus keeps only the +// produce/manifest helpers). Requires the "http" feature. +#[cfg(feature = "http")] +mod knative_ingress; +#[cfg(feature = "http")] +pub use knative_ingress::cloud_events_router; + // gRPC transport (requires "grpc" feature) #[cfg(feature = "grpc")] pub mod grpc; diff --git a/src/outbox_worker/outbox_dispatch.rs b/src/outbox_worker/outbox_dispatch.rs index 132a97e..acb0048 100644 --- a/src/outbox_worker/outbox_dispatch.rs +++ b/src/outbox_worker/outbox_dispatch.rs @@ -12,10 +12,21 @@ use std::time::Duration; -use crate::bus::{AsyncMessagePublisher, Message, MessageKind, TransportError}; +use crate::bus::{AsyncMessagePublisher, Message, MessageKind, TransportError, TransportErrorKind}; use crate::outbox::OutboxMessage; +use crate::repository::RepositoryError; use super::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef, OutboxPublishFailureAction}; +/// Repository/store failures (lock contention, storage hiccups, stale-claim +/// conflicts) are retryable: usually transient, resolved by a later re-claim. This +/// conversion lives with the outbox bridge — which legitimately knows both the +/// store and the bus — so bus core stays free of `RepositoryError`. +impl From for TransportError { + fn from(error: RepositoryError) -> Self { + TransportError::new(TransportErrorKind::Retryable, error.to_string()).with_source(error) + } +} + /// Content type for an outbox payload. Outbox payloads are codec-encoded bytes /// (bitcode or raw), so the media type is binary; the exact codec travels in /// metadata under [`SOURCED_METADATA_PREFIX`] for the consumer. diff --git a/tests/distributed_read_model/main.rs b/tests/distributed_read_model/main.rs index 8605307..e394cf1 100644 --- a/tests/distributed_read_model/main.rs +++ b/tests/distributed_read_model/main.rs @@ -1007,7 +1007,8 @@ where + Sync + 'static, { - use sourced_rust::microsvc::transport::{cloud_events_router, KnativeBus}; + use sourced_rust::microsvc::cloud_events_router; + use sourced_rust::microsvc::transport::KnativeBus; let (collector, collected) = build_collector(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0") diff --git a/tests/knative_cloudevents/main.rs b/tests/knative_cloudevents/main.rs index 74a1958..3b2d01a 100644 --- a/tests/knative_cloudevents/main.rs +++ b/tests/knative_cloudevents/main.rs @@ -8,7 +8,8 @@ use std::sync::{Arc, Mutex}; use serde_json::json; -use sourced_rust::microsvc::transport::{cloud_events_router, Bus, KnativeBus}; +use sourced_rust::microsvc::cloud_events_router; +use sourced_rust::microsvc::transport::{Bus, KnativeBus}; use sourced_rust::microsvc::{ Context, HandlerError, Message, MessageKind, Service, SubscriptionPlan, }; From b1608535f762ef3744ffc97375f7430d31a9d9d4 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sun, 31 May 2026 12:47:29 -0500 Subject: [PATCH 09/14] refactor(bus): make bus tests Service-free; bus is now microsvc-free (P4b/5) - Rewrite the runner and in_memory_bus unit tests to register handlers via the dependency-free Handlers builder instead of microsvc::Service, so the bus has zero crate::microsvc references (production AND tests). - Fix the rabbitmq_transport test to pass &Service (ensure_subscription now takes &impl MessageRouter, which doesn't deref-coerce &Arc). - cargo fmt. src/bus/ now imports nothing from microsvc. 229 lib + 493 default integration tests green; cargo test --all-features --no-run clean. Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]] Co-Authored-By: Claude Opus 4.8 (1M context) --- src/bus/error.rs | 1 - src/bus/handlers.rs | 24 ++++++------ src/bus/in_memory_bus.rs | 58 ++++++++++++---------------- src/bus/mod.rs | 4 +- src/bus/runner.rs | 37 ++++++++---------- src/microsvc/context.rs | 2 +- src/microsvc/mod.rs | 2 +- src/outbox_worker/outbox_dispatch.rs | 2 +- src/outbox_worker/outbox_source.rs | 2 +- tests/postgres_transport/main.rs | 2 +- tests/rabbitmq_transport/main.rs | 4 +- tests/transport_conformance/mod.rs | 2 +- 12 files changed, 64 insertions(+), 76 deletions(-) diff --git a/src/bus/error.rs b/src/bus/error.rs index f1bdb0c..4e509b2 100644 --- a/src/bus/error.rs +++ b/src/bus/error.rs @@ -157,5 +157,4 @@ mod tests { assert!(err.source().is_some()); assert_eq!(err.message(), "publish timed out"); } - } diff --git a/src/bus/handlers.rs b/src/bus/handlers.rs index 8b287a3..62e516e 100644 --- a/src/bus/handlers.rs +++ b/src/bus/handlers.rs @@ -27,8 +27,8 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use super::{MessageRouter, TransportError}; use super::{Message, MessageKind, SubscriptionPlan}; +use super::{MessageRouter, TransportError}; type HandlerFuture<'a> = Pin> + Send + 'a>>; type HandlerFn = dyn for<'a> Fn(&'a Message) -> HandlerFuture<'a> + Send + Sync; @@ -186,17 +186,19 @@ mod tests { } let count = Arc::new(AtomicUsize::new(0)); let seen = count.clone(); - let handlers = Arc::new(Handlers::new().on_event("seat.reserved", move |msg: &Message| { - let seen = seen.clone(); - // Touch the message to prove the borrow is usable in the handler. - let is_event = matches!(msg.kind, MessageKind::Event); - async move { - if is_event { - seen.fetch_add(1, Ordering::SeqCst); + let handlers = Arc::new( + Handlers::new().on_event("seat.reserved", move |msg: &Message| { + let seen = seen.clone(); + // Touch the message to prove the borrow is usable in the handler. + let is_event = matches!(msg.kind, MessageKind::Event); + async move { + if is_event { + seen.fetch_add(1, Ordering::SeqCst); + } + Ok(()) } - Ok(()) - } - })); + }), + ); block_on(bus.subscribe(handlers, RunOptions::idempotent())).unwrap(); assert_eq!(count.load(Ordering::SeqCst), 3); } diff --git a/src/bus/in_memory_bus.rs b/src/bus/in_memory_bus.rs index 7f28474..d47c3d9 100644 --- a/src/bus/in_memory_bus.rs +++ b/src/bus/in_memory_bus.rs @@ -176,8 +176,7 @@ impl ReceivedMessage for InMemoryReceived { #[cfg(test)] mod tests { use super::*; - use crate::microsvc::{HandlerError, Service}; - use serde_json::json; + use crate::bus::Handlers; use std::future::Future; fn block_on(future: F) -> F::Output { @@ -203,32 +202,26 @@ mod tests { Arc::new(Mutex::new(Vec::new())) } - fn command_service(rec: Arc>>) -> Arc> { - Arc::new(Service::new(()).command("work").handle( - move |ctx: &crate::microsvc::Context<()>| { - let rec = rec.clone(); - let name = ctx.message().name().to_string(); - async move { - rec.lock().unwrap().push(name); - Ok(json!({})) - } - }, - )) + fn command_service(rec: Arc>>) -> Arc { + Arc::new(Handlers::new().on_command("work", move |msg: &Message| { + let rec = rec.clone(); + let name = msg.name().to_string(); + async move { + rec.lock().unwrap().push(name); + Ok(()) + } + })) } - fn event_service(rec: Arc>>) -> Arc> { - Arc::new( - Service::new(()) - .event("evt") - .handle(move |ctx: &crate::microsvc::Context<()>| { - let rec = rec.clone(); - let id = ctx.message().id().unwrap_or("?").to_string(); - async move { - rec.lock().unwrap().push(id); - Ok(json!({})) - } - }), - ) + fn event_service(rec: Arc>>) -> Arc { + Arc::new(Handlers::new().on_event("evt", move |msg: &Message| { + let rec = rec.clone(); + let id = msg.id().unwrap_or("?").to_string(); + async move { + rec.lock().unwrap().push(id); + Ok(()) + } + })) } #[test] @@ -322,14 +315,13 @@ mod tests { fn handler_error_does_not_panic_the_loop() { let bus = InMemoryBus::new(); block_on(bus.send("work", b"{}".to_vec())).unwrap(); - let service: Arc> = - Arc::new(Service::new(()).command("work").handle( - |_: &crate::microsvc::Context<()>| async move { - Err(HandlerError::Rejected("no".into())) - }, - )); + let handlers: Arc = Arc::new( + Handlers::new().on_command("work", |_: &Message| async move { + Err(TransportError::permanent("no")) + }), + ); // Default failure policy dead-letters the permanent failure; in-memory // dead_letter is a no-op nack, so the run completes cleanly. - block_on(bus.listen(service, RunOptions::idempotent())).unwrap(); + block_on(bus.listen(handlers, RunOptions::idempotent())).unwrap(); } } diff --git a/src/bus/mod.rs b/src/bus/mod.rs index dd2fe96..f42c5b6 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -85,7 +85,6 @@ mod error; mod failure_policy; mod handlers; mod in_memory_bus; -mod message; #[cfg(feature = "kafka")] mod kafka; #[cfg(feature = "kafka")] @@ -94,6 +93,7 @@ mod kafka_bus; mod knative; #[cfg(feature = "http")] mod knative_bus; +mod message; #[cfg(feature = "nats")] mod nats; #[cfg(feature = "nats")] @@ -129,7 +129,6 @@ pub use rabbit_bus::RabbitBus; pub use rabbitmq::{RabbitPublisher, RabbitReceived, RabbitSource}; pub use bus::{Bus, BusConsumer}; -pub use router::MessageRouter; pub use capabilities::{ConsumerAckKind, KnativeIntegrationKind, TransportCapabilities}; pub use error::{TransportError, TransportErrorKind}; pub use failure_policy::{FailureAction, FailurePolicy}; @@ -139,6 +138,7 @@ pub use message::{Message, MessageKind, PayloadDecodeError, SubscriptionPlan}; #[cfg(feature = "postgres")] pub use postgres_bus::{LogReceived, PostgresBus, QueueReceived}; pub use publisher::AsyncMessagePublisher; +pub use router::MessageRouter; pub use run_options::{ConsumerDeliveryMode, InboxHook, NoInbox, RunOptions}; pub use runner::run_source; pub use source::{AsyncMessageSource, ReceivedMessage}; diff --git a/src/bus/runner.rs b/src/bus/runner.rs index cd0cde5..f8e4a25 100644 --- a/src/bus/runner.rs +++ b/src/bus/runner.rs @@ -10,8 +10,8 @@ use std::sync::Arc; use super::source::{AsyncMessageSource, ReceivedMessage}; -use super::{FailureAction, MessageRouter, RunOptions, TransportError}; use super::Message; +use super::{FailureAction, MessageRouter, RunOptions, TransportError}; /// Run the receive loop for a direct transport source. /// @@ -68,7 +68,7 @@ where FailureAction::Park => received.park(&error.to_string()).await?, FailureAction::LogAndAck => { eprintln!( - "[microsvc::transport] dropping message '{}' after permanent failure: {error}", + "[bus::runner] dropping message '{}' after permanent failure: {error}", received.message().name() ); received.ack().await? @@ -99,9 +99,7 @@ async fn dispatch( #[cfg(test)] mod tests { use super::*; - use crate::bus::FailurePolicy; - use crate::microsvc::{HandlerError, MessageKind, Service}; - use serde_json::json; + use crate::bus::{FailurePolicy, Handlers, MessageKind}; use std::collections::VecDeque; use std::future::Future; use std::sync::Mutex; @@ -224,37 +222,34 @@ mod tests { message } - fn service(recorder: &Arc) -> Arc> { + fn router(recorder: &Arc) -> Arc { let ok = recorder.clone(); let retryable = recorder.clone(); let permanent = recorder.clone(); Arc::new( - Service::new(()) - .event("ok") - .handle(move |ctx: &crate::microsvc::Context<()>| { + Handlers::new() + .on_event("ok", move |msg: &Message| { let ok = ok.clone(); - let name = ctx.message().name().to_string(); + let name = msg.name().to_string(); async move { ok.push(Event::Handled(name)); - Ok(json!({})) + Ok(()) } }) - .event("retryable") - .handle(move |ctx: &crate::microsvc::Context<()>| { + .on_event("retryable", move |msg: &Message| { let retryable = retryable.clone(); - let name = ctx.message().name().to_string(); + let name = msg.name().to_string(); async move { retryable.push(Event::Handled(name)); - Err(HandlerError::Other("infra".into())) + Err(TransportError::retryable("infra")) } }) - .event("permanent") - .handle(move |ctx: &crate::microsvc::Context<()>| { + .on_event("permanent", move |msg: &Message| { let permanent = permanent.clone(); - let name = ctx.message().name().to_string(); + let name = msg.name().to_string(); async move { permanent.push(Event::Handled(name)); - Err(HandlerError::Rejected("nope".into())) + Err(TransportError::permanent("nope")) } }), ) @@ -276,7 +271,7 @@ mod tests { recv_error: bool, ) -> RunResult { let recorder = Recorder::new(); - let svc = service(&recorder); + let svc = router(&recorder); let source = FakeSource { queue: messages.into_iter().collect(), recorder: recorder.clone(), @@ -495,7 +490,7 @@ mod tests { // (no-inbox) path: the runner future must be Send. fn assert_send(_: &T) {} let recorder = Recorder::new(); - let svc = service(&recorder); + let svc = router(&recorder); let source = FakeSource { queue: VecDeque::new(), recorder, diff --git a/src/microsvc/context.rs b/src/microsvc/context.rs index df27ade..a482b1d 100644 --- a/src/microsvc/context.rs +++ b/src/microsvc/context.rs @@ -9,8 +9,8 @@ use serde_json::Value; use super::dependencies::{HasReadModelStore, HasRepo}; use super::error::HandlerError; -use super::transport::Message; use super::session::Session; +use super::transport::Message; /// The context passed to every handler. /// diff --git a/src/microsvc/mod.rs b/src/microsvc/mod.rs index 15ce293..370254a 100644 --- a/src/microsvc/mod.rs +++ b/src/microsvc/mod.rs @@ -63,6 +63,7 @@ mod session; // `microsvc::transport::…` paths working; call sites move to `crate::bus` in P4b. pub use crate::bus as transport; +pub use crate::bus::{Message, MessageKind, PayloadDecodeError, SubscriptionPlan}; pub use context::Context; pub use dependencies::{ HasReadModelStore, HasRepo, ReadModelStoreDependencies, RepoDependencies, @@ -73,7 +74,6 @@ pub use service::{ CommandRequest, CommandResponse, DeliveryKind, HandlerBuilder, HandlerNames, HandlerSpec, Service, }; -pub use crate::bus::{Message, MessageKind, PayloadDecodeError, SubscriptionPlan}; pub use session::Session; // HTTP transport (requires "http" feature) diff --git a/src/outbox_worker/outbox_dispatch.rs b/src/outbox_worker/outbox_dispatch.rs index acb0048..68694c9 100644 --- a/src/outbox_worker/outbox_dispatch.rs +++ b/src/outbox_worker/outbox_dispatch.rs @@ -12,10 +12,10 @@ use std::time::Duration; +use super::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef, OutboxPublishFailureAction}; use crate::bus::{AsyncMessagePublisher, Message, MessageKind, TransportError, TransportErrorKind}; use crate::outbox::OutboxMessage; use crate::repository::RepositoryError; -use super::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef, OutboxPublishFailureAction}; /// Repository/store failures (lock contention, storage hiccups, stale-claim /// conflicts) are retryable: usually transient, resolved by a later re-claim. This diff --git a/src/outbox_worker/outbox_source.rs b/src/outbox_worker/outbox_source.rs index e1c2d9e..fae4763 100644 --- a/src/outbox_worker/outbox_source.rs +++ b/src/outbox_worker/outbox_source.rs @@ -18,9 +18,9 @@ use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; +use super::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef}; use crate::bus::{AsyncMessageSource, Message, ReceivedMessage, TransportError}; use crate::outbox::OutboxMessage; -use super::{AsyncOutboxStore, ClaimOutboxMessages, OutboxClaimRef}; /// Default lease held on a claimed row while it is being dispatched. pub const DEFAULT_OUTBOX_SOURCE_LEASE: Duration = Duration::from_secs(30); diff --git a/tests/postgres_transport/main.rs b/tests/postgres_transport/main.rs index f389e0e..d54eca1 100644 --- a/tests/postgres_transport/main.rs +++ b/tests/postgres_transport/main.rs @@ -15,8 +15,8 @@ use serde_json::json; use sourced_rust::microsvc::transport::{ run_source, AsyncMessageSource, Bus, BusConsumer, PostgresBus, ReceivedMessage, RunOptions, }; -use sourced_rust::OutboxSource; use sourced_rust::microsvc::{Context, Message, MessageKind, Service}; +use sourced_rust::OutboxSource; use sourced_rust::{ AsyncCommitBatch, AsyncOutboxStore, AsyncTransactionalCommit, OutboxMessage, OutboxMessageStatus, PostgresOutboxStore, PostgresRepository, diff --git a/tests/rabbitmq_transport/main.rs b/tests/rabbitmq_transport/main.rs index 73eed09..c141c5e 100644 --- a/tests/rabbitmq_transport/main.rs +++ b/tests/rabbitmq_transport/main.rs @@ -223,11 +223,11 @@ async fn bus_publish_subscribe_fans_out_across_groups() { let bus_proj = RabbitBus::connect(&url, "projections", &ns).await.unwrap(); let bus_audit = RabbitBus::connect(&url, "audit", &ns).await.unwrap(); bus_proj - .ensure_subscription(&svc_proj) + .ensure_subscription(svc_proj.as_ref()) .await .expect("bind proj"); bus_audit - .ensure_subscription(&svc_audit) + .ensure_subscription(svc_audit.as_ref()) .await .expect("bind audit"); diff --git a/tests/transport_conformance/mod.rs b/tests/transport_conformance/mod.rs index a4f0513..e8817d6 100644 --- a/tests/transport_conformance/mod.rs +++ b/tests/transport_conformance/mod.rs @@ -20,8 +20,8 @@ use sourced_rust::microsvc::transport::{ run_source, AsyncMessagePublisher, AsyncMessageSource, FailurePolicy, ReceivedMessage, RunOptions, TransportError, }; -use sourced_rust::OutboxDispatcher; use sourced_rust::microsvc::{Context, HandlerError, Message, MessageKind, Service}; +use sourced_rust::OutboxDispatcher; use sourced_rust::{ AsyncCommitBatch, AsyncTransactionalCommit, HashMapOutboxStore, HashMapRepository, OutboxMessage, OutboxMessageStatus, From 55e6c52fbe7e53ecd093a6795e5386a9405ff3c7 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sun, 31 May 2026 13:05:24 -0500 Subject: [PATCH 10/14] =?UTF-8?q?refactor(bus)!:=20drop=20the=20microsvc::?= =?UTF-8?q?transport=20alias=20=E2=80=94=20clean=20break=20to=20crate::bus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the transitional pub use crate::bus as transport; alias and move every remaining reference to crate::bus / sourced_rust::bus: the message_router and context internal imports, the outbox_source test, all 11 test crates, and the README + async-transports docs. microsvc::transport no longer exists anywhere. 493 default integration tests green; cargo test --all-features --no-run clean. Implements [[tasks/bus-decomposition-phase1]] / [[specs/bus-module-decomposition]] Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 4 +-- docs/async-transports.md | 8 +++--- src/microsvc/context.rs | 2 +- src/microsvc/message_router.rs | 2 +- src/microsvc/mod.rs | 3 -- src/outbox_worker/outbox_source.rs | 2 +- tests/distributed_read_model/main.rs | 32 +++++++++++----------- tests/distributed_read_model_board/main.rs | 2 +- tests/kafka_transport/main.rs | 2 +- tests/knative_cloudevents/main.rs | 2 +- tests/microsvc/transport_listen.rs | 2 +- tests/microsvc/transport_subscribe.rs | 2 +- tests/nats_transport/main.rs | 2 +- tests/postgres_transport/main.rs | 2 +- tests/rabbitmq_transport/main.rs | 2 +- tests/sagas/microsvc_saga.rs | 2 +- tests/transport_conformance/mod.rs | 2 +- 17 files changed, 35 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index eed99ab..9af48c1 100644 --- a/README.md +++ b/README.md @@ -760,7 +760,7 @@ transports; only the constructor line changes.** ```rust use std::sync::Arc; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; // Built once — handlers are transport-agnostic. let service = Arc::new(build_service()); @@ -805,7 +805,7 @@ carries a `FailurePolicy` controlling what happens to a **permanent** handler failure — `Retry`, `DeadLetter`, `Park`, `LogAndAck`, or `Stop`: ```rust -use sourced_rust::microsvc::transport::{FailurePolicy, RunOptions}; +use sourced_rust::bus::{FailurePolicy, RunOptions}; bus.listen( service.clone(), diff --git a/docs/async-transports.md b/docs/async-transports.md index 4631f8a..a0447d5 100644 --- a/docs/async-transports.md +++ b/docs/async-transports.md @@ -2,14 +2,14 @@ Distributed (published from the `sourced_rust` crate) keeps the synchronous in-memory bus intact and adds an async transport layer under -`microsvc::transport`. The design line is: +`bus`. The design line is: - **`microsvc`** owns handler registration, guards, typed input decoding, dispatch, and handler metadata; - **transport adapters** own how messages are received, acknowledged, retried, published, and mapped to external topics/subjects/queues/routes. -The shared vocabulary lives in `microsvc::transport` and depends on no concrete +The shared vocabulary lives in `bus` and depends on no concrete broker. The same application code runs over any transport — selecting one is an adapter/wiring change, not a handler change. @@ -43,7 +43,7 @@ Direct transports implement `AsyncMessageSource` (pull a message) and `Service::dispatch_message` and settling only after the handler completes: ```rust,ignore -use sourced_rust::microsvc::transport::{run_source, RunOptions}; +use sourced_rust::bus::{run_source, RunOptions}; run_source(service, source, RunOptions::idempotent()).await?; ``` @@ -108,7 +108,7 @@ The app surface is identical across transports; only the constructor changes: ```rust use std::sync::Arc; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; // Built once — handlers are transport-agnostic. let service = Arc::new(build_service()); diff --git a/src/microsvc/context.rs b/src/microsvc/context.rs index a482b1d..6a7dbc2 100644 --- a/src/microsvc/context.rs +++ b/src/microsvc/context.rs @@ -10,7 +10,7 @@ use serde_json::Value; use super::dependencies::{HasReadModelStore, HasRepo}; use super::error::HandlerError; use super::session::Session; -use super::transport::Message; +use crate::bus::Message; /// The context passed to every handler. /// diff --git a/src/microsvc/message_router.rs b/src/microsvc/message_router.rs index 5d85acc..97378df 100644 --- a/src/microsvc/message_router.rs +++ b/src/microsvc/message_router.rs @@ -5,7 +5,7 @@ //! retryable/permanent vocabulary happens here, on the microsvc side, so the //! bus-core runner only ever sees an already-classified `TransportError`. -use crate::microsvc::transport::{MessageRouter, TransportError}; +use crate::bus::{MessageRouter, TransportError}; use crate::microsvc::{Message, MessageKind, Service, SubscriptionPlan}; impl MessageRouter for Service { diff --git a/src/microsvc/mod.rs b/src/microsvc/mod.rs index 370254a..a053c31 100644 --- a/src/microsvc/mod.rs +++ b/src/microsvc/mod.rs @@ -59,9 +59,6 @@ mod error; mod message_router; mod service; mod session; -// The bus is now the top-level `crate::bus` module. This transitional alias keeps -// `microsvc::transport::…` paths working; call sites move to `crate::bus` in P4b. -pub use crate::bus as transport; pub use crate::bus::{Message, MessageKind, PayloadDecodeError, SubscriptionPlan}; pub use context::Context; diff --git a/src/outbox_worker/outbox_source.rs b/src/outbox_worker/outbox_source.rs index fae4763..03f1d9f 100644 --- a/src/outbox_worker/outbox_source.rs +++ b/src/outbox_worker/outbox_source.rs @@ -173,7 +173,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::microsvc::transport::{run_source, RunOptions}; + use crate::bus::{run_source, RunOptions}; use crate::microsvc::Service; use crate::{ AsyncCommitBatch, AsyncTransactionalCommit, HashMapRepository, OutboxMessage, diff --git a/tests/distributed_read_model/main.rs b/tests/distributed_read_model/main.rs index e394cf1..ce13f42 100644 --- a/tests/distributed_read_model/main.rs +++ b/tests/distributed_read_model/main.rs @@ -466,7 +466,7 @@ where /// complete, so each event is forwarded exactly once. async fn publish_pending_outbox( outbox: &sourced_rust::HashMapOutboxStore, - bus: &sourced_rust::microsvc::transport::InMemoryBus, + bus: &sourced_rust::bus::InMemoryBus, ) { let claimed = outbox .claim_async(sourced_rust::ClaimOutboxMessages::new( @@ -501,7 +501,7 @@ async fn publish_pending_outbox( /// assertions — only the transport changed. #[tokio::test] async fn seat_checkout_saga_reserves_seat_and_projects_user_screen() { - use sourced_rust::microsvc::transport::InMemoryBus; + use sourced_rust::bus::InMemoryBus; let checkout_store = HashMapRepository::new(); let checkout_service = @@ -770,7 +770,7 @@ async fn checkout_commands_can_be_grpc_service() { use std::collections::HashMap as StdHashMap; use std::sync::{Arc as StdArc, Mutex as StdMutex}; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, RunOptions}; use sourced_rust::microsvc::{Message, MessageKind}; /// The four checkout events in flow (causal) order, by CloudEvent/event type. @@ -957,7 +957,7 @@ fn matrix_ids(tag: &str) -> AsyncFlowIds { /// In-memory persistence × in-memory transport — the always-on matrix cell. #[tokio::test] async fn matrix_in_memory_persistence_over_in_memory_bus() { - use sourced_rust::microsvc::transport::InMemoryBus; + use sourced_rust::bus::InMemoryBus; let (collector, collected) = build_collector(); run_checkout_over_bus( InMemoryBus::new(), @@ -1008,7 +1008,7 @@ where + 'static, { use sourced_rust::microsvc::cloud_events_router; - use sourced_rust::microsvc::transport::KnativeBus; + use sourced_rust::bus::KnativeBus; let (collector, collected) = build_collector(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0") @@ -1059,7 +1059,7 @@ where #[cfg(feature = "sqlite")] #[tokio::test] async fn matrix_sqlite_persistence_over_in_memory_bus() { - use sourced_rust::microsvc::transport::InMemoryBus; + use sourced_rust::bus::InMemoryBus; let (collector, collected) = build_collector(); run_checkout_over_bus( InMemoryBus::new(), @@ -1089,9 +1089,9 @@ fn nats_url() -> Option { } #[cfg(feature = "nats")] -async fn nats_matrix_bus(ns: &str) -> sourced_rust::microsvc::transport::NatsBus { +async fn nats_matrix_bus(ns: &str) -> sourced_rust::bus::NatsBus { let url = nats_url().expect("NATS_URL set"); - let bus = sourced_rust::microsvc::transport::NatsBus::connect(&url, "matrix", ns) + let bus = sourced_rust::bus::NatsBus::connect(&url, "matrix", ns) .await .expect("nats connect") .with_fetch_timeout(Duration::from_millis(800)); @@ -1144,9 +1144,9 @@ fn amqp_url() -> Option { async fn rabbit_matrix_bus( ns: &str, collector: &StdArc>, -) -> sourced_rust::microsvc::transport::RabbitBus { +) -> sourced_rust::bus::RabbitBus { let url = amqp_url().expect("AMQP_URL set"); - let bus = sourced_rust::microsvc::transport::RabbitBus::connect(&url, "matrix", ns) + let bus = sourced_rust::bus::RabbitBus::connect(&url, "matrix", ns) .await .expect("rabbit connect"); // Topic exchange drops events with no bound queue, so bind before publishing. @@ -1200,9 +1200,9 @@ fn kafka_brokers() -> Option { } #[cfg(feature = "kafka")] -async fn kafka_matrix_bus(ns: &str) -> sourced_rust::microsvc::transport::KafkaBus { +async fn kafka_matrix_bus(ns: &str) -> sourced_rust::bus::KafkaBus { let brokers = kafka_brokers().expect("KAFKA_BROKERS set"); - sourced_rust::microsvc::transport::KafkaBus::connect(&brokers, "matrix", ns) + sourced_rust::bus::KafkaBus::connect(&brokers, "matrix", ns) .await .expect("kafka connect") .with_fetch_timeout(Duration::from_secs(10)) @@ -1247,7 +1247,7 @@ async fn matrix_sqlite_persistence_over_kafka_bus() { #[cfg(feature = "postgres")] #[tokio::test] async fn matrix_in_memory_persistence_over_postgres_bus() { - use sourced_rust::microsvc::transport::PostgresBus; + use sourced_rust::bus::PostgresBus; let Some(schema) = postgres::PostgresTestSchema::create_from_env( "matrix_pgbus", "skipping Postgres-bus matrix cell", @@ -1273,7 +1273,7 @@ async fn matrix_in_memory_persistence_over_postgres_bus() { #[cfg(feature = "postgres")] #[tokio::test] async fn matrix_postgres_persistence_over_in_memory_bus() { - use sourced_rust::microsvc::transport::InMemoryBus; + use sourced_rust::bus::InMemoryBus; let Some(schema) = postgres::PostgresTestSchema::create_from_env( "matrix_pg", "skipping Postgres-persistence matrix cell", @@ -1319,8 +1319,8 @@ async fn postgres_matrix_repo() -> Option<( } #[cfg(feature = "postgres")] -async fn postgres_matrix_bus() -> Option { - use sourced_rust::microsvc::transport::PostgresBus; +async fn postgres_matrix_bus() -> Option { + use sourced_rust::bus::PostgresBus; let schema = postgres::PostgresTestSchema::create_from_env( "matrix_pgbus", "skipping Postgres-bus matrix cell", diff --git a/tests/distributed_read_model_board/main.rs b/tests/distributed_read_model_board/main.rs index 6f37abe..349ca66 100644 --- a/tests/distributed_read_model_board/main.rs +++ b/tests/distributed_read_model_board/main.rs @@ -21,7 +21,7 @@ use projections_service::{load_board, service as build_projection}; use query_service::BoardQueryService; use read_models::register_schemas; use serde::Serialize; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; use sourced_rust::microsvc::{Message, MessageKind, Service, Session}; use sourced_rust::{ AsyncAggregateBuilder, AsyncOutboxStore, ClaimOutboxMessages, HashMapOutboxStore, diff --git a/tests/kafka_transport/main.rs b/tests/kafka_transport/main.rs index df1304f..79f4b1f 100644 --- a/tests/kafka_transport/main.rs +++ b/tests/kafka_transport/main.rs @@ -10,7 +10,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use serde_json::json; -use sourced_rust::microsvc::transport::{ +use sourced_rust::bus::{ run_source, AsyncMessagePublisher, Bus, BusConsumer, KafkaBus, KafkaPublisher, KafkaSource, RunOptions, }; diff --git a/tests/knative_cloudevents/main.rs b/tests/knative_cloudevents/main.rs index 3b2d01a..a753ff5 100644 --- a/tests/knative_cloudevents/main.rs +++ b/tests/knative_cloudevents/main.rs @@ -9,7 +9,7 @@ use std::sync::{Arc, Mutex}; use serde_json::json; use sourced_rust::microsvc::cloud_events_router; -use sourced_rust::microsvc::transport::{Bus, KnativeBus}; +use sourced_rust::bus::{Bus, KnativeBus}; use sourced_rust::microsvc::{ Context, HandlerError, Message, MessageKind, Service, SubscriptionPlan, }; diff --git a/tests/microsvc/transport_listen.rs b/tests/microsvc/transport_listen.rs index 6227eb6..d849da3 100644 --- a/tests/microsvc/transport_listen.rs +++ b/tests/microsvc/transport_listen.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use serde_json::json; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, FailurePolicy, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, FailurePolicy, InMemoryBus, RunOptions}; use sourced_rust::microsvc::{Message, MessageKind, Service, Session}; use sourced_rust::{AsyncAggregateBuilder, HashMapRepository, Queueable}; diff --git a/tests/microsvc/transport_subscribe.rs b/tests/microsvc/transport_subscribe.rs index 12a47c6..ccad38c 100644 --- a/tests/microsvc/transport_subscribe.rs +++ b/tests/microsvc/transport_subscribe.rs @@ -5,7 +5,7 @@ use std::sync::Arc; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; use sourced_rust::microsvc::{Message, MessageKind, Service}; use sourced_rust::{AsyncAggregateBuilder, HashMapRepository, Queueable}; diff --git a/tests/nats_transport/main.rs b/tests/nats_transport/main.rs index 316ccfe..4c450d7 100644 --- a/tests/nats_transport/main.rs +++ b/tests/nats_transport/main.rs @@ -9,7 +9,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use serde_json::json; -use sourced_rust::microsvc::transport::{ +use sourced_rust::bus::{ run_source, AsyncMessagePublisher, Bus, BusConsumer, NatsBus, NatsJetStreamSource, NatsPublisher, RunOptions, }; diff --git a/tests/postgres_transport/main.rs b/tests/postgres_transport/main.rs index d54eca1..e4a9957 100644 --- a/tests/postgres_transport/main.rs +++ b/tests/postgres_transport/main.rs @@ -12,7 +12,7 @@ mod postgres; use std::sync::{Arc, Mutex}; use serde_json::json; -use sourced_rust::microsvc::transport::{ +use sourced_rust::bus::{ run_source, AsyncMessageSource, Bus, BusConsumer, PostgresBus, ReceivedMessage, RunOptions, }; use sourced_rust::microsvc::{Context, Message, MessageKind, Service}; diff --git a/tests/rabbitmq_transport/main.rs b/tests/rabbitmq_transport/main.rs index c141c5e..6a83d7c 100644 --- a/tests/rabbitmq_transport/main.rs +++ b/tests/rabbitmq_transport/main.rs @@ -8,7 +8,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use serde_json::json; -use sourced_rust::microsvc::transport::{ +use sourced_rust::bus::{ run_source, AsyncMessagePublisher, Bus, BusConsumer, RabbitBus, RabbitPublisher, RabbitSource, RunOptions, }; diff --git a/tests/sagas/microsvc_saga.rs b/tests/sagas/microsvc_saga.rs index b2f24cb..98af470 100644 --- a/tests/sagas/microsvc_saga.rs +++ b/tests/sagas/microsvc_saga.rs @@ -16,7 +16,7 @@ use std::time::Duration; use serde_json::json; -use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions}; +use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions}; use sourced_rust::microsvc::{Message, MessageKind, Service, Session}; use sourced_rust::{ AsyncAggregateBuilder, AsyncOutboxStore, ClaimOutboxMessages, HashMapOutboxStore, diff --git a/tests/transport_conformance/mod.rs b/tests/transport_conformance/mod.rs index e8817d6..28d7253 100644 --- a/tests/transport_conformance/mod.rs +++ b/tests/transport_conformance/mod.rs @@ -16,7 +16,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use serde_json::json; -use sourced_rust::microsvc::transport::{ +use sourced_rust::bus::{ run_source, AsyncMessagePublisher, AsyncMessageSource, FailurePolicy, ReceivedMessage, RunOptions, TransportError, }; From 10a75d801da445b4674a8c9cb897e5e0178a9350 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sun, 31 May 2026 13:18:48 -0500 Subject: [PATCH 11/14] docs(bus): fix intra-doc links to relocated bus types Point the bus module-doc links at the types' new homes after the extraction: OutboxDispatcher/OutboxDispatchOutcome/OutboxSource -> crate root re-exports, cloud_events_router -> crate::microsvc. Brings cargo doc back to parity with main (4 pre-existing warnings, zero new). Found by the branch-wide review. Implements [[tasks/bus-decomposition-phase1]] Co-Authored-By: Claude Opus 4.8 (1M context) --- src/bus/knative_bus.rs | 4 ++-- src/bus/mod.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/bus/knative_bus.rs b/src/bus/knative_bus.rs index a3e31e7..99ad97a 100644 --- a/src/bus/knative_bus.rs +++ b/src/bus/knative_bus.rs @@ -8,7 +8,7 @@ //! //! - [`KnativeBus::manifests`] — the role-based `Broker` + per-name `Trigger` //! YAML for a service (and a local/kubefwd variant); -//! - mounting [`cloud_events_router`](super::cloud_events_router) so the Triggers' +//! - mounting [`cloud_events_router`](crate::microsvc::cloud_events_router) so the Triggers' //! `/cloudevent/` subscriber URIs reach `Service::dispatch_message`. //! //! Producing POSTs a binary-mode CloudEvent to a broker-ingress URL: @@ -148,7 +148,7 @@ impl KnativeBus { /// subscribed event name to the producing service's events broker. Subscriber /// URIs are `/cloudevent/` (matching [`cloud_events_router`]). /// - /// [`cloud_events_router`]: super::cloud_events_router + /// [`cloud_events_router`]: crate::microsvc::cloud_events_router pub fn manifests(&self, plan: &SubscriptionPlan, subscriptions: &[(&str, &str)]) -> String { let mut out = String::new(); if !plan.commands.is_empty() { diff --git a/src/bus/mod.rs b/src/bus/mod.rs index f42c5b6..cf24cb5 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -17,12 +17,12 @@ //! //! - [`AsyncMessagePublisher`] — the single publish boundary, with each adapter's //! durable publish threshold documented; -//! - [`OutboxDispatcher`] / [`OutboxDispatchOutcome`] — map durable outbox rows to +//! - [`OutboxDispatcher`](crate::OutboxDispatcher) / [`OutboxDispatchOutcome`](crate::OutboxDispatchOutcome) — map durable outbox rows to //! `Message` and dispatch them, sharing one claim → publish → complete path //! between background polling and after-commit immediate dispatch. //! //! Concrete adapters build on these traits: the Postgres outbox-backed source -//! ([`OutboxSource`]) is always available; the NATS JetStream and RabbitMQ +//! ([`OutboxSource`](crate::OutboxSource)) is always available; the NATS JetStream and RabbitMQ //! adapters are behind the `nats` and `rabbitmq` features. The Knative/HTTP //! ingress shape and the Kafka adapter are still separate slices. Everything //! builds on the vocabulary defined here: From 19de09506b880e41f6e57ecf388dd77b5fc36778 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sun, 31 May 2026 13:46:36 -0500 Subject: [PATCH 12/14] fix(bus): dedupe Knative Trigger names to avoid manifest collisions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Distinct CloudEvent types can normalize to the same RFC-1123 label (e.g. `order.created` and `order-created`, or a command and an event of the same name), so the generated `Trigger` `metadata.name`s could collide and the second would clobber the first on apply — silently dropping a subscription. Add `unique_k8s_name`, which sanitizes and then de-duplicates against the names already emitted (numeric suffix, capped at 63 chars; the `type:` filter still carries the raw event name, so routing is unchanged). Thread one dedup set through both generators: `knative_triggers` and `KnativeBus::manifests` (commands + events share the set). Addresses CodeRabbit review on PR #51. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/bus/knative.rs | 59 +++++++++++++++++++++++++++++++++++++++++- src/bus/knative_bus.rs | 16 ++++++++---- 2 files changed, 69 insertions(+), 6 deletions(-) diff --git a/src/bus/knative.rs b/src/bus/knative.rs index 37e6355..fe929fc 100644 --- a/src/bus/knative.rs +++ b/src/bus/knative.rs @@ -7,6 +7,8 @@ //! //! Requires the `http` feature. +use std::collections::HashSet; + use super::SubscriptionPlan; /// Sanitize a string into an RFC 1123 DNS label usable as a Kubernetes resource @@ -34,13 +36,40 @@ pub(super) fn sanitize_k8s_name(name: &str) -> String { } } +/// Sanitize `raw` into a Kubernetes name that is **unique** within `used`. +/// +/// Distinct CloudEvent types can normalize to the same DNS label (e.g. +/// `order.created` and `order-created`, or a command and an event of the same +/// name) — which would otherwise emit two `Trigger`s with the same +/// `metadata.name`, the second silently clobbering the first on apply. On a +/// collision, append a numeric suffix, truncating the base so the result stays +/// within the 63-char label limit. The Trigger's `type:` filter still carries the +/// raw event name, so routing is unaffected; only the resource name is adjusted. +pub(super) fn unique_k8s_name(raw: &str, used: &mut HashSet) -> String { + let base = sanitize_k8s_name(raw); + if used.insert(base.clone()) { + return base; + } + let mut n = 2usize; + loop { + let suffix = format!("-{n}"); + let head = base.len().min(63usize.saturating_sub(suffix.len())); + let candidate = format!("{}{suffix}", base[..head].trim_end_matches('-')); + if used.insert(candidate.clone()) { + return candidate; + } + n += 1; + } +} + /// Render Knative `Trigger` YAML for each event a service subscribes to, derived /// from its [`SubscriptionPlan`]. Each Trigger filters on the CloudEvent `type` /// and routes to `subscriber_service` on `broker`. pub fn knative_triggers(plan: &SubscriptionPlan, broker: &str, subscriber_service: &str) -> String { let mut out = String::new(); + let mut used = HashSet::new(); for event in &plan.events { - let trigger_name = sanitize_k8s_name(&format!("{subscriber_service}-{event}")); + let trigger_name = unique_k8s_name(&format!("{subscriber_service}-{event}"), &mut used); out.push_str(&format!( "apiVersion: eventing.knative.dev/v1\n\ kind: Trigger\n\ @@ -108,4 +137,32 @@ mod tests { assert!(yaml.contains("type: Order.Created")); assert!(yaml.contains("name: checkout-projection-order-created")); } + + #[test] + fn trigger_names_are_deduped_when_event_types_normalize_alike() { + // `order.created` and `order-created` both sanitize to the same label; + // the second Trigger must get a distinct name so neither clobbers the + // other on apply, while both keep their raw `type:` filters. + let plan = SubscriptionPlan { + commands: vec![], + events: vec!["order.created".to_string(), "order-created".to_string()], + }; + let yaml = knative_triggers(&plan, "default", "svc"); + assert!(yaml.contains("name: svc-order-created\n")); + assert!(yaml.contains("name: svc-order-created-2\n")); + assert!(yaml.contains("type: order.created")); + assert!(yaml.contains("type: order-created")); + } + + #[test] + fn unique_k8s_name_caps_suffixed_names_at_63_chars() { + let mut used = HashSet::new(); + let long = "a".repeat(80); + let first = unique_k8s_name(&long, &mut used); + let second = unique_k8s_name(&long, &mut used); + assert_eq!(first.len(), 63); + assert_ne!(first, second); + assert!(second.len() <= 63); + assert!(second.ends_with("-2")); + } } diff --git a/src/bus/knative_bus.rs b/src/bus/knative_bus.rs index 99ad97a..5c8fa7a 100644 --- a/src/bus/knative_bus.rs +++ b/src/bus/knative_bus.rs @@ -20,7 +20,9 @@ use std::time::Duration; -use super::knative::sanitize_k8s_name; +use std::collections::HashSet; + +use super::knative::unique_k8s_name; use super::{Bus, TransportError}; use super::{Message, MessageKind, SubscriptionPlan}; @@ -151,6 +153,10 @@ impl KnativeBus { /// [`cloud_events_router`]: crate::microsvc::cloud_events_router pub fn manifests(&self, plan: &SubscriptionPlan, subscriptions: &[(&str, &str)]) -> String { let mut out = String::new(); + // One dedup set across commands and events: their Trigger names share the + // `{source}-{name}` shape, so a command and an event whose names normalize + // to the same label must not collide on `metadata.name`. + let mut used = HashSet::new(); if !plan.commands.is_empty() { // The broker a service *owns* for commands it handles is // `{source}-commands` — distinct from `commands_broker` (the @@ -158,7 +164,7 @@ impl KnativeBus { let own_commands = format!("{}-commands", self.source); out.push_str(&self.broker_yaml(&own_commands)); for command in &plan.commands { - out.push_str(&self.trigger_yaml(&own_commands, command)); + out.push_str(&self.trigger_yaml(&own_commands, command, &mut used)); } } if self.publishes_events { @@ -170,7 +176,7 @@ impl KnativeBus { .find(|(name, _)| *name == event.as_str()) .map(|(_, broker)| *broker) .unwrap_or("UNMAPPED-events"); - out.push_str(&self.trigger_yaml(broker, event)); + out.push_str(&self.trigger_yaml(broker, event, &mut used)); } out } @@ -187,8 +193,8 @@ impl KnativeBus { ) } - fn trigger_yaml(&self, broker: &str, event: &str) -> String { - let trigger_name = sanitize_k8s_name(&format!("{}-{}", self.source, event)); + fn trigger_yaml(&self, broker: &str, event: &str, used: &mut HashSet) -> String { + let trigger_name = unique_k8s_name(&format!("{}-{}", self.source, event), used); let subscriber = match &self.local { Some(addr) => format!( "\x20 subscriber:\n\ From 456b8ba3251659e0c564d1934169036764c17778 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sun, 31 May 2026 13:47:05 -0500 Subject: [PATCH 13/14] style: rustfmt import ordering in two test crates Leftover formatting from the clean-break import sweep (rustfmt sorts the bus import before the microsvc one). No behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/distributed_read_model/main.rs | 2 +- tests/knative_cloudevents/main.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/distributed_read_model/main.rs b/tests/distributed_read_model/main.rs index ce13f42..9db91ce 100644 --- a/tests/distributed_read_model/main.rs +++ b/tests/distributed_read_model/main.rs @@ -1007,8 +1007,8 @@ where + Sync + 'static, { - use sourced_rust::microsvc::cloud_events_router; use sourced_rust::bus::KnativeBus; + use sourced_rust::microsvc::cloud_events_router; let (collector, collected) = build_collector(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0") diff --git a/tests/knative_cloudevents/main.rs b/tests/knative_cloudevents/main.rs index a753ff5..3de5417 100644 --- a/tests/knative_cloudevents/main.rs +++ b/tests/knative_cloudevents/main.rs @@ -8,8 +8,8 @@ use std::sync::{Arc, Mutex}; use serde_json::json; -use sourced_rust::microsvc::cloud_events_router; use sourced_rust::bus::{Bus, KnativeBus}; +use sourced_rust::microsvc::cloud_events_router; use sourced_rust::microsvc::{ Context, HandlerError, Message, MessageKind, Service, SubscriptionPlan, }; From 31de327b12ed6aceacda6be344f703e00ecde835 Mon Sep 17 00:00:00 2001 From: Patrick Lee Scott Date: Sun, 31 May 2026 13:47:48 -0500 Subject: [PATCH 14/14] docs(bus): describe router-based dispatch in the module overview After the MessageRouter seam landed, the module header still said run_source dispatches through `Service::dispatch_message`. Update it to reference the `MessageRouter` consume seam (implemented by microsvc::Service and the dependency-free Handlers builder) and `MessageRouter::dispatch`, so rustdoc matches the new public surface. Addresses CodeRabbit review on PR #51. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/bus/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/bus/mod.rs b/src/bus/mod.rs index cf24cb5..48c66a3 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -10,8 +10,10 @@ //! //! - [`AsyncMessageSource`] / [`ReceivedMessage`] — pull a message from a direct //! transport (Postgres, RabbitMQ, Kafka, NATS, in-memory) and settle it; -//! - [`run_source`] — the runner that dispatches through -//! `Service::dispatch_message`, then acks/nacks/dead-letters per policy. +//! - [`MessageRouter`] — the consume seam a runner dispatches to, implemented by +//! `microsvc::Service` and the dependency-free [`Handlers`] builder; +//! - [`run_source`] — the runner that dispatches each message through the +//! consumer's [`MessageRouter::dispatch`], then acks/nacks/dead-letters per policy. //! //! The producer-side publish path lives here too: //!