Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 289 additions & 29 deletions Cargo.lock

Large diffs are not rendered by default.

102 changes: 102 additions & 0 deletions crates/openfang-api/src/channel_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,108 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
}
}

async fn check_session_gap(&self, agent_id: AgentId) -> Option<String> {
let gap_threshold = self.kernel.config.compaction.session_gap_secs;
if gap_threshold == 0 {
// Update timestamp even if gap detection is disabled
self.kernel
.last_message_at
.insert(agent_id, std::time::Instant::now());
return None;
}

let now = std::time::Instant::now();
let gap_secs = self
.kernel
.last_message_at
.get(&agent_id)
.map(|t| now.duration_since(*t).as_secs())
.unwrap_or(u64::MAX); // First message ever = treat as long gap

// Update timestamp
self.kernel.last_message_at.insert(agent_id, now);

if gap_secs < gap_threshold {
return None; // Short gap — continue normally
}

tracing::info!(
agent = %agent_id,
gap_secs = gap_secs,
"Session gap detected, running compaction + context refresh"
);

// Run compaction first
if let Err(e) = self.kernel.compact_agent_session(agent_id).await {
tracing::warn!(
agent = %agent_id,
error = %e,
"Session gap compaction failed"
);
}

// Query context sources with bounded lookback
let context_sources = &self.kernel.config.compaction.context_sources;
if context_sources.is_empty() {
return None;
}

let max_lookback = self.kernel.config.compaction.max_lookback_secs;
let lookback_secs = gap_secs.min(max_lookback);
let since = chrono::Utc::now() - chrono::Duration::seconds(lookback_secs as i64);

// Query all context sources in parallel for speed
let mut handles = Vec::new();
for source in context_sources {
let query = format!(
"{}\nSummarize anything relevant since {}.",
source.prompt,
since.format("%Y-%m-%d %H:%M %Z"),
);
let kernel = self.kernel.clone();
let hand = source.hand.clone();
handles.push(tokio::spawn(async move {
let result = tokio::time::timeout(
std::time::Duration::from_secs(30),
openfang_runtime::kernel_handle::KernelHandle::send_to_agent(
kernel.as_ref(),
&hand,
&query,
),
)
.await;
(hand, result)
}));
}

let mut parts = Vec::new();
for handle in handles {
match handle.await {
Ok((hand, Ok(Ok(summary)))) if !summary.trim().is_empty() => {
tracing::info!(
hand = %hand,
summary_len = summary.len(),
"Session gap context source responded"
);
parts.push(format!("{}: {}", hand, summary));
}
Ok((hand, Ok(Err(e)))) => {
tracing::warn!(hand = %hand, error = %e, "Session gap context source failed");
}
Ok((hand, Err(_))) => {
tracing::warn!(hand = %hand, "Session gap context source timed out");
}
_ => {}
}
}

if parts.is_empty() {
None
} else {
Some(parts.join("\n"))
}
}

async fn check_auto_reply(&self, agent_id: AgentId, message: &str) -> Option<String> {
// Check if auto-reply should fire for this message
let channel_type = "bridge"; // Generic; the bridge layer handles specifics
Expand Down
16 changes: 16 additions & 0 deletions crates/openfang-channels/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ pub trait ChannelBridgeHandle: Send + Sync {
// Default: no tracking
}

/// Check if a session gap occurred since the last message to this agent.
/// If gap > threshold, triggers compaction + context source queries.
/// Returns an optional context preamble to prepend to the user's message.
async fn check_session_gap(&self, _agent_id: AgentId) -> Option<String> {
None
}

/// Check if auto-reply is enabled and the message should trigger one.
/// Returns Some(reply_text) if auto-reply fires, None otherwise.
async fn check_auto_reply(&self, _agent_id: AgentId, _message: &str) -> Option<String> {
Expand Down Expand Up @@ -964,6 +971,15 @@ async fn dispatch_message(
text.clone()
};

// Session gap detection: if the user hasn't interacted for a while,
// trigger compaction + context refresh before dispatching.
// The preamble (if any) is prepended so the agent sees it before the user's message.
let prefixed_text = if let Some(preamble) = handle.check_session_gap(agent_id).await {
format!("[Session context]: {preamble}\n\n{prefixed_text}")
} else {
prefixed_text
};

// Send to agent and relay response
let result = handle.send_message(agent_id, &prefixed_text).await;

Expand Down
Loading
Loading