Extract delivery hardening from #206 (without worker_events)#216
Extract delivery hardening from #206 (without worker_events)#216vsumner wants to merge 20 commits intospacedriveapp:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds deterministic worker task contracts and durable terminal delivery receipts; introduces OutboundEnvelope and DeliveryOutcome; updates messaging adapters and tools to use envelopes/outcomes; adds DB migrations, timeline projections, and per-agent worker_contract config with hot-reload wiring. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
docs/content/docs/(configuration)/config.mdx (1)
527-538:⚠️ Potential issue | 🟡 MinorDocument per-agent
[agents.worker_contract]override.The
[[agents]]section table and the Full Reference TOML example don't mention thatworker_contractcan be overridden per-agent. Since the code (AgentConfig.worker_contract) and the AI summary both confirm per-agent overrides are supported, users have no way to discover this from the docs.Consider adding a row to the
[[agents]]table and a[agents.worker_contract]example block, similar to how[agents.routing]and[agents.sandbox]are documented.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/content/docs/`(configuration)/config.mdx around lines 527 - 538, Add documentation that `worker_contract` can be overridden per-agent by updating the `[[agents]]` table to include a `worker_contract` row and adding a `[agents.worker_contract]` example block in the Full Reference TOML, mirroring the style of `[agents.routing]` and `[agents.sandbox]`; reference the code symbol AgentConfig.worker_contract to ensure the table row name and example keys match the actual config field and include a short description, type, default/inheritance behavior, and a minimal example showing per-agent override.src/tools/browser.rs (1)
956-977:⚠️ Potential issue | 🟠 MajorAbort the CDP handler task when closing the browser.
Line 973 sets
_handler_tasktoNonewithout callingabort()on the storedJoinHandle. Dropping aJoinHandledetaches it, allowing the background handler task to continue running without any control path after browser close.Suggested fix
- state._handler_task = None; + if let Some(handler_task) = state._handler_task.take() { + handler_task.abort(); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tools/browser.rs` around lines 956 - 977, In handle_close, before setting state._handler_task = None, if state._handler_task contains a JoinHandle it must be aborted to stop the background CDP handler; obtain the JoinHandle from state._handler_task (e.g., take it or as_mut), call abort() on it (and optionally await or ignore the join result), then set state._handler_task to None so the handler is not left running; update the logic in the async fn handle_close to perform this abort step when closing the browser.
🧹 Nitpick comments (4)
src/llm/model.rs (1)
1576-1580: Nit: test name doesn't reflect theNoneassertion inside it.
positive_max_tokens_omits_zeroalso asserts theNoneinput case. Consider renaming topositive_max_tokens_omits_none_and_zero(or splitting into two tests) so the name accurately describes coverage.✏️ Suggested rename
- fn positive_max_tokens_omits_zero() { + fn positive_max_tokens_omits_none_and_zero() {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/llm/model.rs` around lines 1576 - 1580, Rename or split the test to accurately reflect both assertions: either rename the test function positive_max_tokens_omits_zero to positive_max_tokens_omits_none_and_zero, or split into two tests (e.g., positive_max_tokens_returns_none_for_none and positive_max_tokens_omits_zero) so the behavior of positive_max_tokens when given None and Some(0) is clearly described; update the test function name(s) accordingly and ensure references to positive_max_tokens remain unchanged.src/api/workers.rs (1)
172-193: Consider parallelizing the two timeline DB fetches.
get_worker_task_contract_snapshotandget_worker_terminal_receipt_snapshotare independent queries; running them sequentially adds unnecessary latency to theinclude_timeline=truepath.♻️ Proposed refactor using `tokio::try_join!`
- let contract = logger - .get_worker_task_contract_snapshot(&query.worker_id) - .await - .map_err(|error| { - tracing::warn!( - %error, - worker_id = %query.worker_id, - "failed to load worker task contract snapshot" - ); - StatusCode::INTERNAL_SERVER_ERROR - })?; - let receipt = logger - .get_worker_terminal_receipt_snapshot(&query.worker_id) - .await - .map_err(|error| { - tracing::warn!( - %error, - worker_id = %query.worker_id, - "failed to load worker receipt snapshot" - ); - StatusCode::INTERNAL_SERVER_ERROR - })?; + let (contract, receipt) = tokio::try_join!( + async { + logger + .get_worker_task_contract_snapshot(&query.worker_id) + .await + .map_err(|error| { + tracing::warn!( + %error, + worker_id = %query.worker_id, + "failed to load worker task contract snapshot" + ); + StatusCode::INTERNAL_SERVER_ERROR + }) + }, + async { + logger + .get_worker_terminal_receipt_snapshot(&query.worker_id) + .await + .map_err(|error| { + tracing::warn!( + %error, + worker_id = %query.worker_id, + "failed to load worker receipt snapshot" + ); + StatusCode::INTERNAL_SERVER_ERROR + }) + }, + )?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/workers.rs` around lines 172 - 193, The two independent DB calls logger.get_worker_task_contract_snapshot(&query.worker_id) and logger.get_worker_terminal_receipt_snapshot(&query.worker_id) are executed sequentially causing extra latency; change this to run them concurrently using tokio::try_join! (or futures::try_join) to await both tasks in parallel, propagate/map errors the same way as before (preserving the tracing::warn! logs and StatusCode::INTERNAL_SERVER_ERROR mapping), and then use the returned (contract, receipt) tuple in the rest of the handler.src/messaging/slack.rs (1)
802-805: Add debug context when channel metadata extraction fails.Line 804 maps
extract_channel_iderrors directly toNotSurfaced; logging the error and message ID would make non-surfacing regressions diagnosable.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/slack.rs` around lines 802 - 805, When extract_channel_id(message) fails in the match that currently returns DeliveryOutcome::NotSurfaced, capture the Err value and log debug context before returning: include the error (from extract_channel_id), the message identifier (e.g., message.id or message.msg_id), and any relevant metadata from the message so failures are diagnosable. Update the match arm that handles Err(_) to Err(e) => { log a debug/error via the existing logger with e and message ID, then return Ok(DeliveryOutcome::NotSurfaced) } so the behavior stays the same but emits useful diagnostics.src/config.rs (1)
3160-3177: Consolidate worker-contract normalization to avoid drift.The same field-by-field
WorkerContractConfigmapping is duplicated in defaults and per-agent resolution. A small helper would reduce maintenance risk.♻️ Suggested refactor
- let resolve_nonzero_secs = |value: Option<u64>, fallback: u64| { + let resolve_nonzero_secs = |value: Option<u64>, fallback: u64| { value .and_then(|configured| (configured > 0).then_some(configured)) .unwrap_or(fallback) }; + let resolve_worker_contract = + |configured: TomlWorkerContractConfig, fallback: WorkerContractConfig| { + WorkerContractConfig { + ack_secs: resolve_nonzero_secs(configured.ack_secs, fallback.ack_secs), + progress_secs: resolve_nonzero_secs( + configured.progress_secs, + fallback.progress_secs, + ), + tick_secs: resolve_nonzero_secs(configured.tick_secs, fallback.tick_secs), + } + }; @@ - worker_contract: toml - .defaults - .worker_contract - .map(|contract| WorkerContractConfig { - ack_secs: resolve_nonzero_secs( - contract.ack_secs, - base_defaults.worker_contract.ack_secs, - ), - progress_secs: resolve_nonzero_secs( - contract.progress_secs, - base_defaults.worker_contract.progress_secs, - ), - tick_secs: resolve_nonzero_secs( - contract.tick_secs, - base_defaults.worker_contract.tick_secs, - ), - }) - .unwrap_or(base_defaults.worker_contract), + worker_contract: toml + .defaults + .worker_contract + .map(|contract| resolve_worker_contract(contract, base_defaults.worker_contract)) + .unwrap_or(base_defaults.worker_contract), @@ - worker_contract: a.worker_contract.map(|contract| WorkerContractConfig { - ack_secs: resolve_nonzero_secs( - contract.ack_secs, - defaults.worker_contract.ack_secs, - ), - progress_secs: resolve_nonzero_secs( - contract.progress_secs, - defaults.worker_contract.progress_secs, - ), - tick_secs: resolve_nonzero_secs( - contract.tick_secs, - defaults.worker_contract.tick_secs, - ), - }), + worker_contract: a + .worker_contract + .map(|contract| resolve_worker_contract(contract, defaults.worker_contract)),Also applies to: 3373-3386
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/config.rs` around lines 3160 - 3177, The worker_contract mapping is duplicated; create a small helper (e.g., fn normalize_worker_contract_config(contract: WorkerContractConfig, base: WorkerContractConfig) -> WorkerContractConfig or fn resolve_worker_contract_option(opt: Option<WorkerContractConfig>, base: &WorkerContractConfig) -> WorkerContractConfig) that calls resolve_nonzero_secs for ack_secs, progress_secs and tick_secs and returns the normalized WorkerContractConfig, then replace the inline field-by-field mapping in both the defaults and per-agent resolution sites (references: WorkerContractConfig, resolve_nonzero_secs, worker_contract, base_defaults) to call this helper to avoid drift.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/channel.rs`:
- Around line 501-521: The periodic flushes (worker_receipt_dispatch_deadline /
flush_due_worker_delivery_receipts and worker_contract_tick_deadline /
flush_due_worker_task_contract_deadlines) unconditionally spawn work each tick,
causing overlapping runs and task pile-up; change the logic to guard each flush
with a concurrency control (e.g., an AtomicBool "is_flushing_receipts" and
"is_flushing_contracts" or a try_lock on a tokio::sync::Mutex) so you only
spawn/execute a flush when not already running, set the flag before starting and
clear it on completion (including on error), and avoid spawning additional
spawned tasks if the guard indicates a flush is in progress. Ensure the guards
are referenced where you call flush_due_worker_delivery_receipts and
flush_due_worker_task_contract_deadlines so concurrent ticks skip starting
duplicate flushes.
- Around line 476-483: On handling the RecvError::Lagged in the channel event
loop (where channel_id is logged), add a reconciliation step to avoid stale
WorkerComplete drops: when a lag is detected, iterate the shared worker_handles
collection and remove any JoinHandle entries where .is_finished() is true (or
otherwise not running), and then proceed (or alternatively trigger a background
periodic reconciler that runs the same cleanup). Ensure this logic references
WorkerComplete, worker_handles, and check_worker_limit so the admission control
reflects only actively running workers; if desired, also optionally fall back to
re-querying the authoritative worker count from storage when reconciliation
finds inconsistencies.
- Around line 2984-2991: The RecvError::Lagged branch currently resets the
inactivity deadline, which treats buffer lag as worker activity; change it to
not modify deadline and instead mirror the pattern used elsewhere by simply
logging the lag (using tracing::warn with worker_id and skipped) and leaving
deadline untouched so only is_worker_progress_event() updates deadline; update
the match arm handling tokio::sync::broadcast::error::RecvError::Lagged to
remove the assignment to deadline and ensure any existing state-change logic
remains only in branches that call is_worker_progress_event().
In `@src/main.rs`:
- Around line 1486-1491: The SSE is emitted prematurely via
emit_outbound_sse_event before route_outbound_response finishes, causing clients
to see OutboundMessage/TypingState even if adapter delivery or status surfacing
fails; modify the flow so route_outbound_response is awaited and its routing
outcome (success/failure or surfaced status) is checked, and only call
emit_outbound_sse_event when route_outbound_response indicates a successful
delivery/surfaced status; apply this change for the call site using
emit_outbound_sse_event at the shown location and replicate the same gating
logic for the other emission locations referenced (the block covering 1502-1547)
so SSE is emitted only on confirmed routed outcomes.
In `@src/messaging/discord.rs`:
- Around line 112-133: The write lock on progress_messages is held across async
Discord calls (channel_id.edit_message and channel_id.send_message); drop the
write guard before any .await and only re-acquire it for the short critical
sections that mutate the map (e.g., when progress_messages.remove(&key) or
progress_messages.insert(key, msg_id)). Concretely: read/clone any needed values
from progress_messages (or check presence) while holding the guard, then drop it
before calling edit_message or send_message, and after the await re-lock
progress_messages only to remove/insert the key; keep helper symbols in mind:
progress_messages, EditMessage, CreateMessage, extract_reply_message_id,
edit_message, send_message.
In `@src/tools/skip.rs`:
- Around line 87-90: Replace the silent discard pattern using "let _ = ..." when
sending the stop-typing status on the channel: locate the send call that sends
OutboundResponse::Status(crate::StatusUpdate::StopTyping) via self.response_tx
in the skip logic and change the await result handling to call .ok() on the
awaited Result (i.e., await.ok()) so the send remains fire-and-forget but
follows the guideline for channel sends where the receiver may be dropped.
---
Outside diff comments:
In `@docs/content/docs/`(configuration)/config.mdx:
- Around line 527-538: Add documentation that `worker_contract` can be
overridden per-agent by updating the `[[agents]]` table to include a
`worker_contract` row and adding a `[agents.worker_contract]` example block in
the Full Reference TOML, mirroring the style of `[agents.routing]` and
`[agents.sandbox]`; reference the code symbol AgentConfig.worker_contract to
ensure the table row name and example keys match the actual config field and
include a short description, type, default/inheritance behavior, and a minimal
example showing per-agent override.
In `@src/tools/browser.rs`:
- Around line 956-977: In handle_close, before setting state._handler_task =
None, if state._handler_task contains a JoinHandle it must be aborted to stop
the background CDP handler; obtain the JoinHandle from state._handler_task
(e.g., take it or as_mut), call abort() on it (and optionally await or ignore
the join result), then set state._handler_task to None so the handler is not
left running; update the logic in the async fn handle_close to perform this
abort step when closing the browser.
---
Nitpick comments:
In `@src/api/workers.rs`:
- Around line 172-193: The two independent DB calls
logger.get_worker_task_contract_snapshot(&query.worker_id) and
logger.get_worker_terminal_receipt_snapshot(&query.worker_id) are executed
sequentially causing extra latency; change this to run them concurrently using
tokio::try_join! (or futures::try_join) to await both tasks in parallel,
propagate/map errors the same way as before (preserving the tracing::warn! logs
and StatusCode::INTERNAL_SERVER_ERROR mapping), and then use the returned
(contract, receipt) tuple in the rest of the handler.
In `@src/config.rs`:
- Around line 3160-3177: The worker_contract mapping is duplicated; create a
small helper (e.g., fn normalize_worker_contract_config(contract:
WorkerContractConfig, base: WorkerContractConfig) -> WorkerContractConfig or fn
resolve_worker_contract_option(opt: Option<WorkerContractConfig>, base:
&WorkerContractConfig) -> WorkerContractConfig) that calls resolve_nonzero_secs
for ack_secs, progress_secs and tick_secs and returns the normalized
WorkerContractConfig, then replace the inline field-by-field mapping in both the
defaults and per-agent resolution sites (references: WorkerContractConfig,
resolve_nonzero_secs, worker_contract, base_defaults) to call this helper to
avoid drift.
In `@src/llm/model.rs`:
- Around line 1576-1580: Rename or split the test to accurately reflect both
assertions: either rename the test function positive_max_tokens_omits_zero to
positive_max_tokens_omits_none_and_zero, or split into two tests (e.g.,
positive_max_tokens_returns_none_for_none and positive_max_tokens_omits_zero) so
the behavior of positive_max_tokens when given None and Some(0) is clearly
described; update the test function name(s) accordingly and ensure references to
positive_max_tokens remain unchanged.
In `@src/messaging/slack.rs`:
- Around line 802-805: When extract_channel_id(message) fails in the match that
currently returns DeliveryOutcome::NotSurfaced, capture the Err value and log
debug context before returning: include the error (from extract_channel_id), the
message identifier (e.g., message.id or message.msg_id), and any relevant
metadata from the message so failures are diagnosable. Update the match arm that
handles Err(_) to Err(e) => { log a debug/error via the existing logger with e
and message ID, then return Ok(DeliveryOutcome::NotSurfaced) } so the behavior
stays the same but emits useful diagnostics.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (32)
docs/content/docs/(configuration)/config.mdxdocs/content/docs/(deployment)/roadmap.mdxdocs/content/docs/(features)/workers.mdxmigrations/20260224000002_worker_delivery_receipts.sqlmigrations/20260224000003_worker_task_contracts.sqlsrc/agent/channel.rssrc/api/agents.rssrc/api/channels.rssrc/api/workers.rssrc/config.rssrc/conversation.rssrc/conversation/history.rssrc/cron/scheduler.rssrc/db.rssrc/lib.rssrc/llm/model.rssrc/main.rssrc/messaging/discord.rssrc/messaging/manager.rssrc/messaging/slack.rssrc/messaging/telegram.rssrc/messaging/traits.rssrc/messaging/webchat.rssrc/tools.rssrc/tools/browser.rssrc/tools/cancel.rssrc/tools/conclude_link.rssrc/tools/react.rssrc/tools/reply.rssrc/tools/send_file.rssrc/tools/skip.rssrc/tools/worker_inspect.rs
|
Addressed in Mapped fixes for review
Validation run:
|
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/tools/browser.rs (1)
663-669:⚠️ Potential issue | 🟠 MajorPreserve tab tracking when close fails or times out.
On Line 665, the tab is removed from
state.pagesbefore the close operation at Line 668. If close fails, the tab may still exist in Chrome but becomes untracked in-memory.Proposed fix
- let page = state - .pages - .remove(&id) - .ok_or_else(|| BrowserError::new(format!("no tab with target_id '{id}'")))?; - - Self::with_action_timeout("close tab", page.close()).await?; + let page = state + .pages + .get(&id) + .cloned() + .ok_or_else(|| BrowserError::new(format!("no tab with target_id '{id}'")))?; + + Self::with_action_timeout("close tab", page.close()).await?; + state.pages.remove(&id);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tools/browser.rs` around lines 663 - 669, The code removes the page from state.pages with state.pages.remove(&id) before attempting to close it via Self::with_action_timeout("close tab", page.close()), which causes the tab to become untracked if close fails or times out; change the logic so the page is only removed after a successful close (or if you must take it first, ensure you re-insert it on any error/timeout). Specifically, either fetch/clone/borrow the Page (instead of remove) and call Self::with_action_timeout("close tab", page.close()) and then remove it on success, or keep the removed page value but wrap the close call in a match/Result handler that on Err re-inserts the page back into state.pages (using the same id) before returning the BrowserError from BrowserError::new, so in all failure paths the in-memory tracking is preserved.
♻️ Duplicate comments (1)
src/messaging/discord.rs (1)
112-139: Lock-across-await issue from the prior review is resolved.The read lock is now dropped before
edit_messageandsend_messageawaits. Write locks for.removeand.insertare acquired only for the point mutations, with no awaits held.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/discord.rs` around lines 112 - 139, The read-lock/drop pattern around progress_messages in send/update flow is correct and no further changes are required: ensure the read lock around progress_messages (used to get existing_message_id) is dropped before awaiting edit_message or send_message, and keep the brief write locks only around the .remove(&key) in the Err branch and .insert(key, sent.id) after sending; verify functions EditMessage::new, extract_reply_message_id, CreateMessage::new, channel_id.edit_message, and channel_id.send_message retain this lock-free-across-await structure.
🧹 Nitpick comments (2)
src/messaging/discord.rs (1)
411-411:let _ =is unnecessary — drop it.After
?propagates theErrcase, the surviving value isDeliveryOutcome, which is not#[must_use].let _ =adds visual noise suggesting intentional error suppression. The idiomatic form is:♻️ Proposed fix
- let _ = self.send_status(message, status).await?; + self.send_status(message, status).await?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/discord.rs` at line 411, Remove the unnecessary "let _ =" that precedes the expression which already uses the `?` operator and returns a DeliveryOutcome; the `?` will propagate Err and the surviving Ok value (DeliveryOutcome) is not must_use, so just evaluate the expression without assigning to `_`. Locate the occurrence in the Discord messaging code (the function that produces a DeliveryOutcome — e.g., the send/deliver method where `let _ = ...?;` appears) and delete the `let _ = ` prefix so the line becomes just the expression ending with `?`.src/tools/browser.rs (1)
31-32: Consider making browser action timeout configurable per agent/environment.The fixed 45s timeout is reasonable, but this often needs tuning by workload (slow pages, CI, remote browser hosts). Exposing it via
BrowserConfigwould reduce future churn.Also applies to: 485-487
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tools/browser.rs` around lines 31 - 32, Replace the hard-coded BROWSER_ACTION_TIMEOUT_SECS constant with a configurable field on the BrowserConfig (e.g., browser_action_timeout_secs or action_timeout: Duration) and update any uses (references to BROWSER_ACTION_TIMEOUT_SECS at top-level and at lines referenced) to read from the BrowserConfig instance; add a default value of 45s in BrowserConfig::default() or builder so existing behavior is preserved, and update constructors/factories that create BrowserConfig to accept an optional override or env/config value.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/content/docs/`(configuration)/config.mdx:
- Around line 242-243: Update the hot-reload table row for contract timing to
indicate agent-level overrides also hot-reload: mention both
`[defaults.worker_contract]` and `[agents.worker_contract]` and list the fields
(`ack_secs`, `progress_secs`, `tick_secs`) so readers know that changes at
either defaults or per-agent level are applied without restart; alternatively
add a separate row for `[agents.worker_contract]` with the same "Yes" hot-reload
and description "Agent-level contract deadlines and polling update without
restart" to mirror the defaults entry.
In `@src/agent/channel.rs`:
- Around line 4174-4179: The code currently silences send Results with "let _ =
progress_tx.send(...)" (e.g., the send of ProcessEvent::WorkerStatus using
progress_tx.send) which breaks the repo error-handling guideline; replace these
silent discards with .ok() for channel send Results (e.g.,
progress_tx.send(...).ok()) and for any non-channel Results found nearby (the
other "let _ =" usages) replace with explicit handling—use match/if let to log,
propagate, or map the Err branch as appropriate rather than discarding; locate
occurrences around the progress_tx.send(ProcessEvent::WorkerStatus { ... }) call
and the other "let _ =" instances and apply .ok() for channel sends and explicit
Err handling for non-channel Results.
- Around line 3072-3077: The match arm handling
Err(tokio::sync::broadcast::error::RecvError::Closed) currently only logs and
continues, which causes the watcher loop that calls event_rx.recv() to spin and
repeatedly warn; change that arm to perform an early exit (break or return) from
the watcher loop or task instead of continuing so the loop stops polling
event_rx. Locate the loop that calls event_rx.recv() and update the
Err(RecvError::Closed) branch to stop the watcher (e.g., return from the async
function or break the loop) and optionally log once that the channel closed to
avoid repeated warn messages.
In `@src/main.rs`:
- Around line 627-785: Introduce a 3-strike circuit breaker in
spawn_worker_receipt_dispatch_loop: add a mutable counter (e.g.,
consecutive_failures) and a constant FAILURE_THRESHOLD = 3; increment the
counter whenever an unrecoverable error occurs (when
process_run_logger.claim_due_worker_terminal_receipts_any(...) returns Err, and
for each receipt when delivery_result is Err and/or when
fail_worker_delivery_receipt_attempt or ack_worker_delivery_receipt update
fails), reset the counter to 0 on successful operations (successful claim and
any successful delivery/ack path), and when consecutive_failures >=
FAILURE_THRESHOLD log a clear error including agent_id and stop the loop by
returning from the spawned task; make sure to reference these symbols in your
change: spawn_worker_receipt_dispatch_loop,
claim_due_worker_terminal_receipts_any, delivery_result,
ack_worker_delivery_receipt, fail_worker_delivery_receipt_attempt, and
next_prune_at.
In `@src/messaging/slack.rs`:
- Around line 1278-1285: The function truncate_status_text currently uses
byte-based semantics (text.len(), floor_char_boundary) but its parameter is
named max_chars; either rename the parameter to max_bytes in
truncate_status_text and update all call sites to reflect byte semantics, or
change the implementation to operate on Unicode scalar counts: compute character
count with text.chars().count(), if within limit return text.to_string(),
otherwise iterate text.char_indices() to find the byte index of the character
boundary at (max_chars.saturating_sub(3)) and slice up to that byte index before
appending "...". Update all callers accordingly and ensure the helper still uses
floor_char_boundary only on byte indices when kept.
---
Outside diff comments:
In `@src/tools/browser.rs`:
- Around line 663-669: The code removes the page from state.pages with
state.pages.remove(&id) before attempting to close it via
Self::with_action_timeout("close tab", page.close()), which causes the tab to
become untracked if close fails or times out; change the logic so the page is
only removed after a successful close (or if you must take it first, ensure you
re-insert it on any error/timeout). Specifically, either fetch/clone/borrow the
Page (instead of remove) and call Self::with_action_timeout("close tab",
page.close()) and then remove it on success, or keep the removed page value but
wrap the close call in a match/Result handler that on Err re-inserts the page
back into state.pages (using the same id) before returning the BrowserError from
BrowserError::new, so in all failure paths the in-memory tracking is preserved.
---
Duplicate comments:
In `@src/messaging/discord.rs`:
- Around line 112-139: The read-lock/drop pattern around progress_messages in
send/update flow is correct and no further changes are required: ensure the read
lock around progress_messages (used to get existing_message_id) is dropped
before awaiting edit_message or send_message, and keep the brief write locks
only around the .remove(&key) in the Err branch and .insert(key, sent.id) after
sending; verify functions EditMessage::new, extract_reply_message_id,
CreateMessage::new, channel_id.edit_message, and channel_id.send_message retain
this lock-free-across-await structure.
---
Nitpick comments:
In `@src/messaging/discord.rs`:
- Line 411: Remove the unnecessary "let _ =" that precedes the expression which
already uses the `?` operator and returns a DeliveryOutcome; the `?` will
propagate Err and the surviving Ok value (DeliveryOutcome) is not must_use, so
just evaluate the expression without assigning to `_`. Locate the occurrence in
the Discord messaging code (the function that produces a DeliveryOutcome — e.g.,
the send/deliver method where `let _ = ...?;` appears) and delete the `let _ = `
prefix so the line becomes just the expression ending with `?`.
In `@src/tools/browser.rs`:
- Around line 31-32: Replace the hard-coded BROWSER_ACTION_TIMEOUT_SECS constant
with a configurable field on the BrowserConfig (e.g.,
browser_action_timeout_secs or action_timeout: Duration) and update any uses
(references to BROWSER_ACTION_TIMEOUT_SECS at top-level and at lines referenced)
to read from the BrowserConfig instance; add a default value of 45s in
BrowserConfig::default() or builder so existing behavior is preserved, and
update constructors/factories that create BrowserConfig to accept an optional
override or env/config value.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
docs/content/docs/(configuration)/config.mdxsrc/agent/channel.rssrc/api/workers.rssrc/config.rssrc/conversation/history.rssrc/llm/model.rssrc/main.rssrc/messaging/discord.rssrc/messaging/slack.rssrc/tools/browser.rssrc/tools/skip.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- src/llm/model.rs
- src/api/workers.rs
There was a problem hiding this comment.
Actionable comments posted: 6
♻️ Duplicate comments (1)
docs/content/docs/(configuration)/config.mdx (1)
243-243: Hot-reload table row for[agents.worker_contract]— LGTM.The previously flagged omission (missing
[agents.worker_contract]from the hot-reload row) has been resolved in this commit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/content/docs/`(configuration)/config.mdx at line 243, The hot-reload table row has been updated to include `[agents.worker_contract]` alongside `[defaults.worker_contract]` for `ack_secs`, `progress_secs`, and `tick_secs`, but the PR still contains duplicate approval comments; confirm the corrected table row text (`[defaults.worker_contract]` / `[agents.worker_contract]` (`ack_secs`, `progress_secs`, `tick_secs`)) in docs/content/docs/(configuration)/config.mdx and remove the extraneous duplicate comment markers so the change is cleanly recorded.
🧹 Nitpick comments (4)
docs/content/docs/(configuration)/config.mdx (1)
546-546: Remove internal Rust type name from user-facing docs.
AgentConfig.worker_contracton Line 546 ([[agents]]table) and Line 552 (section header prose) leaks an internal struct path that means nothing to end users. The section heading and cross-link are self-describing; the parenthetical is noise.✏️ Proposed fix
-| `worker_contract` | table | inherits | Per-agent worker contract override (`AgentConfig.worker_contract`) | +| `worker_contract` | table | inherits | Per-agent worker contract timing override; unset keys inherit from `[defaults.worker_contract]` |-Per-agent worker contract override for `AgentConfig.worker_contract`. -Unset keys inherit from `[defaults.worker_contract]`. +Per-agent worker contract timing override. +Unset keys inherit from `[defaults.worker_contract]`.Also applies to: 552-552
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/content/docs/`(configuration)/config.mdx at line 546, Remove the internal Rust type path from the user-facing docs by deleting the parenthetical that exposes the struct path for AgentConfig.worker_contract: update the table row showing `worker_contract` (the `AgentConfig.worker_contract` / `[[agents]]` entry) to only show "Per-agent worker contract override" and remove the same parenthetical mention in the section header/prose where `AgentConfig.worker_contract` is referenced so the heading and cross-link remain self-describing without the internal Rust type name.src/config.rs (2)
3245-3248: Deduplicate browser timeout normalization to reduce drift risk.The same non-zero fallback logic is implemented twice. Reusing
resolve_nonzero_secshere would keep behavior centralized.♻️ Proposed refactor
- browser_action_timeout_secs: b - .browser_action_timeout_secs - .and_then(|secs| (secs > 0).then_some(secs)) - .unwrap_or(base.browser_action_timeout_secs), + browser_action_timeout_secs: resolve_nonzero_secs( + b.browser_action_timeout_secs, + base.browser_action_timeout_secs, + ),- browser_action_timeout_secs: b - .browser_action_timeout_secs - .and_then(|secs| (secs > 0).then_some(secs)) - .unwrap_or(defaults.browser.browser_action_timeout_secs), + browser_action_timeout_secs: resolve_nonzero_secs( + b.browser_action_timeout_secs, + defaults.browser.browser_action_timeout_secs, + ),Also applies to: 3436-3439
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/config.rs` around lines 3245 - 3248, Replace the duplicated non-zero fallback logic for browser_action_timeout_secs with a call to the existing helper resolve_nonzero_secs: instead of using .and_then(|secs| (secs > 0).then_some(secs)).unwrap_or(base.browser_action_timeout_secs), call resolve_nonzero_secs(b.browser_action_timeout_secs, base.browser_action_timeout_secs) (or the equivalent parameter order used by resolve_nonzero_secs) so behavior is centralized in the resolve_nonzero_secs function; make the same replacement for the other duplicated occurrence that normalizes browser_action_timeout_secs elsewhere in the file.
1862-1862: Add a regression test forbrowser_action_timeout_secs = 0fallback.You already covered this fallback pattern for worker contracts; mirroring that coverage for browser timeout would prevent future regressions.
Also applies to: 3245-3248, 3436-3439, 5254-5303
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/config.rs` at line 1862, Add a regression test that verifies the fallback behavior when browser_action_timeout_secs is set to 0: locate the Config struct and its parsing/defaulting logic that contains the browser_action_timeout_secs field, then add a unit test mirroring the worker-contracts test pattern which constructs a config/spec with browser_action_timeout_secs = 0, calls the same config-parsing or normalization function used at runtime (e.g., the Config::from_* or normalize_defaults helper around browser_action_timeout_secs) and asserts the effective timeout equals the expected fallback value (not zero); ensure the test covers both direct parsing and any code paths exercised at runtime so future changes do not regress this fallback.src/agent/channel.rs (1)
1640-1780: Consolidate fallback reply emission to one helper.This block repeats extract/normalize/filter/log/send logic across multiple branches. A shared helper would reduce drift risk and keep fallback behavior consistent.
♻️ Refactor sketch
+async fn emit_fallback_reply(&self, raw_text: &str, context: &'static str) { + let extracted = extract_reply_from_tool_syntax(raw_text.trim()); + let source = self + .conversation_id + .as_deref() + .and_then(|conversation_id| conversation_id.split(':').next()) + .unwrap_or("unknown"); + let final_text = crate::tools::reply::normalize_discord_mention_tokens( + extracted.as_deref().unwrap_or(raw_text.trim()), + source, + ); + if final_text.is_empty() || crate::tools::reply::is_low_value_waiting_update(&final_text) { + return; + } + self.state.conversation_logger.log_bot_message(&self.state.channel_id, &final_text); + if let Err(error) = self.response_tx.send(OutboundEnvelope::from(OutboundResponse::Text(final_text))).await { + tracing::error!(%error, channel_id = %self.id, context, "failed to send fallback reply"); + } +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 1640 - 1780, Multiple branches duplicate the same fallback emission logic (extract_reply_from_tool_syntax, normalize_discord_mention_tokens, is_low_value_waiting_update, logging via conversation_logger, and sending via response_tx with OutboundResponse::Text). Refactor by extracting a single helper function (e.g. emit_fallback_reply(channel: &ChannelState, raw_response: &str, source: &str) or similar) that performs: trim/extract, normalize, low-value check, warn on extracted syntax, conversation_logger.log_bot_message, and response_tx.send; replace the repeated blocks in the retrigger/skipped/else branches with calls to this helper to keep behavior consistent and avoid drift.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/content/docs/`(configuration)/config.mdx:
- Around line 555-566: Update the [agents.worker_contract] docs to include the
zero-value fallback caveat ("Setting ack_secs, progress_secs, or tick_secs to 0
is treated as unset and falls back to the resolved default") and make the three
key descriptions consistent with the [defaults.worker_contract] wording: use
"Deadline to confirm a worker start was surfaced" for ack_secs, keep
progress_secs description matching the defaults entry, and use "Poll interval
for worker contract deadline checks" for tick_secs; apply these changes to the
table and the example block so the per-agent section clearly matches the
defaults behavior and caveat.
In `@src/agent/channel.rs`:
- Around line 509-529: Add a 3‑strike circuit breaker around the recurring flush
loops: track consecutive failures for the two periodic blocks that call
flush_due_worker_delivery_receipts() and
flush_due_worker_task_contract_deadlines() (use two counters, e.g.
worker_receipt_failure_count and worker_contract_failure_count stored on the
same struct as worker_receipt_dispatch_deadline/worker_contract_tick_deadline),
increment the relevant counter when the flush call returns an error, reset the
counter to zero on success, and when a counter reaches 3 disable further
scheduling by skipping the periodic call and deadline reset, emitting a clear
log/metric (include the symbol names in the log) and keeping the disabled state
persistently until an explicit reset; provide a method (e.g.
reset_worker_receipt_circuit() / reset_worker_contract_circuit()) to clear the
counter and re-enable scheduling and ensure successful flushes auto-reset their
counter.
In `@src/main.rs`:
- Around line 423-451: The code currently records a generic reason when marking
a routed.is_status_update as unsurfaced in fail_worker_delivery_receipt_attempt;
modify the block that calls
context.outbound_process_logger.fail_worker_delivery_receipt_attempt (used when
routed.is_status_update && !routed.status_surfaced) to pass the original adapter
error string when delivery_result is Err instead of the fixed "status update not
surfaced by adapter" message: detect/unwrap the existing delivery_result error
(or convert it to a descriptive string) and supply that string as the failure
reason argument to fail_worker_delivery_receipt_attempt so the logged outcome
and rollback use the adapter's error detail (keep current tracing::warn branches
unchanged except they will now reflect the richer message).
- Around line 780-825: The circuit-breaker counter (consecutive_failures) is
being incremented for every delivery failure before attempting to persist the
failure, which allows a single bad batch to trip the breaker; change the logic
so consecutive_failures is NOT incremented at the start of the Err(error) arm
but is only incremented for infrastructure/update failures (i.e., in the
Err(update_error) branch returned by
process_run_logger.fail_worker_delivery_receipt_attempt), so move/remove the
initial consecutive_failures = consecutive_failures.saturating_add(1) and only
call saturating_add when handling update_error (and leave the Ok(outcome) path
unchanged); adjust references around FAILURE_THRESHOLD,
fail_worker_delivery_receipt_attempt, update_error, and tracing logs
accordingly.
---
Duplicate comments:
In `@docs/content/docs/`(configuration)/config.mdx:
- Line 243: The hot-reload table row has been updated to include
`[agents.worker_contract]` alongside `[defaults.worker_contract]` for
`ack_secs`, `progress_secs`, and `tick_secs`, but the PR still contains
duplicate approval comments; confirm the corrected table row text
(`[defaults.worker_contract]` / `[agents.worker_contract]` (`ack_secs`,
`progress_secs`, `tick_secs`)) in docs/content/docs/(configuration)/config.mdx
and remove the extraneous duplicate comment markers so the change is cleanly
recorded.
---
Nitpick comments:
In `@docs/content/docs/`(configuration)/config.mdx:
- Line 546: Remove the internal Rust type path from the user-facing docs by
deleting the parenthetical that exposes the struct path for
AgentConfig.worker_contract: update the table row showing `worker_contract` (the
`AgentConfig.worker_contract` / `[[agents]]` entry) to only show "Per-agent
worker contract override" and remove the same parenthetical mention in the
section header/prose where `AgentConfig.worker_contract` is referenced so the
heading and cross-link remain self-describing without the internal Rust type
name.
In `@src/agent/channel.rs`:
- Around line 1640-1780: Multiple branches duplicate the same fallback emission
logic (extract_reply_from_tool_syntax, normalize_discord_mention_tokens,
is_low_value_waiting_update, logging via conversation_logger, and sending via
response_tx with OutboundResponse::Text). Refactor by extracting a single helper
function (e.g. emit_fallback_reply(channel: &ChannelState, raw_response: &str,
source: &str) or similar) that performs: trim/extract, normalize, low-value
check, warn on extracted syntax, conversation_logger.log_bot_message, and
response_tx.send; replace the repeated blocks in the retrigger/skipped/else
branches with calls to this helper to keep behavior consistent and avoid drift.
In `@src/config.rs`:
- Around line 3245-3248: Replace the duplicated non-zero fallback logic for
browser_action_timeout_secs with a call to the existing helper
resolve_nonzero_secs: instead of using .and_then(|secs| (secs >
0).then_some(secs)).unwrap_or(base.browser_action_timeout_secs), call
resolve_nonzero_secs(b.browser_action_timeout_secs,
base.browser_action_timeout_secs) (or the equivalent parameter order used by
resolve_nonzero_secs) so behavior is centralized in the resolve_nonzero_secs
function; make the same replacement for the other duplicated occurrence that
normalizes browser_action_timeout_secs elsewhere in the file.
- Line 1862: Add a regression test that verifies the fallback behavior when
browser_action_timeout_secs is set to 0: locate the Config struct and its
parsing/defaulting logic that contains the browser_action_timeout_secs field,
then add a unit test mirroring the worker-contracts test pattern which
constructs a config/spec with browser_action_timeout_secs = 0, calls the same
config-parsing or normalization function used at runtime (e.g., the
Config::from_* or normalize_defaults helper around browser_action_timeout_secs)
and asserts the effective timeout equals the expected fallback value (not zero);
ensure the test covers both direct parsing and any code paths exercised at
runtime so future changes do not regress this fallback.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
docs/content/docs/(configuration)/config.mdxsrc/agent/channel.rssrc/config.rssrc/main.rssrc/messaging/discord.rssrc/messaging/slack.rssrc/tools/browser.rs
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
src/main.rs (1)
811-856:consecutive_failuresnot reset in theOk(outcome)arm — partial gap from the proposed fix.The delivery-failure arm no longer increments
consecutive_failureson normal delivery errors (good), but theOk(outcome)path — wherefail_worker_delivery_receipt_attemptsucceeds and the failure is durably persisted — does not reset the counter to0. The proposed fix in the previous review included that reset as an explicit "infrastructure is healthy" signal. Without it, an ack-error from an earlier receipt in a different iteration carries forward and can combine with subsequent update-errors to trip the breaker sooner than the pure 3-strike intent.🔧 Minimal addition to complete the proposed fix
Ok(outcome) => { + consecutive_failures = 0; tracing::warn!( agent_id = %agent_id,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/main.rs` around lines 811 - 856, The Ok(outcome) branch inside the Err(error) handling for fail_worker_delivery_receipt_attempt currently does not reset consecutive_failures; update the Ok(outcome) arm (the match on process_run_logger.fail_worker_delivery_receipt_attempt(&receipt.id, &error.to_string()).await) to set consecutive_failures = 0 when the update succeeds (i.e., in the Ok(outcome) block) so a successful durable persistence of the failure resets the circuit-breaker counter; keep existing logging there and only add the reset assignment.src/agent/channel.rs (2)
597-642:⚠️ Potential issue | 🟠 Major
reconcile_finished_workers_after_lagsilently drops terminal delivery side-effectsThe reconciliation removes finished handles from admission-control maps (correct for
check_worker_limit), but for workers that completed withnotify = trueduring the lag window, none of theWorkerCompletehandler's terminal side-effects are reproduced:
- No
WorkerStatusterminal event emitted- No
mark_worker_task_contract_terminal_pendingcall- No
upsert_worker_terminal_receiptcall- No receipt-dispatch deadline bump
Users will never receive delivery of those workers' results, and their DB contracts remain in a non-terminal state indefinitely.
Emit synthetic terminal events (or directly perform the DB side-effects) for finished handles after removing them, mirroring the logic in the
WorkerCompletebranch ofhandle_event.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 597 - 642, reconcile_finished_workers_after_lag currently removes finished handles from admission-control maps (worker_handles, active_workers, worker_inputs) but does not replay the terminal delivery side-effects that the WorkerComplete branch in handle_event performs; update reconcile_finished_workers_after_lag to, for each removed finished WorkerId (especially those whose Handle::is_finished() had notify == true), after removing the handle from the maps, run the same terminal-path logic as in handle_event's WorkerComplete case: emit a WorkerStatus terminal event, call mark_worker_task_contract_terminal_pending, call upsert_worker_terminal_receipt, and bump the receipt-dispatch deadline (or otherwise schedule dispatch) so DB contracts become terminal and deliveries occur; reuse or extract the WorkerComplete terminal handling code into a helper invoked both from handle_event and reconcile_finished_workers_after_lag to avoid duplication.
2307-2337:⚠️ Potential issue | 🟠 Major
worker_receipt_flush_in_progress/worker_contract_flush_in_progressstaystruepermanently on task panicThe spawned task stores
falseas the last statement, but tokio catches panics at task boundaries before reaching that store. Any panic insideflush_due_worker_delivery_receipts_task,apply_periodic_flush_circuit, or the tracing calls permanently freezes the flag, making every subsequent tick a no-op (hit the early-return guard atswap(true, AcqRel)).Wrap the flag reset in a RAII drop guard, as was originally suggested in review:
🛡️ Proposed fix: drop guard for both flush tasks
+ struct InProgressGuard(Arc<AtomicBool>); + impl Drop for InProgressGuard { + fn drop(&mut self) { + self.0.store(false, Ordering::Release); + } + } tokio::spawn(async move { + let _guard = InProgressGuard(in_progress.clone()); let task_result = Self::flush_due_worker_delivery_receipts_task( run_logger, response_tx, channel_id.clone(), ) .await; // ... circuit logic ... - in_progress.store(false, Ordering::Release); });Apply the same pattern to the
flush_due_worker_task_contract_deadlinesspawn at Line 2428.Also applies to: 2428-2457
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 2307 - 2337, The in_progress atomic flag (worker_receipt_flush_in_progress / worker_contract_flush_in_progress) can remain true if the spawned task panics because the final in_progress.store(false, ...) is never reached; create a small RAII drop guard type (e.g., ResetFlag) that owns the Arc<AtomicBool> and sets it to false in Drop, then, immediately after successfully setting the flag true (the swap(true, AcqRel) check), instantiate this guard inside the spawned task so it will always reset the flag even if flush_due_worker_delivery_receipts_task, apply_periodic_flush_circuit, or tracing panics; apply the same pattern to the other spawn for flush_due_worker_task_contract_deadlines so both tasks use the guard to reliably clear their respective in_progress flags.
🧹 Nitpick comments (1)
src/agent/channel.rs (1)
2598-2617: Prefer.any()over.find().is_some_and()in visibility helpersBoth
worker_is_user_visible_in_status_blockandworker_is_user_visiblecan be simplified:♻️ Proposed simplification
- status_block - .active_workers - .iter() - .find(|worker| worker.id == worker_id) - .is_some_and(|worker| worker.notify_on_complete) + status_block + .active_workers + .iter() + .any(|worker| worker.id == worker_id && worker.notify_on_complete)Apply the same change to
worker_is_user_visibleat Lines 2610–2617.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 2598 - 2617, Both visibility helpers use .find(...).is_some_and(...) which is more verbose; in worker_is_user_visible_in_status_block and worker_is_user_visible replace the iterator chain with .any(...) to check existence and flag in one step: call active_workers.iter().any(|worker| worker.id == worker_id && worker.notify_on_complete) (keeping the same read().await logic and types) to simplify the code and behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/channel.rs`:
- Around line 2583-2589: The loop iterating over due_terminal (results from
claim_due_worker_task_contract_terminal_deadlines) only logs warnings and drops
claimed rows; instead, for each due item (inspect channel_id and due.worker_id)
perform the same escalation used by the terminal delivery pipeline: mark the
corresponding contract/task as terminal (e.g., set state to timed_out/failed),
upsert a delivery receipt or failure record so clients see the outcome, and
reset or schedule any retries if that is the intended behavior; ensure the
handler updates DB state and emits a single definitive log/event so claimed rows
are not silently consumed or re-returned on subsequent ticks.
---
Duplicate comments:
In `@src/agent/channel.rs`:
- Around line 597-642: reconcile_finished_workers_after_lag currently removes
finished handles from admission-control maps (worker_handles, active_workers,
worker_inputs) but does not replay the terminal delivery side-effects that the
WorkerComplete branch in handle_event performs; update
reconcile_finished_workers_after_lag to, for each removed finished WorkerId
(especially those whose Handle::is_finished() had notify == true), after
removing the handle from the maps, run the same terminal-path logic as in
handle_event's WorkerComplete case: emit a WorkerStatus terminal event, call
mark_worker_task_contract_terminal_pending, call upsert_worker_terminal_receipt,
and bump the receipt-dispatch deadline (or otherwise schedule dispatch) so DB
contracts become terminal and deliveries occur; reuse or extract the
WorkerComplete terminal handling code into a helper invoked both from
handle_event and reconcile_finished_workers_after_lag to avoid duplication.
- Around line 2307-2337: The in_progress atomic flag
(worker_receipt_flush_in_progress / worker_contract_flush_in_progress) can
remain true if the spawned task panics because the final
in_progress.store(false, ...) is never reached; create a small RAII drop guard
type (e.g., ResetFlag) that owns the Arc<AtomicBool> and sets it to false in
Drop, then, immediately after successfully setting the flag true (the swap(true,
AcqRel) check), instantiate this guard inside the spawned task so it will always
reset the flag even if flush_due_worker_delivery_receipts_task,
apply_periodic_flush_circuit, or tracing panics; apply the same pattern to the
other spawn for flush_due_worker_task_contract_deadlines so both tasks use the
guard to reliably clear their respective in_progress flags.
In `@src/main.rs`:
- Around line 811-856: The Ok(outcome) branch inside the Err(error) handling for
fail_worker_delivery_receipt_attempt currently does not reset
consecutive_failures; update the Ok(outcome) arm (the match on
process_run_logger.fail_worker_delivery_receipt_attempt(&receipt.id,
&error.to_string()).await) to set consecutive_failures = 0 when the update
succeeds (i.e., in the Ok(outcome) block) so a successful durable persistence of
the failure resets the circuit-breaker counter; keep existing logging there and
only add the reset assignment.
---
Nitpick comments:
In `@src/agent/channel.rs`:
- Around line 2598-2617: Both visibility helpers use .find(...).is_some_and(...)
which is more verbose; in worker_is_user_visible_in_status_block and
worker_is_user_visible replace the iterator chain with .any(...) to check
existence and flag in one step: call active_workers.iter().any(|worker|
worker.id == worker_id && worker.notify_on_complete) (keeping the same
read().await logic and types) to simplify the code and behavior.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
docs/content/docs/(configuration)/config.mdxsrc/agent/channel.rssrc/config.rssrc/main.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- docs/content/docs/(configuration)/config.mdx
- src/config.rs
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/agent/channel.rs (1)
2665-2671:due_terminalitems are claimed but not escalated — open from previous review.
claim_due_worker_task_contract_terminal_deadlinesdestructively marks rows as claimed. The loop body only warns but takes no compensating action (no state transition, no receipt escalation), so terminal deadline violations are silently swallowed with no user-visible outcome.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 2665 - 2671, The for-loop over due_terminal (from claim_due_worker_task_contract_terminal_deadlines) only logs a warning and drops the claimed items; instead, for each due you must perform the compensating state transitions and receipt escalation: invoke the code-path that transitions the worker task contract to terminal_failed (e.g., call a helper like transition_contract_to_terminal_failed or the existing state-transition function used elsewhere) and mark its pending receipts as failed (e.g., mark_pending_receipts_failed or the receipts-escalation helper), passing the channel_id and due.worker_id / due.contract_id as needed; if such helpers don't exist, implement concise functions that atomically update the contract state and associated receipts and call them from this loop so deadline violations produce the same visible state changes as other failure cases.
🧹 Nitpick comments (2)
src/main.rs (1)
398-413:delivery_outcomeis unconditionallySurfacedeven whenrespond()fails.The status path (lines 376-396) correctly sets
DeliveryOutcome::NotSurfacedon error, but the non-status catch-all always claimsSurfaced. Currently harmless becausedelivery_resultguards all downstream behavior, but the inconsistency could mislead future code that inspectsdelivery_outcomealone.Suggested fix
response => { tracing::info!( conversation_id = %context.outbound_conversation_id, "routing outbound response to messaging adapter" ); + let delivery_result = context + .messaging_for_outbound + .respond(context.current_message, response) + .await; + let delivery_outcome = if delivery_result.is_ok() { + spacebot::messaging::traits::DeliveryOutcome::Surfaced + } else { + spacebot::messaging::traits::DeliveryOutcome::NotSurfaced + }; RoutedOutboundResponse { - delivery_result: context - .messaging_for_outbound - .respond(context.current_message, response) - .await, - delivery_outcome: spacebot::messaging::traits::DeliveryOutcome::Surfaced, - status_surfaced: true, + delivery_result, + delivery_outcome, + status_surfaced: delivery_outcome.is_surfaced(), is_status_update: false, acknowledged_worker_id, } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/main.rs` around lines 398 - 413, The code unconditionally sets RoutedOutboundResponse.delivery_outcome to DeliveryOutcome::Surfaced even when context.messaging_for_outbound.respond(...).await fails; change the construction so you first capture the await result (e.g., let delivery_result = context.messaging_for_outbound.respond(...).await) and then set delivery_outcome = match &delivery_result { Ok(_) => DeliveryOutcome::Surfaced, Err(_) => DeliveryOutcome::NotSurfaced }, preserving the existing fields (delivery_result, status_surfaced, is_status_update, acknowledged_worker_id) so RoutedOutboundResponse reflects the real outcome.src/agent/channel.rs (1)
4392-4463: Test relies on real-time scheduling; consider usingstart_paused = true.The test verifies that a progress heartbeat at 700 ms resets a 1 s timeout. Because
#[tokio::test]uses real wall-clock time, a slow CI executor could let the 1 s deadline fire before the 700 ms heartbeat task is scheduled, making the assertion flaky.💡 Suggested fix
- #[tokio::test] + #[tokio::test(start_paused = true)] async fn worker_timeout_resets_on_progress_events() {With time paused,
tokio::time::sleepadvances only when explicitly yielded to, keeping the relative ordering deterministic.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/channel.rs` around lines 4392 - 4463, Change the test to use paused time and explicitly advance it so scheduling is deterministic: update the test attribute to #[tokio::test(start_paused = true)], keep the spawned tasks (progress_task and the spawn_worker_task call) but replace reliance on real sleep by calling tokio::time::advance(Duration::from_millis(700)).await to trigger the progress heartbeat and then tokio::time::advance(Duration::from_millis(1000)).await (or enough to finish) before awaiting progress_task and handle; reference the test function worker_timeout_resets_on_progress_events, the spawned progress_task, and spawn_worker_task which currently use tokio::time::sleep so the explicit tokio::time::advance calls drive those sleeps deterministically.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/agent/channel.rs`:
- Around line 2665-2671: The for-loop over due_terminal (from
claim_due_worker_task_contract_terminal_deadlines) only logs a warning and drops
the claimed items; instead, for each due you must perform the compensating state
transitions and receipt escalation: invoke the code-path that transitions the
worker task contract to terminal_failed (e.g., call a helper like
transition_contract_to_terminal_failed or the existing state-transition function
used elsewhere) and mark its pending receipts as failed (e.g.,
mark_pending_receipts_failed or the receipts-escalation helper), passing the
channel_id and due.worker_id / due.contract_id as needed; if such helpers don't
exist, implement concise functions that atomically update the contract state and
associated receipts and call them from this loop so deadline violations produce
the same visible state changes as other failure cases.
---
Nitpick comments:
In `@src/agent/channel.rs`:
- Around line 4392-4463: Change the test to use paused time and explicitly
advance it so scheduling is deterministic: update the test attribute to
#[tokio::test(start_paused = true)], keep the spawned tasks (progress_task and
the spawn_worker_task call) but replace reliance on real sleep by calling
tokio::time::advance(Duration::from_millis(700)).await to trigger the progress
heartbeat and then tokio::time::advance(Duration::from_millis(1000)).await (or
enough to finish) before awaiting progress_task and handle; reference the test
function worker_timeout_resets_on_progress_events, the spawned progress_task,
and spawn_worker_task which currently use tokio::time::sleep so the explicit
tokio::time::advance calls drive those sleeps deterministically.
In `@src/main.rs`:
- Around line 398-413: The code unconditionally sets
RoutedOutboundResponse.delivery_outcome to DeliveryOutcome::Surfaced even when
context.messaging_for_outbound.respond(...).await fails; change the construction
so you first capture the await result (e.g., let delivery_result =
context.messaging_for_outbound.respond(...).await) and then set delivery_outcome
= match &delivery_result { Ok(_) => DeliveryOutcome::Surfaced, Err(_) =>
DeliveryOutcome::NotSurfaced }, preserving the existing fields (delivery_result,
status_surfaced, is_status_update, acknowledged_worker_id) so
RoutedOutboundResponse reflects the real outcome.
Summary
worker_eventsas a second durable execution-history store (aligned with Workers tab: full transcript viewer, live SSE streaming, introspection tool #192).Context
This PR is the follow-up extraction requested in the review thread for #206:
Included
worker_task_contracts) with ack/progress/terminal deadlines.worker_delivery_receipts) with retry/backoff, ack/fail transitions, and pruning.DeliveryOutcome+OutboundEnveloperouting so status surfacing affects receipt handling.workers/detail?include_timeline=trueworker_inspectprojected timeline section.Explicitly Excluded
worker_eventsjournal and related API/tool timeline surfaces.Test Plan
cargo fmt && cargo checkcargo test worker_timeline_projection_orders_transcript_then_deliverycargo test worker_terminal_convergence_requires_terminal_contract_statecargo test worker_task_timeout_emits_terminal_eventsNote
Automated Summary
This PR implements worker delivery hardening by introducing two new database tables for tracking worker task contracts and delivery receipts. Key changes include migration files for both tables, configuration additions for worker contract timing (ack/progress/tick intervals), and updates across the agent, API, messaging, and tool modules. The worker timeline projection is now exposed via the API and worker_inspect tool. Cancellation behavior and SSE event emission have been improved for terminal receipt delivery. Documentation has been updated to reflect the new worker contract configuration options and timeline visibility. The PR excludes the append-only worker_events journal to avoid creating a second durable execution history store.
Written by Tembo for commit ffd016ddc.