Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ transports; only the constructor line changes.**

```rust
use std::sync::Arc;
use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions};
use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions};

// Built once — handlers are transport-agnostic.
let service = Arc::new(build_service());
Expand Down Expand Up @@ -805,7 +805,7 @@ carries a `FailurePolicy` controlling what happens to a **permanent** handler
failure — `Retry`, `DeadLetter`, `Park`, `LogAndAck`, or `Stop`:

```rust
use sourced_rust::microsvc::transport::{FailurePolicy, RunOptions};
use sourced_rust::bus::{FailurePolicy, RunOptions};

bus.listen(
service.clone(),
Expand Down
8 changes: 4 additions & 4 deletions docs/async-transports.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

Distributed (published from the `sourced_rust` crate) keeps the synchronous
in-memory bus intact and adds an async transport layer under
`microsvc::transport`. The design line is:
`bus`. The design line is:

- **`microsvc`** owns handler registration, guards, typed input decoding,
dispatch, and handler metadata;
- **transport adapters** own how messages are received, acknowledged, retried,
published, and mapped to external topics/subjects/queues/routes.

The shared vocabulary lives in `microsvc::transport` and depends on no concrete
The shared vocabulary lives in `bus` and depends on no concrete
broker. The same application code runs over any transport — selecting one is an
adapter/wiring change, not a handler change.

Expand Down Expand Up @@ -43,7 +43,7 @@ Direct transports implement `AsyncMessageSource` (pull a message) and
`Service::dispatch_message` and settling only after the handler completes:

```rust,ignore
use sourced_rust::microsvc::transport::{run_source, RunOptions};
use sourced_rust::bus::{run_source, RunOptions};

run_source(service, source, RunOptions::idempotent()).await?;
```
Expand Down Expand Up @@ -108,7 +108,7 @@ The app surface is identical across transports; only the constructor changes:

```rust
use std::sync::Arc;
use sourced_rust::microsvc::transport::{Bus, BusConsumer, InMemoryBus, RunOptions};
use sourced_rust::bus::{Bus, BusConsumer, InMemoryBus, RunOptions};

// Built once — handlers are transport-agnostic.
let service = Arc::new(build_service());
Expand Down
22 changes: 10 additions & 12 deletions src/microsvc/transport/bus.rs → src/bus/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
use std::future::Future;
use std::sync::Arc;

use super::{Message, RunOptions, TransportError};
use crate::microsvc::Service;
use super::{Message, MessageRouter, RunOptions, TransportError};

/// Produce side of the bus — uniform across every transport.
pub trait Bus: Send + Sync {
Expand Down Expand Up @@ -54,26 +53,25 @@ pub trait Bus: Send + Sync {

/// Consume side of the bus — pull transports that run a [`run_source`] loop.
///
/// `listen`/`subscribe` derive the message names from the service's registered
/// handlers ([`Service::command_names`]/[`Service::event_names`]), build the
/// transport's source with the matching topology, and run it. Both run until the
/// source drains/stops.
/// `listen`/`subscribe` derive the message names from the router's registered
/// handlers ([`MessageRouter::subscription_plan`]), build the transport's source
/// with the matching topology, and run it. Both run until the source drains/stops.
///
/// [`run_source`]: super::run_source
pub trait BusConsumer: Send + Sync {
/// Run `service` as a command listener: consume its command names with
/// Run `router` as a command listener: consume its command names with
/// competing-consumer (point-to-point) semantics.
fn listen<D: Send + Sync + 'static>(
fn listen<R: MessageRouter>(
&self,
service: Arc<Service<D>>,
router: Arc<R>,
options: RunOptions,
) -> impl Future<Output = Result<(), TransportError>> + Send;

/// Run `service` as an event subscriber: consume its event names with
/// Run `router` as an event subscriber: consume its event names with
/// fan-out semantics.
fn subscribe<D: Send + Sync + 'static>(
fn subscribe<R: MessageRouter>(
&self,
service: Arc<Service<D>>,
router: Arc<R>,
options: RunOptions,
) -> impl Future<Output = Result<(), TransportError>> + Send;
}
File renamed without changes.
108 changes: 0 additions & 108 deletions src/microsvc/transport/error.rs → src/bus/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
use std::error::Error;
use std::fmt;

use crate::microsvc::HandlerError;

/// Whether a [`TransportError`] should be retried.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum TransportErrorKind {
Expand Down Expand Up @@ -103,40 +101,6 @@ impl TransportError {
pub fn message(&self) -> &str {
&self.message
}

/// Classify a [`HandlerError`] for transport retry purposes.
///
/// Transient failures are retryable: repository errors, not-found results,
/// and otherwise-unclassified errors. Not-found is transient in an
/// at-least-once event-driven system because an out-of-order delivery can
/// reference an aggregate or projection that has not been created yet, and a
/// later redelivery resolves the race. (This also keeps a handler-raised
/// [`HandlerError::NotFound`] consistent with a repository-raised
/// [`RepositoryError::NotFound`](crate::RepositoryError), which arrives
/// wrapped in `Repository(_)`.)
///
/// Deterministic failures — unknown routing, payload decode failures,
/// business rejections, authorization, and guard rejections — are permanent
/// because redelivering the identical message cannot change the outcome. An
/// entity that is genuinely missing forever still drains to the failure
/// policy once the transport's retry ceiling is reached, so defaulting
/// not-found to retryable does not risk an unbounded loop.
///
/// This is a default. Handlers that know better can return a classification
/// directly, and the runner's [`FailurePolicy`](super::FailurePolicy) can
/// override what happens to a permanent error.
pub fn classify_handler_error(error: &HandlerError) -> TransportErrorKind {
match error {
HandlerError::Repository(_) | HandlerError::NotFound(_) | HandlerError::Other(_) => {
TransportErrorKind::Retryable
}
HandlerError::UnknownCommand(_)
| HandlerError::DecodeFailed(_)
| HandlerError::Rejected(_)
| HandlerError::Unauthorized(_)
| HandlerError::GuardRejected(_) => TransportErrorKind::Permanent,
}
}
}

impl fmt::Display for TransportError {
Expand All @@ -157,30 +121,6 @@ impl Error for TransportError {
}
}

impl From<HandlerError> for TransportError {
fn from(error: HandlerError) -> Self {
let kind = Self::classify_handler_error(&error);
Self {
kind,
message: error.to_string(),
source: Some(Box::new(error)),
}
}
}

impl From<crate::RepositoryError> for TransportError {
/// Repository/store failures (lock contention, storage hiccups, stale-claim
/// conflicts) are treated as retryable: they are usually transient, and a
/// later redelivery or re-claim resolves them.
fn from(error: crate::RepositoryError) -> Self {
Self {
kind: TransportErrorKind::Retryable,
message: error.to_string(),
source: Some(Box::new(error)),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -217,52 +157,4 @@ mod tests {
assert!(err.source().is_some());
assert_eq!(err.message(), "publish timed out");
}

#[test]
fn transient_handler_errors_are_retryable() {
use crate::repository::RepositoryError;

for error in [
HandlerError::Repository(RepositoryError::NotFound { id: "agg-1".into() }),
// A handler-raised not-found is transient too: it is usually an
// out-of-order delivery race, and stays consistent with the
// repository-raised not-found above.
HandlerError::NotFound("agg-1".into()),
HandlerError::Other("boom".into()),
] {
assert_eq!(
TransportError::classify_handler_error(&error),
TransportErrorKind::Retryable,
"{error} should be retryable"
);
}
}

#[test]
fn deterministic_handler_errors_are_permanent() {
for error in [
HandlerError::UnknownCommand("x".into()),
HandlerError::DecodeFailed("x".into()),
HandlerError::Rejected("x".into()),
HandlerError::Unauthorized("x".into()),
HandlerError::GuardRejected("x".into()),
] {
assert_eq!(
TransportError::classify_handler_error(&error),
TransportErrorKind::Permanent,
"{error} should be permanent"
);
}
}

#[test]
fn from_handler_error_preserves_classification_and_source() {
let err: TransportError = HandlerError::Rejected("invalid".into()).into();
assert!(err.is_permanent());
assert!(err.source().is_some());

let err: TransportError =
HandlerError::Other(Box::<dyn Error + Send + Sync>::from("infra")).into();
assert!(err.is_retryable());
}
}
File renamed without changes.
Loading
Loading