Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/src/ai/agent_sdk/ambient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ impl AmbientAgentRunner {
println!("To increase your concurrent agent limit, upgrade your plan: {}", url);
}
}
AmbientAgentEvent::FollowupAccepted => {}
AmbientAgentEvent::StateChanged {
state,
status_message,
Expand Down
2 changes: 1 addition & 1 deletion app/src/ai/agent_sdk/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ mod snapshot;
pub(crate) mod terminal;

use environment::PrepareEnvironmentError;
pub(crate) use snapshot::upload_snapshot_for_handoff;
pub(crate) use snapshot::{upload_snapshot_for_handoff, HandoffSnapshotUploadOutcome};
use terminal::TerminalDriverEvent;

const MCP_SERVER_STARTUP_TIMEOUT: Duration = Duration::from_secs(20);
Expand Down
37 changes: 21 additions & 16 deletions app/src/ai/agent_sdk/driver/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,17 @@ struct SnapshotOutcome {
manifest_uploaded: bool,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum HandoffSnapshotUploadOutcome {
SkippedEmptyWorkspace,
Uploaded {
initial_snapshot_token: InitialSnapshotToken,
partial_blob_upload: bool,
},
ManifestUploadFailed,
FailedBeforeManifest,
}

// --- Manifest schema ---

#[derive(serde::Serialize)]
Expand Down Expand Up @@ -730,26 +741,17 @@ async fn upload_snapshot_from_declarations_file(
/// contents, allocate an initial snapshot token plus presigned upload URLs via
/// `AIClient::upload_local_handoff_snapshot`, and upload the artifacts.
///
/// Returns:
/// - `Ok(Some(initial_snapshot_token))` when a token was minted **and the manifest landed in GCS**.
/// Individual blob uploads may still have failed; the manifest catalogues their status so the
/// cloud agent rehydrates against whatever did land, matching the cloud→cloud best-effort
/// posture.
/// - `Ok(None)` when the workspace was empty (no repos, no orphan files) **or** when the
/// manifest itself failed to upload. Without the manifest the snapshot is unusable, so
/// callers should spawn the cloud agent without an initial snapshot token instead of pointing
/// it at an incomplete prefix. Manifest-upload failures are also routed through
/// `report_error!` so on-call alerting catches the silent regression.
/// - `Err(_)` only for hard failures of `upload_local_handoff_snapshot` itself (auth, etc.).
/// Returns a structured outcome so the caller can distinguish empty workspaces,
/// manifest failures, partial blob uploads, and hard token-allocation failures.
pub(crate) async fn upload_snapshot_for_handoff(
repo_paths: Vec<PathBuf>,
orphan_file_paths: Vec<PathBuf>,
client: Arc<dyn AIClient>,
http: &http_client::Client,
) -> Result<Option<InitialSnapshotToken>> {
) -> Result<HandoffSnapshotUploadOutcome> {
if repo_paths.is_empty() && orphan_file_paths.is_empty() {
log::info!("Handoff snapshot has no declarations; skipping upload");
return Ok(None);
return Ok(HandoffSnapshotUploadOutcome::SkippedEmptyWorkspace);
}

let declarations: Vec<DeclarationEntry> = repo_paths
Expand Down Expand Up @@ -841,7 +843,7 @@ pub(crate) async fn upload_snapshot_for_handoff(
else {
// Manifest serialization failed (already reported via `report_error!` inside
// the helper). Without a manifest the snapshot is unusable, so refuse the token.
return Ok(None);
return Ok(HandoffSnapshotUploadOutcome::FailedBeforeManifest);
};

let summary = SnapshotSummary::from_entries(&outcome.entries, outcome.manifest_uploaded);
Expand All @@ -855,10 +857,13 @@ pub(crate) async fn upload_snapshot_for_handoff(
summary.uploaded,
summary.total,
));
return Ok(None);
return Ok(HandoffSnapshotUploadOutcome::ManifestUploadFailed);
}

Ok(Some(initial_snapshot_token))
Ok(HandoffSnapshotUploadOutcome::Uploaded {
initial_snapshot_token,
partial_blob_upload: !summary.all_uploaded(),
})
}

/// Core upload pipeline.
Expand Down
3 changes: 3 additions & 0 deletions app/src/ai/ambient_agents/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub enum AmbientAgentEvent {
},
/// Session started and join information became available.
SessionStarted { session_join_info: SessionJoinInfo },
/// Follow-up request was accepted by the API and polling has started.
FollowupAccepted,
/// Timed out waiting for the agent session to be ready.
TimedOut,
/// Cloud agent capacity limit has been reached. This does not block
Expand Down Expand Up @@ -147,6 +149,7 @@ pub fn submit_run_followup(
yield Err(err);
return;
}
yield Ok(AmbientAgentEvent::FollowupAccepted);

let mut stream = Box::pin(poll_run_until_joinable_session(
run_id,
Expand Down
30 changes: 30 additions & 0 deletions app/src/ai/ambient_agents/spawn_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ fn task_with(
}
}

async fn expect_followup_accepted<S>(stream: &mut S)
where
S: futures::Stream<Item = Result<AmbientAgentEvent, anyhow::Error>> + Unpin,
{
use futures::StreamExt;

let event = stream
.next()
.await
.expect("expected follow-up accepted")
.expect("expected ok");
assert!(matches!(event, AmbientAgentEvent::FollowupAccepted));
}

#[tokio::test]
async fn followup_submits_before_polling_and_ignores_previous_session_id() {
use futures::StreamExt;
Expand Down Expand Up @@ -102,6 +116,8 @@ async fn followup_submits_before_polling_and_ignores_previous_session_id() {
None,
));

expect_followup_accepted(&mut stream).await;

let event = stream
.next()
.await
Expand Down Expand Up @@ -194,6 +210,8 @@ async fn followup_terminal_failure_surfaces_status_message() {
None,
));

expect_followup_accepted(&mut stream).await;

let event = stream
.next()
.await
Expand Down Expand Up @@ -259,6 +277,8 @@ async fn followup_without_previous_session_id_accepts_joinable_session() {
None,
));

expect_followup_accepted(&mut stream).await;

let event = stream
.next()
.await
Expand Down Expand Up @@ -322,6 +342,8 @@ async fn followup_without_previous_session_id_errors_if_run_finishes_before_sess
None,
));

