feat: cut per-turn LLM token cost via caching, truncation, and compaction#140
feat: cut per-turn LLM token cost via caching, truncation, and compaction#140andersonleal wants to merge 2 commits into
Conversation
…tion
Three independent token-reduction interventions on the harness chat
agent. Every long-running conversation currently re-ships the full
transcript uncached, untruncated, and unsummarised on every LLM call;
each piece below attacks a different cost driver. All three ship behind
their own env-var flag so any one can be disabled without a rebuild.
## 1. Anthropic prompt caching + OpenAI cache surfacing
- provider-anthropic/src/lib.rs: emit Anthropic's typed-block `system`
field with a `cache_control: ephemeral` marker, stamp the last entry
of the `tools` array (caches the whole tools array as a single span),
and anchor a rolling-transcript marker on the most recent "stable"
assistant turn — one whose tool_use blocks all have matching
downstream tool_result blocks. Never marks an in-flight tool call:
that prefix would invalidate on the next turn and defeat the point.
All gated on a 4096-byte serialised-size floor so short prefixes
can't trip Anthropic's per-model token minimum and 400 the request.
Disable with HARNESS_ANTHROPIC_CACHE=0.
- provider-{anthropic,openai}/crates/provider-base/src/openai_compat.rs:
extend merge_usage to read OpenAI's prompt_tokens_details.cached_tokens
into Usage.cache_read. OpenAI caches prefixes >1024 tokens
unilaterally; this just surfaces the existing savings to dashboards.
## 2. Tool-result truncation + result::fetch escape hatch
- turn-orchestrator/src/states/functions.rs: cap each FunctionResult at
HARNESS_RESULT_TRUNCATE_BYTES (default 8 KB serialised). Oversized
payloads get stashed at session/<id>/result/<call_id>; the in-stream
content becomes a head+tail-elided preview plus a marker that tells
the model how to recover the full payload. UTF-8 char-boundary
helpers ensure the head/tail cuts never split a codepoint.
- turn-orchestrator/src/agent_call.rs: intercept the virtual
result::fetch function inside dispatch() before bus routing. Not
registered as an iii function (no engine::functions::list exposure),
discoverable only via the truncation marker plus a recovery clause
in agent_call_tool()'s description. The clause lives in the tool
description, not the system prompt — system_prompt::BASE_BODY has an
explicit test forbidding prose there.
- harness-types/src/function.rs (all 6 vendored copies kept
byte-identical): add Option<TruncationInfo> to FunctionResult with
#[serde(default, skip_serializing_if)] so the wire format stays
backward-compatible with existing sessions.
## 3. context-compaction worker
New crate. Subscribes to agent::events on the iii bus, watches every
TurnEnd for transcript size (sum of input + output + cache_read — the
true context-window pressure metric, not just the uncached portion),
and when over COMPACT_TRIGGER_TOKENS (default 60k) runs an anchored-
summary compaction through router::stream_assistant against a cheap
fast model (default claude-haiku-4-5). The summariser receives the
older prefix; the trailing COMPACT_KEEP_RECENT_TURNS (default 3) turns
stay verbatim so the model retains its immediate working memory.
Concurrency: nonce-and-readback lease at
session/<id>/compaction_lease. The engine's state::* ops have no CAS
primitive, so each acquisition writes a unique pid-nanos-counter nonce
and confirms ownership via readback — state::set is last-write-wins,
exactly one writer sees its own nonce survive. TTL is 300s,
comfortably above the 120s summariser timeout so a slow LLM call
can't expire its own lease and let a peer start a duplicate
compaction. release_lease only clears the slot when the stored nonce
is still ours, so a TTL-expired predecessor can't wipe its successor.
read_lease_timestamp_secs accepts both the new {nonce, ts} shape and
the legacy bare-i64 shape so a worker upgraded mid-flight reads the
prior lease correctly.
Reload protocol: when a compaction lands, the worker stamps
session/<id>/last_compaction_at with chrono::Utc::now().timestamp_millis().
The orchestrator's handle_streaming rebuilds the hot
session/<id>/messages key from session-tree (whose load_messages
already filters Compaction entries out of the active path) when
last_compaction_at > last_compaction_consumed_at. Single writer per
key — no race between orchestrator and compactor.
Bundled via the harness Makefile (`make compaction`) as a background
process — not added to iii.worker.yaml dependencies because the
upstream worker registry doesn't index it yet. Mirrors the
iii-observability "optional, side-of-config" pattern; PID file lives
alongside engine.pid so `make stop` cleans it up uniformly.
## Tests
- provider-anthropic: +13 unit tests (cache marker placement,
in-flight tool_use guard, eligibility floor, OpenAI cached_tokens
surfacing on both vendored provider-base copies).
- turn-orchestrator: +6 unit + 1 integration (truncation render,
UTF-8 char-boundary helpers, result::fetch description recovery
clause, compaction watermark key namespacing).
- context-compaction: 27 unit + 1 integration. Nonce uniqueness
asserted via a 1000-call collision sweep; timestamp parsing pinned
for both shapes; the LEASE_TTL_SECS > SUMMARIZER_TIMEOUT_MS/1000
invariant pinned as a test so future drift trips CI not prod. The
manifest smoke test asserts functions=[] (the compactor is not
LLM-facing — registering one would expose it to
engine::functions::list) and that "agent::events" stays in the
subscription list.
- 170 unit tests pass across the four most-changed crates.
## Out of scope (deliberate)
- BASE_BODY system-prompt edits — replaced by the agent_call_tool()
description clause for the reasons noted above.
- Bus-mocked tests for acquire_lease / summarize_and_append /
handle_event / maybe_truncate_result — the async functions touching
the iii bus are unit-untested because the repo has no III mock
pattern. The two live-engine integration suites
(trace_correlation.rs, dual_write.rs) are environment-bound and
intermittently flaky; a proper mock surface is a separate piece of
work.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
✅ Files skipped from review due to trivial changes (1)
📝 WalkthroughWalkthroughAdds a new context-compaction worker that subscribes to turn-end events, summarizes older turns when token thresholds are exceeded, appends session-tree compaction entries, introduces cross-crate function-result truncation with recovery, and implements Anthropic prompt-caching and related orchestration wiring. ChangesContext compaction and function truncation
Sequence DiagramsequenceDiagram
participant Agent as agent::events
participant Compactor as context-compaction
participant Router as router::stream_assistant
participant SessionTree as session-tree
participant Orchestrator as turn-orchestrator
Agent->>Compactor: TurnEnd (usage metrics)
Compactor->>Compactor: sum tokens
Compactor->>Compactor: check threshold
Compactor->>Compactor: acquire lease
Compactor->>Router: summarize older messages (stream_assistant)
Router-->>Compactor: summary text
Compactor->>SessionTree: session-tree::compact(summary, tokens_before)
Compactor->>Orchestrator: set session/<id>/last_compaction_at
Orchestrator->>Orchestrator: maybe_reload_after_compaction -> rebuild messages
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
skill-check — worker2 verified, 24 skipped (no docs/).
Three for three. Nicely done. |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (2)
context-compaction/src/lib.rs (1)
250-274: ⚡ Quick winConsider adding explicit timeouts to state operations.
The
state_getandstate_setcalls don't specifytimeout_ms. If the state service becomes unresponsive, these operations could block indefinitely, potentially exhausting worker threads or delaying compaction. Adding explicit timeouts (e.g., 5-10 seconds) would make the worker more resilient to downstream service degradation.⏱️ Proposed fix to add timeouts
async fn state_get(iii: &III, key: &str) -> Option<Value> { iii.trigger(TriggerRequest { function_id: "state::get".into(), payload: json!({ "scope": STATE_SCOPE, "key": key }), action: None, - timeout_ms: None, + timeout_ms: Some(5_000), }) .await .ok() .filter(|v| !v.is_null()) } async fn state_set(iii: &III, key: &str, value: Value) { if let Err(e) = iii .trigger(TriggerRequest { function_id: "state::set".into(), payload: json!({ "scope": STATE_SCOPE, "key": key, "value": value }), action: None, - timeout_ms: None, + timeout_ms: Some(5_000), }) .await🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@context-compaction/src/lib.rs` around lines 250 - 274, state_get and state_set call iii.trigger without a timeout_ms, risking indefinite blocking; update both TriggerRequest constructions in the state_get and state_set functions to set timeout_ms to a reasonable explicit value (e.g., Some(5000) or Some(10000)) so the trigger future fails fast on unresponsive state service, and ensure error handling/logging remains unchanged for state_set and state_get's .ok().filter(...) path.provider-router/crates/harness-types/src/function.rs (1)
112-138: ⚡ Quick winConsider adding serialization tests for the new truncation fields.
The existing test suite validates
FunctionCallroundtrip serialization but doesn't cover the newFunctionResult.truncatedfield orTruncationInfostruct. Adding tests would ensure serde attributes work correctly and maintain backward compatibility.🧪 Example test to add
#[test] fn function_result_with_truncation_roundtrip() { let result = FunctionResult { content: vec![], details: serde_json::json!({}), terminate: false, truncated: Some(TruncationInfo { original_bytes: 16384, call_id: "call_123".into(), }), }; let json = serde_json::to_string(&result).unwrap(); let back: FunctionResult = serde_json::from_str(&json).unwrap(); assert_eq!(result, back); } #[test] fn function_result_without_truncation_omits_field() { let result = FunctionResult { content: vec![], details: serde_json::json!({}), terminate: false, truncated: None, }; let json = serde_json::to_string(&result).unwrap(); assert!(!json.contains("truncated")); }Apply similar tests to
provider-anthropic/crates/harness-types,provider-openai/crates/harness-types, andharness/crates/harness-types.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@provider-router/crates/harness-types/src/function.rs` around lines 112 - 138, Add serde round-trip tests for the new truncation fields: create a test that constructs a FunctionResult with truncated: Some(TruncationInfo { original_bytes: ..., call_id: ... }), serializes with serde_json::to_string, deserializes back and asserts equality (e.g., test name function_result_with_truncation_roundtrip) and another test that constructs FunctionResult with truncated: None, serializes and asserts the resulting JSON does not contain the "truncated" field (e.g., function_result_without_truncation_omits_field); place these alongside the existing FunctionCall tests and reference the FunctionResult and TruncationInfo types so serde attributes and backward-compat behavior are validated.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@context-compaction/Cargo.toml`:
- Around line 1-2: The Cargo.toml contains an empty [workspace] table that
should be removed or clarified; either delete the standalone "[workspace]"
section from the context-compaction crate if it is not a workspace root, or
replace it with an explicit members = [...] list if this crate is intended to be
a workspace root—update the Cargo.toml accordingly (remove the empty table or
add concrete members) so the workspace declaration is not a no-op.
In `@harness/Makefile`:
- Line 118: The compaction Makefile target lacks the documented dependency on
the engine; update the compaction target (symbol: compaction) to depend on the
engine target so the engine is started before the compaction worker runs
(currently it only depends on ensure-dirs). Modify the dependency list for
compaction to include engine (e.g., change the target declaration from
"compaction: ensure-dirs" to include engine) so invoking make compaction will
start the engine first and avoid websocket connection failures.
In `@turn-orchestrator/src/persistence.rs`:
- Around line 45-111: The function maybe_reload_after_compaction early-returns
silently when the rebuilt Vec is empty; add a warning log just before the if
rebuilt.is_empty() return to surface unexpected empty reloads, including
session_id and the last compaction watermark (use last and/or keys from
last_compaction_at_key and last_compaction_consumed_at_key) so operators can
diagnose; place the tracing::warn! call immediately before the existing return
in the rebuilt.is_empty() branch in maybe_reload_after_compaction and do not
change the control flow or return value.
In `@turn-orchestrator/src/states/functions.rs`:
- Around line 48-84: The truncation path in maybe_truncate_result currently
calls persistence::save_full_result without checking its outcome, which can
cause a missing persisted payload while returning a truncated
FunctionResult/TruncationInfo; update persistence::save_full_result to return a
Result<(), E> (propagating underlying errors) and in maybe_truncate_result await
that Result and only proceed to construct and return the truncated
FunctionResult (with TruncationInfo) if save_full_result returns Ok(()); on
Err(_) simply return the original result so we never advertise a fetchable
truncated payload.
---
Nitpick comments:
In `@context-compaction/src/lib.rs`:
- Around line 250-274: state_get and state_set call iii.trigger without a
timeout_ms, risking indefinite blocking; update both TriggerRequest
constructions in the state_get and state_set functions to set timeout_ms to a
reasonable explicit value (e.g., Some(5000) or Some(10000)) so the trigger
future fails fast on unresponsive state service, and ensure error
handling/logging remains unchanged for state_set and state_get's
.ok().filter(...) path.
In `@provider-router/crates/harness-types/src/function.rs`:
- Around line 112-138: Add serde round-trip tests for the new truncation fields:
create a test that constructs a FunctionResult with truncated:
Some(TruncationInfo { original_bytes: ..., call_id: ... }), serializes with
serde_json::to_string, deserializes back and asserts equality (e.g., test name
function_result_with_truncation_roundtrip) and another test that constructs
FunctionResult with truncated: None, serializes and asserts the resulting JSON
does not contain the "truncated" field (e.g.,
function_result_without_truncation_omits_field); place these alongside the
existing FunctionCall tests and reference the FunctionResult and TruncationInfo
types so serde attributes and backward-compat behavior are validated.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b135c810-8dc5-456d-a4b9-49f16e55b7c4
⛔ Files ignored due to path filters (1)
context-compaction/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (36)
context-compaction/Cargo.tomlcontext-compaction/README.mdcontext-compaction/build.rscontext-compaction/iii.worker.yamlcontext-compaction/prompts/compaction.txtcontext-compaction/skill.mdcontext-compaction/src/config.rscontext-compaction/src/lib.rscontext-compaction/src/main.rscontext-compaction/src/manifest.rscontext-compaction/src/summarize.rscontext-compaction/src/threshold.rscontext-compaction/tests/manifest.rsharness/Makefileharness/crates/harness-types/src/function.rsharness/crates/harness-types/src/lib.rsprovider-anthropic/crates/harness-types/src/function.rsprovider-anthropic/crates/harness-types/src/lib.rsprovider-anthropic/crates/provider-base/src/openai_compat.rsprovider-anthropic/src/lib.rsprovider-openai/crates/harness-types/src/function.rsprovider-openai/crates/harness-types/src/lib.rsprovider-openai/crates/provider-base/src/openai_compat.rsprovider-router/crates/harness-types/src/function.rsprovider-router/crates/harness-types/src/lib.rssession/crates/harness-types/src/function.rssession/crates/harness-types/src/lib.rsturn-orchestrator/crates/harness-types/src/function.rsturn-orchestrator/crates/harness-types/src/lib.rsturn-orchestrator/src/agent_call.rsturn-orchestrator/src/lib.rsturn-orchestrator/src/persistence.rsturn-orchestrator/src/state.rsturn-orchestrator/src/states/assistant.rsturn-orchestrator/src/states/functions.rsturn-orchestrator/tests/integration.rs
| [workspace] | ||
|
|
There was a problem hiding this comment.
Remove or clarify the empty workspace declaration.
An empty [workspace] section with no members serves no purpose. If context-compaction is intended to be a workspace member, this section should be removed (the parent workspace will manage it). If it's a standalone crate, this section is unnecessary.
🔧 Suggested fix
-[workspace]
-
[package]
name = "context-compaction"🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@context-compaction/Cargo.toml` around lines 1 - 2, The Cargo.toml contains an
empty [workspace] table that should be removed or clarified; either delete the
standalone "[workspace]" section from the context-compaction crate if it is not
a workspace root, or replace it with an explicit members = [...] list if this
crate is intended to be a workspace root—update the Cargo.toml accordingly
(remove the empty table or add concrete members) so the workspace declaration is
not a no-op.
| # Requires the engine to be up first. The PID file lives under $(PIDS_DIR) | ||
| # alongside `engine.pid` / `web.pid` so `make stop` cleans it up uniformly. | ||
| COMPACTION_BIN := $(WORKERS_BIN)/context-compaction | ||
| compaction: ensure-dirs |
There was a problem hiding this comment.
Add engine dependency to enforce documented requirement.
The comment (lines 107-116) documents that the compaction worker "requires the engine to be up first," but the target doesn't enforce this dependency. When running make compaction directly (bypassing make all), the engine might not be running, causing the worker to fail during WebSocket connection with confusing errors.
🔗 Proposed fix
-compaction: ensure-dirs
+compaction: ensure-dirs engine📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| compaction: ensure-dirs | |
| compaction: ensure-dirs engine |
🧰 Tools
🪛 checkmake (0.3.2)
[warning] 118-118: Target body for "compaction" exceeds allowed length of 5 lines (14).
(maxbodylength)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/Makefile` at line 118, The compaction Makefile target lacks the
documented dependency on the engine; update the compaction target (symbol:
compaction) to depend on the engine target so the engine is started before the
compaction worker runs (currently it only depends on ensure-dirs). Modify the
dependency list for compaction to include engine (e.g., change the target
declaration from "compaction: ensure-dirs" to include engine) so invoking make
compaction will start the engine first and avoid websocket connection failures.
| pub async fn maybe_reload_after_compaction(iii: &III, session_id: &str) { | ||
| let last_key = crate::state::last_compaction_at_key(session_id); | ||
| let watermark_key = crate::state::last_compaction_consumed_at_key(session_id); | ||
| let last = state_get(iii, &last_key) | ||
| .await | ||
| .and_then(|v| v.as_i64()) | ||
| .unwrap_or(0); | ||
| if last == 0 { | ||
| return; | ||
| } | ||
| let consumed = state_get(iii, &watermark_key) | ||
| .await | ||
| .and_then(|v| v.as_i64()) | ||
| .unwrap_or(0); | ||
| if last <= consumed { | ||
| return; | ||
| } | ||
| let resp = match iii | ||
| .trigger(TriggerRequest { | ||
| function_id: "session-tree::messages".into(), | ||
| payload: json!({ "session_id": session_id }), | ||
| action: None, | ||
| timeout_ms: Some(10_000), | ||
| }) | ||
| .await | ||
| { | ||
| Ok(v) => v, | ||
| Err(e) => { | ||
| tracing::warn!(error = %e, %session_id, "reload-after-compaction: session-tree::messages failed"); | ||
| return; | ||
| } | ||
| }; | ||
| // `session-tree::messages` returns `{messages: [{entry_id, message}, ...]}`. | ||
| let Some(rows) = resp.get("messages").and_then(|v| v.as_array()) else { | ||
| return; | ||
| }; | ||
| let mut rebuilt: Vec<AgentMessage> = Vec::with_capacity(rows.len()); | ||
| for row in rows { | ||
| let msg = row.get("message").cloned().unwrap_or(JsonValue::Null); | ||
| if msg.is_null() { | ||
| continue; | ||
| } | ||
| match serde_json::from_value::<AgentMessage>(msg) { | ||
| Ok(m) => rebuilt.push(m), | ||
| Err(e) => { | ||
| tracing::warn!(error = %e, %session_id, "reload-after-compaction: failed to decode AgentMessage; aborting reload"); | ||
| return; | ||
| } | ||
| } | ||
| } | ||
| if rebuilt.is_empty() { | ||
| return; | ||
| } | ||
| if let Ok(value) = serde_json::to_value(&rebuilt) { | ||
| state_set(iii, &messages_key(session_id), value).await; | ||
| } | ||
| // Reset the mirror watermark so future `save_messages` calls don't try | ||
| // to re-append messages the tree already has. | ||
| let mirror_key = crate::state::last_session_tree_len_key(session_id); | ||
| state_set(iii, &mirror_key, json!(rebuilt.len() as u64)).await; | ||
| state_set(iii, &watermark_key, json!(last)).await; | ||
| tracing::info!( | ||
| %session_id, | ||
| new_len = rebuilt.len(), | ||
| "context-compaction landed; reloaded messages from session-tree" | ||
| ); | ||
| } |
There was a problem hiding this comment.
Consider logging when compaction reload yields no messages.
Lines 95-97 return early when rebuilt is empty without logging. If last_compaction_at is set, we expect the session-tree to contain compacted messages. An empty result after decoding might indicate:
- All messages in the tree were null (line 84-86)
- All messages failed to decode (though line 90-92 returns early and logs for that case)
Adding a warning would improve observability for edge cases where compaction appears to succeed but no messages are recovered.
📊 Proposed observability improvement
}
if rebuilt.is_empty() {
+ tracing::warn!(
+ %session_id,
+ last_compaction = last,
+ "reload-after-compaction: session-tree returned no decodable messages despite compaction watermark"
+ );
return;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn maybe_reload_after_compaction(iii: &III, session_id: &str) { | |
| let last_key = crate::state::last_compaction_at_key(session_id); | |
| let watermark_key = crate::state::last_compaction_consumed_at_key(session_id); | |
| let last = state_get(iii, &last_key) | |
| .await | |
| .and_then(|v| v.as_i64()) | |
| .unwrap_or(0); | |
| if last == 0 { | |
| return; | |
| } | |
| let consumed = state_get(iii, &watermark_key) | |
| .await | |
| .and_then(|v| v.as_i64()) | |
| .unwrap_or(0); | |
| if last <= consumed { | |
| return; | |
| } | |
| let resp = match iii | |
| .trigger(TriggerRequest { | |
| function_id: "session-tree::messages".into(), | |
| payload: json!({ "session_id": session_id }), | |
| action: None, | |
| timeout_ms: Some(10_000), | |
| }) | |
| .await | |
| { | |
| Ok(v) => v, | |
| Err(e) => { | |
| tracing::warn!(error = %e, %session_id, "reload-after-compaction: session-tree::messages failed"); | |
| return; | |
| } | |
| }; | |
| // `session-tree::messages` returns `{messages: [{entry_id, message}, ...]}`. | |
| let Some(rows) = resp.get("messages").and_then(|v| v.as_array()) else { | |
| return; | |
| }; | |
| let mut rebuilt: Vec<AgentMessage> = Vec::with_capacity(rows.len()); | |
| for row in rows { | |
| let msg = row.get("message").cloned().unwrap_or(JsonValue::Null); | |
| if msg.is_null() { | |
| continue; | |
| } | |
| match serde_json::from_value::<AgentMessage>(msg) { | |
| Ok(m) => rebuilt.push(m), | |
| Err(e) => { | |
| tracing::warn!(error = %e, %session_id, "reload-after-compaction: failed to decode AgentMessage; aborting reload"); | |
| return; | |
| } | |
| } | |
| } | |
| if rebuilt.is_empty() { | |
| return; | |
| } | |
| if let Ok(value) = serde_json::to_value(&rebuilt) { | |
| state_set(iii, &messages_key(session_id), value).await; | |
| } | |
| // Reset the mirror watermark so future `save_messages` calls don't try | |
| // to re-append messages the tree already has. | |
| let mirror_key = crate::state::last_session_tree_len_key(session_id); | |
| state_set(iii, &mirror_key, json!(rebuilt.len() as u64)).await; | |
| state_set(iii, &watermark_key, json!(last)).await; | |
| tracing::info!( | |
| %session_id, | |
| new_len = rebuilt.len(), | |
| "context-compaction landed; reloaded messages from session-tree" | |
| ); | |
| } | |
| pub async fn maybe_reload_after_compaction(iii: &III, session_id: &str) { | |
| let last_key = crate::state::last_compaction_at_key(session_id); | |
| let watermark_key = crate::state::last_compaction_consumed_at_key(session_id); | |
| let last = state_get(iii, &last_key) | |
| .await | |
| .and_then(|v| v.as_i64()) | |
| .unwrap_or(0); | |
| if last == 0 { | |
| return; | |
| } | |
| let consumed = state_get(iii, &watermark_key) | |
| .await | |
| .and_then(|v| v.as_i64()) | |
| .unwrap_or(0); | |
| if last <= consumed { | |
| return; | |
| } | |
| let resp = match iii | |
| .trigger(TriggerRequest { | |
| function_id: "session-tree::messages".into(), | |
| payload: json!({ "session_id": session_id }), | |
| action: None, | |
| timeout_ms: Some(10_000), | |
| }) | |
| .await | |
| { | |
| Ok(v) => v, | |
| Err(e) => { | |
| tracing::warn!(error = %e, %session_id, "reload-after-compaction: session-tree::messages failed"); | |
| return; | |
| } | |
| }; | |
| // `session-tree::messages` returns `{messages: [{entry_id, message}, ...]}`. | |
| let Some(rows) = resp.get("messages").and_then(|v| v.as_array()) else { | |
| return; | |
| }; | |
| let mut rebuilt: Vec<AgentMessage> = Vec::with_capacity(rows.len()); | |
| for row in rows { | |
| let msg = row.get("message").cloned().unwrap_or(JsonValue::Null); | |
| if msg.is_null() { | |
| continue; | |
| } | |
| match serde_json::from_value::<AgentMessage>(msg) { | |
| Ok(m) => rebuilt.push(m), | |
| Err(e) => { | |
| tracing::warn!(error = %e, %session_id, "reload-after-compaction: failed to decode AgentMessage; aborting reload"); | |
| return; | |
| } | |
| } | |
| } | |
| if rebuilt.is_empty() { | |
| tracing::warn!( | |
| %session_id, | |
| last_compaction = last, | |
| "reload-after-compaction: session-tree returned no decodable messages despite compaction watermark" | |
| ); | |
| return; | |
| } | |
| if let Ok(value) = serde_json::to_value(&rebuilt) { | |
| state_set(iii, &messages_key(session_id), value).await; | |
| } | |
| // Reset the mirror watermark so future `save_messages` calls don't try | |
| // to re-append messages the tree already has. | |
| let mirror_key = crate::state::last_session_tree_len_key(session_id); | |
| state_set(iii, &mirror_key, json!(rebuilt.len() as u64)).await; | |
| state_set(iii, &watermark_key, json!(last)).await; | |
| tracing::info!( | |
| %session_id, | |
| new_len = rebuilt.len(), | |
| "context-compaction landed; reloaded messages from session-tree" | |
| ); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@turn-orchestrator/src/persistence.rs` around lines 45 - 111, The function
maybe_reload_after_compaction early-returns silently when the rebuilt Vec is
empty; add a warning log just before the if rebuilt.is_empty() return to surface
unexpected empty reloads, including session_id and the last compaction watermark
(use last and/or keys from last_compaction_at_key and
last_compaction_consumed_at_key) so operators can diagnose; place the
tracing::warn! call immediately before the existing return in the
rebuilt.is_empty() branch in maybe_reload_after_compaction and do not change the
control flow or return value.
| async fn maybe_truncate_result( | ||
| iii: &III, | ||
| session_id: &str, | ||
| call_id: &str, | ||
| result: FunctionResult, | ||
| ) -> FunctionResult { | ||
| let threshold = truncate_threshold(); | ||
| let serialized_size = match serde_json::to_string(&result) { | ||
| Ok(s) => s.len(), | ||
| Err(_) => return result, | ||
| }; | ||
| if serialized_size <= threshold { | ||
| return result; | ||
| } | ||
| // Persist the full payload first; if state::set fails, fall through and | ||
| // return the original result so we never lose data. | ||
| let full_json = match serde_json::to_value(&result) { | ||
| Ok(v) => v, | ||
| Err(_) => return result, | ||
| }; | ||
| persistence::save_full_result(iii, session_id, call_id, &full_json).await; | ||
|
|
||
| let summary_text = render_truncated_text(&result, serialized_size, call_id); | ||
| FunctionResult { | ||
| content: vec![ContentBlock::Text(TextContent { text: summary_text })], | ||
| details: json!({ | ||
| "truncated": true, | ||
| "original_bytes": serialized_size, | ||
| "call_id": call_id, | ||
| }), | ||
| terminate: result.terminate, | ||
| truncated: Some(TruncationInfo { | ||
| original_bytes: serialized_size as u64, | ||
| call_id: call_id.to_string(), | ||
| }), | ||
| } | ||
| } |
There was a problem hiding this comment.
Check for persistence failure before returning truncated result.
Line 68 calls save_full_result but doesn't await or check the result. If state::set fails internally, the full payload is lost but the function still returns a truncated FunctionResult with TruncationInfo. The model will see the truncation marker and attempt to call result::fetch, which will fail with result_not_found. This violates the recovery contract.
🛡️ Proposed fix to verify persistence before truncating
Modify save_full_result to return a Result<(), ()> and check it:
- persistence::save_full_result(iii, session_id, call_id, &full_json).await;
+ if persistence::save_full_result(iii, session_id, call_id, &full_json).await.is_err() {
+ // If we can't persist the full payload, return the original result
+ // rather than a truncated one the model can't recover.
+ return result;
+ }
let summary_text = render_truncated_text(&result, serialized_size, call_id);And update persistence::save_full_result to propagate errors:
-pub async fn save_full_result(iii: &III, session_id: &str, call_id: &str, payload: &Value) {
+pub async fn save_full_result(iii: &III, session_id: &str, call_id: &str, payload: &Value) -> Result<(), ()> {
let key = staging_key(session_id, &format!("result/{call_id}"));
- state_set(iii, &key, payload.clone()).await;
+ state_set(iii, &key, payload.clone()).await.map_err(|_| ())
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@turn-orchestrator/src/states/functions.rs` around lines 48 - 84, The
truncation path in maybe_truncate_result currently calls
persistence::save_full_result without checking its outcome, which can cause a
missing persisted payload while returning a truncated
FunctionResult/TruncationInfo; update persistence::save_full_result to return a
Result<(), E> (propagating underlying errors) and in maybe_truncate_result await
that Result and only proceed to construct and return the truncated
FunctionResult (with TruncationInfo) if save_full_result returns Ok(()); on
Err(_) simply return the original result so we never advertise a fetchable
truncated payload.
Adds context-compaction to the two worker-name lists in the CI release flow: - release.yml: adds 'context-compaction/v*' to the on-push tag triggers so pushing a tag of that shape kicks off the binary build, GitHub release creation, and registry publish (parse_release_tag.py already understands the tag, picks up deploy=binary from iii.worker.yaml). - create-tag.yml: adds 'context-compaction' to the workflow_dispatch worker choice so the standard "Bump version & create tag" UI can cut releases. Both lists are alphabetical; the new entry slots between 'auth-credentials' and the harness fan-out comment / harness entry. The worker's iii.worker.yaml already declares `deploy: binary` and a matching `bin: context-compaction`, so no further pipeline configuration is needed — the existing _rust-binary.yml reusable handles the build.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Summary
Three independent, env-gated token-reduction interventions on the harness chat agent.
1. Anthropic prompt caching + OpenAI cache surfacing —
provider-anthropic/src/lib.rsemits the typed-blocksystemfield withcache_control: ephemeral, marks the lasttoolsentry, and anchors a rolling-transcript marker on the most recent stable assistant turn (skips in-flight tool calls). 4096-byte floor avoids 400s on too-short prefixes.provider-base/openai_compat.rssurfaces OpenAI's automaticcached_tokensintoUsage.cache_read. Disable withHARNESS_ANTHROPIC_CACHE=0.2. Tool-result truncation +
result::fetchescape hatch —turn-orchestrator/src/states/functions.rscaps eachFunctionResultatHARNESS_RESULT_TRUNCATE_BYTES(default 8 KB); oversized payloads get stashed atsession/<id>/result/<call_id>with a head+tail-elided preview. The model recovers via the virtualresult::fetchfunction intercepted inagent_call::dispatchbefore bus routing — not exposed viaengine::functions::list, only discoverable from the marker text + a recovery clause inagent_call_tool()'s description.3.
context-compactionworker — new crate. Subscribes toagent::eventson the iii bus, watchesTurnEndfor transcript size (input + output + cache_read), and when overCOMPACT_TRIGGER_TOKENS(default 60k) runs an anchored-summary compaction throughrouter::stream_assistantagainst a cheap fast model (defaultclaude-haiku-4-5). Concurrency: nonce-and-readback lease — the engine has no CAS primitive, so each acquisition writes a uniquepid-nanos-counternonce and confirms ownership via readback (last-write-wins; exactly one writer sees its own nonce survive). TTL 300s, above the 120s summariser timeout. Bundled via the harnessMakefile(make compaction) as a background process — not iniii.worker.yamldependencies:because the upstream registry doesn't index it yet.Tests: +49 unit tests, +1 integration test, 170 unit tests pass across the four most-changed crates. CI's
validate_worker.pyexits 0 forcontext-compaction(README.mdandtests/manifest.rsland in this PR). See the squashed commit message for the full design rationale, including the deliberate "out of scope" decisions.Test plan
make alland confirm the harness boots withcontext-compactionalongside the engine (make logs W=context-compaction)AssistantMessage.usage— turn 2 onward should show non-zerocache_readfor Anthropic; OpenAI should show non-zero after the first turn if prefix >1024 tokensshell::fs::read) and verify the[result truncated …]marker appears in the transcript; then ask the agent to fetch the full output and confirmresult::fetchreturns it verbatimCOMPACT_TRIGGER_TOKENS=5000and run a long-enough session — verify aCompactionentry appears in the session-tree and the next turn'smessagesarray shrinksHARNESS_ANTHROPIC_CACHE=0removescache_controlmarkers from the outgoing wire bodyTurnEnd, verify the lease grants to exactly one (the loser logscompaction lease held; skipping)Summary by CodeRabbit