Skip to content

feat: cut per-turn LLM token cost via caching, truncation, and compaction#140

Open
andersonleal wants to merge 2 commits into
mainfrom
feat/token-reduction
Open

feat: cut per-turn LLM token cost via caching, truncation, and compaction#140
andersonleal wants to merge 2 commits into
mainfrom
feat/token-reduction

Conversation

@andersonleal
Copy link
Copy Markdown
Collaborator

@andersonleal andersonleal commented May 15, 2026

Summary

Three independent, env-gated token-reduction interventions on the harness chat agent.

1. Anthropic prompt caching + OpenAI cache surfacingprovider-anthropic/src/lib.rs emits the typed-block system field with cache_control: ephemeral, marks the last tools entry, and anchors a rolling-transcript marker on the most recent stable assistant turn (skips in-flight tool calls). 4096-byte floor avoids 400s on too-short prefixes. provider-base/openai_compat.rs surfaces OpenAI's automatic cached_tokens into Usage.cache_read. Disable with HARNESS_ANTHROPIC_CACHE=0.

2. Tool-result truncation + result::fetch escape hatchturn-orchestrator/src/states/functions.rs caps each FunctionResult at HARNESS_RESULT_TRUNCATE_BYTES (default 8 KB); oversized payloads get stashed at session/<id>/result/<call_id> with a head+tail-elided preview. The model recovers via the virtual result::fetch function intercepted in agent_call::dispatch before bus routing — not exposed via engine::functions::list, only discoverable from the marker text + a recovery clause in agent_call_tool()'s description.

3. context-compaction worker — new crate. Subscribes to agent::events on the iii bus, watches TurnEnd for transcript size (input + output + cache_read), and when over COMPACT_TRIGGER_TOKENS (default 60k) runs an anchored-summary compaction through router::stream_assistant against a cheap fast model (default claude-haiku-4-5). Concurrency: nonce-and-readback lease — the engine has no CAS primitive, so each acquisition writes a unique pid-nanos-counter nonce and confirms ownership via readback (last-write-wins; exactly one writer sees its own nonce survive). TTL 300s, above the 120s summariser timeout. Bundled via the harness Makefile (make compaction) as a background process — not in iii.worker.yaml dependencies: because the upstream registry doesn't index it yet.

Tests: +49 unit tests, +1 integration test, 170 unit tests pass across the four most-changed crates. CI's validate_worker.py exits 0 for context-compaction (README.md and tests/manifest.rs land in this PR). See the squashed commit message for the full design rationale, including the deliberate "out of scope" decisions.

Test plan

  • Run make all and confirm the harness boots with context-compaction alongside the engine (make logs W=context-compaction)
  • Run a 5+ turn conversation, inspect AssistantMessage.usage — turn 2 onward should show non-zero cache_read for Anthropic; OpenAI should show non-zero after the first turn if prefix >1024 tokens
  • Trigger a large tool result (e.g. read a >8 KB file via shell::fs::read) and verify the [result truncated …] marker appears in the transcript; then ask the agent to fetch the full output and confirm result::fetch returns it verbatim
  • Temporarily set COMPACT_TRIGGER_TOKENS=5000 and run a long-enough session — verify a Compaction entry appears in the session-tree and the next turn's messages array shrinks
  • Manual: confirm HARNESS_ANTHROPIC_CACHE=0 removes cache_control markers from the outgoing wire body
  • Manual: run two context-compaction processes against the same engine, fire a contending TurnEnd, verify the lease grants to exactly one (the loser logs compaction lease held; skipping)

Summary by CodeRabbit

  • New Features
    • Session history compaction worker: out-of-band summarization of older turns when token thresholds are exceeded; configurable thresholds, recent-turn retention, and summarizer provider/model.
    • Anthropic prompt caching: smarter request shaping and cache markers to reduce latency and token usage.
    • Tool result truncation + recovery: large function results are truncated to save context; full payloads can be fetched on demand.

Review Change Stack

…tion

Three independent token-reduction interventions on the harness chat
agent. Every long-running conversation currently re-ships the full
transcript uncached, untruncated, and unsummarised on every LLM call;
each piece below attacks a different cost driver. All three ship behind
their own env-var flag so any one can be disabled without a rebuild.

## 1. Anthropic prompt caching + OpenAI cache surfacing

- provider-anthropic/src/lib.rs: emit Anthropic's typed-block `system`
  field with a `cache_control: ephemeral` marker, stamp the last entry
  of the `tools` array (caches the whole tools array as a single span),
  and anchor a rolling-transcript marker on the most recent "stable"
  assistant turn — one whose tool_use blocks all have matching
  downstream tool_result blocks. Never marks an in-flight tool call:
  that prefix would invalidate on the next turn and defeat the point.
  All gated on a 4096-byte serialised-size floor so short prefixes
  can't trip Anthropic's per-model token minimum and 400 the request.
  Disable with HARNESS_ANTHROPIC_CACHE=0.

