Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .semgrep.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,47 @@ rules:
languages: [rust]
severity: ERROR

- id: vm-gil-cleanup-no-caller-attach-on-kleisli-apply
pattern-either:
- pattern: Python::attach(|$PY| $KLEISLI.apply($PY, $ARGS))
- pattern: Python::attach(|$PY| $KLEISLI.apply_with_run_token($PY, $ARGS, $RUN_TOKEN))
message: |
GIL ownership belongs inside Kleisli implementations.
Call kleisli.apply(...) / apply_with_run_token(...) directly without forcing Python::attach
at the call site. Ref: vm-perf-gil-cleanup.
languages: [rust]
severity: ERROR
paths:
include:
- /packages/doeff-vm-core/src/**

- id: vm-gil-cleanup-no-caller-attach-on-ir-stream-program
pattern-either:
- pattern: |
Python::attach(|$PY| {
...
$PROGRAM.start($PY, $EFFECT, $K, $STORE, $SCOPE)
})
- pattern: |
Python::attach(|$PY| {
...
$PROGRAM.resume($VALUE, $STORE, $SCOPE)
})
- pattern: |
Python::attach(|$PY| {
...
$PROGRAM.throw($EXC, $STORE, $SCOPE)
})
message: |
IRStreamProgram implementations must acquire the GIL internally when needed.
Do not wrap start/resume/throw in Python::attach at the RustKleisliStream call site.
Ref: vm-perf-gil-cleanup.
languages: [rust]
severity: ERROR
paths:
include:
- /packages/doeff-vm-core/src/kleisli.rs

- id: no-getattr-in-classify-yielded
patterns:
- pattern-inside: |
Expand Down
5 changes: 4 additions & 1 deletion packages/doeff-core-effects/src/effects/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,10 @@ pub fn register_effect_classes(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyCreateExternalPromise>()?;
m.add_class::<PyCancelEffect>()?;
m.add_class::<PyTaskCompleted>()?;
m.add("TaskCancelledError", m.py().get_type::<TaskCancelledError>())?;
m.add(
"TaskCancelledError",
m.py().get_type::<TaskCancelledError>(),
)?;
m.add_class::<PySemaphore>()?;
m.add_class::<PyCreateSemaphore>()?;
m.add_class::<PyAcquireSemaphore>()?;
Expand Down
28 changes: 8 additions & 20 deletions packages/doeff-core-effects/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ impl AwaitHandlerProgram {
impl IRStreamProgram for AwaitHandlerProgram {
fn start(
&mut self,
_py: Python<'_>,
effect: DispatchEffect,
k: Continuation,
_store: &mut RustStore,
Expand Down Expand Up @@ -541,7 +540,6 @@ impl StateHandlerProgram {
impl IRStreamProgram for StateHandlerProgram {
fn start(
&mut self,
_py: Python<'_>,
effect: DispatchEffect,
k: Continuation,
store: &mut RustStore,
Expand Down Expand Up @@ -641,9 +639,10 @@ impl IRStreamProgram for StateHandlerProgram {
}
// Modify case: store modifier result but resume caller with OLD value.
// SPEC-008 L1271: Modify is read-then-modify, returns the old value.
let key = self.pending_key.take().expect(
"StateHandler Modify invariant violated: pending key missing during resume",
);
let key = self
.pending_key
.take()
.expect("StateHandler Modify invariant violated: pending key missing during resume");
let continuation = self.pending_k.take().expect(
"StateHandler Modify invariant violated: pending continuation missing during resume",
);
Expand Down Expand Up @@ -1020,7 +1019,6 @@ impl LazyAskHandlerProgram {
impl IRStreamProgram for LazyAskHandlerProgram {
fn start(
&mut self,
_py: Python<'_>,
effect: DispatchEffect,
k: Continuation,
store: &mut RustStore,
Expand Down Expand Up @@ -1310,7 +1308,6 @@ impl ReaderHandlerProgram {
impl IRStreamProgram for ReaderHandlerProgram {
fn start(
&mut self,
_py: Python<'_>,
effect: DispatchEffect,
k: Continuation,
store: &mut RustStore,
Expand Down Expand Up @@ -1446,7 +1443,6 @@ impl WriterHandlerProgram {
impl IRStreamProgram for WriterHandlerProgram {
fn start(
&mut self,
_py: Python<'_>,
effect: DispatchEffect,
k: Continuation,
store: &mut RustStore,
Expand Down Expand Up @@ -1614,7 +1610,6 @@ impl ResultSafeHandlerProgram {
impl IRStreamProgram for ResultSafeHandlerProgram {
fn start(
&mut self,
_py: Python<'_>,
effect: DispatchEffect,
k: Continuation,
_store: &mut RustStore,
Expand Down Expand Up @@ -1765,7 +1760,6 @@ impl std::fmt::Debug for DoubleCallHandlerProgram {
impl IRStreamProgram for DoubleCallHandlerProgram {
fn start(
&mut self,
_py: Python<'_>,
effect: DispatchEffect,
k: Continuation,
_store: &mut RustStore,
Expand Down Expand Up @@ -2022,14 +2016,8 @@ mod tests {
let obj = locals.get_item("obj").unwrap().unwrap().unbind();
let effect = Effect::from_shared(PyShared::new(obj));

let step = IRStreamProgram::start(
&mut program,
py,
effect,
continuation,
&mut store,
&mut scope,
);
let step =
IRStreamProgram::start(&mut program, effect, continuation, &mut store, &mut scope);
assert!(matches!(
step,
IRStreamStep::Yield(DoCtrl::EvalInScope { .. })
Expand Down Expand Up @@ -2374,7 +2362,7 @@ mod tests {
let ok_program = ResultSafeHandlerFactory.create_program();
let start_step = {
let mut guard = ok_program.lock().unwrap();
guard.start(py, effect.clone(), k.clone(), &mut store, &mut scope)
guard.start(effect.clone(), k.clone(), &mut store, &mut scope)
};
assert!(matches!(
start_step,
Expand Down Expand Up @@ -2402,7 +2390,7 @@ mod tests {
let err_program = ResultSafeHandlerFactory.create_program();
let _ = {
let mut guard = err_program.lock().unwrap();
guard.start(py, effect, k, &mut store, &mut scope)
guard.start(effect, k, &mut store, &mut scope)
};

let err_step = {
Expand Down
4 changes: 3 additions & 1 deletion packages/doeff-core-effects/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ pub mod effect {
}

pub mod handler {
pub use doeff_vm_core::{IRStreamFactory, IRStreamFactoryRef, IRStreamProgram, IRStreamProgramRef};
pub use crate::handlers::*;
pub use doeff_vm_core::{
IRStreamFactory, IRStreamFactoryRef, IRStreamProgram, IRStreamProgramRef,
};
}

pub fn register_all(m: &pyo3::Bound<'_, pyo3::types::PyModule>) -> pyo3::PyResult<()> {
Expand Down
61 changes: 27 additions & 34 deletions packages/doeff-core-effects/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use crate::effect::{
dispatch_from_shared, dispatch_into_python, dispatch_ref_as_python,
make_execution_context_object, DispatchEffect, PyAcquireSemaphore, PyCancelEffect,
PyCompletePromise, PyCreateExternalPromise, PyCreatePromise, PyCreateSemaphore, PyFailPromise,
PyGather, PyGetExecutionContext, PyRace, PyReleaseSemaphore, PySemaphore, PySpawn, PyTaskCompleted,
TaskCancelledError,
PyGather, PyGetExecutionContext, PyRace, PyReleaseSemaphore, PySemaphore, PySpawn,
PyTaskCompleted, TaskCancelledError,
};
use crate::error::VMError;
use crate::handler::{IRStreamFactory, IRStreamProgram, IRStreamProgramRef};
Expand Down Expand Up @@ -528,10 +528,13 @@ pub fn debug_semaphore_exists(semaphore_id: u64) -> bool {
registry
.iter()
.filter_map(|(state_id, weak_state)| {
weak_state.upgrade().map(|state| (*state_id, state)).or_else(|| {
stale_state_ids.push(*state_id);
None
})
weak_state
.upgrade()
.map(|state| (*state_id, state))
.or_else(|| {
stale_state_ids.push(*state_id);
None
})
})
.collect()
};
Expand Down Expand Up @@ -559,7 +562,10 @@ pub fn notify_semaphore_handle_dropped(state_id: u64, semaphore_id: u64) {
let mut notifications = semaphore_drop_notifications()
.lock()
.expect("Scheduler lock poisoned");
notifications.entry(state_id).or_default().push(semaphore_id);
notifications
.entry(state_id)
.or_default()
.push(semaphore_id);
}

fn parse_task_completed_result(
Expand All @@ -586,7 +592,9 @@ fn parse_task_completed_result(
}

fn extract_semaphore_id(obj: &Bound<'_, PyAny>) -> Option<u64> {
obj.extract::<PyRef<'_, PySemaphore>>().ok().map(|semaphore| semaphore.id)
obj.extract::<PyRef<'_, PySemaphore>>()
.ok()
.map(|semaphore| semaphore.id)
}

fn is_internal_source_file(source_file: &str) -> bool {
Expand Down Expand Up @@ -910,12 +918,12 @@ fn make_python_semaphore_value(semaphore_id: u64, state_id: u64) -> Result<Value
state_id,
},
)
.map_err(|e| {
PyException::runtime_error(format!(
"failed to instantiate runtime Semaphore({semaphore_id}): {e}"
))
})?
.into_any();
.map_err(|e| {
PyException::runtime_error(format!(
"failed to instantiate runtime Semaphore({semaphore_id}): {e}"
))
})?
.into_any();
Ok(Value::Python(PyShared::new(semaphore.unbind())))
})
}
Expand Down Expand Up @@ -1051,11 +1059,9 @@ impl SchedulerState {
}

fn has_external_waiters(&self) -> bool {
self.waiters
.iter()
.any(|(item, waiters)| {
matches!(item, Waitable::ExternalPromise(_)) && !waiters.is_empty()
})
self.waiters.iter().any(|(item, waiters)| {
matches!(item, Waitable::ExternalPromise(_)) && !waiters.is_empty()
})
}

fn parse_external_completion_item(
Expand Down Expand Up @@ -2661,7 +2667,6 @@ impl SchedulerProgram {
impl IRStreamProgram for SchedulerProgram {
fn start(
&mut self,
_py: Python<'_>,
effect: DispatchEffect,
k_user: Continuation,
store: &mut RustStore,
Expand Down Expand Up @@ -3815,7 +3820,6 @@ mod tests {

let step = IRStreamProgram::start(
&mut program,
py,
complete,
idle_k.clone(),
&mut store,
Expand Down Expand Up @@ -4045,7 +4049,6 @@ mod tests {

let first_step = IRStreamProgram::start(
&mut program,
py,
complete,
idle_driver_k.clone(),
&mut store,
Expand Down Expand Up @@ -4154,7 +4157,6 @@ mod tests {

let resolver_step = IRStreamProgram::start(
&mut resolver_program,
py,
complete,
resolver_k.clone(),
&mut store,
Expand Down Expand Up @@ -4298,7 +4300,6 @@ mod tests {
);
let first_step = IRStreamProgram::start(
&mut program,
py,
wake_normal,
idle_k.clone(),
&mut store,
Expand All @@ -4325,7 +4326,6 @@ mod tests {
);
let second_step = IRStreamProgram::start(
&mut program,
py,
wake_high,
normal_k.clone(),
&mut store,
Expand Down Expand Up @@ -4413,14 +4413,8 @@ mod tests {
.into_any();
let fail = make_fail_promise_effect(py, promise_obj, error_obj);

let step = IRStreamProgram::start(
&mut program,
py,
fail,
idle_k.clone(),
&mut store,
&mut _scope,
);
let step =
IRStreamProgram::start(&mut program, fail, idle_k.clone(), &mut store, &mut _scope);
assert!(
matches!(
&step,
Expand Down Expand Up @@ -4996,7 +4990,6 @@ mod tests {

let step = IRStreamProgram::start(
&mut program,
py,
make_acquire_semaphore_effect(py, make_semaphore_object(py, semaphore_id)),
driver_k,
&mut store,
Expand Down
10 changes: 3 additions & 7 deletions packages/doeff-vm-core/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

use std::sync::{Arc, Mutex};

use pyo3::prelude::*;

use crate::continuation::Continuation;
use crate::do_ctrl::DoCtrl;
use crate::effect::DispatchEffect;
Expand All @@ -20,7 +18,6 @@ use crate::value::Value;
pub trait IRStreamProgram: std::fmt::Debug + Send {
fn start(
&mut self,
py: Python<'_>,
effect: DispatchEffect,
k: Continuation,
store: &mut RustStore,
Expand Down Expand Up @@ -74,21 +71,20 @@ impl<T> Kleisli for T
where
T: IRStreamFactory + Clone + std::fmt::Debug + Send + Sync + 'static,
{
fn apply(&self, py: Python<'_>, args: Vec<Value>) -> Result<DoCtrl, VMError> {
self.apply_with_run_token(py, args, None)
fn apply(&self, args: Vec<Value>) -> Result<DoCtrl, VMError> {
self.apply_with_run_token(args, None)
}

fn apply_with_run_token(
&self,
py: Python<'_>,
args: Vec<Value>,
run_token: Option<u64>,
) -> Result<DoCtrl, VMError> {
let kleisli = RustKleisli::new(
Arc::new(self.clone()),
<Self as IRStreamFactory>::handler_name(self).to_string(),
);
kleisli.apply_with_run_token(py, args, run_token)
kleisli.apply_with_run_token(args, run_token)
}

fn debug_info(&self) -> KleisliDebugInfo {
Expand Down
Loading
Loading