expect_followup_accepted(&mut stream).await;

let event = stream
.next()
.await
Expand Down Expand Up @@ -408,6 +430,8 @@ async fn followup_skips_prior_terminal_state_until_working_then_attaches() {
None,
));

expect_followup_accepted(&mut stream).await;

// First emitted event must be `Pending` — the prior `Blocked` was suppressed.
let event = stream
.next()
Expand Down Expand Up @@ -497,6 +521,8 @@ async fn followup_skips_prior_terminal_then_surfaces_real_failure() {
None,
));

expect_followup_accepted(&mut stream).await;

let event = stream
.next()
.await
Expand Down Expand Up @@ -563,6 +589,8 @@ async fn followup_cancelled_state_breaks_skip_loop() {
None,
));

expect_followup_accepted(&mut stream).await;

let event = stream
.next()
.await
Expand Down Expand Up @@ -616,6 +644,8 @@ async fn followup_bounded_skip_for_server_stall() {
None,
));

expect_followup_accepted(&mut stream).await;

// After exhausting the bounded skips, the next poll falls through to the terminal
// branch, emitting a StateChanged for the stale state and a synthetic timeout error.
let event = stream
Expand Down
159 changes: 159 additions & 0 deletions app/src/ai/ambient_agents/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,51 @@ pub enum HandoffEntryPoint {
FooterChip,
}

/// Funnel stages for local-to-cloud handoff.
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum HandoffLocalCloudStage {
ForkConversation,
PaneOpened,
AutoSubmitQueued,
AutoSubmitStarted,
SessionReady,
}

/// Funnel stages for cloud-to-cloud follow-up handoff.
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum HandoffCloudCloudStage {
FollowupSubmitted,
PollTimeout,
SessionReady,
HotswapAttached,
}

/// Low-cardinality outcome values for handoff funnel events.
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum HandoffFunnelOutcome {
Started,
Succeeded,
Failed,
Queued,
}

/// Snapshot outcomes for local-to-cloud handoff.
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum HandoffSnapshotOutcome {
Started,
SkippedEmptyWorkspace,
TokenAllocationFailed,
Uploaded,
UploadedWithPartialBlobs,
ManifestUploadFailed,
Failed,
SpawnedWithoutSnapshot,
}

/// Telemetry events for client interactions with cloud agents.
#[derive(Debug, EnumDiscriminants)]
#[strum_discriminants(derive(EnumIter))]
Expand Down Expand Up @@ -90,6 +135,27 @@ pub enum CloudAgentTelemetryEvent {
/// Whether the handoff forked an existing conversation.
forked_existing_conversation: bool,
},
/// Local-to-cloud handoff funnel stage outcome.
#[cfg_attr(target_family = "wasm", allow(dead_code))]
HandoffLocalCloudFunnel {
stage: HandoffLocalCloudStage,
outcome: HandoffFunnelOutcome,
reason: Option<String>,
forked_existing_conversation: Option<bool>,
},
/// Local-to-cloud handoff snapshot pipeline outcome.
#[cfg_attr(target_family = "wasm", allow(dead_code))]
HandoffSnapshot {
outcome: HandoffSnapshotOutcome,
reason: Option<String>,
},
/// Cloud-to-cloud handoff follow-up funnel stage outcome.
#[cfg_attr(target_family = "wasm", allow(dead_code))]
HandoffCloudCloudFunnel {
stage: HandoffCloudCloudStage,
outcome: HandoffFunnelOutcome,
reason: Option<String>,
},
}