- provider-{anthropic,openai}/crates/provider-base/src/openai_compat.rs:
  extend merge_usage to read OpenAI's prompt_tokens_details.cached_tokens
  into Usage.cache_read. OpenAI caches prefixes >1024 tokens
  unilaterally; this just surfaces the existing savings to dashboards.

## 2. Tool-result truncation + result::fetch escape hatch

- turn-orchestrator/src/states/functions.rs: cap each FunctionResult at
  HARNESS_RESULT_TRUNCATE_BYTES (default 8 KB serialised). Oversized
  payloads get stashed at session/<id>/result/<call_id>; the in-stream
  content becomes a head+tail-elided preview plus a marker that tells
  the model how to recover the full payload. UTF-8 char-boundary
  helpers ensure the head/tail cuts never split a codepoint.

- turn-orchestrator/src/agent_call.rs: intercept the virtual
  result::fetch function inside dispatch() before bus routing. Not
  registered as an iii function (no engine::functions::list exposure),
  discoverable only via the truncation marker plus a recovery clause
  in agent_call_tool()'s description. The clause lives in the tool
  description, not the system prompt — system_prompt::BASE_BODY has an
  explicit test forbidding prose there.

- harness-types/src/function.rs (all 6 vendored copies kept
  byte-identical): add Option<TruncationInfo> to FunctionResult with
  #[serde(default, skip_serializing_if)] so the wire format stays
  backward-compatible with existing sessions.

## 3. context-compaction worker

New crate. Subscribes to agent::events on the iii bus, watches every
TurnEnd for transcript size (sum of input + output + cache_read — the
true context-window pressure metric, not just the uncached portion),
and when over COMPACT_TRIGGER_TOKENS (default 60k) runs an anchored-
summary compaction through router::stream_assistant against a cheap
fast model (default claude-haiku-4-5). The summariser receives the
older prefix; the trailing COMPACT_KEEP_RECENT_TURNS (default 3) turns
stay verbatim so the model retains its immediate working memory.

Concurrency: nonce-and-readback lease at
session/<id>/compaction_lease. The engine's state::* ops have no CAS
primitive, so each acquisition writes a unique pid-nanos-counter nonce
and confirms ownership via readback — state::set is last-write-wins,
exactly one writer sees its own nonce survive. TTL is 300s,
comfortably above the 120s summariser timeout so a slow LLM call
can't expire its own lease and let a peer start a duplicate
compaction. release_lease only clears the slot when the stored nonce
is still ours, so a TTL-expired predecessor can't wipe its successor.
read_lease_timestamp_secs accepts both the new {nonce, ts} shape and
the legacy bare-i64 shape so a worker upgraded mid-flight reads the
prior lease correctly.

Reload protocol: when a compaction lands, the worker stamps
session/<id>/last_compaction_at with chrono::Utc::now().timestamp_millis().
The orchestrator's handle_streaming rebuilds the hot
session/<id>/messages key from session-tree (whose load_messages
already filters Compaction entries out of the active path) when
last_compaction_at > last_compaction_consumed_at. Single writer per
key — no race between orchestrator and compactor.

Bundled via the harness Makefile (`make compaction`) as a background
process — not added to iii.worker.yaml dependencies because the
upstream worker registry doesn't index it yet. Mirrors the
iii-observability "optional, side-of-config" pattern; PID file lives
alongside engine.pid so `make stop` cleans it up uniformly.

## Tests

- provider-anthropic: +13 unit tests (cache marker placement,
  in-flight tool_use guard, eligibility floor, OpenAI cached_tokens
  surfacing on both vendored provider-base copies).
- turn-orchestrator: +6 unit + 1 integration (truncation render,
  UTF-8 char-boundary helpers, result::fetch description recovery
  clause, compaction watermark key namespacing).
- context-compaction: 27 unit + 1 integration. Nonce uniqueness
  asserted via a 1000-call collision sweep; timestamp parsing pinned
  for both shapes; the LEASE_TTL_SECS > SUMMARIZER_TIMEOUT_MS/1000
  invariant pinned as a test so future drift trips CI not prod. The
  manifest smoke test asserts functions=[] (the compactor is not
  LLM-facing — registering one would expose it to
  engine::functions::list) and that "agent::events" stays in the
  subscription list.
- 170 unit tests pass across the four most-changed crates.

## Out of scope (deliberate)

- BASE_BODY system-prompt edits — replaced by the agent_call_tool()
  description clause for the reasons noted above.
