Skip to content

feat: durable enqueue outbox dispatch — publish-in-commit via attached bus#54

Open
patrickleet wants to merge 9 commits into
codex/hops-service-create-microsvc-scaffoldfrom
feat/durable-enqueue-outbox-dispatch
Open

feat: durable enqueue outbox dispatch — publish-in-commit via attached bus#54
patrickleet wants to merge 9 commits into
codex/hops-service-create-microsvc-scaffoldfrom
feat/durable-enqueue-outbox-dispatch

Conversation

@patrickleet
Copy link
Copy Markdown
Collaborator

Stacked on #53 (base = codex/hops-service-create-microsvc-scaffold); rebases onto main once #53 merges. Implements [[specs/durable-enqueue-outbox-dispatch]].

What this does

Connects the producing side of the bus to a service so one bus config drives both consume and produce, with publish-in-commit as the primary path:

let service = distributed::register_handlers!(
    Service::with_repo(repo.queued().aggregate::<Todo>()),
    command handlers::todo_create,
    event   handlers::todo_projection,
);

service
    .with_bus(InMemoryBus::new())   // transport config on the service
    .run(RunOptions::idempotent())  // auto listen (commands) + subscribe (events)
    .await?;
// In a handler:
ctx.commit_outbox(&mut todo, message).await?;
//  bus attached → claim the outbox row in the commit transaction (InFlight,
//                 short lease, attempts=1) → publish immediately via the bus →
//                 complete; publish failure releases for the poller and never
//                 rolls back the committed aggregate.
//  no bus       → commit pending; the polling worker publishes.

The claim-in-transaction model means the after-commit publish needs no separate claim and cannot race the poller; the lease is the crash-handoff to the worker.

Commits

  1. CommitReceipt from OutboxCommit::commit — the inserted-ids seam (source-compatible across all 52 callers).
  2. BusPublisherBus → AsyncMessagePublisher adapter routing Command/Event to send_message/publish_message (the missing piece that lets the dispatcher publish through any *Bus).
  3. HasOutboxStore — capability resolving the outbox store through the AggregateRepository → QueuedRepository → leaf wrapper chain.
  4. Service::with_bus + Microservice + dispatcher() — transport config on the service (produce side).
  5. OutboxCommit::commit_claimed — claim-in-transaction (row inserts InFlight/leased, not poller-claimable).
  6. Context::commit_outbox + DynPublisher — publish-in-commit end to end. DynPublisher is an object-safe (boxed-future) shim around AsyncMessagePublisher so Service<D> keeps its type rather than becoming Service<D, P>.
  7. Microservice::run — derives listen/subscribe from the registered handlers, executor-agnostic poll-join (no tokio pulled into core). Clone derived for RunOptions.

Tests

cargo test --lib236 passed, 0 failed. All integration test targets compile. New coverage: commit_receipt, bus_publisher routing, has_outbox_store wrapper resolution, commit_claimed (in-flight/leased/not-poller-claimable), commit_outbox immediate publish e2e, run() consuming a queued command.

Not in this PR (follow-ups, tracked in tasks/durable-enqueue-outbox-dispatch-impl)

  • Docs: README "durable enqueue" framing + closing the Quick Start produce loop.
  • Continuous backstop poll loop inside run(): needs an async timer (tokio), which isn't in core. dispatcher() is exposed to drive it; the immediate path covers the happy case, so the loop is the crash backstop only. Open: gate behind a runtime feature vs. a separate worker entrypoint.
  • commit_outbox covers AggregateRepository, not yet SnapshotAggregateRepository.
  • SQL-backed (postgres/sqlite) end-to-end test of commit_outbox (trait impls compile under those features; only HashMap exercised so far).
  • Immediate-publish lease customization (currently defaults; the Microservice setters configure the poller).

🤖 Generated with Claude Code

patrickleet and others added 7 commits June 3, 2026 14:43
OutboxCommit::commit now returns a CommitReceipt carrying the inserted
outbox message id(s) instead of (), so an after-commit dispatcher can
publish exactly the rows the transaction wrote. Source-compatible:
?-statement callers discard the receipt.

