diff --git a/backend/src/db.rs b/backend/src/db.rs index 501c6998..c28194ea 100644 --- a/backend/src/db.rs +++ b/backend/src/db.rs @@ -1643,6 +1643,21 @@ pub async fn ensure_indexes(db: &Database) -> Result<(), mongodb::error::Error> .build(), ) .await?; + // Session→worker affinity: the FIFO claim filters queued tasks by + // `required_worker_label` (null for fresh, the owning account for + // follow-ups) before sorting by `created_at`. + oracle_tasks + .create_index( + IndexModel::builder() + .keys(doc! { + "pool_id": 1, + "status": 1, + "required_worker_label": 1, + "created_at": 1, + }) + .build(), + ) + .await?; oracle_tasks .create_index( IndexModel::builder() diff --git a/backend/src/handlers/oracle_tasks.rs b/backend/src/handlers/oracle_tasks.rs index 7307cd5b..4271932b 100644 --- a/backend/src/handlers/oracle_tasks.rs +++ b/backend/src/handlers/oracle_tasks.rs @@ -501,6 +501,7 @@ mod tests { pdf_name: Some("a.pdf".to_string()), conversation_id: Some("conv_1".to_string()), is_followup: false, + required_worker_label: None, client_ref: None, status: OracleTaskStatus::Queued, phase: None, diff --git a/backend/src/models/oracle_session.rs b/backend/src/models/oracle_session.rs index 4b2b0464..fe558e18 100644 --- a/backend/src/models/oracle_session.rs +++ b/backend/src/models/oracle_session.rs @@ -32,6 +32,22 @@ pub struct OracleSession { /// Browser-side conversation URL pinned by the worker after turn 1. #[serde(default, skip_serializing_if = "Option::is_none")] pub chatgpt_url: Option, + /// Worker label of the account that created this conversation, stamped + /// on the first result. Follow-ups copy it onto their task as + /// `required_worker_label` so multi-turn lands back on the owning + /// account in a multi-account pool. `None` for legacy/unstamped + /// sessions (unpinned, today's behavior). + /// + /// Affinity keys on the worker *label*, so correctness rests on the + /// operational invariant that one stable label maps to one ChatGPT + /// account. Two tabs of the same account under different labels are + /// treated as different accounts (over-pinning — harmless beyond lost + /// load-balancing); two different accounts sharing a label would + /// reintroduce the misroute this pinning prevents. Worker label + /// assignment lives in the CDP/userscript clients, which already mint a + /// stable per-tab label (`?nyx=N` → `tab_N`). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub owner_worker_label: Option, pub turn_count: u64, #[serde(default, skip_serializing_if = "Option::is_none")] pub last_task_id: Option, @@ -62,6 +78,7 @@ mod tests { api_key_id: Some(uuid::Uuid::new_v4().to_string()), tag: Some("bedc-deep".to_string()), chatgpt_url: Some("https://chatgpt.com/c/abc".to_string()), + owner_worker_label: Some("tab_1".to_string()), turn_count: 3, last_task_id: Some("task-3".to_string()), closed_at: None, @@ -90,6 +107,7 @@ mod tests { api_key_id: None, tag: None, chatgpt_url: None, + owner_worker_label: None, turn_count: 0, last_task_id: None, closed_at: Some(Utc::now()), @@ -111,6 +129,7 @@ mod tests { api_key_id: None, tag: None, chatgpt_url: None, + owner_worker_label: None, turn_count: 0, last_task_id: None, closed_at: None, diff --git a/backend/src/models/oracle_task.rs b/backend/src/models/oracle_task.rs index 9befe54e..b3ca1a62 100644 --- a/backend/src/models/oracle_task.rs +++ b/backend/src/models/oracle_task.rs @@ -84,6 +84,12 @@ pub struct OracleTask { /// True when this task continues an existing conversation. #[serde(default)] pub is_followup: bool, + /// When set, only the worker with this label may claim the task. + /// Copied from the owning session for follow-ups so multi-turn lands + /// back on the account that created the conversation in a + /// multi-account pool. `None` = fresh task, claimable by any worker. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub required_worker_label: Option, /// Optional submitter-scoped idempotency key. #[serde(default, skip_serializing_if = "Option::is_none")] pub client_ref: Option, @@ -152,6 +158,7 @@ mod tests { pdf_name: None, conversation_id: None, is_followup: false, + required_worker_label: None, client_ref: None, status: OracleTaskStatus::Queued, phase: None, diff --git a/backend/src/services/oracle_task_service.rs b/backend/src/services/oracle_task_service.rs index 2de63c94..d6011080 100644 --- a/backend/src/services/oracle_task_service.rs +++ b/backend/src/services/oracle_task_service.rs @@ -180,51 +180,60 @@ pub async fn submit_task( // Session resolution (three-state conversation_id). let now = Utc::now(); - let (conversation_id, is_followup) = match input.conversation_id.as_deref() { - None => (None, false), - Some("") => { - let conv_id = mint_conversation_id(); - let session = OracleSession { - id: conv_id.clone(), - pool_id: pool.id.clone(), - owner_user_id: submitter.user_id.clone(), - origin: "nyxid".to_string(), - api_key_id: submitter.api_key_id.clone(), - tag: input.tag.clone(), - chatgpt_url: None, - turn_count: 0, - last_task_id: None, - closed_at: None, - created_at: now, - updated_at: now, - }; - db.collection::(ORACLE_SESSIONS) - .insert_one(&session) - .await?; - (Some(conv_id), false) - } - Some(conv_id) => { - let session = db - .collection::(ORACLE_SESSIONS) - .find_one(doc! { "_id": conv_id }) - .await? - .ok_or_else(|| AppError::OracleSessionNotFound(conv_id.to_string()))?; - if session.closed_at.is_some() { - return Err(AppError::OracleSessionClosed(conv_id.to_string())); + let (conversation_id, is_followup, required_worker_label) = + match input.conversation_id.as_deref() { + None => (None, false, None), + Some("") => { + let conv_id = mint_conversation_id(); + let session = OracleSession { + id: conv_id.clone(), + pool_id: pool.id.clone(), + owner_user_id: submitter.user_id.clone(), + origin: "nyxid".to_string(), + api_key_id: submitter.api_key_id.clone(), + tag: input.tag.clone(), + chatgpt_url: None, + owner_worker_label: None, + turn_count: 0, + last_task_id: None, + closed_at: None, + created_at: now, + updated_at: now, + }; + db.collection::(ORACLE_SESSIONS) + .insert_one(&session) + .await?; + (Some(conv_id), false, None) } - if session.pool_id != pool.id { - return Err(AppError::ValidationError( - "conversation belongs to a different pool".to_string(), - )); + Some(conv_id) => { + let session = db + .collection::(ORACLE_SESSIONS) + .find_one(doc! { "_id": conv_id }) + .await? + .ok_or_else(|| AppError::OracleSessionNotFound(conv_id.to_string()))?; + if session.closed_at.is_some() { + return Err(AppError::OracleSessionClosed(conv_id.to_string())); + } + if session.pool_id != pool.id { + return Err(AppError::ValidationError( + "conversation belongs to a different pool".to_string(), + )); + } + if session.owner_user_id != submitter.user_id { + return Err(AppError::Forbidden( + "only the session owner can continue it".to_string(), + )); + } + // Pin the follow-up to the account that owns this + // conversation (stamped on its first result). Fresh sessions + // with no owner yet stay unpinned. + ( + Some(conv_id.to_string()), + session.turn_count > 0, + session.owner_worker_label.clone(), + ) } - if session.owner_user_id != submitter.user_id { - return Err(AppError::Forbidden( - "only the session owner can continue it".to_string(), - )); - } - (Some(conv_id.to_string()), session.turn_count > 0) - } - }; + }; let task = OracleTask { id: uuid::Uuid::new_v4().to_string(), @@ -244,6 +253,7 @@ pub async fn submit_task( pdf_name: input.pdf_name, conversation_id, is_followup, + required_worker_label, client_ref: input.client_ref, status: OracleTaskStatus::Queued, phase: None, @@ -495,6 +505,7 @@ pub async fn extract_url( pdf_name: None, conversation_id: None, is_followup: false, + required_worker_label: None, client_ref: None, status: OracleTaskStatus::Queued, phase: None, @@ -544,6 +555,7 @@ pub async fn attach_conversation( api_key_id: submitter.api_key_id.clone(), tag: tag.clone(), chatgpt_url: Some(chatgpt_url.to_string()), + owner_worker_label: None, turn_count: 0, last_task_id: None, closed_at: None, @@ -570,6 +582,7 @@ pub async fn attach_conversation( pdf_name: None, conversation_id: Some(session.id.clone()), is_followup: false, + required_worker_label: None, client_ref: None, status: OracleTaskStatus::Queued, phase: None, @@ -774,6 +787,43 @@ async fn requeue_expired_leases(db: &mongodb::Database, pool_id: &str) -> AppRes Ok(result.modified_count) } +/// Affinity escape hatch — the "lease/age fallback" the issue deferred. +/// +/// A follow-up pinned (`required_worker_label`) to an owning account whose +/// worker never comes back would otherwise sit queued forever: it is never +/// dispatched to a non-matching worker, so its lease never starts and +/// `requeue_expired_leases` never touches it, yet it keeps counting against +/// the submitter's inflight quota and the pool queue cap (see +/// `enforce_submit_quotas`). Once such a task has waited a full +/// `task_timeout_secs` window — generous enough to absorb a tab reload or +/// network blip — without its owner claiming it, drop the pin so any worker +/// may claim it. A non-owner that picks it up cannot reopen the other +/// account's `/c/` and will report an extraction failure, but that +/// surfaces a terminal error and frees the quota instead of leaking it. +async fn release_stale_affinity(db: &mongodb::Database, pool: &OraclePool) -> AppResult { + let now = Utc::now(); + let cutoff = now - Duration::seconds(pool.task_timeout_secs as i64); + let result = db + .collection::(ORACLE_TASKS) + .update_many( + doc! { + "pool_id": &pool.id, + "status": "queued", + "required_worker_label": { "$ne": null }, + "created_at": { "$lt": bson::DateTime::from_chrono(cutoff) }, + }, + doc! { + "$set": { + "phase": "affinity_released_after_grace", + "updated_at": bson::DateTime::from_chrono(now), + }, + "$unset": { "required_worker_label": "" }, + }, + ) + .await?; + Ok(result.modified_count) +} + /// The payload a worker receives for a claimed task. Field names mirror /// the local oracle servers' task dicts so the userscript port stays a /// thin diff. @@ -839,10 +889,13 @@ async fn worker_payload( }) } -/// Worker poll: requeue expired leases, resume the worker's own in-flight -/// task if any (idempotent re-claim — this is what lets a tab survive a -/// mid-task page reload), then atomically claim the oldest queued task if -/// the pool has dispatch capacity. `None` = idle. +/// Worker poll: requeue expired leases, release follow-ups whose owning +/// worker is long gone (affinity grace fallback), resume the worker's own +/// in-flight task if any (idempotent re-claim — this is what lets a tab +/// survive a mid-task page reload), then atomically claim the oldest queued +/// task if the pool has dispatch capacity. `None` = idle. Because every +/// live worker polls here continuously, the stale-affinity sweep runs as +/// long as any worker in the pool is alive. pub async fn claim_task( db: &mongodb::Database, pool: &OraclePool, @@ -852,6 +905,7 @@ pub async fn claim_task( ) -> AppResult> { validate_worker_label(worker_label)?; requeue_expired_leases(db, &pool.id).await?; + release_stale_affinity(db, pool).await?; let now = Utc::now(); let lease = now + Duration::seconds(pool.task_timeout_secs as i64); @@ -898,10 +952,21 @@ pub async fn claim_task( return Ok(None); } + // Fresh tasks (`required_worker_label` absent/null — `{ $eq: null }` + // matches both) are claimable by any worker; a follow-up pinned to a + // specific account is claimable only by that account's worker, so + // multi-turn lands back on the account that owns the conversation. let claimed = db .collection::(ORACLE_TASKS) .find_one_and_update( - doc! { "pool_id": &pool.id, "status": "queued" }, + doc! { + "pool_id": &pool.id, + "status": "queued", + "$or": [ + { "required_worker_label": null }, + { "required_worker_label": worker_label }, + ], + }, doc! { "$set": { "status": "dispatched", "assigned_worker_id": worker_label, @@ -1103,6 +1168,20 @@ pub async fn worker_submit_result( }, ) .await?; + + // Stamp the owning account on the first successful result: the + // worker that produced it created the `/c/` conversation, so + // follow-ups must pin to it. `{ owner_worker_label: null }` + // matches both unset and legacy-null docs; once stamped this is a + // no-op, so the first account to answer keeps ownership. + if !is_failure { + db.collection::(ORACLE_SESSIONS) + .update_one( + doc! { "_id": conv_id, "owner_worker_label": null }, + doc! { "$set": { "owner_worker_label": worker_label } }, + ) + .await?; + } } Ok(if is_failure { @@ -1227,6 +1306,7 @@ pub async fn worker_submit_transcript( pdf_name: None, conversation_id: Some(session_id.clone()), is_followup: true, + required_worker_label: None, client_ref: None, status: OracleTaskStatus::Completed, phase: None, @@ -1271,6 +1351,15 @@ pub async fn worker_submit_transcript( ) .await?; + // The account that scraped the conversation physically owns its + // `/c/`; pin follow-ups (`oracle ask --conversation`) to it. + db.collection::(ORACLE_SESSIONS) + .update_one( + doc! { "_id": &session_id, "owner_worker_label": null }, + doc! { "$set": { "owner_worker_label": worker_label } }, + ) + .await?; + Ok(TranscriptOutcome::Imported { pairs: pair_count }) } @@ -1910,6 +1999,265 @@ mod tests { db.drop().await.ok(); } + #[tokio::test] + async fn followup_pins_to_owning_worker() { + let Some(db) = connect_test_database("oracle_task_affinity").await else { + return; + }; + let owner = uuid::Uuid::new_v4().to_string(); + let mut pool = test_pool(&owner); + pool.per_user_max_inflight = 5; + seed_pool(&db, &pool).await; + + // Open a session; tab_1 (account A) answers turn 1 and so becomes + // the conversation owner. + let t1 = submit_task( + &db, + &pool, + &submitter(&owner), + SubmitTaskInput { + conversation_id: Some(String::new()), + ..prompt_input("turn one") + }, + ) + .await + .unwrap(); + let conv_id = t1.task.conversation_id.clone().expect("conv id minted"); + assert!( + t1.task.required_worker_label.is_none(), + "fresh task unpinned" + ); + + let claimed = claim_task(&db, &pool, "tab_1", None, None) + .await + .unwrap() + .expect("claim turn one"); + assert_eq!(claimed.task_id, t1.task.id); + worker_submit_result( + &db, + &pool, + "tab_1", + &t1.task.id, + "turn one answer", + Some("https://chatgpt.com/c/xyz"), + None, + None, + 30, + ) + .await + .unwrap(); + + // Ownership is stamped on the session. + let session = crate::services::oracle_session_service::get_session_for_consumer( + &db, &owner, &conv_id, + ) + .await + .unwrap(); + assert_eq!(session.owner_worker_label.as_deref(), Some("tab_1")); + + // A follow-up copies the owner onto the task. + let t2 = submit_task( + &db, + &pool, + &submitter(&owner), + SubmitTaskInput { + conversation_id: Some(conv_id.clone()), + ..prompt_input("turn two") + }, + ) + .await + .unwrap(); + assert!(t2.task.is_followup); + assert_eq!(t2.task.required_worker_label.as_deref(), Some("tab_1")); + + // A worker on a different account (tab_2) cannot claim the pinned + // follow-up — it idles instead of misrouting. + let other = claim_task(&db, &pool, "tab_2", None, None).await.unwrap(); + assert!(other.is_none(), "tab_2 must not claim tab_1's follow-up"); + + // The owning account's worker claims it. + let claimed2 = claim_task(&db, &pool, "tab_1", None, None) + .await + .unwrap() + .expect("tab_1 claims its follow-up"); + assert_eq!(claimed2.task_id, t2.task.id); + + // A fresh single-shot task stays competitively load-balanced: any + // worker, including tab_2, may claim it. + let fresh = submit_task(&db, &pool, &submitter(&owner), prompt_input("fresh")) + .await + .unwrap(); + assert!(fresh.task.required_worker_label.is_none()); + let claimed_fresh = claim_task(&db, &pool, "tab_2", None, None) + .await + .unwrap() + .expect("tab_2 claims a fresh task"); + assert_eq!(claimed_fresh.task_id, fresh.task.id); + + db.drop().await.ok(); + } + + #[tokio::test] + async fn stale_followup_affinity_is_released_after_grace() { + let Some(db) = connect_test_database("oracle_task_affinity_grace").await else { + return; + }; + let owner = uuid::Uuid::new_v4().to_string(); + let mut pool = test_pool(&owner); + pool.per_user_max_inflight = 5; + // task_timeout_secs is the affinity grace window (test_pool: 3600s). + seed_pool(&db, &pool).await; + + // tab_1 (account A) owns the conversation. + let t1 = submit_task( + &db, + &pool, + &submitter(&owner), + SubmitTaskInput { + conversation_id: Some(String::new()), + ..prompt_input("turn one") + }, + ) + .await + .unwrap(); + let conv_id = t1.task.conversation_id.clone().unwrap(); + claim_task(&db, &pool, "tab_1", None, None) + .await + .unwrap() + .expect("claim turn one"); + worker_submit_result( + &db, + &pool, + "tab_1", + &t1.task.id, + "turn one answer", + Some("https://chatgpt.com/c/xyz"), + None, + None, + 30, + ) + .await + .unwrap(); + + // A follow-up pins to tab_1, which has now vanished. + let t2 = submit_task( + &db, + &pool, + &submitter(&owner), + SubmitTaskInput { + conversation_id: Some(conv_id.clone()), + ..prompt_input("turn two") + }, + ) + .await + .unwrap(); + assert_eq!(t2.task.required_worker_label.as_deref(), Some("tab_1")); + + // Before the grace elapses, a different account still cannot claim it. + assert!( + claim_task(&db, &pool, "tab_2", None, None) + .await + .unwrap() + .is_none(), + "pinned follow-up must not be claimable before grace" + ); + + // Age the follow-up past the grace window (simulates tab_1 never + // returning). + db.collection::(ORACLE_TASKS) + .update_one( + doc! { "_id": &t2.task.id }, + doc! { "$set": { "created_at": bson::DateTime::from_chrono(Utc::now() - Duration::seconds(pool.task_timeout_secs as i64 + 60)) } }, + ) + .await + .unwrap(); + + // Now any worker may claim it (affinity released), freeing the quota. + let recovered = claim_task(&db, &pool, "tab_2", None, None) + .await + .unwrap() + .expect("released follow-up claimable by any worker"); + assert_eq!(recovered.task_id, t2.task.id); + let (claimed_doc, _) = get_task_for_consumer(&db, &owner, &t2.task.id) + .await + .unwrap(); + assert!(claimed_doc.required_worker_label.is_none()); + assert_eq!(claimed_doc.assigned_worker_id.as_deref(), Some("tab_2")); + + db.drop().await.ok(); + } + + #[tokio::test] + async fn failed_first_turn_leaves_session_unowned() { + let Some(db) = connect_test_database("oracle_task_failed_first_turn").await else { + return; + }; + let owner = uuid::Uuid::new_v4().to_string(); + let mut pool = test_pool(&owner); + pool.per_user_max_inflight = 5; + seed_pool(&db, &pool).await; + + let t1 = submit_task( + &db, + &pool, + &submitter(&owner), + SubmitTaskInput { + conversation_id: Some(String::new()), + ..prompt_input("turn one") + }, + ) + .await + .unwrap(); + let conv_id = t1.task.conversation_id.clone().unwrap(); + claim_task(&db, &pool, "tab_1", None, None) + .await + .unwrap() + .expect("claim turn one"); + + // Turn 1 fails even though a chat URL was reported. + let outcome = worker_submit_result( + &db, + &pool, + "tab_1", + &t1.task.id, + "ERROR: extraction failed", + Some("https://chatgpt.com/c/xyz"), + None, + None, + 30, + ) + .await + .unwrap(); + assert_eq!(outcome, ResultOutcome::Failed); + + // The turn is counted and the URL pinned, but ownership is NOT + // stamped on a failed first turn. + let session = crate::services::oracle_session_service::get_session_for_consumer( + &db, &owner, &conv_id, + ) + .await + .unwrap(); + assert_eq!(session.turn_count, 1); + assert!(session.owner_worker_label.is_none()); + + // So the next follow-up stays unpinned (any worker may serve it). + let t2 = submit_task( + &db, + &pool, + &submitter(&owner), + SubmitTaskInput { + conversation_id: Some(conv_id.clone()), + ..prompt_input("turn two") + }, + ) + .await + .unwrap(); + assert!(t2.task.is_followup); + assert!(t2.task.required_worker_label.is_none()); + + db.drop().await.ok(); + } + #[tokio::test] async fn attach_conversation_imports_transcript_turns() { let Some(db) = connect_test_database("oracle_task_attach").await else {