- Bus-mocked tests for acquire_lease / summarize_and_append /
  handle_event / maybe_truncate_result — the async functions touching
  the iii bus are unit-untested because the repo has no III mock
  pattern. The two live-engine integration suites
  (trace_correlation.rs, dual_write.rs) are environment-bound and
  intermittently flaky; a proper mock surface is a separate piece of
  work.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 15, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d70eb884-8df4-469d-b163-6a39e43f2b4e

📥 Commits

Reviewing files that changed from the base of the PR and between a91caab and 0795ac7.

📒 Files selected for processing (2)
  • .github/workflows/create-tag.yml
  • .github/workflows/release.yml
✅ Files skipped from review due to trivial changes (1)
  • .github/workflows/create-tag.yml

📝 Walkthrough

Walkthrough

Adds a new context-compaction worker that subscribes to turn-end events, summarizes older turns when token thresholds are exceeded, appends session-tree compaction entries, introduces cross-crate function-result truncation with recovery, and implements Anthropic prompt-caching and related orchestration wiring.

Changes

Context compaction and function truncation

Layer / File(s) Summary
Context-compaction worker packaging
context-compaction/Cargo.toml, context-compaction/build.rs, context-compaction/iii.worker.yaml, context-compaction/README.md, context-compaction/skill.md, context-compaction/src/main.rs, context-compaction/src/manifest.rs
Worker crate manifest, build script, CLI entry point, YAML deployment config, and inline/external documentation describing the compactor's subscription to agent::events, token-threshold triggering, and session-tree compaction entry appending.
Event handling and per-session lease coordination
context-compaction/src/config.rs, context-compaction/src/lib.rs, context-compaction/src/threshold.rs
Configuration helpers and core event handler that extracts token usage, checks thresholds, acquires nonce-based single-writer leases with TTL, triggers compaction, and stamps last_compaction_at; includes unit tests for payloads, usage summing, and lease invariants.
Summarization flow and session-tree append
context-compaction/src/summarize.rs
Loads active messages, splits into older/recent portions, calls router LLM with a compaction prompt, extracts summary text, appends a Compaction entry to session-tree, and stamps watermark; includes unit tests for splitting, prompt rendering, and extraction.
Function result truncation schema across crates
harness/crates/harness-types/src/function.rs, harness/crates/harness-types/src/lib.rs, provider-anthropic/crates/harness-types/src/function.rs, provider-anthropic/crates/harness-types/src/lib.rs, provider-openai/crates/harness-types/src/function.rs, provider-openai/crates/harness-types/src/lib.rs, provider-router/crates/harness-types/src/function.rs, provider-router/crates/harness-types/src/lib.rs, session/crates/harness-types/src/function.rs, session/crates/harness-types/src/lib.rs, turn-orchestrator/crates/harness-types/src/function.rs, turn-orchestrator/crates/harness-types/src/lib.rs
Adds optional truncated: Option<TruncationInfo> to FunctionResult with serde defaults and conditional serialization; introduces TruncationInfo carrying original_bytes and call_id, and re-exports across crate roots.
Function result truncation implementation
turn-orchestrator/src/states/functions.rs
Serializes large function results, compares against environment-configurable byte threshold, persists full payload under session/<id>/result/<call_id>, replaces in-stream result with compact preview containing TruncationInfo, and adds UTF-8-safe head/tail elision helpers with tests.
Truncated result recovery and agent tool mechanism
turn-orchestrator/src/agent_call.rs, turn-orchestrator/src/persistence.rs
Adds a virtual result::fetch tool to fetch stashed full results, updates tool description, consumes session_id for storage keys, and provides save_full_result/load_full_result helpers.
Compaction watermark coordination
turn-orchestrator/src/state.rs, turn-orchestrator/src/persistence.rs, turn-orchestrator/src/states/assistant.rs, turn-orchestrator/src/lib.rs
Exports last_compaction_at_key and last_compaction_consumed_at_key, implements maybe_reload_after_compaction to rebuild in-memory messages when a newer compaction exists, and calls the reload check during assistant streaming startup.
Anthropic prompt-caching support
provider-anthropic/crates/provider-base/src/openai_compat.rs, provider-anthropic/src/lib.rs
Extracts cached-token counts into Usage.cache_read, adds build_system_field, apply_tools_cache_control, and apply_messages_cache_anchor helpers, and integrates cache markers into outgoing Anthropic requests; includes tests.
Harness orchestration and testing
harness/Makefile, context-compaction/tests/manifest.rs, turn-orchestrator/tests/integration.rs, .github/workflows/create-tag.yml, .github/workflows/release.yml
Adds context-compaction to LOCAL_WORKERS, compaction make target (nohup/pidfile), updates all and logs targets; adds manifest smoke test and compaction watermark namespace/distinctness test; updates workflow dispatch options and release tags.

Sequence Diagram