impl TelemetryEvent for CloudAgentTelemetryEvent {
Expand Down Expand Up @@ -136,6 +202,30 @@ impl TelemetryEvent for CloudAgentTelemetryEvent {
"entry_point": entry_point,
"forked_existing_conversation": forked_existing_conversation,
})),
CloudAgentTelemetryEvent::HandoffLocalCloudFunnel {
stage,
outcome,
reason,
forked_existing_conversation,
} => Some(json!({
"stage": stage,
"outcome": outcome,
"reason": reason,
"forked_existing_conversation": forked_existing_conversation,
})),
CloudAgentTelemetryEvent::HandoffSnapshot { outcome, reason } => Some(json!({
"outcome": outcome,
"reason": reason,
})),
CloudAgentTelemetryEvent::HandoffCloudCloudFunnel {
stage,
outcome,
reason,
} => Some(json!({
"stage": stage,
"outcome": outcome,
"reason": reason,
})),
}
}

Expand Down Expand Up @@ -178,6 +268,9 @@ impl TelemetryEventDesc for CloudAgentTelemetryEventDiscriminants {
}
Self::DispatchFailed => "AmbientAgent.DispatchFailed",
Self::HandoffInitiated => "AmbientAgent.Handoff.Initiated",
Self::HandoffLocalCloudFunnel => "AmbientAgent.Handoff.LocalCloud.Funnel",
Self::HandoffSnapshot => "AmbientAgent.Handoff.Snapshot",
Self::HandoffCloudCloudFunnel => "AmbientAgent.Handoff.CloudCloud.Funnel",
}
}

Expand All @@ -200,6 +293,9 @@ impl TelemetryEventDesc for CloudAgentTelemetryEventDiscriminants {
}
Self::DispatchFailed => "Ambient agent failed to dispatch or encountered an error",
Self::HandoffInitiated => "User initiated a local-to-cloud handoff",
Self::HandoffLocalCloudFunnel => "Local-to-cloud handoff funnel stage outcome",
Self::HandoffSnapshot => "Local-to-cloud handoff snapshot outcome",
Self::HandoffCloudCloudFunnel => "Cloud-to-cloud handoff funnel stage outcome",
}
}

Expand All @@ -209,3 +305,66 @@ impl TelemetryEventDesc for CloudAgentTelemetryEventDiscriminants {
}

warp_core::register_telemetry_event!(CloudAgentTelemetryEvent);

#[cfg(test)]
mod tests {
use super::*;
use warp_core::telemetry::TelemetryEvent;

#[test]
fn local_cloud_funnel_event_has_expected_name_and_payload() {
let event = CloudAgentTelemetryEvent::HandoffLocalCloudFunnel {
stage: HandoffLocalCloudStage::PaneOpened,
outcome: HandoffFunnelOutcome::Succeeded,
reason: None,
forked_existing_conversation: Some(true),
};

assert_eq!(event.name(), "AmbientAgent.Handoff.LocalCloud.Funnel");
assert_eq!(
event.payload(),
Some(json!({
"stage": "pane_opened",
"outcome": "succeeded",
"reason": null,
"forked_existing_conversation": true,
}))
);
}

#[test]
fn snapshot_event_serializes_manifest_failure_reason() {
let event = CloudAgentTelemetryEvent::HandoffSnapshot {
outcome: HandoffSnapshotOutcome::ManifestUploadFailed,
reason: Some("manifest_upload_failed".to_string()),
};

assert_eq!(event.name(), "AmbientAgent.Handoff.Snapshot");
assert_eq!(
event.payload(),
Some(json!({
"outcome": "manifest_upload_failed",
"reason": "manifest_upload_failed",
}))
);
}

#[test]
fn cloud_cloud_funnel_event_serializes_hotswap_stage() {
let event = CloudAgentTelemetryEvent::HandoffCloudCloudFunnel {
stage: HandoffCloudCloudStage::HotswapAttached,
outcome: HandoffFunnelOutcome::Succeeded,
reason: None,
};

assert_eq!(event.name(), "AmbientAgent.Handoff.CloudCloud.Funnel");
assert_eq!(
event.payload(),
Some(json!({
"stage": "hotswap_attached",
"outcome": "succeeded",
"reason": null,
}))
);
}
}
Loading