diff --git a/Cargo.lock b/Cargo.lock index 8d84ca7e8..2f4c34b98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -321,6 +321,28 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "async-task" version = "4.7.1" @@ -396,9 +418,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.39.0" +version = "0.39.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fa7e52a4c5c547c741610a2c6f123f3881e409b714cd27e6798ef020c514f0a" +checksum = "83a25cf98105baa966497416dbd42565ce3a8cf8dbfd59803ec9ad46f3126399" dependencies = [ "cc", "cmake", @@ -406,13 +428,40 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.3", + "tower-layer", + "tower-service", +] + [[package]] name = "axum" version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" dependencies = [ - "axum-core", + "axum-core 0.5.6", "base64 0.22.1", "bytes", "form_urlencoded", @@ -423,7 +472,7 @@ dependencies = [ "hyper", "hyper-util", "itoa", - "matchit", + "matchit 0.8.4", "memchr", "mime", "multer", @@ -437,12 +486,32 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-tungstenite 0.28.0", - "tower", + "tower 0.5.3", "tower-layer", "tower-service", "tracing", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.5.6" @@ -2461,6 +2530,17 @@ dependencies = [ "system-deps", ] +[[package]] +name = "google-api-proto" +version = "1.710.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e8fbf69e5b682fa5d6ca8a19ebf4e6707ace9e15118d4750cd0b51321701ed0" +dependencies = [ + "prost", + "prost-types", + "tonic", +] + [[package]] name = "governor" version = "0.10.4" @@ -2536,6 +2616,25 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "h2" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap 2.13.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2714,6 +2813,7 @@ dependencies = [ "bytes", "futures-channel", "futures-core", + "h2", "http", "http-body", "httparse", @@ -2743,6 +2843,19 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.20" @@ -3235,6 +3348,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64 0.22.1", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "keyboard-types" version = "0.7.0" @@ -3518,6 +3646,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matchit" version = "0.8.4" @@ -3760,12 +3894,31 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-conv" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -3977,11 +4130,10 @@ dependencies = [ [[package]] name = "openfang-api" -version = "0.5.5" +version = "0.5.1" dependencies = [ - "argon2", "async-trait", - "axum", + "axum 0.8.8", "base64 0.22.1", "chrono", "dashmap", @@ -3999,7 +4151,6 @@ dependencies = [ "openfang-skills", "openfang-types", "openfang-wire", - "rand 0.8.5", "reqwest 0.12.28", "serde", "serde_json", @@ -4011,7 +4162,7 @@ dependencies = [ "tokio-stream", "tokio-test", "toml 0.9.12+spec-1.1.0", - "tower", + "tower 0.5.3", "tower-http", "tracing", "uuid", @@ -4019,25 +4170,30 @@ dependencies = [ [[package]] name = "openfang-channels" -version = "0.5.5" +version = "0.5.1" dependencies = [ "aes", + "async-stream", "async-trait", - "axum", + "axum 0.8.8", "base64 0.22.1", "cbc", "chrono", + "chrono-tz", "dashmap", "futures", + "google-api-proto", "hex", "hmac", "html-escape", "imap", + "jsonwebtoken", "lettre", "mailparse", "native-tls", "openfang-types", "prost", + "prost-types", "regex-lite", "reqwest 0.12.28", "roxmltree", @@ -4050,6 +4206,7 @@ dependencies = [ "tokio-stream", "tokio-test", "tokio-tungstenite 0.24.0", + "tonic", "tracing", "url", "uuid", @@ -4058,7 +4215,7 @@ dependencies = [ [[package]] name = "openfang-cli" -version = "0.5.5" +version = "0.5.1" dependencies = [ "clap", "clap_complete", @@ -4086,9 +4243,9 @@ dependencies = [ [[package]] name = "openfang-desktop" -version = "0.5.5" +version = "0.5.1" dependencies = [ - "axum", + "axum 0.8.8", "open", "openfang-api", "openfang-kernel", @@ -4112,11 +4269,11 @@ dependencies = [ [[package]] name = "openfang-extensions" -version = "0.5.5" +version = "0.5.1" dependencies = [ "aes-gcm", "argon2", - "axum", + "axum 0.8.8", "base64 0.22.1", "chrono", "dashmap", @@ -4140,7 +4297,7 @@ dependencies = [ [[package]] name = "openfang-hands" -version = "0.5.5" +version = "0.5.1" dependencies = [ "chrono", "dashmap", @@ -4157,7 +4314,7 @@ dependencies = [ [[package]] name = "openfang-kernel" -version = "0.5.5" +version = "0.5.1" dependencies = [ "async-trait", "chrono", @@ -4195,7 +4352,7 @@ dependencies = [ [[package]] name = "openfang-memory" -version = "0.5.5" +version = "0.5.1" dependencies = [ "async-trait", "chrono", @@ -4215,7 +4372,7 @@ dependencies = [ [[package]] name = "openfang-migrate" -version = "0.5.5" +version = "0.5.1" dependencies = [ "chrono", "dirs 6.0.0", @@ -4234,7 +4391,7 @@ dependencies = [ [[package]] name = "openfang-runtime" -version = "0.5.5" +version = "0.5.1" dependencies = [ "anyhow", "async-trait", @@ -4270,7 +4427,7 @@ dependencies = [ [[package]] name = "openfang-skills" -version = "0.5.5" +version = "0.5.1" dependencies = [ "chrono", "hex", @@ -4293,7 +4450,7 @@ dependencies = [ [[package]] name = "openfang-types" -version = "0.5.5" +version = "0.5.1" dependencies = [ "async-trait", "chrono", @@ -4312,7 +4469,7 @@ dependencies = [ [[package]] name = "openfang-wire" -version = "0.5.5" +version = "0.5.1" dependencies = [ "async-trait", "chrono", @@ -4509,6 +4666,16 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -4773,6 +4940,26 @@ dependencies = [ "siphasher 1.0.2", ] +[[package]] +name = "pin-project" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "pin-project-lite" version = "0.2.17" @@ -5046,6 +5233,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + [[package]] name = "psm" version = "0.1.30" @@ -5517,7 +5713,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.4", "tokio-util", - "tower", + "tower 0.5.3", "tower-http", "tower-service", "url", @@ -5558,7 +5754,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.4", "tokio-util", - "tower", + "tower 0.5.3", "tower-http", "tower-service", "url", @@ -6345,6 +6541,18 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "703d5c7ef118737c72f1af64ad2f6f8c5e1921f818cdcb97b8fe6fc69bf66214" +[[package]] +name = "simple_asn1" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror 2.0.18", + "time", +] + [[package]] name = "siphasher" version = "0.3.11" @@ -7503,6 +7711,58 @@ version = "1.1.0+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d282ade6016312faf3e41e57ebbba0c073e4056dab1232ab1cb624199648f8ed" +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.9", + "base64 0.22.1", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "rustls-pemfile", + "socket2 0.5.10", + "tokio", + "tokio-rustls 0.26.4", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.3" @@ -7537,7 +7797,7 @@ dependencies = [ "pin-project-lite", "tokio", "tokio-util", - "tower", + "tower 0.5.3", "tower-layer", "tower-service", "tracing", @@ -9368,7 +9628,7 @@ checksum = "b9cc00251562a284751c9973bace760d86c0276c471b4be569fe6b068ee97a56" [[package]] name = "xtask" -version = "0.5.5" +version = "0.5.1" [[package]] name = "yoke" diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index f8336ba59..0b4004d7b 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -880,6 +880,108 @@ impl ChannelBridgeHandle for KernelBridgeAdapter { } } + async fn check_session_gap(&self, agent_id: AgentId) -> Option { + let gap_threshold = self.kernel.config.compaction.session_gap_secs; + if gap_threshold == 0 { + // Update timestamp even if gap detection is disabled + self.kernel + .last_message_at + .insert(agent_id, std::time::Instant::now()); + return None; + } + + let now = std::time::Instant::now(); + let gap_secs = self + .kernel + .last_message_at + .get(&agent_id) + .map(|t| now.duration_since(*t).as_secs()) + .unwrap_or(u64::MAX); // First message ever = treat as long gap + + // Update timestamp + self.kernel.last_message_at.insert(agent_id, now); + + if gap_secs < gap_threshold { + return None; // Short gap — continue normally + } + + tracing::info!( + agent = %agent_id, + gap_secs = gap_secs, + "Session gap detected, running compaction + context refresh" + ); + + // Run compaction first + if let Err(e) = self.kernel.compact_agent_session(agent_id).await { + tracing::warn!( + agent = %agent_id, + error = %e, + "Session gap compaction failed" + ); + } + + // Query context sources with bounded lookback + let context_sources = &self.kernel.config.compaction.context_sources; + if context_sources.is_empty() { + return None; + } + + let max_lookback = self.kernel.config.compaction.max_lookback_secs; + let lookback_secs = gap_secs.min(max_lookback); + let since = chrono::Utc::now() - chrono::Duration::seconds(lookback_secs as i64); + + // Query all context sources in parallel for speed + let mut handles = Vec::new(); + for source in context_sources { + let query = format!( + "{}\nSummarize anything relevant since {}.", + source.prompt, + since.format("%Y-%m-%d %H:%M %Z"), + ); + let kernel = self.kernel.clone(); + let hand = source.hand.clone(); + handles.push(tokio::spawn(async move { + let result = tokio::time::timeout( + std::time::Duration::from_secs(30), + openfang_runtime::kernel_handle::KernelHandle::send_to_agent( + kernel.as_ref(), + &hand, + &query, + ), + ) + .await; + (hand, result) + })); + } + + let mut parts = Vec::new(); + for handle in handles { + match handle.await { + Ok((hand, Ok(Ok(summary)))) if !summary.trim().is_empty() => { + tracing::info!( + hand = %hand, + summary_len = summary.len(), + "Session gap context source responded" + ); + parts.push(format!("{}: {}", hand, summary)); + } + Ok((hand, Ok(Err(e)))) => { + tracing::warn!(hand = %hand, error = %e, "Session gap context source failed"); + } + Ok((hand, Err(_))) => { + tracing::warn!(hand = %hand, "Session gap context source timed out"); + } + _ => {} + } + } + + if parts.is_empty() { + None + } else { + Some(parts.join("\n")) + } + } + async fn check_auto_reply(&self, agent_id: AgentId, message: &str) -> Option { // Check if auto-reply should fire for this message let channel_type = "bridge"; // Generic; the bridge layer handles specifics diff --git a/crates/openfang-channels/src/bridge.rs b/crates/openfang-channels/src/bridge.rs index 7cf6870c6..08ab8d532 100644 --- a/crates/openfang-channels/src/bridge.rs +++ b/crates/openfang-channels/src/bridge.rs @@ -236,6 +236,13 @@ pub trait ChannelBridgeHandle: Send + Sync { // Default: no tracking } + /// Check if a session gap occurred since the last message to this agent. + /// If gap > threshold, triggers compaction + context source queries. + /// Returns an optional context preamble to prepend to the user's message. + async fn check_session_gap(&self, _agent_id: AgentId) -> Option { + None + } + /// Check if auto-reply is enabled and the message should trigger one. /// Returns Some(reply_text) if auto-reply fires, None otherwise. async fn check_auto_reply(&self, _agent_id: AgentId, _message: &str) -> Option { @@ -964,6 +971,15 @@ async fn dispatch_message( text.clone() }; + // Session gap detection: if the user hasn't interacted for a while, + // trigger compaction + context refresh before dispatching. + // The preamble (if any) is prepended so the agent sees it before the user's message. + let prefixed_text = if let Some(preamble) = handle.check_session_gap(agent_id).await { + format!("[Session context]: {preamble}\n\n{prefixed_text}") + } else { + prefixed_text + }; + // Send to agent and relay response let result = handle.send_message(agent_id, &prefixed_text).await; diff --git a/crates/openfang-kernel/src/kernel.rs b/crates/openfang-kernel/src/kernel.rs index d9fe60f97..904ed93a8 100644 --- a/crates/openfang-kernel/src/kernel.rs +++ b/crates/openfang-kernel/src/kernel.rs @@ -158,6 +158,12 @@ pub struct OpenFangKernel { /// Per-agent message locks — serializes LLM calls for the same agent to prevent /// session corruption when multiple messages arrive concurrently (e.g. rapid voice /// messages via Telegram). Different agents can still run in parallel. + /// Per-agent exchange counter for continuous compaction. + exchange_counters: dashmap::DashMap, + /// Per-agent timestamp of the last continuous compaction (for from_ts in context queries). + last_compaction_at: dashmap::DashMap>, + /// Per-agent last message timestamp for session gap detection. + pub last_message_at: dashmap::DashMap, agent_msg_locks: dashmap::DashMap>>, /// Weak self-reference for trigger dispatch (set after Arc wrapping). self_handle: OnceLock>, @@ -502,6 +508,106 @@ fn gethostname() -> Option { } } +// ── Continuous compaction helpers ──────────────────────────────────────────── + +/// Query configured context sources in parallel and return formatted parts. +/// +/// Each source receives a bounded time-window query (`from_ts` → `to_ts`). +/// Sources that time out or error are skipped; only non-empty responses are returned. +async fn query_context_sources_parallel( + kernel: &Arc, + from_ts: chrono::DateTime, + to_ts: chrono::DateTime, +) -> Vec { + let context_sources = kernel.config.compaction.context_sources.clone(); + if context_sources.is_empty() { + return Vec::new(); + } + + let mut handles = Vec::new(); + for source in context_sources { + let query = format!( + "{}\nTime window: from {} to {}.", + source.prompt, + from_ts.format("%Y-%m-%d %H:%M %Z"), + to_ts.format("%Y-%m-%d %H:%M %Z"), + ); + let k = kernel.clone(); + let hand = source.hand.clone(); + handles.push(tokio::spawn(async move { + let result = tokio::time::timeout( + std::time::Duration::from_secs(30), + openfang_runtime::kernel_handle::KernelHandle::send_to_agent( + k.as_ref(), + &hand, + &query, + ), + ) + .await; + (hand, result) + })); + } + + let mut parts = Vec::new(); + for handle in handles { + match handle.await { + Ok((hand, Ok(Ok(summary)))) if !summary.trim().is_empty() => { + info!(hand = %hand, summary_len = summary.len(), "Compaction context source responded"); + parts.push(format!("[{}]: {}", hand, summary)); + } + Ok((hand, Ok(Err(e)))) => { + warn!(hand = %hand, error = %e, "Compaction context source failed"); + } + Ok((hand, Err(_))) => { + warn!(hand = %hand, "Compaction context source timed out"); + } + _ => {} + } + } + parts +} + +/// Inject a context block into the agent's live session as a synthetic user message. +/// +/// This makes the context visible to the LLM on the next turn without requiring +/// the agent to call memory_recall. Appended after compaction so it stays in +/// the `keep_recent` window. +fn inject_context_into_session( + kernel: &OpenFangKernel, + agent_id: AgentId, + context_block: &str, + timestamp: chrono::DateTime, +) { + use openfang_types::message::{Message, MessageContent, Role}; + + let entry = match kernel.registry.get(agent_id) { + Some(e) => e, + None => return, + }; + + match kernel.memory.get_session(entry.session_id) { + Ok(Some(mut session)) => { + session.messages.push(Message { + role: Role::User, + content: MessageContent::Text(format!( + "[Context refresh — {}]\n{}", + timestamp.format("%Y-%m-%d %H:%M UTC"), + context_block, + )), + }); + if let Err(e) = kernel.memory.save_session(&session) { + warn!(agent_id = %agent_id, error = %e, "Failed to inject context into session"); + } else { + info!(agent_id = %agent_id, "Context injected into session"); + } + } + Ok(None) => {} + Err(e) => { + warn!(agent_id = %agent_id, error = %e, "Failed to load session for context injection"); + } + } +} + impl OpenFangKernel { /// Boot the kernel with configuration from the given path. pub fn boot(config_path: Option<&Path>) -> KernelResult { @@ -881,12 +987,12 @@ impl OpenFangKernel { // Auto-detect embedding provider by checking API key env vars in // priority order. First match wins. const API_KEY_PROVIDERS: &[(&str, &str)] = &[ - ("OPENAI_API_KEY", "openai"), - ("GROQ_API_KEY", "groq"), - ("MISTRAL_API_KEY", "mistral"), - ("TOGETHER_API_KEY", "together"), + ("OPENAI_API_KEY", "openai"), + ("GROQ_API_KEY", "groq"), + ("MISTRAL_API_KEY", "mistral"), + ("TOGETHER_API_KEY", "together"), ("FIREWORKS_API_KEY", "fireworks"), - ("COHERE_API_KEY", "cohere"), + ("COHERE_API_KEY", "cohere"), ]; let detected_from_key = API_KEY_PROVIDERS @@ -1084,6 +1190,9 @@ impl OpenFangKernel { whatsapp_gateway_pid: Arc::new(std::sync::Mutex::new(None)), channel_adapters: dashmap::DashMap::new(), default_model_override: std::sync::RwLock::new(None), + exchange_counters: dashmap::DashMap::new(), + last_compaction_at: dashmap::DashMap::new(), + last_message_at: dashmap::DashMap::new(), agent_msg_locks: dashmap::DashMap::new(), self_handle: OnceLock::new(), }; @@ -1127,8 +1236,7 @@ impl OpenFangKernel { != entry.manifest.tool_allowlist || disk_manifest.tool_blocklist != entry.manifest.tool_blocklist - || disk_manifest.skills - != entry.manifest.skills + || disk_manifest.skills != entry.manifest.skills || disk_manifest.mcp_servers != entry.manifest.mcp_servers; if changed { @@ -1640,6 +1748,81 @@ impl OpenFangKernel { "ok", ); + // Continuous compaction: increment exchange counter and check + { + let compaction_config = &self.config.compaction; + if compaction_config.continuous_interval > 0 { + let counter = self + .exchange_counters + .entry(agent_id) + .or_insert_with(|| std::sync::atomic::AtomicUsize::new(0)); + let count = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; + + // Check message count from the registry entry's session + let msg_count = self + .registry + .get(agent_id) + .and_then(|e| { + self.memory + .get_session(e.session_id) + .ok() + .flatten() + .map(|s| s.messages.len()) + }) + .unwrap_or(0); + + if msg_count > compaction_config.keep_recent + && count.is_multiple_of(compaction_config.continuous_interval) + { + // Capture from_ts before compaction (start of the window being compacted) + let now = chrono::Utc::now(); + let from_ts = self + .last_compaction_at + .get(&agent_id) + .map(|t| *t) + .unwrap_or_else(|| { + now - chrono::Duration::seconds( + self.config.compaction.max_lookback_secs as i64, + ) + }); + self.last_compaction_at.insert(agent_id, now); + + let self_clone = self.self_handle.get().and_then(|w| w.upgrade()); + if let Some(kernel) = self_clone { + tokio::spawn(async move { + info!( + agent_id = %agent_id, + exchange_count = count, + "Continuous compaction triggered (non-streaming)" + ); + + // Run standard compaction + if let Err(e) = kernel.compact_agent_session(agent_id).await { + warn!( + agent_id = %agent_id, + "Continuous compaction failed: {e}" + ); + return; + } + + // Query context sources in parallel with proper time window + let parts = + query_context_sources_parallel(&kernel, from_ts, now).await; + if !parts.is_empty() { + let context_block = parts.join("\n\n"); + inject_context_into_session( + &kernel, + agent_id, + &context_block, + now, + ); + } + }); + } + } + } + } + Ok(result) } Err(e) => { @@ -2127,6 +2310,70 @@ impl OpenFangKernel { } } + // Continuous compaction: compact every N exchanges with optional hand context + { + let compaction_config = &kernel_clone.config.compaction; + if compaction_config.continuous_interval > 0 { + let counter = kernel_clone + .exchange_counters + .entry(agent_id) + .or_insert_with(|| std::sync::atomic::AtomicUsize::new(0)); + let count = + counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; + + let msg_count = session.messages.len(); + + if msg_count > compaction_config.keep_recent + && count.is_multiple_of(compaction_config.continuous_interval) + { + // Capture from_ts before compaction (start of the window being compacted) + let now = chrono::Utc::now(); + let from_ts = kernel_clone + .last_compaction_at + .get(&agent_id) + .map(|t| *t) + .unwrap_or_else(|| { + now - chrono::Duration::seconds( + kernel_clone.config.compaction.max_lookback_secs as i64, + ) + }); + kernel_clone.last_compaction_at.insert(agent_id, now); + + let kc = kernel_clone.clone(); + tokio::spawn(async move { + info!( + agent_id = %agent_id, + exchange_count = count, + "Continuous compaction triggered" + ); + + // First, run standard compaction + if let Err(e) = kc.compact_agent_session(agent_id).await { + warn!( + agent_id = %agent_id, + "Continuous compaction failed: {e}" + ); + return; + } + + // Query context sources in parallel with proper time window, + // then inject results directly into the session. + let parts = + query_context_sources_parallel(&kc, from_ts, now).await; + if !parts.is_empty() { + let context_block = parts.join("\n\n"); + inject_context_into_session( + &kc, + agent_id, + &context_block, + now, + ); + } + }); + } + } + } + Ok(result) } Err(e) => { diff --git a/crates/openfang-runtime/src/compactor.rs b/crates/openfang-runtime/src/compactor.rs index 3186e4f4a..feae91cf3 100644 --- a/crates/openfang-runtime/src/compactor.rs +++ b/crates/openfang-runtime/src/compactor.rs @@ -44,6 +44,8 @@ pub struct CompactionConfig { pub token_threshold_ratio: f64, /// Model context window size in tokens. pub context_window_tokens: usize, + /// Trigger continuous compaction every N exchanges. 0 = disabled. + pub continuous_interval: usize, } impl Default for CompactionConfig { @@ -60,6 +62,7 @@ impl Default for CompactionConfig { max_retries: 3, token_threshold_ratio: 0.7, context_window_tokens: 200_000, + continuous_interval: 0, } } } @@ -128,6 +131,17 @@ pub fn needs_compaction_by_tokens(estimated_tokens: usize, config: &CompactionCo estimated_tokens > threshold } +/// Check if continuous compaction should trigger. +pub fn needs_continuous_compaction( + exchange_count: usize, + session_message_count: usize, + config: &CompactionConfig, +) -> bool { + config.continuous_interval > 0 + && session_message_count > config.keep_recent + && exchange_count.is_multiple_of(config.continuous_interval) +} + // --------------------------------------------------------------------------- // Context Report // --------------------------------------------------------------------------- @@ -444,9 +458,10 @@ async fn summarize_messages( conversation_text = conversation_text[safe_start..].to_string(); } + let now = chrono::Utc::now().format("%Y-%m-%d %H:%M UTC"); let summarize_prompt = format!( - "Summarize the following conversation preserving key facts, decisions, user preferences, \ - and important context. Be concise but thorough. Output only the summary, no preamble.\n\n\ + "Current time: {now}\n\n\ + Summarize the following conversation. Output only the summary, no preamble.\n\n\ ---\n{conversation_text}---" ); @@ -463,8 +478,13 @@ async fn summarize_messages( max_tokens: config.max_summary_tokens, temperature: 0.3, system: Some( - "You are a conversation summarizer. Produce a concise summary that captures \ - all key facts, decisions, and context from the conversation." + "You are a conversation summarizer. Produce a concise summary that:\n\ + - Preserves key facts, decisions, user preferences, and ongoing tasks\n\ + - Drops facts and events that are no longer relevant to current context\n\ + - For calendar data: keep only events from the last 6h and next 24h\n\ + - For email data: keep only notable unread emails, drop ads and promotions\n\ + - Uses absolute dates/times (YYYY-MM-DD HH:MM), never relative\n\ + - Is concise — only what's still relevant matters" .to_string(), ), thinking: None, diff --git a/crates/openfang-runtime/src/mcp.rs b/crates/openfang-runtime/src/mcp.rs index b9f5f3819..3cb063d25 100644 --- a/crates/openfang-runtime/src/mcp.rs +++ b/crates/openfang-runtime/src/mcp.rs @@ -14,7 +14,6 @@ use rmcp::service::RunningService; use rmcp::{RoleClient, ServiceExt}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::sync::Arc; use tracing::{debug, info}; // --------------------------------------------------------------------------- @@ -307,11 +306,8 @@ impl McpConnection { } } - let config = StreamableHttpClientTransportConfig { - uri: Arc::from(url), - custom_headers, - ..Default::default() - }; + let config = + StreamableHttpClientTransportConfig::with_uri(url).custom_headers(custom_headers); let transport = StreamableHttpClientTransport::from_config(config); diff --git a/crates/openfang-runtime/src/web_fetch.rs b/crates/openfang-runtime/src/web_fetch.rs index 81021aefc..85e70b79a 100644 --- a/crates/openfang-runtime/src/web_fetch.rs +++ b/crates/openfang-runtime/src/web_fetch.rs @@ -506,7 +506,11 @@ mod tests { assert!(check_ssrf("http://169.254.169.254/latest/meta-data/", &allow).is_err()); // Also verify hostname-based metadata blocks let allow2 = vec!["metadata.google.internal".to_string()]; - assert!(check_ssrf("http://metadata.google.internal/computeMetadata/v1/", &allow2).is_err()); + assert!(check_ssrf( + "http://metadata.google.internal/computeMetadata/v1/", + &allow2 + ) + .is_err()); } #[test] diff --git a/crates/openfang-runtime/src/web_search.rs b/crates/openfang-runtime/src/web_search.rs index 28e92259e..11b2f5823 100644 --- a/crates/openfang-runtime/src/web_search.rs +++ b/crates/openfang-runtime/src/web_search.rs @@ -358,7 +358,10 @@ impl WebSearchEngine { let resp = self .client - .get(format!("{}/search", self.config.searxng.url.trim_end_matches('/'))) + .get(format!( + "{}/search", + self.config.searxng.url.trim_end_matches('/') + )) .query(&[ ("q", query), ("format", "json"), @@ -451,7 +454,10 @@ impl WebSearchEngine { let resp = self .client - .get(format!("{}/config", self.config.searxng.url.trim_end_matches('/'))) + .get(format!( + "{}/config", + self.config.searxng.url.trim_end_matches('/') + )) .header("User-Agent", "Mozilla/5.0 (compatible; OpenFangAgent/0.1)") .send() .await diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index f290bbf8c..8a18d788d 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -1130,6 +1130,48 @@ pub struct KernelConfig { /// Heartbeat monitor settings. #[serde(default)] pub heartbeat: HeartbeatSettings, + /// Compaction configuration (continuous compaction with contextual hand summaries). + #[serde(default)] + pub compaction: CompactionTomlConfig, +} + +/// A context source queried during continuous compaction to enrich summaries. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompactionContextSource { + /// Hand (agent name) to query for context. + pub hand: String, + /// Prompt to send to the hand. + pub prompt: String, +} + +/// Configuration for continuous compaction with optional contextual hand summaries. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct CompactionTomlConfig { + /// Trigger continuous compaction every N exchanges. 0 = disabled. + pub continuous_interval: usize, + /// Number of recent messages to keep verbatim. + pub keep_recent: usize, + /// Gap (seconds) between messages that triggers immediate compaction + context + /// refresh before dispatching. 0 = disabled. Default: 900 (15 minutes). + pub session_gap_secs: u64, + /// Maximum lookback (seconds) for context source queries. Caps the time window + /// so a 3-day absence doesn't dump 3 days of events. Default: 86400 (24 hours). + pub max_lookback_secs: u64, + /// Hands to query for contextual summaries after compaction. + pub context_sources: Vec, +} + +impl Default for CompactionTomlConfig { + fn default() -> Self { + Self { + continuous_interval: 0, + keep_recent: 6, + session_gap_secs: 900, // 15 minutes + max_lookback_secs: 86400, // 24 hours + context_sources: Vec::new(), + } + } } /// Heartbeat monitor settings exposed in `[heartbeat]` config section. @@ -1367,6 +1409,7 @@ impl Default for KernelConfig { auth: AuthConfig::default(), workflows_dir: None, heartbeat: HeartbeatSettings::default(), + compaction: CompactionTomlConfig::default(), } } }