sequenceDiagram
  participant Agent as agent::events
  participant Compactor as context-compaction
  participant Router as router::stream_assistant
  participant SessionTree as session-tree
  participant Orchestrator as turn-orchestrator

  Agent->>Compactor: TurnEnd (usage metrics)
  Compactor->>Compactor: sum tokens
  Compactor->>Compactor: check threshold
  Compactor->>Compactor: acquire lease
  Compactor->>Router: summarize older messages (stream_assistant)
  Router-->>Compactor: summary text
  Compactor->>SessionTree: session-tree::compact(summary, tokens_before)
  Compactor->>Orchestrator: set session/<id>/last_compaction_at
  Orchestrator->>Orchestrator: maybe_reload_after_compaction -> rebuild messages
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • iii-hq/workers#138: Shares Makefile wiring modifications for LOCAL_WORKERS and worker orchestration setup.
  • iii-hq/workers#80: Related to token-usage signal population (Usage.cache_read) used by the compaction trigger.

Suggested reviewers

  • sergiofilhowz
  • ytallo

Poem

🐇 I skitter through messages, quiet and neat,
Trimming old turns so the session stays fleet,
With nonces and leases I guard every race,
Stash full results for a tidy retrace,
Hooray — compacted history, light on its feet!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately captures the three main token-reduction interventions: caching (Anthropic/OpenAI), truncation (tool results), and compaction (session history). It directly reflects the PR's core objective.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/token-reduction

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 15, 2026

skill-check — worker

2 verified, 24 skipped (no docs/).

Layer Result
structure
vale
ai

Three for three. Nicely done.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
context-compaction/src/lib.rs (1)

250-274: ⚡ Quick win

Consider adding explicit timeouts to state operations.

The state_get and state_set calls don't specify timeout_ms. If the state service becomes unresponsive, these operations could block indefinitely, potentially exhausting worker threads or delaying compaction. Adding explicit timeouts (e.g., 5-10 seconds) would make the worker more resilient to downstream service degradation.

