Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 105 additions & 34 deletions packages/doeff-core-effects/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,9 +1127,10 @@ impl IRStreamProgram for LazyAskHandlerProgram {
cache_snapshot,
semaphore_snapshot,
};
let scope_fiber = k.fibers()[0];
return IRStreamStep::Yield(DoCtrl::EvalInScope {
expr: local_effect.sub_program,
scope: k,
scope_fiber,
bindings: local_effect.overrides,
metadata: None,
});
Expand Down Expand Up @@ -1225,9 +1226,10 @@ impl IRStreamProgram for LazyAskHandlerProgram {
source_id,
semaphore,
};
let scope_fiber = continuation.fibers()[0];
IRStreamStep::Yield(DoCtrl::EvalInScope {
expr,
scope: continuation,
scope_fiber,
bindings: HashMap::new(),
metadata: None,
})
Expand Down Expand Up @@ -1687,11 +1689,12 @@ impl IRStreamProgram for ResultSafeHandlerProgram {
if let Some(obj) = dispatch_into_python(effect.clone()) {
return match parse_result_safe_python_effect(&obj) {
Ok(Some(sub_program)) => {
let scope_fiber = k.fibers()[0];
let fiber_ids = k.fibers().to_vec();
self.phase = ResultSafePhase::AwaitEval { fiber_ids };
IRStreamStep::Yield(DoCtrl::EvalInScope {
expr: sub_program,
scope: k,
scope_fiber,
bindings: HashMap::new(),
metadata: None,
})
Expand Down Expand Up @@ -1921,15 +1924,14 @@ mod tests {
use crate::effect::{dispatch_from_shared, Effect};
use crate::ids::Marker;
use crate::ir_stream::{IRStream, IRStreamStep};
use crate::segment::Segment;
use pyo3::types::PyDictMethods;
use pyo3::{IntoPyObject, Python};

fn make_test_continuation() -> Continuation {
let marker = Marker::fresh();
let seg = Segment::new(marker, None);
let seg_id = crate::ids::SegmentId::from_index(0);
Continuation::capture(&seg, seg_id)
use std::sync::atomic::{AtomicUsize, Ordering};
static NEXT_FIBER: AtomicUsize = AtomicUsize::new(2000);
let idx = NEXT_FIBER.fetch_add(1, Ordering::Relaxed);
Continuation::from_fiber(FiberId::from_index(idx), None)
}

fn dispatch(effect: Effect) -> DispatchEffect {
Expand Down Expand Up @@ -1963,7 +1965,7 @@ mod tests {
let mut scope = ScopeStore::default();
let mut program = AwaitHandlerProgram::new();
let continuation = make_test_continuation();
let continuation_id = continuation.derived_cont_id();
let continuation_id = continuation.identity();
program.phase = AwaitPhase::AwaitResult { continuation };

let location = IRStream::debug_location(&program).expect("await debug location");
Expand All @@ -1972,14 +1974,14 @@ mod tests {

let step = IRStream::resume(&mut program, Value::Int(12), &mut store, &mut scope);
match step {
IRStreamStep::Yield(DoCtrl::Resume {
IRStreamStep::Yield(DoCtrl::Transfer {
continuation,
value,
}) => {
assert_eq!(continuation.derived_cont_id(), continuation_id);
assert_eq!(continuation.identity(), continuation_id);
assert_eq!(value.as_int(), Some(12));
}
_ => panic!("expected IRStream Yield(Resume)"),
_ => panic!("expected IRStream Yield(Transfer), got {:?}", step),
}

let location = IRStream::debug_location(&program).expect("await debug location");
Expand All @@ -1994,25 +1996,37 @@ mod tests {

let mut program = StateHandlerProgram::new();
let continuation = make_test_continuation();
let continuation_id = continuation.derived_cont_id();
let continuation_id = continuation.identity();
program.pending_key = Some("count".to_string());
program.pending_k = Some(continuation);
program.pending_old_value = Some(Value::Int(5));
program.phase = StatePhase::AwaitModifyWrite;

let location = IRStream::debug_location(&program).expect("state debug location");
assert_eq!(location.function_name, "StateHandler");
assert_eq!(location.phase.as_deref(), Some("ModifyApply"));

// resume with new value (from modifier) → WriteHandlerState
let step = IRStream::resume(&mut program, Value::Int(8), &mut store, &mut scope);
match &step {
IRStreamStep::Yield(DoCtrl::WriteHandlerState { key, value }) => {
assert_eq!(key, "count");
store.put(key.clone(), value.clone());
}
_ => panic!("expected IRStream Yield(WriteHandlerState), got {:?}", step),
}

// resume after write → Resume with old_value
let step = IRStream::resume(&mut program, Value::Unit, &mut store, &mut scope);
match step {
IRStreamStep::Yield(DoCtrl::Resume {
continuation,
value,
}) => {
assert_eq!(continuation.derived_cont_id(), continuation_id);
assert_eq!(continuation.identity(), continuation_id);
assert_eq!(value.as_int(), Some(5));
}
_ => panic!("expected IRStream Yield(Resume)"),
_ => panic!("expected IRStream Yield(Resume), got {:?}", step),
}

assert_eq!(store.get("count").and_then(Value::as_int), Some(8));
Expand All @@ -2031,6 +2045,7 @@ mod tests {

program.pending_key = Some("count".to_string());
program.pending_old_value = Some(Value::Int(5));
program.phase = StatePhase::AwaitModifyWrite;

let _ = IRStream::resume(&mut program, Value::Int(8), &mut store, &mut scope);
}
Expand Down Expand Up @@ -2132,7 +2147,7 @@ mod tests {
let mut scope = ScopeStore::default();
let mut program = LazyAskHandlerProgram::new(Arc::new(Mutex::new(LazyAskState::default())));
let continuation = make_test_continuation();
let continuation_id = continuation.derived_cont_id();
let continuation_id = continuation.identity();
program.phase = LazyAskPhase::AwaitRelease {
continuation,
outcome: Ok(Value::Int(44)),
Expand All @@ -2148,7 +2163,7 @@ mod tests {
continuation,
value,
}) => {
assert_eq!(continuation.derived_cont_id(), continuation_id);
assert_eq!(continuation.identity(), continuation_id);
assert_eq!(value.as_int(), Some(44));
}
_ => panic!("expected IRStream Yield(Resume)"),
Expand Down Expand Up @@ -2215,17 +2230,25 @@ mod tests {
store.put("key".to_string(), Value::Int(42));
let k = make_test_continuation();
let program_ref = StateHandlerFactory.create_program();
let step = {
let start_step = {
let mut guard = program_ref.lock().unwrap();
guard.start(py, dispatch(Effect::get("key")), k, &mut store, &mut scope)
};
match step {
assert!(
matches!(start_step, IRStreamStep::Yield(DoCtrl::ReadHandlerState { .. })),
"Expected Yield(ReadHandlerState), got {:?}", start_step
);
let resume_step = {
let mut guard = program_ref.lock().unwrap();
guard.resume(Value::Int(42), &mut store, &mut scope)
};
match resume_step {
IRStreamStep::Yield(DoCtrl::Resume { value, .. }) => {
assert_eq!(value.as_int(), Some(42));
}
_ => panic!(
"Expected Yield(Resume), got {:?}",
std::mem::discriminant(&step)
resume_step
),
}
});
Expand All @@ -2238,7 +2261,7 @@ mod tests {
let mut scope = ScopeStore::default();
let k = make_test_continuation();
let program_ref = StateHandlerFactory.create_program();
let step = {
let start_step = {
let mut guard = program_ref.lock().unwrap();
guard.start(
py,
Expand All @@ -2248,8 +2271,19 @@ mod tests {
&mut scope,
)
};
// Handler yields WriteHandlerState; simulate VM writing the state
match &start_step {
IRStreamStep::Yield(DoCtrl::WriteHandlerState { key, value }) => {
store.put(key.clone(), value.clone());
}
_ => panic!("Expected Yield(WriteHandlerState), got {:?}", start_step),
}
let resume_step = {
let mut guard = program_ref.lock().unwrap();
guard.resume(Value::Unit, &mut store, &mut scope)
};
assert!(matches!(
step,
resume_step,
IRStreamStep::Yield(DoCtrl::Resume {
value: Value::Unit,
..
Expand All @@ -2269,7 +2303,8 @@ mod tests {
let k = make_test_continuation();
let modifier = py.None().into_pyobject(py).unwrap().unbind().into_any();
let program_ref = StateHandlerFactory.create_program();
let step = {
// start returns ReadHandlerState
let start_step = {
let mut guard = program_ref.lock().unwrap();
guard.start(
py,
Expand All @@ -2282,6 +2317,15 @@ mod tests {
&mut scope,
)
};
assert!(
matches!(start_step, IRStreamStep::Yield(DoCtrl::ReadHandlerState { .. })),
"Expected Yield(ReadHandlerState), got {:?}", start_step
);
// resume with stored value yields NeedsPython
let step = {
let mut guard = program_ref.lock().unwrap();
guard.resume(Value::Int(10), &mut store, &mut scope)
};
match step {
IRStreamStep::NeedsPython(PythonCall::CallFunc { args, .. }) => {
assert_eq!(args.len(), 1);
Expand All @@ -2302,10 +2346,10 @@ mod tests {
let k = make_test_continuation();
let modifier = py.None().into_pyobject(py).unwrap().unbind().into_any();
let program_ref = StateHandlerFactory.create_program();
// start: returns NeedsPython
// start: returns ReadHandlerState
{
let mut guard = program_ref.lock().unwrap();
guard.start(
let step = guard.start(
py,
dispatch(Effect::Modify {
key: "key".to_string(),
Expand All @@ -2315,12 +2359,30 @@ mod tests {
&mut store,
&mut scope,
);
assert!(matches!(step, IRStreamStep::Yield(DoCtrl::ReadHandlerState { .. })));
}
// resume with new value
let step = {
// resume with old value from store → NeedsPython
{
let mut guard = program_ref.lock().unwrap();
let step = guard.resume(Value::Int(10), &mut store, &mut scope);
assert!(matches!(step, IRStreamStep::NeedsPython(PythonCall::CallFunc { .. })));
}
// resume with modifier result (new value) → WriteHandlerState
let write_step = {
let mut guard = program_ref.lock().unwrap();
guard.resume(Value::Int(20), &mut store, &mut scope)
};
match &write_step {
IRStreamStep::Yield(DoCtrl::WriteHandlerState { key, value }) => {
store.put(key.clone(), value.clone());
}
_ => panic!("Expected Yield(WriteHandlerState), got {:?}", write_step),
}
// resume after write → Resume with old_value
let step = {
let mut guard = program_ref.lock().unwrap();
guard.resume(Value::Unit, &mut store, &mut scope)
};
match step {
IRStreamStep::Yield(DoCtrl::Resume { value, .. }) => {
assert_eq!(value.as_int(), Some(10)); // old_value returned (SPEC-008 L1271)
Expand Down Expand Up @@ -2442,7 +2504,8 @@ mod tests {
Python::attach(|py| {
let mut store = VarStore::new();
let mut scope = ScopeStore::default();
let k = make_test_continuation();
let k1 = make_test_continuation();
let k2 = make_test_continuation();

let locals = pyo3::types::PyDict::new(py);
locals
Expand All @@ -2463,7 +2526,7 @@ mod tests {
let ok_program = ResultSafeHandlerFactory.create_program();
let start_step = {
let mut guard = ok_program.lock().unwrap();
guard.start(py, effect.clone(), k.clone(), &mut store, &mut scope)
guard.start(py, effect.clone(), k1, &mut store, &mut scope)
};
assert!(matches!(
start_step,
Expand All @@ -2475,7 +2538,11 @@ mod tests {
guard.resume(Value::Int(42), &mut store, &mut scope)
};
match ok_step {
IRStreamStep::Yield(DoCtrl::Resume {
IRStreamStep::Yield(DoCtrl::Transfer {
value: Value::Python(obj),
..
})
| IRStreamStep::Yield(DoCtrl::Resume {
value: Value::Python(obj),
..
}) => {
Expand All @@ -2485,13 +2552,13 @@ mod tests {
assert!(is_ok);
assert_eq!(inner.extract::<i64>().unwrap(), 42);
}
_ => panic!("expected Resume with Ok(value)"),
_ => panic!("expected Transfer/Resume with Ok(value), got {:?}", ok_step),
}

let err_program = ResultSafeHandlerFactory.create_program();
let _ = {
let mut guard = err_program.lock().unwrap();
guard.start(py, effect, k, &mut store, &mut scope)
guard.start(py, effect, k2, &mut store, &mut scope)
};

let err_step = {
Expand All @@ -2500,7 +2567,11 @@ mod tests {
};

match err_step {
IRStreamStep::Yield(DoCtrl::Resume {
IRStreamStep::Yield(DoCtrl::Transfer {
value: Value::Python(obj),
..
})
| IRStreamStep::Yield(DoCtrl::Resume {
value: Value::Python(obj),
..
}) => {
Expand All @@ -2511,7 +2582,7 @@ mod tests {
assert!(is_err);
assert!(msg.contains("boom"));
}
_ => panic!("expected Resume with Err(exception)"),
_ => panic!("expected Transfer/Resume with Err(exception), got {:?}", err_step),
}
});
}
Expand Down
Loading
Loading