feat: durable enqueue outbox dispatch — publish-in-commit via attached bus#54
Open
patrickleet wants to merge 9 commits into
Open
Conversation
OutboxCommit::commit now returns a CommitReceipt carrying the inserted outbox message id(s) instead of (), so an after-commit dispatcher can publish exactly the rows the transaction wrote. Source-compatible: ?-statement callers discard the receipt. Step 1 of [[tasks/durable-enqueue-outbox-dispatch-impl]] Implements [[specs/durable-enqueue-outbox-dispatch]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Routes outbox-derived messages by MessageKind: commands to send_message (point-to-point), events to publish_message (fan-out). This is the missing adapter that lets the outbox dispatcher publish through any *Bus uniformly. Step 2 of [[tasks/durable-enqueue-outbox-dispatch-impl]] Implements [[specs/durable-enqueue-outbox-dispatch]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
New trait abstracting 'produce a durable outbox store', resolving through the AggregateRepository -> QueuedRepository -> leaf repo wrapper chain. Lets the runtime build an OutboxDispatcher without naming the concrete repository type. Impls for HashMap (and feature-gated Sqlite/Postgres) leaves + the wrappers. Step 3 (store access) of [[tasks/durable-enqueue-outbox-dispatch-impl]] Implements [[specs/durable-enqueue-outbox-dispatch]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Service::with_bus(bus) wraps the consumer Service into a Microservice carrying the transport config. Microservice::dispatcher() assembles an OutboxDispatcher over the service's own outbox store + a BusPublisher, so committed outbox rows drain to the bus routed by kind. Test proves commit -> dispatch -> published end to end over InMemoryBus. Consume side (run() auto listen/subscribe) and the in-transaction commit_outbox land next. Step 6 (runtime, produce side) of [[tasks/durable-enqueue-outbox-dispatch-impl]] Implements [[specs/durable-enqueue-outbox-dispatch]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claims the outbox row for publication in the same transaction that commits the aggregate: the row inserts already InFlight under the worker's lease (attempts = 1), so the after-commit publish needs no separate claim and cannot race the poller. Returns the claimed message clone so the caller can build the transport message and settle the claim. Test proves the row is in-flight, leased, and not poller-claimable. Step 4 (claim-in-transaction) of [[tasks/durable-enqueue-outbox-dispatch-impl]] Implements [[specs/durable-enqueue-outbox-dispatch]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Wires the durable-enqueue command path end to end: - DynPublisher: object-safe (boxed-future) form of AsyncMessagePublisher, so a publisher can sit behind Arc<dyn> without making Service generic over it. - Service carries an optional ImmediatePublish (publisher + worker id + lease + attempts), set by with_bus; Context receives it. - Context::commit_outbox: with a bus attached, claims the outbox row in the commit transaction then publishes immediately through the bus, completing or releasing the claim; with no bus, commits pending for the poller. Best-effort publish never rolls back the committed aggregate. Test: dispatch -> commit_outbox -> row published immediately, none left pending. Steps 3+5 (DynPublisher + commit_outbox) of [[tasks/durable-enqueue-outbox-dispatch-impl]] Implements [[specs/durable-enqueue-outbox-dispatch]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
run() reads the service's subscription_plan and drives the consumers concurrently on the caller's runtime: command handlers via competing listen, event handlers via fan-out subscribe. Uses an executor-agnostic poll-join (no spawn, no timer) so it works in core without pulling tokio. Returns on first error or when the consumers stop. Derive Clone for RunOptions/ConsumerDeliveryMode so one options value drives both consumers. Test: run() consumes a queued command and the handler's commit_outbox publishes immediately. Producing happy-path is commit_outbox (immediate); the backstop poll loop (needs a timer) is driven from dispatcher() by a runtime that provides one. Step 6 (runtime, consume side) of [[tasks/durable-enqueue-outbox-dispatch-impl]] Implements [[specs/durable-enqueue-outbox-dispatch]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Exercises commit_outbox (claim-in-transaction + immediate publish) and run() against a real SQL backend (in-memory SQLite), not just HashMapRepository. Proves the HasOutboxStore impls and the SQL commit path persist the in-flight claim and complete it. Also fixes a must_use warning on the finished-consumer future in run(). [[tasks/durable-enqueue-outbox-dispatch-impl]] Implements [[specs/durable-enqueue-outbox-dispatch]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Generalize the durable-enqueue command path with an OutboxCommitting<A> trait that commits an aggregate + outbox row in one transaction, staging whatever the repo needs. Implemented for AggregateRepository (delegates to the existing OutboxCommit) and SnapshotAggregateRepository (stages the snapshot + outbox row together via CommitBatch — previously these could not compose). Context::commit_outbox now binds D::Repo: OutboxCommitting<A> + HasOutboxStore instead of the concrete AggregateRepository, so snapshot-backed services get claim-in-transaction + immediate publish too. Test: snapshot-backed commit_outbox publishes immediately. [[tasks/durable-enqueue-outbox-dispatch-impl]] Builds on [[specs/transactional-commit-boundary]] Implements [[specs/durable-enqueue-outbox-dispatch]] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked on #53 (base =
codex/hops-service-create-microsvc-scaffold); rebases ontomainonce #53 merges. Implements[[specs/durable-enqueue-outbox-dispatch]].What this does
Connects the producing side of the bus to a service so one bus config drives both consume and produce, with publish-in-commit as the primary path:
The claim-in-transaction model means the after-commit publish needs no separate claim and cannot race the poller; the lease is the crash-handoff to the worker.
Commits
CommitReceiptfromOutboxCommit::commit— the inserted-ids seam (source-compatible across all 52 callers).BusPublisher—Bus → AsyncMessagePublisheradapter routingCommand/Eventtosend_message/publish_message(the missing piece that lets the dispatcher publish through any*Bus).HasOutboxStore— capability resolving the outbox store through theAggregateRepository → QueuedRepository → leafwrapper chain.Service::with_bus+Microservice+dispatcher()— transport config on the service (produce side).OutboxCommit::commit_claimed— claim-in-transaction (row inserts InFlight/leased, not poller-claimable).Context::commit_outbox+DynPublisher— publish-in-commit end to end.DynPublisheris an object-safe (boxed-future) shim aroundAsyncMessagePublishersoService<D>keeps its type rather than becomingService<D, P>.Microservice::run— derives listen/subscribe from the registered handlers, executor-agnostic poll-join (no tokio pulled into core).Clonederived forRunOptions.Tests
cargo test --lib→ 236 passed, 0 failed. All integration test targets compile. New coverage:commit_receipt,bus_publisherrouting,has_outbox_storewrapper resolution,commit_claimed(in-flight/leased/not-poller-claimable),commit_outboximmediate publish e2e,run()consuming a queued command.Not in this PR (follow-ups, tracked in
tasks/durable-enqueue-outbox-dispatch-impl)run(): needs an async timer (tokio), which isn't in core.dispatcher()is exposed to drive it; the immediate path covers the happy case, so the loop is the crash backstop only. Open: gate behind aruntimefeature vs. a separate worker entrypoint.commit_outboxcoversAggregateRepository, not yetSnapshotAggregateRepository.commit_outbox(trait impls compile under those features; only HashMap exercised so far).Microservicesetters configure the poller).🤖 Generated with Claude Code