⏱️ Proposed fix to add timeouts
 async fn state_get(iii: &III, key: &str) -> Option<Value> {
     iii.trigger(TriggerRequest {
         function_id: "state::get".into(),
         payload: json!({ "scope": STATE_SCOPE, "key": key }),
         action: None,
-        timeout_ms: None,
+        timeout_ms: Some(5_000),
     })
     .await
     .ok()
     .filter(|v| !v.is_null())
 }

 async fn state_set(iii: &III, key: &str, value: Value) {
     if let Err(e) = iii
         .trigger(TriggerRequest {
             function_id: "state::set".into(),
             payload: json!({ "scope": STATE_SCOPE, "key": key, "value": value }),
             action: None,
-            timeout_ms: None,
+            timeout_ms: Some(5_000),
         })
         .await
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@context-compaction/src/lib.rs` around lines 250 - 274, state_get and
state_set call iii.trigger without a timeout_ms, risking indefinite blocking;
update both TriggerRequest constructions in the state_get and state_set
functions to set timeout_ms to a reasonable explicit value (e.g., Some(5000) or
Some(10000)) so the trigger future fails fast on unresponsive state service, and
ensure error handling/logging remains unchanged for state_set and state_get's
.ok().filter(...) path.
provider-router/crates/harness-types/src/function.rs (1)

112-138: ⚡ Quick win

Consider adding serialization tests for the new truncation fields.

The existing test suite validates FunctionCall roundtrip serialization but doesn't cover the new FunctionResult.truncated field or TruncationInfo struct. Adding tests would ensure serde attributes work correctly and maintain backward compatibility.

🧪 Example test to add
#[test]
fn function_result_with_truncation_roundtrip() {
    let result = FunctionResult {
        content: vec![],
        details: serde_json::json!({}),
        terminate: false,
        truncated: Some(TruncationInfo {
            original_bytes: 16384,
            call_id: "call_123".into(),
        }),
    };
    let json = serde_json::to_string(&result).unwrap();
    let back: FunctionResult = serde_json::from_str(&json).unwrap();
    assert_eq!(result, back);
}

#[test]
fn function_result_without_truncation_omits_field() {
    let result = FunctionResult {
        content: vec![],
        details: serde_json::json!({}),
        terminate: false,
        truncated: None,
    };
    let json = serde_json::to_string(&result).unwrap();
    assert!(!json.contains("truncated"));
}

Apply similar tests to provider-anthropic/crates/harness-types, provider-openai/crates/harness-types, and harness/crates/harness-types.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@provider-router/crates/harness-types/src/function.rs` around lines 112 - 138,
Add serde round-trip tests for the new truncation fields: create a test that
constructs a FunctionResult with truncated: Some(TruncationInfo {
original_bytes: ..., call_id: ... }), serializes with serde_json::to_string,
deserializes back and asserts equality (e.g., test name
function_result_with_truncation_roundtrip) and another test that constructs
FunctionResult with truncated: None, serializes and asserts the resulting JSON
does not contain the "truncated" field (e.g.,
function_result_without_truncation_omits_field); place these alongside the
existing FunctionCall tests and reference the FunctionResult and TruncationInfo
types so serde attributes and backward-compat behavior are validated.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@context-compaction/Cargo.toml`:
- Around line 1-2: The Cargo.toml contains an empty [workspace] table that
should be removed or clarified; either delete the standalone "[workspace]"
section from the context-compaction crate if it is not a workspace root, or
replace it with an explicit members = [...] list if this crate is intended to be
a workspace root—update the Cargo.toml accordingly (remove the empty table or
add concrete members) so the workspace declaration is not a no-op.

In `@harness/Makefile`:
- Line 118: The compaction Makefile target lacks the documented dependency on
the engine; update the compaction target (symbol: compaction) to depend on the
engine target so the engine is started before the compaction worker runs
(currently it only depends on ensure-dirs). Modify the dependency list for
compaction to include engine (e.g., change the target declaration from
"compaction: ensure-dirs" to include engine) so invoking make compaction will
start the engine first and avoid websocket connection failures.

In `@turn-orchestrator/src/persistence.rs`:
- Around line 45-111: The function maybe_reload_after_compaction early-returns
silently when the rebuilt Vec is empty; add a warning log just before the if
rebuilt.is_empty() return to surface unexpected empty reloads, including
session_id and the last compaction watermark (use last and/or keys from
last_compaction_at_key and last_compaction_consumed_at_key) so operators can
diagnose; place the tracing::warn! call immediately before the existing return
in the rebuilt.is_empty() branch in maybe_reload_after_compaction and do not
change the control flow or return value.

In `@turn-orchestrator/src/states/functions.rs`:
- Around line 48-84: The truncation path in maybe_truncate_result currently
calls persistence::save_full_result without checking its outcome, which can
cause a missing persisted payload while returning a truncated
FunctionResult/TruncationInfo; update persistence::save_full_result to return a
Result<(), E> (propagating underlying errors) and in maybe_truncate_result await
that Result and only proceed to construct and return the truncated
FunctionResult (with TruncationInfo) if save_full_result returns Ok(()); on
Err(_) simply return the original result so we never advertise a fetchable
truncated payload.

---

Nitpick comments:
In `@context-compaction/src/lib.rs`:
- Around line 250-274: state_get and state_set call iii.trigger without a
timeout_ms, risking indefinite blocking; update both TriggerRequest
constructions in the state_get and state_set functions to set timeout_ms to a
reasonable explicit value (e.g., Some(5000) or Some(10000)) so the trigger
future fails fast on unresponsive state service, and ensure error
handling/logging remains unchanged for state_set and state_get's
.ok().filter(...) path.

In `@provider-router/crates/harness-types/src/function.rs`:
- Around line 112-138: Add serde round-trip tests for the new truncation fields:
create a test that constructs a FunctionResult with truncated:
Some(TruncationInfo { original_bytes: ..., call_id: ... }), serializes with
serde_json::to_string, deserializes back and asserts equality (e.g., test name
function_result_with_truncation_roundtrip) and another test that constructs
FunctionResult with truncated: None, serializes and asserts the resulting JSON
does not contain the "truncated" field (e.g.,
function_result_without_truncation_omits_field); place these alongside the
existing FunctionCall tests and reference the FunctionResult and TruncationInfo
types so serde attributes and backward-compat behavior are validated.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b135c810-8dc5-456d-a4b9-49f16e55b7c4

📥 Commits

Reviewing files that changed from the base of the PR and between 7dadbd5 and a91caab.

⛔ Files ignored due to path filters (1)
  • context-compaction/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (36)
  • context-compaction/Cargo.toml
  • context-compaction/README.md
  • context-compaction/build.rs
  • context-compaction/iii.worker.yaml
  • context-compaction/prompts/compaction.txt
  • context-compaction/skill.md
  • context-compaction/src/config.rs
  • context-compaction/src/lib.rs
  • context-compaction/src/main.rs
  • context-compaction/src/manifest.rs
  • context-compaction/src/summarize.rs
  • context-compaction/src/threshold.rs
  • context-compaction/tests/manifest.rs
  • harness/Makefile
  • harness/crates/harness-types/src/function.rs
  • harness/crates/harness-types/src/lib.rs
  • provider-anthropic/crates/harness-types/src/function.rs
  • provider-anthropic/crates/harness-types/src/lib.rs
  • provider-anthropic/crates/provider-base/src/openai_compat.rs
  • provider-anthropic/src/lib.rs
  • provider-openai/crates/harness-types/src/function.rs
  • provider-openai/crates/harness-types/src/lib.rs
  • provider-openai/crates/provider-base/src/openai_compat.rs
  • provider-router/crates/harness-types/src/function.rs
  • provider-router/crates/harness-types/src/lib.rs
  • session/crates/harness-types/src/function.rs
  • session/crates/harness-types/src/lib.rs
  • turn-orchestrator/crates/harness-types/src/function.rs
  • turn-orchestrator/crates/harness-types/src/lib.rs
  • turn-orchestrator/src/agent_call.rs
  • turn-orchestrator/src/lib.rs
  • turn-orchestrator/src/persistence.rs
  • turn-orchestrator/src/state.rs
  • turn-orchestrator/src/states/assistant.rs
  • turn-orchestrator/src/states/functions.rs
  • turn-orchestrator/tests/integration.rs

Comment on lines +1 to +2
[workspace]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove or clarify the empty workspace declaration.

An empty [workspace] section with no members serves no purpose. If context-compaction is intended to be a workspace member, this section should be removed (the parent workspace will manage it). If it's a standalone crate, this section is unnecessary.

🔧 Suggested fix
-[workspace]
-
 [package]
 name = "context-compaction"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@context-compaction/Cargo.toml` around lines 1 - 2, The Cargo.toml contains an
empty [workspace] table that should be removed or clarified; either delete the
standalone "[workspace]" section from the context-compaction crate if it is not
a workspace root, or replace it with an explicit members = [...] list if this
crate is intended to be a workspace root—update the Cargo.toml accordingly
(remove the empty table or add concrete members) so the workspace declaration is
not a no-op.

Comment thread harness/Makefile
# Requires the engine to be up first. The PID file lives under $(PIDS_DIR)
# alongside `engine.pid` / `web.pid` so `make stop` cleans it up uniformly.
COMPACTION_BIN := $(WORKERS_BIN)/context-compaction
compaction: ensure-dirs
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add engine dependency to enforce documented requirement.

The comment (lines 107-116) documents that the compaction worker "requires the engine to be up first," but the target doesn't enforce this dependency. When running make compaction directly (bypassing make all), the engine might not be running, causing the worker to fail during WebSocket connection with confusing errors.

🔗 Proposed fix
-compaction: ensure-dirs
+compaction: ensure-dirs engine
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
compaction: ensure-dirs
compaction: ensure-dirs engine
🧰 Tools
🪛 checkmake (0.3.2)

[warning] 118-118: Target body for "compaction" exceeds allowed length of 5 lines (14).

(maxbodylength)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@harness/Makefile` at line 118, The compaction Makefile target lacks the
documented dependency on the engine; update the compaction target (symbol:
compaction) to depend on the engine target so the engine is started before the
compaction worker runs (currently it only depends on ensure-dirs). Modify the
dependency list for compaction to include engine (e.g., change the target
declaration from "compaction: ensure-dirs" to include engine) so invoking make
compaction will start the engine first and avoid websocket connection failures.

Comment on lines +45 to +111
pub async fn maybe_reload_after_compaction(iii: &III, session_id: &str) {
let last_key = crate::state::last_compaction_at_key(session_id);
let watermark_key = crate::state::last_compaction_consumed_at_key(session_id);
let last = state_get(iii, &last_key)
.await
.and_then(|v| v.as_i64())
.unwrap_or(0);
if last == 0 {
return;
}
let consumed = state_get(iii, &watermark_key)
.await
.and_then(|v| v.as_i64())
.unwrap_or(0);
if last <= consumed {
return;
}
let resp = match iii
.trigger(TriggerRequest {
function_id: "session-tree::messages".into(),
payload: json!({ "session_id": session_id }),
action: None,
timeout_ms: Some(10_000),
})
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, %session_id, "reload-after-compaction: session-tree::messages failed");
return;
}
};
// `session-tree::messages` returns `{messages: [{entry_id, message}, ...]}`.
let Some(rows) = resp.get("messages").and_then(|v| v.as_array()) else {
return;
};
let mut rebuilt: Vec<AgentMessage> = Vec::with_capacity(rows.len());
for row in rows {
let msg = row.get("message").cloned().unwrap_or(JsonValue::Null);
if msg.is_null() {
continue;
}
match serde_json::from_value::<AgentMessage>(msg) {
Ok(m) => rebuilt.push(m),
Err(e) => {
tracing::warn!(error = %e, %session_id, "reload-after-compaction: failed to decode AgentMessage; aborting reload");
return;
}
}
}
if rebuilt.is_empty() {
return;
}
if let Ok(value) = serde_json::to_value(&rebuilt) {
state_set(iii, &messages_key(session_id), value).await;
}
// Reset the mirror watermark so future `save_messages` calls don't try
// to re-append messages the tree already has.
let mirror_key = crate::state::last_session_tree_len_key(session_id);
state_set(iii, &mirror_key, json!(rebuilt.len() as u64)).await;
state_set(iii, &watermark_key, json!(last)).await;
tracing::info!(
%session_id,
new_len = rebuilt.len(),
"context-compaction landed; reloaded messages from session-tree"
);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Consider logging when compaction reload yields no messages.

Lines 95-97 return early when rebuilt is empty without logging. If last_compaction_at is set, we expect the session-tree to contain compacted messages. An empty result after decoding might indicate:

  • All messages in the tree were null (line 84-86)
  • All messages failed to decode (though line 90-92 returns early and logs for that case)

Adding a warning would improve observability for edge cases where compaction appears to succeed but no messages are recovered.

📊 Proposed observability improvement
     }
     if rebuilt.is_empty() {
+        tracing::warn!(
+            %session_id,
+            last_compaction = last,
+            "reload-after-compaction: session-tree returned no decodable messages despite compaction watermark"
+        );
         return;
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn maybe_reload_after_compaction(iii: &III, session_id: &str) {
let last_key = crate::state::last_compaction_at_key(session_id);
let watermark_key = crate::state::last_compaction_consumed_at_key(session_id);
let last = state_get(iii, &last_key)
.await
.and_then(|v| v.as_i64())
.unwrap_or(0);
if last == 0 {
return;
}
let consumed = state_get(iii, &watermark_key)
.await
.and_then(|v| v.as_i64())
.unwrap_or(0);
if last <= consumed {
return;
}
let resp = match iii
.trigger(TriggerRequest {
function_id: "session-tree::messages".into(),
payload: json!({ "session_id": session_id }),
action: None,
timeout_ms: Some(10_000),
})
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, %session_id, "reload-after-compaction: session-tree::messages failed");
return;
}
};
// `session-tree::messages` returns `{messages: [{entry_id, message}, ...]}`.
let Some(rows) = resp.get("messages").and_then(|v| v.as_array()) else {
return;
};
let mut rebuilt: Vec<AgentMessage> = Vec::with_capacity(rows.len());
for row in rows {
let msg = row.get("message").cloned().unwrap_or(JsonValue::Null);
if msg.is_null() {
continue;
}
match serde_json::from_value::<AgentMessage>(msg) {
Ok(m) => rebuilt.push(m),
Err(e) => {
tracing::warn!(error = %e, %session_id, "reload-after-compaction: failed to decode AgentMessage; aborting reload");
return;
}
}
}
if rebuilt.is_empty() {
return;
}
if let Ok(value) = serde_json::to_value(&rebuilt) {
state_set(iii, &messages_key(session_id), value).await;
}
// Reset the mirror watermark so future `save_messages` calls don't try
// to re-append messages the tree already has.
let mirror_key = crate::state::last_session_tree_len_key(session_id);
state_set(iii, &mirror_key, json!(rebuilt.len() as u64)).await;
state_set(iii, &watermark_key, json!(last)).await;
tracing::info!(
%session_id,
new_len = rebuilt.len(),
"context-compaction landed; reloaded messages from session-tree"
);
}
pub async fn maybe_reload_after_compaction(iii: &III, session_id: &str) {
let last_key = crate::state::last_compaction_at_key(session_id);
let watermark_key = crate::state::last_compaction_consumed_at_key(session_id);
let last = state_get(iii, &last_key)
.await
.and_then(|v| v.as_i64())
.unwrap_or(0);
if last == 0 {
return;
}
let consumed = state_get(iii, &watermark_key)
.await
.and_then(|v| v.as_i64())
.unwrap_or(0);
if last <= consumed {
return;
}
let resp = match iii
.trigger(TriggerRequest {
function_id: "session-tree::messages".into(),
payload: json!({ "session_id": session_id }),
action: None,
timeout_ms: Some(10_000),
})
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, %session_id, "reload-after-compaction: session-tree::messages failed");
return;
}
};
// `session-tree::messages` returns `{messages: [{entry_id, message}, ...]}`.
let Some(rows) = resp.get("messages").and_then(|v| v.as_array()) else {
return;
};
let mut rebuilt: Vec<AgentMessage> = Vec::with_capacity(rows.len());
for row in rows {
let msg = row.get("message").cloned().unwrap_or(JsonValue::Null);
if msg.is_null() {
continue;
}
match serde_json::from_value::<AgentMessage>(msg) {
Ok(m) => rebuilt.push(m),
Err(e) => {
tracing::warn!(error = %e, %session_id, "reload-after-compaction: failed to decode AgentMessage; aborting reload");
return;
}
}
}
if rebuilt.is_empty() {
tracing::warn!(
%session_id,
last_compaction = last,
"reload-after-compaction: session-tree returned no decodable messages despite compaction watermark"
);
return;
}
if let Ok(value) = serde_json::to_value(&rebuilt) {
state_set(iii, &messages_key(session_id), value).await;
}
// Reset the mirror watermark so future `save_messages` calls don't try
// to re-append messages the tree already has.
let mirror_key = crate::state::last_session_tree_len_key(session_id);
state_set(iii, &mirror_key, json!(rebuilt.len() as u64)).await;
state_set(iii, &watermark_key, json!(last)).await;
tracing::info!(
%session_id,
new_len = rebuilt.len(),
"context-compaction landed; reloaded messages from session-tree"
);
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@turn-orchestrator/src/persistence.rs` around lines 45 - 111, The function
maybe_reload_after_compaction early-returns silently when the rebuilt Vec is
empty; add a warning log just before the if rebuilt.is_empty() return to surface
unexpected empty reloads, including session_id and the last compaction watermark
(use last and/or keys from last_compaction_at_key and
last_compaction_consumed_at_key) so operators can diagnose; place the
tracing::warn! call immediately before the existing return in the
rebuilt.is_empty() branch in maybe_reload_after_compaction and do not change the
control flow or return value.

Comment on lines +48 to +84
async fn maybe_truncate_result(
iii: &III,
session_id: &str,
call_id: &str,
result: FunctionResult,
) -> FunctionResult {
let threshold = truncate_threshold();
let serialized_size = match serde_json::to_string(&result) {
Ok(s) => s.len(),
Err(_) => return result,
};
if serialized_size <= threshold {
return result;
}
// Persist the full payload first; if state::set fails, fall through and
// return the original result so we never lose data.
let full_json = match serde_json::to_value(&result) {
Ok(v) => v,
Err(_) => return result,
};
persistence::save_full_result(iii, session_id, call_id, &full_json).await;

let summary_text = render_truncated_text(&result, serialized_size, call_id);
FunctionResult {
content: vec![ContentBlock::Text(TextContent { text: summary_text })],
details: json!({
"truncated": true,
"original_bytes": serialized_size,
"call_id": call_id,
}),
terminate: result.terminate,
truncated: Some(TruncationInfo {
original_bytes: serialized_size as u64,
call_id: call_id.to_string(),
}),
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Check for persistence failure before returning truncated result.

Line 68 calls save_full_result but doesn't await or check the result. If state::set fails internally, the full payload is lost but the function still returns a truncated FunctionResult with TruncationInfo. The model will see the truncation marker and attempt to call result::fetch, which will fail with result_not_found. This violates the recovery contract.

🛡️ Proposed fix to verify persistence before truncating

Modify save_full_result to return a Result<(), ()> and check it:

-    persistence::save_full_result(iii, session_id, call_id, &full_json).await;
+    if persistence::save_full_result(iii, session_id, call_id, &full_json).await.is_err() {
+        // If we can't persist the full payload, return the original result
+        // rather than a truncated one the model can't recover.
+        return result;
+    }

     let summary_text = render_truncated_text(&result, serialized_size, call_id);

And update persistence::save_full_result to propagate errors:

-pub async fn save_full_result(iii: &III, session_id: &str, call_id: &str, payload: &Value) {
+pub async fn save_full_result(iii: &III, session_id: &str, call_id: &str, payload: &Value) -> Result<(), ()> {
     let key = staging_key(session_id, &format!("result/{call_id}"));
-    state_set(iii, &key, payload.clone()).await;
+    state_set(iii, &key, payload.clone()).await.map_err(|_| ())
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@turn-orchestrator/src/states/functions.rs` around lines 48 - 84, The
truncation path in maybe_truncate_result currently calls
persistence::save_full_result without checking its outcome, which can cause a
missing persisted payload while returning a truncated
FunctionResult/TruncationInfo; update persistence::save_full_result to return a
Result<(), E> (propagating underlying errors) and in maybe_truncate_result await
that Result and only proceed to construct and return the truncated
FunctionResult (with TruncationInfo) if save_full_result returns Ok(()); on
Err(_) simply return the original result so we never advertise a fetchable
truncated payload.

Adds context-compaction to the two worker-name lists in the CI release
flow:

- release.yml: adds 'context-compaction/v*' to the on-push tag triggers
  so pushing a tag of that shape kicks off the binary build, GitHub
  release creation, and registry publish (parse_release_tag.py already
  understands the tag, picks up deploy=binary from iii.worker.yaml).
- create-tag.yml: adds 'context-compaction' to the workflow_dispatch
  worker choice so the standard "Bump version & create tag" UI can
  cut releases.

Both lists are alphabetical; the new entry slots between
'auth-credentials' and the harness fan-out comment / harness entry.

The worker's iii.worker.yaml already declares `deploy: binary` and a
matching `bin: context-compaction`, so no further pipeline configuration
is needed — the existing _rust-binary.yml reusable handles the build.
@vercel
Copy link
Copy Markdown

vercel Bot commented May 15, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
workers Error Error May 15, 2026 9:59pm

Request Review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants