diff --git a/packages/doeff-core-effects/src/handlers/mod.rs b/packages/doeff-core-effects/src/handlers/mod.rs index 26f110e7..3afb9c7e 100644 --- a/packages/doeff-core-effects/src/handlers/mod.rs +++ b/packages/doeff-core-effects/src/handlers/mod.rs @@ -1127,9 +1127,10 @@ impl IRStreamProgram for LazyAskHandlerProgram { cache_snapshot, semaphore_snapshot, }; + let scope_fiber = k.fibers()[0]; return IRStreamStep::Yield(DoCtrl::EvalInScope { expr: local_effect.sub_program, - scope: k, + scope_fiber, bindings: local_effect.overrides, metadata: None, }); @@ -1225,9 +1226,10 @@ impl IRStreamProgram for LazyAskHandlerProgram { source_id, semaphore, }; + let scope_fiber = continuation.fibers()[0]; IRStreamStep::Yield(DoCtrl::EvalInScope { expr, - scope: continuation, + scope_fiber, bindings: HashMap::new(), metadata: None, }) @@ -1687,11 +1689,12 @@ impl IRStreamProgram for ResultSafeHandlerProgram { if let Some(obj) = dispatch_into_python(effect.clone()) { return match parse_result_safe_python_effect(&obj) { Ok(Some(sub_program)) => { + let scope_fiber = k.fibers()[0]; let fiber_ids = k.fibers().to_vec(); self.phase = ResultSafePhase::AwaitEval { fiber_ids }; IRStreamStep::Yield(DoCtrl::EvalInScope { expr: sub_program, - scope: k, + scope_fiber, bindings: HashMap::new(), metadata: None, }) @@ -1921,15 +1924,14 @@ mod tests { use crate::effect::{dispatch_from_shared, Effect}; use crate::ids::Marker; use crate::ir_stream::{IRStream, IRStreamStep}; - use crate::segment::Segment; use pyo3::types::PyDictMethods; use pyo3::{IntoPyObject, Python}; fn make_test_continuation() -> Continuation { - let marker = Marker::fresh(); - let seg = Segment::new(marker, None); - let seg_id = crate::ids::SegmentId::from_index(0); - Continuation::capture(&seg, seg_id) + use std::sync::atomic::{AtomicUsize, Ordering}; + static NEXT_FIBER: AtomicUsize = AtomicUsize::new(2000); + let idx = NEXT_FIBER.fetch_add(1, Ordering::Relaxed); + Continuation::from_fiber(FiberId::from_index(idx), None) } fn dispatch(effect: Effect) -> DispatchEffect { @@ -1963,7 +1965,7 @@ mod tests { let mut scope = ScopeStore::default(); let mut program = AwaitHandlerProgram::new(); let continuation = make_test_continuation(); - let continuation_id = continuation.derived_cont_id(); + let continuation_id = continuation.identity(); program.phase = AwaitPhase::AwaitResult { continuation }; let location = IRStream::debug_location(&program).expect("await debug location"); @@ -1972,14 +1974,14 @@ mod tests { let step = IRStream::resume(&mut program, Value::Int(12), &mut store, &mut scope); match step { - IRStreamStep::Yield(DoCtrl::Resume { + IRStreamStep::Yield(DoCtrl::Transfer { continuation, value, }) => { - assert_eq!(continuation.derived_cont_id(), continuation_id); + assert_eq!(continuation.identity(), continuation_id); assert_eq!(value.as_int(), Some(12)); } - _ => panic!("expected IRStream Yield(Resume)"), + _ => panic!("expected IRStream Yield(Transfer), got {:?}", step), } let location = IRStream::debug_location(&program).expect("await debug location"); @@ -1994,25 +1996,37 @@ mod tests { let mut program = StateHandlerProgram::new(); let continuation = make_test_continuation(); - let continuation_id = continuation.derived_cont_id(); + let continuation_id = continuation.identity(); program.pending_key = Some("count".to_string()); program.pending_k = Some(continuation); program.pending_old_value = Some(Value::Int(5)); + program.phase = StatePhase::AwaitModifyWrite; let location = IRStream::debug_location(&program).expect("state debug location"); assert_eq!(location.function_name, "StateHandler"); assert_eq!(location.phase.as_deref(), Some("ModifyApply")); + // resume with new value (from modifier) → WriteHandlerState let step = IRStream::resume(&mut program, Value::Int(8), &mut store, &mut scope); + match &step { + IRStreamStep::Yield(DoCtrl::WriteHandlerState { key, value }) => { + assert_eq!(key, "count"); + store.put(key.clone(), value.clone()); + } + _ => panic!("expected IRStream Yield(WriteHandlerState), got {:?}", step), + } + + // resume after write → Resume with old_value + let step = IRStream::resume(&mut program, Value::Unit, &mut store, &mut scope); match step { IRStreamStep::Yield(DoCtrl::Resume { continuation, value, }) => { - assert_eq!(continuation.derived_cont_id(), continuation_id); + assert_eq!(continuation.identity(), continuation_id); assert_eq!(value.as_int(), Some(5)); } - _ => panic!("expected IRStream Yield(Resume)"), + _ => panic!("expected IRStream Yield(Resume), got {:?}", step), } assert_eq!(store.get("count").and_then(Value::as_int), Some(8)); @@ -2031,6 +2045,7 @@ mod tests { program.pending_key = Some("count".to_string()); program.pending_old_value = Some(Value::Int(5)); + program.phase = StatePhase::AwaitModifyWrite; let _ = IRStream::resume(&mut program, Value::Int(8), &mut store, &mut scope); } @@ -2132,7 +2147,7 @@ mod tests { let mut scope = ScopeStore::default(); let mut program = LazyAskHandlerProgram::new(Arc::new(Mutex::new(LazyAskState::default()))); let continuation = make_test_continuation(); - let continuation_id = continuation.derived_cont_id(); + let continuation_id = continuation.identity(); program.phase = LazyAskPhase::AwaitRelease { continuation, outcome: Ok(Value::Int(44)), @@ -2148,7 +2163,7 @@ mod tests { continuation, value, }) => { - assert_eq!(continuation.derived_cont_id(), continuation_id); + assert_eq!(continuation.identity(), continuation_id); assert_eq!(value.as_int(), Some(44)); } _ => panic!("expected IRStream Yield(Resume)"), @@ -2215,17 +2230,25 @@ mod tests { store.put("key".to_string(), Value::Int(42)); let k = make_test_continuation(); let program_ref = StateHandlerFactory.create_program(); - let step = { + let start_step = { let mut guard = program_ref.lock().unwrap(); guard.start(py, dispatch(Effect::get("key")), k, &mut store, &mut scope) }; - match step { + assert!( + matches!(start_step, IRStreamStep::Yield(DoCtrl::ReadHandlerState { .. })), + "Expected Yield(ReadHandlerState), got {:?}", start_step + ); + let resume_step = { + let mut guard = program_ref.lock().unwrap(); + guard.resume(Value::Int(42), &mut store, &mut scope) + }; + match resume_step { IRStreamStep::Yield(DoCtrl::Resume { value, .. }) => { assert_eq!(value.as_int(), Some(42)); } _ => panic!( "Expected Yield(Resume), got {:?}", - std::mem::discriminant(&step) + resume_step ), } }); @@ -2238,7 +2261,7 @@ mod tests { let mut scope = ScopeStore::default(); let k = make_test_continuation(); let program_ref = StateHandlerFactory.create_program(); - let step = { + let start_step = { let mut guard = program_ref.lock().unwrap(); guard.start( py, @@ -2248,8 +2271,19 @@ mod tests { &mut scope, ) }; + // Handler yields WriteHandlerState; simulate VM writing the state + match &start_step { + IRStreamStep::Yield(DoCtrl::WriteHandlerState { key, value }) => { + store.put(key.clone(), value.clone()); + } + _ => panic!("Expected Yield(WriteHandlerState), got {:?}", start_step), + } + let resume_step = { + let mut guard = program_ref.lock().unwrap(); + guard.resume(Value::Unit, &mut store, &mut scope) + }; assert!(matches!( - step, + resume_step, IRStreamStep::Yield(DoCtrl::Resume { value: Value::Unit, .. @@ -2269,7 +2303,8 @@ mod tests { let k = make_test_continuation(); let modifier = py.None().into_pyobject(py).unwrap().unbind().into_any(); let program_ref = StateHandlerFactory.create_program(); - let step = { + // start returns ReadHandlerState + let start_step = { let mut guard = program_ref.lock().unwrap(); guard.start( py, @@ -2282,6 +2317,15 @@ mod tests { &mut scope, ) }; + assert!( + matches!(start_step, IRStreamStep::Yield(DoCtrl::ReadHandlerState { .. })), + "Expected Yield(ReadHandlerState), got {:?}", start_step + ); + // resume with stored value yields NeedsPython + let step = { + let mut guard = program_ref.lock().unwrap(); + guard.resume(Value::Int(10), &mut store, &mut scope) + }; match step { IRStreamStep::NeedsPython(PythonCall::CallFunc { args, .. }) => { assert_eq!(args.len(), 1); @@ -2302,10 +2346,10 @@ mod tests { let k = make_test_continuation(); let modifier = py.None().into_pyobject(py).unwrap().unbind().into_any(); let program_ref = StateHandlerFactory.create_program(); - // start: returns NeedsPython + // start: returns ReadHandlerState { let mut guard = program_ref.lock().unwrap(); - guard.start( + let step = guard.start( py, dispatch(Effect::Modify { key: "key".to_string(), @@ -2315,12 +2359,30 @@ mod tests { &mut store, &mut scope, ); + assert!(matches!(step, IRStreamStep::Yield(DoCtrl::ReadHandlerState { .. }))); } - // resume with new value - let step = { + // resume with old value from store → NeedsPython + { + let mut guard = program_ref.lock().unwrap(); + let step = guard.resume(Value::Int(10), &mut store, &mut scope); + assert!(matches!(step, IRStreamStep::NeedsPython(PythonCall::CallFunc { .. }))); + } + // resume with modifier result (new value) → WriteHandlerState + let write_step = { let mut guard = program_ref.lock().unwrap(); guard.resume(Value::Int(20), &mut store, &mut scope) }; + match &write_step { + IRStreamStep::Yield(DoCtrl::WriteHandlerState { key, value }) => { + store.put(key.clone(), value.clone()); + } + _ => panic!("Expected Yield(WriteHandlerState), got {:?}", write_step), + } + // resume after write → Resume with old_value + let step = { + let mut guard = program_ref.lock().unwrap(); + guard.resume(Value::Unit, &mut store, &mut scope) + }; match step { IRStreamStep::Yield(DoCtrl::Resume { value, .. }) => { assert_eq!(value.as_int(), Some(10)); // old_value returned (SPEC-008 L1271) @@ -2442,7 +2504,8 @@ mod tests { Python::attach(|py| { let mut store = VarStore::new(); let mut scope = ScopeStore::default(); - let k = make_test_continuation(); + let k1 = make_test_continuation(); + let k2 = make_test_continuation(); let locals = pyo3::types::PyDict::new(py); locals @@ -2463,7 +2526,7 @@ mod tests { let ok_program = ResultSafeHandlerFactory.create_program(); let start_step = { let mut guard = ok_program.lock().unwrap(); - guard.start(py, effect.clone(), k.clone(), &mut store, &mut scope) + guard.start(py, effect.clone(), k1, &mut store, &mut scope) }; assert!(matches!( start_step, @@ -2475,7 +2538,11 @@ mod tests { guard.resume(Value::Int(42), &mut store, &mut scope) }; match ok_step { - IRStreamStep::Yield(DoCtrl::Resume { + IRStreamStep::Yield(DoCtrl::Transfer { + value: Value::Python(obj), + .. + }) + | IRStreamStep::Yield(DoCtrl::Resume { value: Value::Python(obj), .. }) => { @@ -2485,13 +2552,13 @@ mod tests { assert!(is_ok); assert_eq!(inner.extract::().unwrap(), 42); } - _ => panic!("expected Resume with Ok(value)"), + _ => panic!("expected Transfer/Resume with Ok(value), got {:?}", ok_step), } let err_program = ResultSafeHandlerFactory.create_program(); let _ = { let mut guard = err_program.lock().unwrap(); - guard.start(py, effect, k, &mut store, &mut scope) + guard.start(py, effect, k2, &mut store, &mut scope) }; let err_step = { @@ -2500,7 +2567,11 @@ mod tests { }; match err_step { - IRStreamStep::Yield(DoCtrl::Resume { + IRStreamStep::Yield(DoCtrl::Transfer { + value: Value::Python(obj), + .. + }) + | IRStreamStep::Yield(DoCtrl::Resume { value: Value::Python(obj), .. }) => { @@ -2511,7 +2582,7 @@ mod tests { assert!(is_err); assert!(msg.contains("boom")); } - _ => panic!("expected Resume with Err(exception)"), + _ => panic!("expected Transfer/Resume with Err(exception), got {:?}", err_step), } }); } diff --git a/packages/doeff-core-effects/src/scheduler/mod.rs b/packages/doeff-core-effects/src/scheduler/mod.rs index 22172ceb..0028a452 100644 --- a/packages/doeff-core-effects/src/scheduler/mod.rs +++ b/packages/doeff-core-effects/src/scheduler/mod.rs @@ -12,7 +12,7 @@ use pyo3::prelude::*; use pyo3::types::{PyDict, PyTuple}; use crate::capture::{SpawnSite, TraceHop}; -use crate::continuation::{Continuation, OwnedControlContinuation}; +use crate::continuation::{Continuation, OwnedControlContinuation, PendingContinuation}; use crate::doeff_generator::DoeffGeneratorFn; use crate::effect::{ dispatch_into_python, dispatch_ref_as_python, make_execution_context_object, DispatchEffect, @@ -23,7 +23,7 @@ use crate::effect::{ use crate::error::VMError; use crate::handler::{IRStreamFactory, IRStreamProgram, IRStreamProgramRef}; use crate::handlers::AwaitHandlerFactory; -use crate::ids::{ContId, FiberId, PromiseId, TaskId}; +use crate::ids::{FiberId, PromiseId, TaskId}; use crate::ir_stream::{IRStream, IRStreamStep, StreamLocation}; use crate::kleisli::{DgfnKleisli, KleisliRef}; use crate::py_shared::PyShared; @@ -185,21 +185,21 @@ enum WaitMode { #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] enum WaitOwner { - Task { task_id: TaskId, cont_id: ContId }, - Root { cont_id: ContId }, + Task { task_id: TaskId, fiber_id: Option }, + Root { fiber_id: Option }, } impl WaitOwner { - fn cont_id(&self) -> ContId { + fn fiber_id(&self) -> Option { match self { - WaitOwner::Task { cont_id, .. } | WaitOwner::Root { cont_id } => *cont_id, + WaitOwner::Task { fiber_id, .. } | WaitOwner::Root { fiber_id } => *fiber_id, } } } #[derive(Debug)] struct WaitRequest { - cont_id: ContId, + fiber_id: Option, continuation: Option, items: Vec, mode: WaitMode, @@ -214,10 +214,10 @@ impl WaitRequest { match self.waiting_task { Some(task_id) => WaitOwner::Task { task_id, - cont_id: self.cont_id, + fiber_id: self.fiber_id, }, None => WaitOwner::Root { - cont_id: self.cont_id, + fiber_id: self.fiber_id, }, } } @@ -291,7 +291,7 @@ struct ReadyRootResume { #[derive(Clone, Copy, Debug, PartialEq, Eq)] enum ReadyTarget { Task(TaskId), - Root(ContId), + Root(Option), } #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -305,11 +305,11 @@ impl Ord for ReadyEntry { fn cmp(&self, other: &Self) -> CmpOrdering { let self_target = match self.target { ReadyTarget::Task(task_id) => task_id.raw(), - ReadyTarget::Root(cont_id) => cont_id.raw(), + ReadyTarget::Root(fiber_id) => fiber_id.map(|f| f.index() as u64).unwrap_or(0), }; let other_target = match other.target { ReadyTarget::Task(task_id) => task_id.raw(), - ReadyTarget::Root(cont_id) => cont_id.raw(), + ReadyTarget::Root(fiber_id) => fiber_id.map(|f| f.index() as u64).unwrap_or(0), }; self.priority .cmp(&other.priority) @@ -415,17 +415,17 @@ fn started(k: Continuation) -> OwnedControlContinuation { OwnedControlContinuation::Started(k) } -fn step_targets_cont_id(step: &IRStreamStep, cont_id: ContId) -> bool { +fn step_targets_fiber_id(step: &IRStreamStep, fiber_id: Option) -> bool { match step { IRStreamStep::Yield(DoCtrl::Resume { continuation, .. }) | IRStreamStep::Yield(DoCtrl::ResumeThrow { continuation, .. }) | IRStreamStep::Yield(DoCtrl::Transfer { continuation, .. }) | IRStreamStep::Yield(DoCtrl::Discontinue { continuation, .. }) | IRStreamStep::Yield(DoCtrl::TransferThrow { continuation, .. }) => { - continuation.derived_cont_id() == cont_id + continuation.identity() == fiber_id } IRStreamStep::Yield(DoCtrl::ResumeContinuation { continuation, .. }) => { - continuation.cont_id() == cont_id + continuation.identity() == fiber_id } IRStreamStep::Yield( DoCtrl::Pure { .. } @@ -506,7 +506,7 @@ fn shared_handler_suffix_len(current: &[KleisliRef], stored: &[KleisliRef]) -> u pub struct SchedulerState { ready: ReadySet, ready_task_ids: HashSet, - ready_root_resumes: HashMap, + ready_root_resumes: HashMap, ReadyRootResume>, pub tasks: HashMap, task_metadata: HashMap, pub promises: HashMap, @@ -1310,11 +1310,11 @@ impl SchedulerState { self.ready.push(entry); } - fn enqueue_ready_root(&mut self, cont_id: ContId) { + fn enqueue_ready_root(&mut self, fiber_id: Option) { let entry = ReadyEntry { priority: PRIORITY_NORMAL, sequence: self.next_ready_sequence, - target: ReadyTarget::Root(cont_id), + target: ReadyTarget::Root(fiber_id), }; self.next_ready_sequence += 1; self.ready.push(entry); @@ -1391,12 +1391,12 @@ impl SchedulerState { fn finalize_task_cancellation(&mut self, task_id: TaskId) { let task_exists = self.tasks.contains_key(&task_id); - let cont_id = match self.tasks.get(&task_id) { - Some(TaskState::Pending { cont, .. }) => Some(cont.cont_id()), + let fiber_id = match self.tasks.get(&task_id) { + Some(TaskState::Pending { cont, .. }) => Some(cont.identity()), Some(TaskState::Done { .. }) | None => None, }; - if let Some(cont_id) = cont_id { - self.clear_waiters_for_owner(Some(task_id), cont_id); + if let Some(fiber_id) = fiber_id { + self.clear_waiters_for_owner(Some(task_id), fiber_id); } self.remove_ready_task(task_id); @@ -1431,9 +1431,9 @@ impl SchedulerState { return false; } - let (cont_id, priority, started) = match task_state { + let (fiber_id, priority, started) = match task_state { TaskState::Pending { cont, priority, .. } => { - (cont.cont_id(), *priority, cont.is_started()) + (cont.identity(), *priority, cont.is_started()) } TaskState::Done { .. } => return false, }; @@ -1442,7 +1442,7 @@ impl SchedulerState { return false; } - self.clear_waiters_for_owner(Some(task_id), cont_id); + self.clear_waiters_for_owner(Some(task_id), fiber_id); self.cancel_requested.remove(&task_id); let cancelled_waiters = self.remove_semaphore_waiters_for_task(task_id); @@ -1721,7 +1721,7 @@ impl SchedulerState { let (waiter_id, waiting_task, mode, item_count, items) = { let waiter = self.wait_requests.get(&owner)?; ( - waiter.cont_id, + waiter.fiber_id, waiter.waiting_task, waiter.mode, waiter.items.len(), @@ -1748,7 +1748,7 @@ impl SchedulerState { pending_log_merge_items, .. }) => { - if cont.cont_id() != waiter_id { + if cont.identity() != waiter_id { return None; } *resume_outcome = Some(outcome.clone()); @@ -1801,7 +1801,7 @@ impl SchedulerState { } for owner in waiters_for_item { - let waiter_id = owner.cont_id(); + let waiter_id = owner.fiber_id(); let already_ready = match owner { WaitOwner::Task { task_id, .. } => self.ready_task_ids.contains(&task_id), WaitOwner::Root { .. } => self.ready_root_resumes.contains_key(&waiter_id), @@ -1872,11 +1872,11 @@ impl SchedulerState { fn clear_waiters_for_owner( &mut self, waiting_task: Option, - cont_id: ContId, + fiber_id: Option, ) -> Option { let owner = match waiting_task { - Some(task_id) => WaitOwner::Task { task_id, cont_id }, - None => WaitOwner::Root { cont_id }, + Some(task_id) => WaitOwner::Task { task_id, fiber_id }, + None => WaitOwner::Root { fiber_id }, }; let removed_request = self.wait_requests.remove(&owner); self.pending_gather_fail_fast.remove(&owner); @@ -1888,13 +1888,13 @@ impl SchedulerState { } else { self.waitables_by_owner.remove(&owner).unwrap_or_default() }; - if let Some(ready_resume) = self.ready_root_resumes.get(&cont_id) { + if let Some(ready_resume) = self.ready_root_resumes.get(&fiber_id) { if ready_resume.waiting_task == waiting_task { - self.ready_root_resumes.remove(&cont_id); + self.ready_root_resumes.remove(&fiber_id); } } self.ready.retain(|entry| match entry.target { - ReadyTarget::Root(root_id) => root_id != cont_id, + ReadyTarget::Root(root_id) => root_id != fiber_id, ReadyTarget::Task(_) => true, }); @@ -1907,7 +1907,7 @@ impl SchedulerState { .. }) = self.tasks.get_mut(&task_id) { - if cont.cont_id() == cont_id { + if cont.identity() == fiber_id { *resume_outcome = None; *pending_log_merge_items = None; } @@ -1920,7 +1920,7 @@ impl SchedulerState { .filter(|task_id| { matches!( self.tasks.get(task_id), - Some(TaskState::Pending { cont, .. }) if cont.cont_id() == cont_id + Some(TaskState::Pending { cont, .. }) if cont.identity() == fiber_id ) }) .collect(); @@ -2001,7 +2001,7 @@ impl SchedulerState { WaitOwner::Task { task_id, .. } => Some(task_id), WaitOwner::Root { .. } => None, }, - owner.cont_id(), + owner.fiber_id(), ) .expect("fail-fast waiter must still be registered"); let mut pending_cleanup_tasks = HashSet::new(); @@ -2059,7 +2059,7 @@ impl SchedulerState { } let waiter = fail_fast.waiter; - let waiter_id = waiter.cont_id; + let waiter_id = waiter.fiber_id; self.execution_context_task_override = Some(fail_fast.failed_task); if let Some(waiting_task) = waiter.waiting_task { @@ -2071,7 +2071,7 @@ impl SchedulerState { pending_log_merge_items, .. }) => { - if cont.cont_id() != waiter_id { + if cont.identity() != waiter_id { return None; } *resume_outcome = Some(Err(fail_fast.error)); @@ -2179,7 +2179,7 @@ impl SchedulerState { fn register_waiter( &mut self, items: &[Waitable], - cont_id: ContId, + fiber_id: Option, continuation: Option, store: &VarStore, mode: WaitMode, @@ -2196,8 +2196,8 @@ impl SchedulerState { self.current_task.is_none() && self.root_wait_should_restore_store(items); let owner = match self.current_task { - Some(task_id) => WaitOwner::Task { task_id, cont_id }, - None => WaitOwner::Root { cont_id }, + Some(task_id) => WaitOwner::Task { task_id, fiber_id }, + None => WaitOwner::Root { fiber_id }, }; self.active_wait_owners.insert(owner); assert!( @@ -2212,7 +2212,7 @@ impl SchedulerState { WaitMode::Any => 1, }; let waiter = WaitRequest { - cont_id, + fiber_id, continuation, items: items.to_vec(), mode, @@ -2234,21 +2234,21 @@ impl SchedulerState { pub fn wait_on_all( &mut self, items: &[Waitable], - cont_id: ContId, + fiber_id: Option, continuation: Option, store: &VarStore, ) { - self.register_waiter(items, cont_id, continuation, store, WaitMode::All); + self.register_waiter(items, fiber_id, continuation, store, WaitMode::All); } pub fn wait_on_any( &mut self, items: &[Waitable], - cont_id: ContId, + fiber_id: Option, continuation: Option, store: &VarStore, ) { - self.register_waiter(items, cont_id, continuation, store, WaitMode::Any); + self.register_waiter(items, fiber_id, continuation, store, WaitMode::Any); } fn root_wait_should_restore_store(&self, items: &[Waitable]) -> bool { @@ -2353,8 +2353,8 @@ impl SchedulerState { selected_ready = Some(entry); break; } - ReadyTarget::Root(cont_id) => { - if !self.ready_root_resumes.contains_key(&cont_id) { + ReadyTarget::Root(fiber_id) => { + if !self.ready_root_resumes.contains_key(&fiber_id) { continue; } selected_ready = Some(entry); @@ -2421,13 +2421,13 @@ impl SchedulerState { Value::Unit, )); } - ReadyTarget::Root(cont_id) => { - let Some(ready_root) = self.ready_root_resumes.remove(&cont_id) else { + ReadyTarget::Root(fiber_id) => { + let Some(ready_root) = self.ready_root_resumes.remove(&fiber_id) else { continue; }; self.clear_waiters_for_owner( ready_root.waiting_task, - cont_id, + fiber_id, ); if let Some(waiting_task) = ready_root.waiting_task { if let Err(error) = self.load_task_store(waiting_task, store) { @@ -2484,16 +2484,16 @@ impl SchedulerState { store: &mut VarStore, ) -> Result, PyException> { match owner { - WaitOwner::Task { task_id, cont_id } => { + WaitOwner::Task { task_id, fiber_id } => { if !self.ready_task_ids.contains(&task_id) { return Ok(None); } - let task_cont_id = match self.tasks.get(&task_id) { - Some(TaskState::Pending { cont, .. }) => cont.cont_id(), + let task_fiber_id = match self.tasks.get(&task_id) { + Some(TaskState::Pending { cont, .. }) => cont.identity(), _ => return Ok(None), }; - if task_cont_id != cont_id { + if task_fiber_id != fiber_id { return Ok(None); } let (task_k, resume_outcome) = self.take_task_continuation(task_id)?; @@ -2526,13 +2526,13 @@ impl SchedulerState { }; Ok(Some(step)) } - WaitOwner::Root { cont_id } => { - let Some(ready_root) = self.ready_root_resumes.remove(&cont_id) else { + WaitOwner::Root { fiber_id } => { + let Some(ready_root) = self.ready_root_resumes.remove(&fiber_id) else { return Ok(None); }; self.clear_waiters_for_owner( ready_root.waiting_task, - cont_id, + fiber_id, ); if let Some(waiting_task) = ready_root.waiting_task { self.load_task_store(waiting_task, store)?; @@ -2698,7 +2698,7 @@ impl SchedulerProgram { fn wait_owner( &self, waiting_task: Option, - cont_id: ContId, + fiber_id: Option, active_driver_owner: Option, ) -> WaitOwner { match (&self.phase, waiting_task) { @@ -2713,8 +2713,8 @@ impl SchedulerProgram { active_driver_owner.expect("guarded by is_some") } _ => match waiting_task { - Some(task_id) => WaitOwner::Task { task_id, cont_id }, - None => WaitOwner::Root { cont_id }, + Some(task_id) => WaitOwner::Task { task_id, fiber_id }, + None => WaitOwner::Root { fiber_id }, }, } } @@ -2763,9 +2763,9 @@ impl SchedulerProgram { ) -> IRStreamStep { let mut state = self.state.lock().expect("Scheduler lock poisoned"); let waiting_task = state.current_task; - let cont_id = k_user.derived_cont_id(); + let fiber_id = k_user.identity(); if let Some(aggregate) = state.collect_all_result(&items) { - state.clear_waiters_for_owner(waiting_task, cont_id); + state.clear_waiters_for_owner(waiting_task, fiber_id); return match aggregate { Ok(results) => resume_to_continuation(started(k_user), results), Err(error) => throw_to_continuation(started(k_user), error), @@ -2780,8 +2780,8 @@ impl SchedulerProgram { Some(k_user) }; - state.wait_on_all(&items, cont_id, root_wait_continuation, store); - let owner = self.wait_owner(waiting_task, cont_id, state.active_driver_owner); + state.wait_on_all(&items, fiber_id, root_wait_continuation, store); + let owner = self.wait_owner(waiting_task, fiber_id, state.active_driver_owner); drop(state); self.continue_wait_transfer( owner, @@ -2811,7 +2811,7 @@ impl SchedulerProgram { ) -> IRStreamStep { match outcome { TransferNextOutcome::Step(step) => { - let resumed_waiting_owner = step_targets_cont_id(&step, owner.cont_id()); + let resumed_waiting_owner = step_targets_fiber_id(&step, owner.fiber_id()); let keep_driving = next_running_task.is_some() && !resumed_waiting_owner && step_switches_into_task_body(&step); @@ -2949,7 +2949,7 @@ impl SchedulerProgram { other => other, }; let next_running_task = state.current_task; - let resumed_preempted_caller = step_targets_cont_id(&step, k_user.derived_cont_id()); + let resumed_preempted_caller = step_targets_fiber_id(&step, k_user.identity()); let switched_into_task_body = step_switches_into_task_body(&step); let keep_preemptive_transfer = next_running_task.is_some() && switched_into_task_body @@ -3010,9 +3010,9 @@ impl SchedulerProgram { let mut state = self.state.lock().expect("Scheduler lock poisoned"); let items = [item]; let waiting_task = state.current_task; - let cont_id = k_user.derived_cont_id(); + let fiber_id = k_user.identity(); if let Some(result) = state.collect_any_result(&items) { - state.clear_waiters_for_owner(waiting_task, cont_id); + state.clear_waiters_for_owner(waiting_task, fiber_id); return match result { Ok(value) => resume_to_continuation(started(k_user), value), Err(error) => throw_to_continuation(started(k_user), error), @@ -3027,8 +3027,8 @@ impl SchedulerProgram { Some(k_user) }; - state.wait_on_any(&items, cont_id, root_wait_continuation, store); - let owner = self.wait_owner(waiting_task, cont_id, state.active_driver_owner); + state.wait_on_any(&items, fiber_id, root_wait_continuation, store); + let owner = self.wait_owner(waiting_task, fiber_id, state.active_driver_owner); drop(state); self.continue_wait_transfer( owner, @@ -3058,9 +3058,9 @@ impl SchedulerProgram { ) -> IRStreamStep { let mut state = self.state.lock().expect("Scheduler lock poisoned"); let waiting_task = state.current_task; - let cont_id = k_user.derived_cont_id(); + let fiber_id = k_user.identity(); if let Some(first) = state.collect_any_result(&items) { - state.clear_waiters_for_owner(waiting_task, cont_id); + state.clear_waiters_for_owner(waiting_task, fiber_id); return match first { Ok(value) => resume_to_continuation(started(k_user), value), Err(error) => throw_to_continuation(started(k_user), error), @@ -3075,8 +3075,8 @@ impl SchedulerProgram { Some(k_user) }; - state.wait_on_any(&items, cont_id, root_wait_continuation, store); - let owner = self.wait_owner(waiting_task, cont_id, state.active_driver_owner); + state.wait_on_any(&items, fiber_id, root_wait_continuation, store); + let owner = self.wait_owner(waiting_task, fiber_id, state.active_driver_owner); drop(state); self.continue_wait_transfer( owner, @@ -3216,11 +3216,12 @@ impl SchedulerProgram { (Some(current), Some(woken)) if woken.priority > current ) && !nested_preemptive_transfer_active; if should_preempt { - let transfer_k = Continuation::from_fiber(k_user.fibers()[0], None); + let transfer_fiber_id = k_user.fibers()[0]; if let Err(error) = state.park_current_with_value(k_user, Value::Unit) { return IRStreamStep::Throw(error); } drop(state); + let transfer_k = Continuation::from_fiber(transfer_fiber_id, None); self.continue_preemptive_transfer_step(transfer_k, store) } else { resume_to_continuation(started(k_user), Value::Unit) @@ -3289,7 +3290,7 @@ impl SchedulerProgram { match state.transfer_next(store) { TransferNextOutcome::Step(step) => { let next_running_task = state.current_task; - let resumed_waiting_owner = step_targets_cont_id(&step, owner.cont_id()); + let resumed_waiting_owner = step_targets_fiber_id(&step, owner.fiber_id()); let keep_driving = next_running_task.is_some() && !resumed_waiting_owner && step_switches_into_task_body(&step); @@ -3501,7 +3502,7 @@ impl IRStreamProgram for SchedulerProgram { let mut state = self.state.lock().expect("Scheduler lock poisoned"); let items = [Waitable::Promise(promise_id)]; let waiting_task = state.current_task; - let cont_id = k_user.derived_cont_id(); + let fiber_id = k_user.identity(); let root_wait_continuation = if let Some(waiting_task) = state.current_task { if let Err(error) = state.suspend_task_for_wait(waiting_task, k_user) { @@ -3511,9 +3512,9 @@ impl IRStreamProgram for SchedulerProgram { } else { Some(k_user) }; - state.wait_on_any(&items, cont_id, root_wait_continuation, store); + state.wait_on_any(&items, fiber_id, root_wait_continuation, store); let owner = - self.wait_owner(waiting_task, cont_id, state.active_driver_owner); + self.wait_owner(waiting_task, fiber_id, state.active_driver_owner); drop(state); self.continue_wait_transfer( owner, @@ -3793,13 +3794,14 @@ impl IRStreamProgram for SchedulerProgram { let current_priority = state.current_task_priority(); let task_value = Value::Task(TaskHandle { id: task_id }); if matches!(current_priority, Some(current) if priority > current) { - let transfer_k = Continuation::from_fiber(k_user.fibers()[0], None); + let transfer_fiber_id = k_user.fibers()[0]; if let Err(error) = state.park_current_with_value(k_user, task_value) { return IRStreamStep::Throw(error); } drop(state); + let transfer_k = Continuation::from_fiber(transfer_fiber_id, None); self.continue_preemptive_transfer_step(transfer_k, store) } else { resume_to_continuation(started(k_user), task_value) @@ -4038,17 +4040,23 @@ mod tests { use pyo3::{IntoPyObject, PyClassInitializer, Python}; fn make_test_continuation() -> Continuation { - use crate::ids::{Marker, SegmentId}; - use crate::segment::Segment; + use std::sync::atomic::{AtomicUsize, Ordering}; + static NEXT_FIBER: AtomicUsize = AtomicUsize::new(1000); + let idx = NEXT_FIBER.fetch_add(1, Ordering::Relaxed); + Continuation::from_fiber(FiberId::from_index(idx), None) + } - let marker = Marker::fresh(); - let seg = Segment::new(marker, None); - let seg_id = SegmentId::from_index(0); - Continuation::capture(&seg, seg_id) + /// Create a test continuation with a specific FiberId. + /// Use this when multiple continuations need the same identity (replacing clone). + fn make_test_continuation_with_fiber(fiber_id: FiberId) -> Continuation { + Continuation::from_fiber(fiber_id, None) } - fn make_unstarted_test_continuation() -> Continuation { - Python::attach(|py| Continuation::create_unstarted(PyShared::new(py.None()), Vec::new())) + fn make_unstarted_test_continuation() -> OwnedControlContinuation { + Python::attach(|py| { + let pc = PendingContinuation::create(PyShared::new(py.None()), Vec::new()); + OwnedControlContinuation::Pending(pc) + }) } fn named_rust_handler(name: &str) -> KleisliRef { @@ -4082,10 +4090,7 @@ mod tests { let program = SchedulerProgram::new(state); let code = pyo3::ffi::c_str!( - "from doeff.handlers.await_handlers import AWAIT_SHIM_ATTR\n\ - def handler(effect, k):\n\ - yield effect\n\ - setattr(handler, AWAIT_SHIM_ATTR, True)\n" + "from doeff.handlers.await_handlers import AWAIT_SHIM_ATTR\ndef handler(effect, k):\n yield effect\nsetattr(handler, AWAIT_SHIM_ATTR, True)\n" ); let filename = pyo3::ffi::c_str!("test_sync_await_shim.py"); let module_name = pyo3::ffi::c_str!("test_sync_await_shim"); @@ -4193,15 +4198,15 @@ mod tests { #[test] fn test_transfer_to_continuation_started_emits_transfer() { let cont = make_test_continuation(); - let cont_id = cont.derived_cont_id(); - let step = transfer_to_continuation(cont, Value::Int(123)); + let fiber_id = cont.identity(); + let step = transfer_to_continuation(OwnedControlContinuation::Started(cont), Value::Int(123)); match step { IRStreamStep::Yield(DoCtrl::Transfer { continuation, value, }) => { - assert_eq!(continuation.derived_cont_id(), cont_id); + assert_eq!(continuation.identity(), fiber_id); assert_eq!(value.as_int(), Some(123)); } _ => panic!("started continuation must emit DoCtrl::Transfer"), @@ -4211,15 +4216,16 @@ mod tests { #[test] fn test_transfer_to_continuation_unstarted_emits_resume_continuation() { let cont = make_unstarted_test_continuation(); - let cont_id = cont.derived_cont_id(); + let fiber_id = cont.identity(); let step = transfer_to_continuation(cont, Value::Int(456)); + match step { IRStreamStep::Yield(DoCtrl::ResumeContinuation { continuation, value, }) => { - assert_eq!(continuation.cont_id(), cont_id); + assert_eq!(continuation.identity(), fiber_id); assert_eq!(value.as_int(), Some(456)); } _ => panic!("unstarted continuation must emit DoCtrl::ResumeContinuation"), @@ -4228,14 +4234,13 @@ mod tests { #[test] fn test_step_is_terminal() { - let cont = make_test_continuation(); let terminal_steps = vec![ IRStreamStep::Yield(DoCtrl::Transfer { - continuation: cont.clone(), + continuation: make_test_continuation(), value: Value::Unit, }), IRStreamStep::Yield(DoCtrl::TransferThrow { - continuation: cont.clone(), + continuation: make_test_continuation(), exception: PyException::runtime_error("boom".to_string()), }), IRStreamStep::Yield(DoCtrl::Pass), @@ -4249,15 +4254,15 @@ mod tests { let non_terminal_steps = vec![ IRStreamStep::Yield(DoCtrl::Delegate), IRStreamStep::Yield(DoCtrl::Resume { - continuation: cont.clone(), + continuation: make_test_continuation(), value: Value::Unit, }), IRStreamStep::Yield(DoCtrl::ResumeThrow { - continuation: cont.clone(), + continuation: make_test_continuation(), exception: PyException::runtime_error("boom".to_string()), }), IRStreamStep::Yield(DoCtrl::GetTraceback { - continuation: cont.clone(), + continuation: make_test_continuation(), }), IRStreamStep::Yield(DoCtrl::GetHandlers), IRStreamStep::Yield(DoCtrl::CreateContinuation { @@ -4267,7 +4272,7 @@ mod tests { outside_scope: None, }), IRStreamStep::Yield(DoCtrl::ResumeContinuation { - continuation: cont.clone(), + continuation: OwnedControlContinuation::Started(make_test_continuation()), value: Value::Unit, }), IRStreamStep::Yield(DoCtrl::Expand { @@ -4292,10 +4297,11 @@ mod tests { let mut fresh_state = SchedulerState::new(); let fresh_task = fresh_state.alloc_task_id(); let fresh_cont = make_test_continuation(); + let fresh_cont_id = fresh_cont.identity(); fresh_state.tasks.insert( fresh_task, TaskState::Pending { - cont: fresh_cont.clone(), + cont: OwnedControlContinuation::Started(fresh_cont), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -4307,16 +4313,17 @@ mod tests { assert!(matches!( fresh_step, IRStreamStep::Yield(DoCtrl::Transfer { continuation, .. }) - if continuation.derived_cont_id() == fresh_cont.derived_cont_id() + if continuation.identity() == fresh_cont_id )); let mut woke_ok_state = SchedulerState::new(); let woke_ok_task = woke_ok_state.alloc_task_id(); let woke_ok_cont = make_test_continuation(); + let woke_ok_cont_id = woke_ok_cont.identity(); woke_ok_state.tasks.insert( woke_ok_task, TaskState::Pending { - cont: woke_ok_cont.clone(), + cont: OwnedControlContinuation::Started(woke_ok_cont), store: TaskStore::Shared, resume_outcome: Some(Ok(Value::Int(7))), priority: PRIORITY_NORMAL, @@ -4328,16 +4335,17 @@ mod tests { assert!(matches!( woke_ok_step, IRStreamStep::Yield(DoCtrl::Transfer { continuation, value }) - if continuation.derived_cont_id() == woke_ok_cont.derived_cont_id() && value.as_int() == Some(7) + if continuation.identity() == woke_ok_cont_id && value.as_int() == Some(7) )); let mut woke_err_state = SchedulerState::new(); let woke_err_task = woke_err_state.alloc_task_id(); let woke_err_cont = make_test_continuation(); + let woke_err_cont_id = woke_err_cont.identity(); woke_err_state.tasks.insert( woke_err_task, TaskState::Pending { - cont: woke_err_cont.clone(), + cont: OwnedControlContinuation::Started(woke_err_cont), store: TaskStore::Shared, resume_outcome: Some(Err(PyException::runtime_error("boom".to_string()))), priority: PRIORITY_NORMAL, @@ -4349,7 +4357,7 @@ mod tests { assert!(matches!( woke_err_step, IRStreamStep::Yield(DoCtrl::TransferThrow { continuation, .. }) - if continuation.derived_cont_id() == woke_err_cont.derived_cont_id() + if continuation.identity() == woke_err_cont_id )); } @@ -4357,7 +4365,7 @@ mod tests { fn test_ready_set_fifo_push_pop_and_retain() { let t0 = TaskId::from_raw(0); let t1 = TaskId::from_raw(1); - let mut ready = ReadySet::Fifo(VecDeque::new()); + let mut ready = ReadySet(BinaryHeap::new()); ready.push(ReadyEntry { priority: PRIORITY_NORMAL, sequence: 0, @@ -4383,7 +4391,7 @@ mod tests { let t0 = TaskId::from_raw(0); let t1 = TaskId::from_raw(1); let t2 = TaskId::from_raw(2); - let mut ready = ReadySet::Priority(BinaryHeap::new()); + let mut ready = ReadySet(BinaryHeap::new()); ready.push(ReadyEntry { priority: PRIORITY_NORMAL, sequence: 1, @@ -4424,6 +4432,7 @@ mod tests { let mut store = VarStore::new(); let mut _scope = ScopeStore::default(); let k_user = make_test_continuation(); + let k_user_id = k_user.identity(); let expected_scope = k_user.segment_id(); let spawn_program = py.None().into_pyobject(py).unwrap().unbind().into_any(); let spawn_effect = Py::new( @@ -4493,12 +4502,15 @@ mod tests { | IRStreamStep::Yield(DoCtrl::Transfer { continuation, value, - }) - | IRStreamStep::Yield(DoCtrl::ResumeContinuation { + }) => { + assert_eq!(continuation.identity(), k_user_id); + assert!(matches!(value, Value::Task(_))); + } + IRStreamStep::Yield(DoCtrl::ResumeContinuation { continuation, value, }) => { - assert_eq!(continuation.derived_cont_id(), k_user_id); + assert_eq!(continuation.identity(), k_user_id); assert!(matches!(value, Value::Task(_))); } _ => panic!("expected IRStream Yield(Resume|Transfer|ResumeContinuation)"), @@ -4517,15 +4529,17 @@ mod tests { let mut _scope = ScopeStore::default(); let caller_k = make_test_continuation(); + let caller_k_id = caller_k.identity(); let spawned_k = make_test_continuation(); + let spawned_k_id = spawned_k.identity(); - let current_task = { + let (current_task, spawned_task_id) = { let mut guard = state.lock().expect("Scheduler lock poisoned"); let task_id = guard.alloc_task_id(); guard.tasks.insert( task_id, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_IDLE, @@ -4533,25 +4547,27 @@ mod tests { }, ); guard.current_task = Some(task_id); - task_id + let spawned_task_id = guard.alloc_task_id(); + (task_id, spawned_task_id) }; program.phase = SchedulerPhase::SpawnAwaitContinuation { - k_user: caller_k.clone(), + k_user: caller_k, store_mode: StoreMode::Shared, store_snapshot: None, priority: PRIORITY_NORMAL, spawn_site: None, + task_id: spawned_task_id, }; let step = IRStream::resume( &mut program, - Value::Continuation(spawned_k.clone()), + Value::Continuation(spawned_k), &mut store, &mut _scope, ); assert!( - step_targets_cont_id(&step, spawned_k.derived_cont_id()), + step_targets_fiber_id(&step, spawned_k_id), "higher-priority spawned task must run before caller, got {:?}", step ); @@ -4570,7 +4586,7 @@ mod tests { else { panic!("current task should remain pending after preemption"); }; - assert_eq!(cont.cont_id(), caller_k.derived_cont_id()); + assert_eq!(cont.identity(), caller_k_id); assert!( matches!(resume_outcome, Some(Ok(Value::Task(_)))), "caller must be parked with spawned task handle" @@ -4590,7 +4606,9 @@ mod tests { let mut _scope = ScopeStore::default(); let waiter_k = make_test_continuation(); + let waiter_k_id = waiter_k.identity(); let idle_k = make_test_continuation(); + let idle_k_id = idle_k.identity(); let (promise_id, waiter_task, idle_task) = { let mut guard = state.lock().expect("Scheduler lock poisoned"); @@ -4601,7 +4619,7 @@ mod tests { guard.tasks.insert( waiter_task, TaskState::Pending { - cont: waiter_k.clone(), + cont: OwnedControlContinuation::Started(waiter_k), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -4609,13 +4627,13 @@ mod tests { }, ); guard.current_task = Some(waiter_task); - guard.wait_on_any(&[Waitable::Promise(promise_id)], waiter_k.clone(), &store); + guard.wait_on_any(&[Waitable::Promise(promise_id)], waiter_k_id, None, &store); let idle_task = guard.alloc_task_id(); guard.tasks.insert( idle_task, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_IDLE, @@ -4641,12 +4659,12 @@ mod tests { &mut program, py, complete, - idle_k.clone(), + idle_k, &mut store, &mut _scope, ); assert!( - step_targets_cont_id(&step, waiter_k.derived_cont_id()), + step_targets_fiber_id(&step, waiter_k_id), "higher-priority waiter must run first after promise completion, got {:?}", step ); @@ -4661,7 +4679,7 @@ mod tests { else { panic!("idle caller should remain pending after promise completion"); }; - assert_eq!(cont.cont_id(), idle_k.derived_cont_id()); + assert_eq!(cont.identity(), idle_k_id); assert!( matches!(resume_outcome, Some(Ok(Value::Unit))), "idle caller should be parked with unit resume value" @@ -4677,23 +4695,23 @@ mod tests { fn test_driving_phase_contains_wait_owner_identity_only() { let waiting_task = TaskId::from_raw(7); let running_task = TaskId::from_raw(9); - let waiting_cont_id = ContId::from_raw(11); + let waiting_fiber_id = Some(FiberId::from_index(11)); let phase = SchedulerPhase::Driving { owner: WaitOwner::Task { task_id: waiting_task, - cont_id: waiting_cont_id, + fiber_id: waiting_fiber_id, }, running_task, }; match phase { SchedulerPhase::Driving { - owner: WaitOwner::Task { task_id, cont_id }, + owner: WaitOwner::Task { task_id, fiber_id }, running_task: active_task, } => { assert_eq!(task_id, waiting_task); - assert_eq!(cont_id, waiting_cont_id); + assert_eq!(fiber_id, waiting_fiber_id); assert_eq!(active_task, running_task); } other => panic!("expected Driving phase, got {:?}", other), @@ -4704,7 +4722,7 @@ mod tests { SchedulerPhase::Driving { owner: WaitOwner::Task { task_id: waiting_task, - cont_id: waiting_cont_id, + fiber_id: waiting_fiber_id, }, running_task, } @@ -4721,6 +4739,8 @@ mod tests { let mut _scope = ScopeStore::default(); let waiter_k = make_test_continuation(); + let waiter_k_id = waiter_k.identity(); + let waiter_k_fiber = waiter_k_id.unwrap(); let running_k = make_test_continuation(); let (waiting_task, running_task) = { @@ -4731,7 +4751,7 @@ mod tests { guard.tasks.insert( waiting_task, TaskState::Pending { - cont: waiter_k.clone(), + cont: OwnedControlContinuation::Started(waiter_k), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -4741,7 +4761,7 @@ mod tests { guard.tasks.insert( running_task, TaskState::Pending { - cont: running_k, + cont: OwnedControlContinuation::Started(running_k), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_IDLE, @@ -4751,9 +4771,9 @@ mod tests { guard.current_task = Some(waiting_task); guard - .suspend_task_for_wait(waiting_task, waiter_k.clone()) + .suspend_task_for_wait(waiting_task, make_test_continuation_with_fiber(waiter_k_fiber)) .expect("waiting task should suspend"); - guard.wait_on_all(&[Waitable::Task(running_task)], waiter_k.clone(), &store); + guard.wait_on_all(&[Waitable::Task(running_task)], waiter_k_id, None, &store); guard.current_task = Some(running_task); (waiting_task, running_task) @@ -4762,37 +4782,31 @@ mod tests { program.phase = SchedulerPhase::Driving { owner: WaitOwner::Task { task_id: waiting_task, - cont_id: waiter_k.derived_cont_id(), + fiber_id: waiter_k_id, }, running_task, }; let step = IRStream::resume(&mut program, Value::Int(55), &mut store, &mut _scope); - match step { - IRStreamStep::Yield(DoCtrl::Resume { - continuation, - value, - }) - | IRStreamStep::Yield(DoCtrl::Transfer { - continuation, - value, - }) - | IRStreamStep::Yield(DoCtrl::ResumeContinuation { - continuation, - value, - }) => { - assert_eq!(continuation.derived_cont_id(), waiter_k.derived_cont_id()); - match value { - Value::List(values) => { - assert_eq!(values.len(), 1); - assert_eq!(values[0].as_int(), Some(55)); - } - other => panic!( - "waiter must resume with gathered list via ready queue, got {other:?}" - ), - } + let (step_identity, step_value) = match &step { + IRStreamStep::Yield(DoCtrl::Resume { continuation, value }) + | IRStreamStep::Yield(DoCtrl::Transfer { continuation, value }) => { + (continuation.identity(), value) + } + IRStreamStep::Yield(DoCtrl::ResumeContinuation { continuation, value }) => { + (continuation.identity(), value) } other => panic!("expected waiter continuation dequeue step, got {:?}", other), + }; + assert_eq!(step_identity, waiter_k_id); + match step_value { + Value::List(values) => { + assert_eq!(values.len(), 1); + assert_eq!(values[0].as_int(), Some(55)); + } + other => panic!( + "waiter must resume with gathered list via ready queue, got {other:?}" + ), } assert!(matches!(program.phase, SchedulerPhase::Idle)); @@ -4821,7 +4835,9 @@ mod tests { let mut _scope = ScopeStore::default(); let normal_k = make_test_continuation(); + let normal_k_id = normal_k.identity(); let idle_driver_k = make_test_continuation(); + let idle_driver_k_id = idle_driver_k.identity(); let (promise_id, normal_task, idle_task) = { let mut guard = state.lock().expect("Scheduler lock poisoned"); @@ -4832,7 +4848,7 @@ mod tests { guard.tasks.insert( normal_task, TaskState::Pending { - cont: normal_k.clone(), + cont: OwnedControlContinuation::Started(normal_k), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -4840,13 +4856,13 @@ mod tests { }, ); guard.current_task = Some(normal_task); - guard.wait_on_any(&[Waitable::Promise(promise_id)], normal_k.clone(), &store); + guard.wait_on_any(&[Waitable::Promise(promise_id)], normal_k_id, None, &store); let idle_task = guard.alloc_task_id(); guard.tasks.insert( idle_task, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_IDLE, @@ -4871,24 +4887,24 @@ mod tests { &mut program, py, complete, - idle_driver_k.clone(), + idle_driver_k, &mut store, &mut _scope, ); assert!( - step_targets_cont_id(&first_step, normal_k.derived_cont_id()), + step_targets_fiber_id(&first_step, normal_k_id), "NORMAL waiter should preempt IDLE driver, got {:?}", first_step ); assert!(matches!( &program.phase, SchedulerPhase::PreemptiveTransfer { k_user } - if k_user.derived_cont_id() == idle_driver_k.derived_cont_id() + if k_user.identity() == idle_driver_k_id )); let second_step = IRStream::resume(&mut program, Value::Unit, &mut store, &mut _scope); assert!( - step_targets_cont_id(&second_step, idle_driver_k.derived_cont_id()), + step_targets_fiber_id(&second_step, idle_driver_k_id), "IDLE driver should resume after NORMAL task hands control back, got {:?}", second_step ); @@ -4916,6 +4932,8 @@ mod tests { let mut _scope = ScopeStore::default(); let waiter_k = make_test_continuation(); + let waiter_k_id = waiter_k.identity(); + let waiter_k_fiber = waiter_k_id.unwrap(); let running_k = make_test_continuation(); let resolver_k = make_test_continuation(); @@ -4926,7 +4944,7 @@ mod tests { guard.tasks.insert( waiting_task, TaskState::Pending { - cont: waiter_k.clone(), + cont: OwnedControlContinuation::Started(waiter_k), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -4938,7 +4956,7 @@ mod tests { guard.tasks.insert( running_task, TaskState::Pending { - cont: running_k, + cont: OwnedControlContinuation::Started(running_k), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_IDLE, @@ -4951,9 +4969,9 @@ mod tests { guard.current_task = Some(waiting_task); guard - .suspend_task_for_wait(waiting_task, waiter_k.clone()) + .suspend_task_for_wait(waiting_task, make_test_continuation_with_fiber(waiter_k_fiber)) .expect("waiting task should suspend"); - guard.wait_on_any(&[Waitable::Promise(promise_id)], waiter_k.clone(), &store); + guard.wait_on_any(&[Waitable::Promise(promise_id)], waiter_k_id, None, &store); guard.current_task = None; (promise_id, waiting_task, running_task) @@ -4962,7 +4980,7 @@ mod tests { driving_program.phase = SchedulerPhase::Driving { owner: WaitOwner::Task { task_id: waiting_task, - cont_id: waiter_k.derived_cont_id(), + fiber_id: waiter_k_id, }, running_task, }; @@ -4980,7 +4998,7 @@ mod tests { &mut resolver_program, py, complete, - resolver_k.clone(), + resolver_k, &mut store, &mut _scope, ); @@ -5006,13 +5024,13 @@ mod tests { IRStream::resume(&mut driving_program, Value::Unit, &mut store, &mut _scope); let mut waiter_activation_count = 0; - if step_targets_cont_id(&resolver_step, waiter_k.derived_cont_id()) { + if step_targets_fiber_id(&resolver_step, waiter_k_id) { waiter_activation_count += 1; } - if step_targets_cont_id(&dequeue_step, waiter_k.derived_cont_id()) { + if step_targets_fiber_id(&dequeue_step, waiter_k_id) { waiter_activation_count += 1; } - if step_targets_cont_id(&stale_step, waiter_k.derived_cont_id()) { + if step_targets_fiber_id(&stale_step, waiter_k_id) { waiter_activation_count += 1; } @@ -5021,11 +5039,11 @@ mod tests { "waiter continuation must be activated exactly once across scheduler instances" ); assert!( - step_targets_cont_id(&dequeue_step, waiter_k.derived_cont_id()), + step_targets_fiber_id(&dequeue_step, waiter_k_id), "single activation must happen via ready-queue dequeue" ); assert!( - !step_targets_cont_id(&stale_step, waiter_k.derived_cont_id()), + !step_targets_fiber_id(&stale_step, waiter_k_id), "stale Driving instance must not reactivate waiter continuation" ); }); @@ -5040,8 +5058,11 @@ mod tests { let mut _scope = ScopeStore::default(); let idle_k = make_test_continuation(); + let idle_k_id = idle_k.identity(); let normal_k = make_test_continuation(); + let normal_k_id = normal_k.identity(); let high_k = make_test_continuation(); + let high_k_id = high_k.identity(); let (wake_normal_promise, wake_high_promise, high_task, normal_task) = { let mut guard = state.lock().expect("Scheduler lock poisoned"); @@ -5059,7 +5080,7 @@ mod tests { guard.tasks.insert( normal_task, TaskState::Pending { - cont: normal_k.clone(), + cont: OwnedControlContinuation::Started(normal_k), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -5069,7 +5090,8 @@ mod tests { guard.current_task = Some(normal_task); guard.wait_on_any( &[Waitable::Promise(wake_normal_promise)], - normal_k.clone(), + normal_k_id, + None, &store, ); @@ -5077,7 +5099,7 @@ mod tests { guard.tasks.insert( high_task, TaskState::Pending { - cont: high_k.clone(), + cont: OwnedControlContinuation::Started(high_k), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_HIGH, @@ -5087,7 +5109,8 @@ mod tests { guard.current_task = Some(high_task); guard.wait_on_any( &[Waitable::Promise(wake_high_promise)], - high_k.clone(), + high_k_id, + None, &store, ); @@ -5095,7 +5118,7 @@ mod tests { guard.tasks.insert( idle_task, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_IDLE, @@ -5124,19 +5147,19 @@ mod tests { &mut program, py, wake_normal, - idle_k.clone(), + idle_k, &mut store, &mut _scope, ); assert!( - step_targets_cont_id(&first_step, normal_k.derived_cont_id()), + step_targets_fiber_id(&first_step, normal_k_id), "first promise completion should preempt IDLE and run NORMAL, got {:?}", first_step ); assert!(matches!( &program.phase, SchedulerPhase::PreemptiveTransfer { k_user, .. } - if k_user.derived_cont_id() == idle_k.derived_cont_id() + if k_user.identity() == idle_k_id )); let wake_high = make_complete_promise_effect( @@ -5147,23 +5170,24 @@ mod tests { .expect("None must convert") .unbind(), ); + let normal_k_for_start = make_test_continuation_with_fiber(normal_k_id.unwrap()); let second_step = IRStreamProgram::start( &mut program, py, wake_high, - normal_k.clone(), + normal_k_for_start, &mut store, &mut _scope, ); assert!( - step_targets_cont_id(&second_step, normal_k.derived_cont_id()), + step_targets_fiber_id(&second_step, normal_k_id), "nested preemption must be blocked while transfer is active, got {:?}", second_step ); assert!(matches!( &program.phase, SchedulerPhase::PreemptiveTransfer { k_user, .. } - if k_user.derived_cont_id() == idle_k.derived_cont_id() + if k_user.identity() == idle_k_id )); let guard = state.lock().expect("Scheduler lock poisoned"); @@ -5188,7 +5212,9 @@ mod tests { let mut _scope = ScopeStore::default(); let waiter_k = make_test_continuation(); + let waiter_k_id = waiter_k.identity(); let idle_k = make_test_continuation(); + let idle_k_id = idle_k.identity(); let (promise_id, waiter_task, idle_task) = { let mut guard = state.lock().expect("Scheduler lock poisoned"); @@ -5199,7 +5225,7 @@ mod tests { guard.tasks.insert( waiter_task, TaskState::Pending { - cont: waiter_k.clone(), + cont: OwnedControlContinuation::Started(waiter_k), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -5207,13 +5233,13 @@ mod tests { }, ); guard.current_task = Some(waiter_task); - guard.wait_on_any(&[Waitable::Promise(promise_id)], waiter_k.clone(), &store); + guard.wait_on_any(&[Waitable::Promise(promise_id)], waiter_k_id, None, &store); let idle_task = guard.alloc_task_id(); guard.tasks.insert( idle_task, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_IDLE, @@ -5241,7 +5267,7 @@ mod tests { &mut program, py, fail, - idle_k.clone(), + idle_k, &mut store, &mut _scope, ); @@ -5249,7 +5275,7 @@ mod tests { matches!( &step, IRStreamStep::Yield(DoCtrl::ResumeThrow { continuation, .. }) - if continuation.derived_cont_id() == waiter_k.derived_cont_id() + if continuation.identity() == waiter_k_id ), "higher-priority waiter should receive non-terminal throw transfer, got {:?}", step @@ -5265,7 +5291,7 @@ mod tests { else { panic!("idle caller should remain pending after promise failure"); }; - assert_eq!(cont.cont_id(), idle_k.derived_cont_id()); + assert_eq!(cont.identity(), idle_k_id); assert!( matches!(resume_outcome, Some(Ok(Value::Unit))), "idle caller should be parked with unit resume value" @@ -5285,15 +5311,16 @@ mod tests { let mut _scope = ScopeStore::default(); let caller_k = make_test_continuation(); + let caller_k_id = caller_k.identity(); let spawned_k = make_test_continuation(); - let current_task = { + let (current_task, spawned_task_id) = { let mut guard = state.lock().expect("Scheduler lock poisoned"); let task_id = guard.alloc_task_id(); guard.tasks.insert( task_id, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -5301,15 +5328,17 @@ mod tests { }, ); guard.current_task = Some(task_id); - task_id + let spawned_task_id = guard.alloc_task_id(); + (task_id, spawned_task_id) }; program.phase = SchedulerPhase::SpawnAwaitContinuation { - k_user: caller_k.clone(), + k_user: caller_k, store_mode: StoreMode::Shared, store_snapshot: None, priority: PRIORITY_NORMAL, spawn_site: None, + task_id: spawned_task_id, }; let step = IRStream::resume( @@ -5319,7 +5348,7 @@ mod tests { &mut _scope, ); assert!( - step_targets_cont_id(&step, caller_k.derived_cont_id()), + step_targets_fiber_id(&step, caller_k_id), "same-priority spawn should resume caller immediately, got {:?}", step ); @@ -5340,15 +5369,16 @@ mod tests { let mut _scope = ScopeStore::default(); let caller_k = make_test_continuation(); + let caller_k_id = caller_k.identity(); let spawned_k = make_test_continuation(); - let current_task = { + let (current_task, spawned_task_id) = { let mut guard = state.lock().expect("Scheduler lock poisoned"); let task_id = guard.alloc_task_id(); guard.tasks.insert( task_id, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -5356,15 +5386,17 @@ mod tests { }, ); guard.current_task = Some(task_id); - task_id + let spawned_task_id = guard.alloc_task_id(); + (task_id, spawned_task_id) }; program.phase = SchedulerPhase::SpawnAwaitContinuation { - k_user: caller_k.clone(), + k_user: caller_k, store_mode: StoreMode::Shared, store_snapshot: None, priority: PRIORITY_IDLE, spawn_site: None, + task_id: spawned_task_id, }; let step = IRStream::resume( @@ -5374,7 +5406,7 @@ mod tests { &mut _scope, ); assert!( - step_targets_cont_id(&step, caller_k.derived_cont_id()), + step_targets_fiber_id(&step, caller_k_id), "lower-priority spawn should resume caller immediately, got {:?}", step ); @@ -5432,17 +5464,17 @@ mod tests { let mut state = SchedulerState::new(); let mut store = VarStore::new(); let mut _scope = ScopeStore::default(); - let scheduler_k = make_test_continuation(); - let task0 = state.alloc_task_id(); let task1 = state.alloc_task_id(); let cont0 = make_test_continuation(); + let cont0_id = cont0.identity(); let cont1 = make_test_continuation(); + let cont1_id = cont1.identity(); state.tasks.insert( task0, TaskState::Pending { - cont: cont0.clone(), + cont: OwnedControlContinuation::Started(cont0), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -5452,7 +5484,7 @@ mod tests { state.tasks.insert( task1, TaskState::Pending { - cont: cont1.clone(), + cont: OwnedControlContinuation::Started(cont1), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -5462,21 +5494,29 @@ mod tests { for i in 0..128 { let (task, expected_cont) = if i % 2 == 0 { - (task0, cont0.derived_cont_id()) + (task0, cont0_id) } else { - (task1, cont1.derived_cont_id()) + (task1, cont1_id) }; + // Re-park continuation before enqueuing (simulate task yielding back) + if let Some(TaskState::Pending { cont, .. }) = state.tasks.get_mut(&task) { + if cont.is_placeholder() { + *cont = OwnedControlContinuation::Started( + make_test_continuation_with_fiber(expected_cont.unwrap()), + ); + } + } state.enqueue_ready_task(task, PRIORITY_NORMAL); - let step = state.transfer_next_or(scheduler_k.clone(), &mut store); + let step = state.transfer_next_or(make_test_continuation(), &mut store); match step { IRStreamStep::Yield(DoCtrl::Transfer { continuation, .. }) => { - assert_eq!(continuation.derived_cont_id(), expected_cont); + assert_eq!(continuation.identity(), expected_cont); } IRStreamStep::Yield(DoCtrl::Resume { .. }) => { panic!("task switches must not emit DoCtrl::Resume") } - _ => panic!("task switches must emit DoCtrl::Transfer"), + _ => panic!("task switches must emit DoCtrl::Transfer, got {:?}", step), } // Simulate that the resumed task yielded back to scheduler. @@ -5494,7 +5534,7 @@ mod tests { state.tasks.insert( task_id, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -5503,20 +5543,23 @@ mod tests { ); let waiter = make_test_continuation(); - state.wait_on_all(&[Waitable::Task(task_id)], waiter.clone(), &store); + let waiter_id = waiter.identity(); + let waiter_fiber = waiter_id.unwrap(); + let waiter_for_wait = make_test_continuation_with_fiber(waiter_fiber); + state.wait_on_all(&[Waitable::Task(task_id)], waiter_id, Some(waiter_for_wait), &store); state .mark_task_done(task_id, Ok(Value::Int(7))) .expect("task should exist when marking done"); state.wake_waiters(Waitable::Task(task_id)); // transfer_next_or should resume whichever waiter is globally ready. - let step = state.transfer_next_or(waiter.clone(), &mut store); + let step = state.transfer_next_or(waiter, &mut store); match step { IRStreamStep::Yield(DoCtrl::Transfer { continuation, value, }) => { - assert_eq!(continuation.derived_cont_id(), waiter.derived_cont_id()); + assert_eq!(continuation.identity(), waiter_id); match value { Value::List(values) => { assert_eq!(values.len(), 1); @@ -5612,7 +5655,7 @@ mod tests { #[test] fn test_scheduler_state_ready_defaults_to_priority_set() { let state = SchedulerState::new(); - assert!(matches!(state.ready, ReadySet::Priority(_))); + assert!(matches!(state.ready, ReadySet(_))); } #[test] @@ -5751,8 +5794,9 @@ mod tests { let mut _scope = ScopeStore::default(); let gather_owner_k = make_test_continuation(); - let waiter_k = make_test_continuation(); + let gather_owner_k_id = gather_owner_k.identity(); let runnable_k = make_test_continuation(); + let runnable_k_id = runnable_k.identity(); let driver_k = make_test_continuation(); let (semaphore_id, state_id, waiting_task, runnable_task) = { @@ -5764,7 +5808,7 @@ mod tests { guard.tasks.insert( holder_task, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -5781,7 +5825,7 @@ mod tests { guard.tasks.insert( waiting_task, TaskState::Pending { - cont: waiter_k.clone(), + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -5793,7 +5837,7 @@ mod tests { guard.tasks.insert( runnable_task, TaskState::Pending { - cont: runnable_k.clone(), + cont: OwnedControlContinuation::Started(runnable_k), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -5808,7 +5852,7 @@ mod tests { program.phase = SchedulerPhase::Driving { owner: WaitOwner::Root { - cont_id: gather_owner_k.derived_cont_id(), + fiber_id: gather_owner_k_id, }, running_task: waiting_task, }; @@ -5826,16 +5870,16 @@ mod tests { ); assert!( - step_targets_cont_id(&step, runnable_k.derived_cont_id()), + step_targets_fiber_id(&step, runnable_k_id), "blocked acquire should transfer into another runnable task, got {:?}", step ); assert!(matches!( &program.phase, SchedulerPhase::Driving { - owner: WaitOwner::Root { cont_id }, + owner: WaitOwner::Root { fiber_id }, running_task, - } if *cont_id == gather_owner_k.derived_cont_id() + } if *fiber_id == gather_owner_k_id && *running_task == runnable_task )); }); @@ -5910,11 +5954,10 @@ mod tests { }, ); // t1 is Pending — gather should return None - let cont = make_test_continuation(); state.tasks.insert( t1, TaskState::Pending { - cont, + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -5971,11 +6014,10 @@ mod tests { let t2 = state.alloc_task_id(); // t0 still pending - let cont = make_test_continuation(); state.tasks.insert( t0, TaskState::Pending { - cont, + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6015,12 +6057,10 @@ mod tests { let t0 = state.alloc_task_id(); let t1 = state.alloc_task_id(); - let cont0 = make_test_continuation(); - let cont1 = make_test_continuation(); state.tasks.insert( t0, TaskState::Pending { - cont: cont0, + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6030,7 +6070,7 @@ mod tests { state.tasks.insert( t1, TaskState::Pending { - cont: cont1, + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6101,11 +6141,10 @@ mod tests { let mut state = sched_state.lock().unwrap(); let t0 = state.alloc_task_id(); let t1 = state.alloc_task_id(); - let cont = make_test_continuation(); state.tasks.insert( t0, TaskState::Pending { - cont, + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6147,11 +6186,10 @@ mod tests { }, ); // t1 still pending - let cont = make_test_continuation(); state.tasks.insert( t1, TaskState::Pending { - cont, + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6162,10 +6200,11 @@ mod tests { // Register a waiter on t1 (simulating what wait_on_all does // for the one remaining pending item) let waiter = make_test_continuation(); - let waiter_id = waiter.derived_cont_id(); + let waiter_id = waiter.identity(); state.wait_on_all( &[Waitable::Task(t0), Waitable::Task(t1)], - waiter, + waiter_id, + Some(waiter), &VarStore::new(), ); @@ -6207,12 +6246,10 @@ mod tests { let t0 = state.alloc_task_id(); let t1 = state.alloc_task_id(); - let cont0 = make_test_continuation(); - let cont1 = make_test_continuation(); state.tasks.insert( t0, TaskState::Pending { - cont: cont0, + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6222,7 +6259,7 @@ mod tests { state.tasks.insert( t1, TaskState::Pending { - cont: cont1, + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6232,10 +6269,11 @@ mod tests { // Register race waiter on both let waiter = make_test_continuation(); - let waiter_id = waiter.derived_cont_id(); + let waiter_id = waiter.identity(); state.wait_on_any( &[Waitable::Task(t0), Waitable::Task(t1)], - waiter, + waiter_id, + Some(waiter), &VarStore::new(), ); @@ -6264,17 +6302,12 @@ mod tests { store.put("key".to_string(), Value::Int(1)); // Create a task with isolated store - use crate::ids::Marker; - use crate::segment::Segment; - let marker = Marker::fresh(); - let seg = Segment::new(marker, None); - let seg_id = crate::ids::SegmentId::from_index(0); - let cont = Continuation::capture(&seg, seg_id); + let cont = make_test_continuation(); state.tasks.insert( tid, TaskState::Pending { - cont, + cont: OwnedControlContinuation::Started(cont), store: TaskStore::Isolated { store: store.clone(), merge: StoreMergePolicy::LogsOnly, @@ -6323,7 +6356,7 @@ mod tests { let k_user = make_test_continuation(); s.ready_root_resumes.insert( - k_user.derived_cont_id(), + k_user.identity(), ReadyRootResume { fiber_ids: k_user.fibers().to_vec(), outcome: Ok(Value::List(vec![Value::Int(1), Value::Int(2)])), @@ -6339,18 +6372,18 @@ mod tests { let mut program = SchedulerProgram::new(state.clone()); let mut store = VarStore::new(); let mut _scope = ScopeStore::default(); - let k_cont_id = k_user.derived_cont_id(); + let k_fiber_id = k_user.identity(); let step = program.handle_gather( k_user, vec![Waitable::Task(t0), Waitable::Task(t1)], &mut store, ); - assert!(step_targets_cont_id(&step, k_cont_id)); + assert!(step_targets_fiber_id(&step, k_fiber_id)); let s = state.lock().unwrap(); assert!( - !s.ready_root_resumes.contains_key(&k_cont_id), + !s.ready_root_resumes.contains_key(&k_fiber_id), "stale ready_root_resume for already-resumed continuation must be removed" ); } @@ -6361,11 +6394,15 @@ mod tests { let mut store = VarStore::new(); let mut _scope = ScopeStore::default(); + let waiter_cont = make_test_continuation(); + let waiter_cont_id = waiter_cont.identity(); + let waiter_cont_fiber = waiter_cont_id.unwrap(); + let waiting_task = state.alloc_task_id(); state.tasks.insert( waiting_task, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(waiter_cont), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6373,17 +6410,17 @@ mod tests { }, ); - let waiter_cont = make_test_continuation(); let promise_id = state.alloc_promise_id(); state.promises.insert(promise_id, PromiseState::Pending); state.current_task = Some(waiting_task); state - .suspend_task_for_wait(waiting_task, waiter_cont.clone()) + .suspend_task_for_wait(waiting_task, make_test_continuation_with_fiber(waiter_cont_fiber)) .expect("waiting task should be suspendable"); state.wait_on_any( &[Waitable::Promise(promise_id)], - waiter_cont.clone(), + waiter_cont_id, + None, &store, ); state.current_task = None; @@ -6400,12 +6437,15 @@ mod tests { | IRStreamStep::Yield(DoCtrl::Resume { continuation, value, - }) - | IRStreamStep::Yield(DoCtrl::ResumeContinuation { + }) => { + assert_eq!(continuation.identity(), waiter_cont_id); + assert_eq!(value.as_int(), Some(99)); + } + IRStreamStep::Yield(DoCtrl::ResumeContinuation { continuation, value, }) => { - assert_eq!(continuation.derived_cont_id(), waiter_cont.derived_cont_id()); + assert_eq!(continuation.identity(), waiter_cont_id); assert_eq!(value.as_int(), Some(99)); } other => panic!("expected waiter continuation to run, got {:?}", other), @@ -6420,6 +6460,7 @@ mod tests { store.put("ephemeral".to_string(), Value::Int(1)); let waiter_cont = make_test_continuation(); + let waiter_cont_id = waiter_cont.identity(); let promise_id = state.alloc_promise_id(); state.promises.insert(promise_id, PromiseState::Pending); @@ -6430,7 +6471,8 @@ mod tests { // from the staged waiter payload when resumed. state.wait_on_any( &[Waitable::Promise(promise_id)], - waiter_cont.clone(), + waiter_cont_id, + Some(waiter_cont), &waiting_store, ); state.mark_promise_done(promise_id, Ok(Value::Int(7))); @@ -6445,12 +6487,15 @@ mod tests { | IRStreamStep::Yield(DoCtrl::Transfer { continuation, value, - }) - | IRStreamStep::Yield(DoCtrl::ResumeContinuation { + }) => { + assert_eq!(continuation.identity(), waiter_cont_id); + assert_eq!(value.as_int(), Some(7)); + } + IRStreamStep::Yield(DoCtrl::ResumeContinuation { continuation, value, }) => { - assert_eq!(continuation.derived_cont_id(), waiter_cont.derived_cont_id()); + assert_eq!(continuation.identity(), waiter_cont_id); assert_eq!(value.as_int(), Some(7)); } other => panic!( @@ -6470,11 +6515,12 @@ mod tests { store.put("counter".to_string(), Value::Int(0)); let waiter_cont = make_test_continuation(); + let waiter_cont_id = waiter_cont.identity(); let task_id = state.alloc_task_id(); state.tasks.insert( task_id, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(make_test_continuation()), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6484,8 +6530,8 @@ mod tests { state.wait_on_all( &[Waitable::Task(task_id)], - waiter_cont.derived_cont_id(), - Some(waiter_cont.clone()), + waiter_cont_id, + Some(waiter_cont), &store, ); @@ -6498,26 +6544,26 @@ mod tests { let foreign_owner = make_test_continuation(); let step = state.transfer_next_or(foreign_owner, &mut store); - match step { - IRStreamStep::Yield(DoCtrl::Resume { - continuation, - value, - }) - | IRStreamStep::Yield(DoCtrl::Transfer { - continuation, - value, - }) - | IRStreamStep::Yield(DoCtrl::ResumeContinuation { - continuation, - value, - }) => { - assert_eq!(continuation.derived_cont_id(), waiter_cont.derived_cont_id()); - assert_eq!(value.as_int(), Some(7)); + let (step_identity, step_value) = match &step { + IRStreamStep::Yield(DoCtrl::Resume { continuation, value }) + | IRStreamStep::Yield(DoCtrl::Transfer { continuation, value }) => { + (continuation.identity(), value) + } + IRStreamStep::Yield(DoCtrl::ResumeContinuation { continuation, value }) => { + (continuation.identity(), value) } other => panic!( "expected shared-task waiter continuation to run, got {:?}", other ), + }; + assert_eq!(step_identity, waiter_cont_id); + match step_value { + Value::List(values) => { + assert_eq!(values.len(), 1); + assert_eq!(values[0].as_int(), Some(7)); + } + other => panic!("expected gathered list result, got {:?}", other), } assert_eq!(store.get("counter").and_then(Value::as_int), Some(3)); @@ -6531,10 +6577,12 @@ mod tests { let owner_a_task = state.alloc_task_id(); let owner_a_cont = make_test_continuation(); + let owner_a_cont_id = owner_a_cont.identity(); + let owner_a_fiber = owner_a_cont_id.unwrap(); state.tasks.insert( owner_a_task, TaskState::Pending { - cont: owner_a_cont.clone(), + cont: OwnedControlContinuation::Started(owner_a_cont), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6545,21 +6593,24 @@ mod tests { state.promises.insert(promise_a, PromiseState::Pending); state.current_task = Some(owner_a_task); state - .suspend_task_for_wait(owner_a_task, owner_a_cont.clone()) + .suspend_task_for_wait(owner_a_task, make_test_continuation_with_fiber(owner_a_fiber)) .expect("owner A task should be suspendable"); state.wait_on_any( &[Waitable::Promise(promise_a)], - owner_a_cont.clone(), + owner_a_cont_id, + None, &store, ); state.current_task = None; let owner_b_task = state.alloc_task_id(); let owner_b_cont = make_test_continuation(); + let owner_b_cont_id = owner_b_cont.identity(); + let owner_b_fiber = owner_b_cont_id.unwrap(); state.tasks.insert( owner_b_task, TaskState::Pending { - cont: owner_b_cont.clone(), + cont: OwnedControlContinuation::Started(owner_b_cont), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6570,18 +6621,20 @@ mod tests { state.promises.insert(promise_b, PromiseState::Pending); state.current_task = Some(owner_b_task); state - .suspend_task_for_wait(owner_b_task, owner_b_cont.clone()) + .suspend_task_for_wait(owner_b_task, make_test_continuation_with_fiber(owner_b_fiber)) .expect("owner B task should be suspendable"); state.wait_on_any( &[Waitable::Promise(promise_b)], - owner_b_cont.clone(), + owner_b_cont_id, + None, &store, ); state.current_task = None; state.mark_promise_done(promise_a, Ok(Value::Int(11))); - let step = state.transfer_next_or(owner_b_cont, &mut store); + let owner_b_cont_for_transfer = make_test_continuation(); + let step = state.transfer_next_or(owner_b_cont_for_transfer, &mut store); match step { IRStreamStep::Yield(DoCtrl::Transfer { continuation, @@ -6590,12 +6643,15 @@ mod tests { | IRStreamStep::Yield(DoCtrl::Resume { continuation, value, - }) - | IRStreamStep::Yield(DoCtrl::ResumeContinuation { + }) => { + assert_eq!(continuation.identity(), owner_a_cont_id); + assert_eq!(value.as_int(), Some(11)); + } + IRStreamStep::Yield(DoCtrl::ResumeContinuation { continuation, value, }) => { - assert_eq!(continuation.derived_cont_id(), owner_a_cont.derived_cont_id()); + assert_eq!(continuation.identity(), owner_a_cont_id); assert_eq!(value.as_int(), Some(11)); } other => panic!( @@ -6611,19 +6667,21 @@ mod tests { let mut store = VarStore::new(); let mut _scope = ScopeStore::default(); + let waiter_cont = make_test_continuation(); + let waiter_cont_id = waiter_cont.identity(); + let waiter_cont_fiber = waiter_cont_id.unwrap(); + let waiting_task = state.alloc_task_id(); state.tasks.insert( waiting_task, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(waiter_cont), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, pending_log_merge_items: None, }, ); - - let waiter_cont = make_test_continuation(); let p1 = state.alloc_promise_id(); let p2 = state.alloc_promise_id(); state.promises.insert(p1, PromiseState::Pending); @@ -6631,50 +6689,46 @@ mod tests { state.current_task = Some(waiting_task); state - .suspend_task_for_wait(waiting_task, waiter_cont.clone()) + .suspend_task_for_wait(waiting_task, make_test_continuation_with_fiber(waiter_cont_fiber)) .expect("waiting task should be suspendable"); state.wait_on_all( &[Waitable::Promise(p1), Waitable::Promise(p2)], - waiter_cont.clone(), + waiter_cont_id, + None, &store, ); state.current_task = None; state.mark_promise_done(p1, Ok(Value::Int(1))); let owner_after_first = make_test_continuation(); - let first_step = state.transfer_next_or(owner_after_first.clone(), &mut store); + let owner_after_first_id = owner_after_first.identity(); + let first_step = state.transfer_next_or(owner_after_first, &mut store); assert!( - step_targets_cont_id(&first_step, owner_after_first.derived_cont_id()), + step_targets_fiber_id(&first_step, owner_after_first_id), "waiter must not be ready after only first dependency completes" ); state.mark_promise_done(p2, Ok(Value::Int(2))); let owner_after_second = make_test_continuation(); let second_step = state.transfer_next_or(owner_after_second, &mut store); - match second_step { - IRStreamStep::Yield(DoCtrl::Transfer { - continuation, - value, - }) - | IRStreamStep::Yield(DoCtrl::Resume { - continuation, - value, - }) - | IRStreamStep::Yield(DoCtrl::ResumeContinuation { - continuation, - value, - }) => { - assert_eq!(continuation.derived_cont_id(), waiter_cont.derived_cont_id()); - match value { - Value::List(values) => { - assert_eq!(values.len(), 2); - assert_eq!(values[0].as_int(), Some(1)); - assert_eq!(values[1].as_int(), Some(2)); - } - other => panic!("gather should resume with list result, got {:?}", other), - } + let (step_identity, step_value) = match &second_step { + IRStreamStep::Yield(DoCtrl::Transfer { continuation, value }) + | IRStreamStep::Yield(DoCtrl::Resume { continuation, value }) => { + (continuation.identity(), value) + } + IRStreamStep::Yield(DoCtrl::ResumeContinuation { continuation, value }) => { + (continuation.identity(), value) } other => panic!("expected waiter continuation to run, got {:?}", other), + }; + assert_eq!(step_identity, waiter_cont_id); + match step_value { + Value::List(values) => { + assert_eq!(values.len(), 2); + assert_eq!(values[0].as_int(), Some(1)); + assert_eq!(values[1].as_int(), Some(2)); + } + other => panic!("gather should resume with list result, got {:?}", other), } } @@ -6684,11 +6738,15 @@ mod tests { let mut store = VarStore::new(); let mut _scope = ScopeStore::default(); + let waiter_cont = make_test_continuation(); + let waiter_cont_id = waiter_cont.identity(); + let waiter_cont_fiber = waiter_cont_id.unwrap(); + let waiting_task = state.alloc_task_id(); state.tasks.insert( waiting_task, TaskState::Pending { - cont: make_test_continuation(), + cont: OwnedControlContinuation::Started(waiter_cont), store: TaskStore::Shared, resume_outcome: None, priority: PRIORITY_NORMAL, @@ -6696,7 +6754,6 @@ mod tests { }, ); - let waiter_cont = make_test_continuation(); let p1 = state.alloc_promise_id(); let p2 = state.alloc_promise_id(); state.promises.insert(p1, PromiseState::Pending); @@ -6704,11 +6761,12 @@ mod tests { state.current_task = Some(waiting_task); state - .suspend_task_for_wait(waiting_task, waiter_cont.clone()) + .suspend_task_for_wait(waiting_task, make_test_continuation_with_fiber(waiter_cont_fiber)) .expect("waiting task should be suspendable"); state.wait_on_any( &[Waitable::Promise(p1), Waitable::Promise(p2)], - waiter_cont.clone(), + waiter_cont_id, + None, &store, ); state.current_task = None; @@ -6725,12 +6783,15 @@ mod tests { | IRStreamStep::Yield(DoCtrl::Resume { continuation, value, - }) - | IRStreamStep::Yield(DoCtrl::ResumeContinuation { + }) => { + assert_eq!(continuation.identity(), waiter_cont_id); + assert_eq!(value.as_int(), Some(7)); + } + IRStreamStep::Yield(DoCtrl::ResumeContinuation { continuation, value, }) => { - assert_eq!(continuation.derived_cont_id(), waiter_cont.derived_cont_id()); + assert_eq!(continuation.identity(), waiter_cont_id); assert_eq!(value.as_int(), Some(7)); } other => panic!("expected waiter continuation to run, got {:?}", other), diff --git a/packages/doeff-vm-core/src/continuation.rs b/packages/doeff-vm-core/src/continuation.rs index 1548edab..b84db3bd 100644 --- a/packages/doeff-vm-core/src/continuation.rs +++ b/packages/doeff-vm-core/src/continuation.rs @@ -4,7 +4,7 @@ use pyo3::prelude::*; use pyo3::types::{PyDict, PyList}; use crate::frame::CallMetadata; -use crate::ids::{ContId, FiberId, SegmentId}; +use crate::ids::{fresh_pending_cont_id, FiberId, SegmentId}; use crate::kleisli::KleisliRef; use crate::memory_stats; use crate::py_shared::PyShared; @@ -27,7 +27,7 @@ pub enum OwnedControlContinuation { impl OwnedControlContinuation { /// Returns the identity of this control continuation. /// For started continuations, this is fibers[0]. - /// For pending continuations, this is the ContId (no fibers yet). + /// For pending continuations, this is None (no fibers yet). pub fn identity(&self) -> Option { match self { Self::Started(continuation) => continuation.identity(), @@ -35,18 +35,6 @@ impl OwnedControlContinuation { } } - /// Returns a ContId for scheduler/tracking use. - /// For started continuations: derived from fibers[0]. - /// For pending continuations: the stored ContId. - pub fn cont_id(&self) -> ContId { - match self { - Self::Started(continuation) => ContId::from_raw( - continuation.identity().map(|f| f.index() as u64).unwrap_or(0), - ), - Self::Pending(pending) => pending.cont_id, - } - } - pub fn is_started(&self) -> bool { matches!(self, Self::Started(continuation) if continuation.is_started()) } @@ -115,10 +103,10 @@ impl PyK { } /// Returns a display-friendly identity for this PyK. - /// Uses FiberId index for started continuations, ContId raw for pending. + /// Uses FiberId index for started continuations, pending_id for pending. pub fn display_id(&self) -> u64 { if let Some(pending) = &self.pending { - return pending.cont_id.raw(); + return pending.pending_id; } self.continuation .identity() @@ -161,7 +149,7 @@ impl PyK { #[derive(Debug, Clone)] pub struct PendingContinuation { - pub cont_id: ContId, + pub pending_id: u64, program: PyShared, handlers: Vec, handler_identities: Vec>, @@ -188,7 +176,7 @@ impl PendingContinuation { handler_identities }; Self { - cont_id: ContId::fresh(), + pending_id: fresh_pending_cont_id(), program: expr, handlers, handler_identities: normalized_identities, @@ -259,7 +247,7 @@ impl PendingContinuation { pub fn to_pyobject<'py>(&self, py: Python<'py>) -> PyResult> { let dict = PyDict::new(py); - dict.set_item("cont_id", self.cont_id.raw())?; + dict.set_item("cont_id", self.pending_id)?; dict.set_item("started", false)?; dict.set_item("program", self.program.bind(py))?; let handlers = PyList::empty(py); @@ -316,7 +304,7 @@ impl Continuation { } /// Returns the first FiberId as a natural unique identity for this continuation. - /// Used as dispatch identity (replaces ContId per SPEC-VM-021 Step 6). + /// Used as dispatch identity (SPEC-VM-021). pub fn identity(&self) -> Option { self.fibers.as_ref().and_then(|f| f.first().copied()) } @@ -368,12 +356,6 @@ impl Continuation { .map(|f| Self::new_captured(f[1..].to_vec())) } - /// Returns a ContId derived from fibers[0] for scheduler/tracking compatibility. - /// Continuation no longer stores ContId; this derives one from the natural fiber identity. - pub fn derived_cont_id(&self) -> ContId { - ContId::from_raw(self.identity().map(|f| f.index() as u64).unwrap_or(0)) - } - pub fn consumed(&self) -> bool { self.fibers.is_none() } diff --git a/packages/doeff-vm-core/src/do_ctrl.rs b/packages/doeff-vm-core/src/do_ctrl.rs index 8684c817..bb8a1ee1 100644 --- a/packages/doeff-vm-core/src/do_ctrl.rs +++ b/packages/doeff-vm-core/src/do_ctrl.rs @@ -10,7 +10,7 @@ use crate::continuation::{Continuation, OwnedControlContinuation}; use crate::driver::PyException; use crate::effect::DispatchEffect; use crate::frame::CallMetadata; -use crate::ids::{SegmentId, VarId}; +use crate::ids::{FiberId, SegmentId, VarId}; use crate::ir_stream::IRStreamRef; use crate::kleisli::KleisliRef; use crate::py_key::HashedPyKey; @@ -299,7 +299,7 @@ pub enum DoCtrl { }, EvalInScope { expr: PyShared, - scope: Continuation, + scope_fiber: FiberId, bindings: HashMap, metadata: Option, }, diff --git a/packages/doeff-vm-core/src/ids.rs b/packages/doeff-vm-core/src/ids.rs index adb6704a..7d753df7 100644 --- a/packages/doeff-vm-core/src/ids.rs +++ b/packages/doeff-vm-core/src/ids.rs @@ -17,12 +17,6 @@ pub struct Marker(pub u64); #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub struct FiberId(pub u32); -/// Unique identifier for continuations (one-shot tracking). -/// -/// Each captured continuation gets a unique ContId to enforce one-shot semantics. -#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub struct ContId(pub u64); - /// Unique identifier for runnable continuations. #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub struct RunnableId(pub u64); @@ -49,7 +43,7 @@ pub struct VarId { // Global counters for ID generation static MARKER_COUNTER: AtomicU64 = AtomicU64::new(1); static VAR_ID_COUNTER: AtomicU64 = AtomicU64::new(1); -static CONT_ID_COUNTER: AtomicU64 = AtomicU64::new(1); +static PENDING_CONT_COUNTER: AtomicU64 = AtomicU64::new(1); static RUNNABLE_ID_COUNTER: AtomicU64 = AtomicU64::new(1); impl Marker { @@ -95,20 +89,9 @@ impl VarId { } } -impl ContId { - /// Create a fresh unique ContId. - pub fn fresh() -> Self { - ContId(CONT_ID_COUNTER.fetch_add(1, Ordering::Relaxed)) - } - - /// Get the raw value. - pub fn raw(&self) -> u64 { - self.0 - } - - pub fn from_raw(value: u64) -> Self { - ContId(value) - } +/// Generate a fresh unique pending continuation ID. +pub fn fresh_pending_cont_id() -> u64 { + PENDING_CONT_COUNTER.fetch_add(1, Ordering::Relaxed) } impl RunnableId { @@ -171,9 +154,9 @@ mod tests { } #[test] - fn test_cont_id_fresh_is_unique() { - let c1 = ContId::fresh(); - let c2 = ContId::fresh(); + fn test_pending_cont_id_fresh_is_unique() { + let c1 = fresh_pending_cont_id(); + let c2 = fresh_pending_cont_id(); assert_ne!(c1, c2); } diff --git a/packages/doeff-vm-core/src/lib.rs b/packages/doeff-vm-core/src/lib.rs index addfefb4..d17834a6 100644 --- a/packages/doeff-vm-core/src/lib.rs +++ b/packages/doeff-vm-core/src/lib.rs @@ -94,7 +94,7 @@ pub use frame::Frame; #[cfg(feature = "python_bridge")] pub use handler::{IRStreamFactory, IRStreamFactoryRef, IRStreamProgram, IRStreamProgramRef}; #[cfg(feature = "python_bridge")] -pub use ids::{ContId, FiberId, Marker, PromiseId, RunnableId, SegmentId, TaskId, VarId}; +pub use ids::{fresh_pending_cont_id, FiberId, Marker, PromiseId, RunnableId, SegmentId, TaskId, VarId}; #[cfg(feature = "python_bridge")] pub use ir_stream::{IRStream, IRStreamRef, IRStreamStep, PythonGeneratorStream, StreamLocation}; #[cfg(feature = "python_bridge")] diff --git a/packages/doeff-vm-core/src/vm/dispatch.rs b/packages/doeff-vm-core/src/vm/dispatch.rs index 14e581a1..6d014f9f 100644 --- a/packages/doeff-vm-core/src/vm/dispatch.rs +++ b/packages/doeff-vm-core/src/vm/dispatch.rs @@ -1409,12 +1409,17 @@ impl VM { pub(super) fn eval_in_scope_chain_start_segment( &self, - scope: &Continuation, + scope_fiber: FiberId, ) -> Option { - // Lexical scope must anchor to the immediate captured scope - // continuation. Dynamic handler/interceptor visibility is handled - // separately via `child.parent`. - self.continuation_chain_segment_id(scope) + // Lexical scope must anchor to the immediate captured scope fiber. + // Dynamic handler/interceptor visibility is handled separately via `child.parent`. + Some(scope_fiber).filter(|seg_id| self.segments.get(*seg_id).is_some()).or_else(|| { + // Fall back to parent hint if the fiber itself is not found + self.segments + .get(scope_fiber) + .and_then(|s| s.parent) + .and_then(|p| self.normalize_live_parent_hint(Some(p))) + }) } fn continuation_parent_hint(&self, continuation: &Continuation) -> Option { @@ -1467,7 +1472,9 @@ impl VM { .or_else(|| self.continuation_chain_segment_id(continuation)) } - fn return_to_continuation(&self) -> Option { + /// Returns the FiberId of the return-to target found in the current frame chain. + /// Does NOT create a Continuation — callers use the FiberId directly for topology queries. + fn return_to_fiber_id(&self) -> Option { let mut cursor = self.current_segment; while let Some(seg_id) = cursor { let Some(seg) = self.segments.get(seg_id) else { @@ -1477,7 +1484,7 @@ impl VM { Frame::EvalReturn(eval_return) => match eval_return.as_ref() { EvalReturnContinuation::ReturnToContinuation { fiber_ids } | EvalReturnContinuation::EvalInScopeReturn { fiber_ids } => { - Some(fiber_ids.clone()) + Some(fiber_ids) } EvalReturnContinuation::ResumeToContinuation { .. } | EvalReturnContinuation::ApplyResolveFunction { .. } @@ -1496,14 +1503,62 @@ impl VM { | Frame::FlatMapBindResult | Frame::FlatMapBindSource { .. } => None, }) { - let captured_caller = self.segments.get(fiber_ids[0]).and_then(|s| s.parent); - return Some(Continuation::from_fiber(fiber_ids[0], captured_caller)); + return fiber_ids.first().copied(); } cursor = seg.parent; } None } + /// Dispatch view lookup for a single fiber ID (used for return-to queries). + fn dispatch_view_for_single_fiber(&self, fiber_id: FiberId) -> Option { + let single = [fiber_id]; + self.dispatch_lookup_candidates() + .into_iter() + .find(|view| { + Self::fiber_ids_match_slice(&view.dispatch.origin_fiber_ids, &single) + || Self::fiber_ids_match_slice(&view.dispatch.handler_fiber_ids, &single) + }) + .or_else(|| { + self.dispatch_lookup_candidates() + .into_iter() + .filter(|view| { + let h = &view.dispatch.handler_fiber_ids; + !h.is_empty() && *h.last().unwrap() == fiber_id + }) + .min_by_key(|view| view.dispatch.handler_fiber_ids.len()) + }) + } + + fn fiber_ids_match_slice(stored: &[FiberId], target: &[FiberId]) -> bool { + !stored.is_empty() && stored == target + } + + /// Handler chain start for a single return-to fiber. + fn handler_chain_start_for_return_to_fiber(&self, fiber_id: FiberId) -> Option { + // Equivalent to live_handler_chain_start_for_return_to + continuation_handler_chain_start + // for a single-fiber case. + let parent_hint = self + .segments + .get(fiber_id) + .and_then(|s| s.parent) + .and_then(|p| self.normalize_live_parent_hint(Some(p))); + let chain_seg = Some(fiber_id) + .filter(|seg_id| self.segments.get(*seg_id).is_some()) + .or(parent_hint); + let handler_chain_start = parent_hint.or(chain_seg); + + // Also check via dispatch origin (mirrors live_handler_chain_start_for_return_to) + handler_chain_start.or_else(|| { + self.dispatch_view_for_single_fiber(fiber_id) + .and_then(|view| view.dispatch.parent_dispatch_id) + .and_then(|origin_dispatch_id| { + self.dispatch_origin_for_origin_dispatch_id_anywhere(origin_dispatch_id) + }) + .and_then(|origin| self.fiber_ids_handler_chain_start(&origin.origin_fiber_ids)) + }) + } + fn is_internal_doeff_handler_source_file(source_file: &str) -> bool { let normalized = source_file.replace('\\', "/").to_lowercase(); if !normalized.contains("/doeff/") { @@ -1651,9 +1706,11 @@ impl VM { self.dispatch_ref_in_segment(seg_id) .map(|dispatch| dispatch.origin_dispatch_id) .or_else(|| { - self.return_to_continuation().and_then(|continuation| { - self.continuation_parent_dispatch_id(&continuation) - .or_else(|| self.continuation_dispatch_id(&continuation)) + self.return_to_fiber_id().and_then(|fiber_id| { + let view = self.dispatch_view_for_single_fiber(fiber_id)?; + view.dispatch + .parent_dispatch_id + .or(Some(view.dispatch.origin_dispatch_id)) }) }) .or_else(|| { @@ -1844,11 +1901,8 @@ impl VM { { full_current_entries .get_or_insert_with(|| self.full_handler_entries_for_segment(seg_id)); - self.return_to_continuation() - .and_then(|continuation| { - self.live_handler_chain_start_for_return_to(&continuation) - .or_else(|| self.continuation_handler_chain_start(&continuation)) - }) + self.return_to_fiber_id() + .and_then(|fid| self.handler_chain_start_for_return_to_fiber(fid)) .map(|outer_start| self.handlers_in_caller_chain(outer_start)) .unwrap_or_default() } else { @@ -1876,15 +1930,13 @@ impl VM { selected_from_current_chain = false; handler_count = 0; } - let fallback_return_to = (selected.is_none()) - .then(|| self.return_to_continuation()) + let fallback_return_to_fiber = (selected.is_none()) + .then(|| self.return_to_fiber_id()) .flatten(); if selected.is_none() { - let mut cursor = fallback_return_to.as_ref().and_then(|continuation| { - self.live_handler_chain_start_for_return_to(continuation) - .or_else(|| self.continuation_handler_chain_start(continuation)) - }); + let mut cursor = fallback_return_to_fiber + .and_then(|fid| self.handler_chain_start_for_return_to_fiber(fid)); while let Some(cursor_id) = cursor { let Some(seg) = self.segments.get(cursor_id) else { break; @@ -1937,7 +1989,7 @@ impl VM { } } - if cacheable_current_chain && selected_from_current_chain && fallback_return_to.is_none() { + if cacheable_current_chain && selected_from_current_chain && fallback_return_to_fiber.is_none() { if let Some((_, _, prompt_seg_id, _)) = selected.as_ref() { self.cache_current_chain_handler_resolution(seg_id, effect_type_id, *prompt_seg_id); } @@ -2020,7 +2072,8 @@ impl VM { } else { self.capture_live_continuation(seg_id) }; - if let Some(return_to) = fallback_return_to { + if let Some(return_to_fid) = fallback_return_to_fiber { + let return_to = self.capture_live_continuation(return_to_fid); k_user.append_owned_fibers(return_to); } if let Some(seg_id) = self.current_segment { @@ -3390,12 +3443,10 @@ impl VM { fn current_handler_chain_with_live_prefix(&self) -> Vec { let base_entries = self.current_handler_chain(); - let Some(return_to) = self.return_to_continuation() else { + let Some(return_to_fid) = self.return_to_fiber_id() else { return base_entries; }; - let Some(outer_start) = self - .live_handler_chain_start_for_return_to(&return_to) - .or_else(|| self.continuation_handler_chain_start(&return_to)) + let Some(outer_start) = self.handler_chain_start_for_return_to_fiber(return_to_fid) else { return base_entries; }; diff --git a/packages/doeff-vm-core/src/vm/step.rs b/packages/doeff-vm-core/src/vm/step.rs index 34d8c611..4ea29613 100644 --- a/packages/doeff-vm-core/src/vm/step.rs +++ b/packages/doeff-vm-core/src/vm/step.rs @@ -880,9 +880,10 @@ impl VM { self.mode = Mode::Throw(exc); return StepEvent::Continue; } - if let Some(continuation) = - self.handler_stream_throw_continuation(&stream, handler_kind) + if let Some(throw_fiber_id) = + self.handler_stream_throw_fiber_id(&stream, handler_kind) { + let continuation = self.capture_live_continuation(throw_fiber_id); self.mode = Mode::HandleYield(DoCtrl::TransferThrow { continuation, exception: exc, @@ -1527,10 +1528,10 @@ impl VM { DoCtrl::Eval { expr, metadata } => self.handle_yield_eval(expr, metadata), DoCtrl::EvalInScope { expr, - scope, + scope_fiber, bindings, metadata, - } => self.handle_yield_eval_in_scope(expr, scope, bindings, metadata), + } => self.handle_yield_eval_in_scope(expr, scope_fiber, bindings, metadata), DoCtrl::AllocVar { initial } => self.handle_yield_alloc_var(initial), DoCtrl::ReadVar { var } => self.handle_yield_read_var(var), DoCtrl::WriteVar { var, value } => self.handle_yield_write_var(var, value), @@ -1908,7 +1909,7 @@ impl VM { pub(super) fn handle_yield_eval_in_scope( &mut self, expr: PyShared, - scope: Continuation, + scope_fiber: FiberId, bindings: HashMap, metadata: Option, ) -> StepEvent { @@ -1920,9 +1921,7 @@ impl VM { let Some(_current_seg) = self.segments.get(current_seg_id) else { return StepEvent::Error(VMError::internal("EvalInScope current segment not found")); }; - let captured_caller = self.parent_segment(current_seg_id); - let return_to = Continuation::from_fiber(current_seg_id, captured_caller); - let Some(scope_parent_seg_id) = self.eval_in_scope_chain_start_segment(&scope) else { + let Some(scope_parent_seg_id) = self.eval_in_scope_chain_start_segment(scope_fiber) else { return StepEvent::Error(VMError::internal( "EvalInScope received scope from unknown segment", )); @@ -1934,7 +1933,7 @@ impl VM { }); child_seg.push_frame(Frame::EvalReturn(Box::new( EvalReturnContinuation::EvalInScopeReturn { - fiber_ids: return_to.fibers().to_vec(), + fiber_ids: vec![current_seg_id], }, ))); let child_seg_id = self.alloc_segment(child_seg); @@ -2677,9 +2676,10 @@ impl VM { self.emit_handler_threw_for_dispatch(origin_cont_id, &exception); } } - if let Some(continuation) = - self.handler_stream_throw_continuation(&stream, handler_kind) + if let Some(throw_fiber_id) = + self.handler_stream_throw_fiber_id(&stream, handler_kind) { + let continuation = self.capture_live_continuation(throw_fiber_id); self.mode = Mode::HandleYield(DoCtrl::TransferThrow { continuation, exception, diff --git a/packages/doeff-vm-core/src/vm/vm_trace.rs b/packages/doeff-vm-core/src/vm/vm_trace.rs index 576a10a1..47f12969 100644 --- a/packages/doeff-vm-core/src/vm/vm_trace.rs +++ b/packages/doeff-vm-core/src/vm/vm_trace.rs @@ -270,11 +270,13 @@ impl VM { }) } - pub(super) fn handler_stream_throw_continuation( + /// Returns the FiberId for a handler stream throw target. + /// Callers create the Continuation via capture_live_continuation (requires &mut self). + pub(super) fn handler_stream_throw_fiber_id( &self, stream: &IRStreamRef, handler_kind: Option, - ) -> Option { + ) -> Option { handler_kind?; let origin_dispatch_id = self @@ -306,10 +308,7 @@ impl VM { origin.origin_fiber_ids.clone() }; self.fiber_ids_dispatch_is_live(&fiber_ids) - .then(|| { - let captured_caller = self.segments.get(fiber_ids[0]).and_then(|s| s.parent); - Continuation::from_fiber(fiber_ids[0], captured_caller) - }) + .then(|| fiber_ids[0]) } pub(super) fn active_error_dispatch_original_exception(&self) -> Option { diff --git a/packages/doeff-vm-core/src/vm_tests.rs b/packages/doeff-vm-core/src/vm_tests.rs index 3d498d83..90c83a61 100644 --- a/packages/doeff-vm-core/src/vm_tests.rs +++ b/packages/doeff-vm-core/src/vm_tests.rs @@ -592,17 +592,11 @@ fn test_eval_in_scope_uses_scope_chain_for_dynamic_handler_lookup() { let scope_parent_id = vm.alloc_segment(Segment::new(None)); let scope_seg_id = vm.alloc_segment(Segment::new(Some(scope_parent_id))); - let scope_seg = vm - .segments - .get(scope_seg_id) - .expect("scope segment must exist for continuation capture"); - let scope = Continuation::capture(scope_seg, scope_seg_id); - let current_seg_id = vm.alloc_segment(Segment::new(Some(scope_parent_id))); vm.current_segment = Some(current_seg_id); let expr = Python::attach(|py| PyShared::new(py.None())); - let event = vm.handle_yield_eval_in_scope(expr, scope, std::collections::HashMap::new(), None); + let event = vm.handle_yield_eval_in_scope(expr, scope_seg_id, std::collections::HashMap::new(), None); assert!(matches!(event, StepEvent::NeedsPython(_))); let child_seg_id = vm diff --git a/packages/doeff-vm/src/lib.rs b/packages/doeff-vm/src/lib.rs index 9c4146d0..2b4b7c4b 100644 --- a/packages/doeff-vm/src/lib.rs +++ b/packages/doeff-vm/src/lib.rs @@ -98,7 +98,7 @@ pub use effect::*; pub use error::VMError; pub use frame::Frame; pub use handler::*; -pub use ids::{ContId, FiberId, Marker, PromiseId, RunnableId, SegmentId, TaskId, VarId}; +pub use ids::{fresh_pending_cont_id, FiberId, Marker, PromiseId, RunnableId, SegmentId, TaskId, VarId}; pub use ir_stream::{IRStream, IRStreamRef, IRStreamStep, PythonGeneratorStream, StreamLocation}; pub use kleisli::{Kleisli, KleisliDebugInfo, KleisliRef, PyKleisli, RustKleisli}; pub use py_key::HashedPyKey; diff --git a/packages/doeff-vm/src/pyvm.rs b/packages/doeff-vm/src/pyvm.rs index 225b91d1..aff10a2d 100644 --- a/packages/doeff-vm/src/pyvm.rs +++ b/packages/doeff-vm/src/pyvm.rs @@ -1648,11 +1648,12 @@ pub(crate) fn doctrl_to_pyexpr_for_vm(yielded: &DoCtrl) -> Result { - let k = pyk_from_started(py, scope) + let scope_cont = Continuation::from_fiber(*scope_fiber, None); + let k = pyk_from_started(py, &scope_cont) .map_err(|err| PyException::runtime_error(format!("{err}")))?; Some( Bound::new( @@ -2066,9 +2067,12 @@ pub(crate) fn classify_yielded_bound( let eval: PyRef<'_, PyEvalInScope> = obj.extract()?; let expr = eval.expr.clone_ref(py); let scope = borrow_control_continuation(py, &eval.scope, "EvalInScope.scope")?; + let scope_fiber = scope.segment_id().ok_or_else(|| { + PyRuntimeError::new_err("EvalInScope.scope must have a fiber identity") + })?; Ok(DoCtrl::EvalInScope { expr: PyShared::new(expr), - scope, + scope_fiber, bindings: scope_bindings_from_pyany(eval.bindings.bind(py).as_any())?, metadata: None, })