Step 1 of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Routes outbox-derived messages by MessageKind: commands to send_message
(point-to-point), events to publish_message (fan-out). This is the missing
adapter that lets the outbox dispatcher publish through any *Bus uniformly.

Step 2 of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
New trait abstracting 'produce a durable outbox store', resolving through the
AggregateRepository -> QueuedRepository -> leaf repo wrapper chain. Lets the
runtime build an OutboxDispatcher without naming the concrete repository type.
Impls for HashMap (and feature-gated Sqlite/Postgres) leaves + the wrappers.

Step 3 (store access) of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Service::with_bus(bus) wraps the consumer Service into a Microservice carrying
the transport config. Microservice::dispatcher() assembles an OutboxDispatcher
over the service's own outbox store + a BusPublisher, so committed outbox rows
drain to the bus routed by kind. Test proves commit -> dispatch -> published
end to end over InMemoryBus.

Consume side (run() auto listen/subscribe) and the in-transaction commit_outbox
land next.

Step 6 (runtime, produce side) of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claims the outbox row for publication in the same transaction that commits the
aggregate: the row inserts already InFlight under the worker's lease
(attempts = 1), so the after-commit publish needs no separate claim and cannot
race the poller. Returns the claimed message clone so the caller can build the
transport message and settle the claim. Test proves the row is in-flight,
leased, and not poller-claimable.

Step 4 (claim-in-transaction) of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Wires the durable-enqueue command path end to end:
- DynPublisher: object-safe (boxed-future) form of AsyncMessagePublisher, so a
  publisher can sit behind Arc<dyn> without making Service generic over it.
- Service carries an optional ImmediatePublish (publisher + worker id + lease +
  attempts), set by with_bus; Context receives it.
- Context::commit_outbox: with a bus attached, claims the outbox row in the
  commit transaction then publishes immediately through the bus, completing or
  releasing the claim; with no bus, commits pending for the poller. Best-effort
  publish never rolls back the committed aggregate.

Test: dispatch -> commit_outbox -> row published immediately, none left pending.

Steps 3+5 (DynPublisher + commit_outbox) of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
run() reads the service's subscription_plan and drives the consumers
concurrently on the caller's runtime: command handlers via competing listen,
event handlers via fan-out subscribe. Uses an executor-agnostic poll-join (no
spawn, no timer) so it works in core without pulling tokio. Returns on first
error or when the consumers stop. Derive Clone for RunOptions/ConsumerDeliveryMode
so one options value drives both consumers. Test: run() consumes a queued
command and the handler's commit_outbox publishes immediately.

Producing happy-path is commit_outbox (immediate); the backstop poll loop (needs
a timer) is driven from dispatcher() by a runtime that provides one.

Step 6 (runtime, consume side) of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jun 3, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d4e4ce75-afa2-4ae6-8e13-161da2cb71c1

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/durable-enqueue-outbox-dispatch

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

patrickleet and others added 2 commits June 3, 2026 15:54
Exercises commit_outbox (claim-in-transaction + immediate publish) and run()
against a real SQL backend (in-memory SQLite), not just HashMapRepository.
Proves the HasOutboxStore impls and the SQL commit path persist the in-flight
claim and complete it. Also fixes a must_use warning on the finished-consumer
future in run().

[[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Generalize the durable-enqueue command path with an OutboxCommitting<A> trait
that commits an aggregate + outbox row in one transaction, staging whatever the
repo needs. Implemented for AggregateRepository (delegates to the existing
OutboxCommit) and SnapshotAggregateRepository (stages the snapshot + outbox row
together via CommitBatch — previously these could not compose). Context::commit_outbox
now binds D::Repo: OutboxCommitting<A> + HasOutboxStore instead of the concrete
AggregateRepository, so snapshot-backed services get claim-in-transaction +
immediate publish too. Test: snapshot-backed commit_outbox publishes immediately.

[[tasks/durable-enqueue-outbox-dispatch-impl]]
Builds on [[specs/transactional-commit-boundary]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant