From 47995dc942648dc7b4002a8a4457d3222a76ef25 Mon Sep 17 00:00:00 2001 From: proboscis Date: Sat, 21 Mar 2026 15:21:05 +0900 Subject: [PATCH 1/4] Fix vm dispatch memory retention --- packages/doeff-vm-core/src/continuation.rs | 92 ++-- packages/doeff-vm-core/src/frame.rs | 4 +- packages/doeff-vm-core/src/ir_stream.rs | 94 ++++- packages/doeff-vm-core/src/kleisli.rs | 8 +- packages/doeff-vm-core/src/lib.rs | 4 + packages/doeff-vm-core/src/memory_stats.rs | 58 +++ packages/doeff-vm-core/src/segment.rs | 49 ++- packages/doeff-vm-core/src/vm/dispatch.rs | 415 ++++++++++++------- packages/doeff-vm-core/src/vm/step.rs | 123 +++--- packages/doeff-vm-core/src/vm/vm_trace.rs | 2 +- packages/doeff-vm/Cargo.lock | 1 + packages/doeff-vm/Cargo.toml | 1 + packages/doeff-vm/doeff_vm/__init__.py | 1 + packages/doeff-vm/src/pyvm.rs | 143 ++++++- packages/doeff-vm/tests/test_memory_stats.py | 27 ++ 15 files changed, 739 insertions(+), 283 deletions(-) create mode 100644 packages/doeff-vm-core/src/memory_stats.rs create mode 100644 packages/doeff-vm/tests/test_memory_stats.py diff --git a/packages/doeff-vm-core/src/continuation.rs b/packages/doeff-vm-core/src/continuation.rs index cdaebee8..2736bc33 100644 --- a/packages/doeff-vm-core/src/continuation.rs +++ b/packages/doeff-vm-core/src/continuation.rs @@ -8,14 +8,15 @@ use pyo3::types::{PyDict, PyList}; use crate::frame::CallMetadata; use crate::frame::Frame; +use crate::ids::VarId; use crate::ids::{ContId, DispatchId, Marker, ScopeId, SegmentId}; use crate::kleisli::KleisliRef; +use crate::memory_stats; use crate::py_key::HashedPyKey; use crate::py_shared::PyShared; use crate::segment::Segment; use crate::step::PyException; use crate::value::Value; -use crate::ids::VarId; #[pyclass(name = "K")] pub struct PyK { @@ -62,7 +63,7 @@ pub(crate) struct DispatchHandlerHint { pub(crate) prompt_seg_id: SegmentId, } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Continuation { pub cont_id: ContId, dispatch_id: Option, @@ -80,20 +81,7 @@ pub struct Continuation { impl Continuation { fn captured_segment_snapshot(segment: &Segment) -> Box { - Box::new(Segment { - scope_id: segment.scope_id, - persistent_epoch: segment.persistent_epoch, - marker: segment.marker, - frames: segment.frames.clone(), - parent: segment.parent, - state_store: segment.state_store.clone(), - writer_log: segment.writer_log.clone(), - kind: segment.kind.clone(), - pending_error_context: segment.pending_error_context.clone(), - throw_parent: segment.throw_parent.clone(), - interceptor_eval_depth: segment.interceptor_eval_depth, - interceptor_skip_stack: segment.interceptor_skip_stack.clone(), - }) + Box::new(segment.clone()) } pub fn capture( @@ -101,6 +89,7 @@ impl Continuation { segment_id: SegmentId, dispatch_id: Option, ) -> Self { + memory_stats::register_continuation(); Continuation { cont_id: ContId::fresh(), dispatch_id, @@ -122,6 +111,7 @@ impl Continuation { segment_id: SegmentId, dispatch_id: Option, ) -> Self { + memory_stats::register_continuation(); Continuation { cont_id, dispatch_id, @@ -148,6 +138,7 @@ impl Continuation { outside_scope: Option, ) -> Self { let handler_count = handlers.len(); + memory_stats::register_continuation(); Continuation { cont_id: ContId::fresh(), dispatch_id: None, @@ -190,6 +181,7 @@ impl Continuation { metadata: Option, outside_scope: Option, ) -> Self { + memory_stats::register_continuation(); Continuation { cont_id: ContId::fresh(), dispatch_id: None, @@ -325,19 +317,10 @@ impl Continuation { } pub(crate) fn clone_for_dispatch(&self, dispatch_id: Option) -> Self { - Continuation { - cont_id: ContId::fresh(), - dispatch_id, - resume_dispatch_id: self.resume_dispatch_id, - dispatch_handler_hint: self.dispatch_handler_hint, - segment_id: self.segment_id, - segment_snapshot: self.segment_snapshot.clone(), - scope_parent_snapshot: self.scope_parent_snapshot, - scope_bindings_snapshot: self.scope_bindings_snapshot.clone(), - var_overrides_snapshot: self.var_overrides_snapshot.clone(), - unstarted: self.unstarted.clone(), - parent: self.parent.clone(), - } + let mut cloned = self.clone(); + cloned.cont_id = ContId::fresh(); + cloned.dispatch_id = dispatch_id; + cloned } pub(crate) fn refresh_persistent_segment_state( @@ -432,7 +415,7 @@ impl Continuation { } pub(crate) fn into_unstarted_parts( - self, + mut self, ) -> Option<( PyShared, Vec, @@ -440,7 +423,7 @@ impl Continuation { Option, Option, )> { - self.unstarted.map( + self.unstarted.take().map( |UnstartedContinuation { program, handlers, @@ -486,6 +469,31 @@ impl Continuation { } } +impl Clone for Continuation { + fn clone(&self) -> Self { + memory_stats::register_continuation(); + Continuation { + cont_id: self.cont_id, + dispatch_id: self.dispatch_id, + resume_dispatch_id: self.resume_dispatch_id, + dispatch_handler_hint: self.dispatch_handler_hint, + segment_id: self.segment_id, + segment_snapshot: self.segment_snapshot.clone(), + scope_parent_snapshot: self.scope_parent_snapshot, + scope_bindings_snapshot: self.scope_bindings_snapshot.clone(), + var_overrides_snapshot: self.var_overrides_snapshot.clone(), + unstarted: self.unstarted.clone(), + parent: self.parent.clone(), + } + } +} + +impl Drop for Continuation { + fn drop(&mut self) { + memory_stats::unregister_continuation(); + } +} + #[cfg(test)] mod tests { use super::*; @@ -493,6 +501,7 @@ mod tests { use crate::error::VMError; use crate::ids::Marker; use crate::kleisli::{Kleisli, KleisliDebugInfo}; + use crate::memory_stats::live_object_counts; use crate::segment::SegmentKind; use crate::value::Value; @@ -551,11 +560,15 @@ mod tests { #[test] fn test_unstarted_continuation_has_no_segment_snapshot() { Python::attach(|py| { + let baseline = live_object_counts().live_continuations; let cont = Continuation::create_unstarted(PyShared::new(py.None()), Vec::new()); assert!(!cont.is_started()); assert!(cont.segment_id().is_none()); assert!(cont.segment().is_none()); assert!(cont.frames().is_none()); + assert_eq!(live_object_counts().live_continuations, baseline + 1); + drop(cont); + assert_eq!(live_object_counts().live_continuations, baseline); }); } @@ -642,4 +655,21 @@ mod tests { assert_eq!(snapshot.writer_log, vec![Value::Int(20)]); assert_eq!(snapshot.persistent_epoch, 2); } + + #[test] + fn test_continuation_live_count_tracks_clone_lifetime() { + let baseline = live_object_counts().live_continuations; + let (seg, seg_id) = make_test_segment(); + let cont = Continuation::capture(&seg, seg_id, None); + assert_eq!(live_object_counts().live_continuations, baseline + 1); + + let cont_clone = cont.clone(); + assert_eq!(live_object_counts().live_continuations, baseline + 2); + + drop(cont_clone); + assert_eq!(live_object_counts().live_continuations, baseline + 1); + + drop(cont); + assert_eq!(live_object_counts().live_continuations, baseline); + } } diff --git a/packages/doeff-vm-core/src/frame.rs b/packages/doeff-vm-core/src/frame.rs index 02d47f7a..477711d4 100644 --- a/packages/doeff-vm-core/src/frame.rs +++ b/packages/doeff-vm-core/src/frame.rs @@ -270,9 +270,7 @@ mod tests { #[test] fn test_program_frame_is_program() { - let stream = std::sync::Arc::new(std::sync::Mutex::new( - Box::new(DummyStream) as Box - )); + let stream = IRStreamRef::new(Box::new(DummyStream) as Box); let frame = Frame::program(stream, None); assert!(frame.is_program()); } diff --git a/packages/doeff-vm-core/src/ir_stream.rs b/packages/doeff-vm-core/src/ir_stream.rs index 25c3928a..adb1d7b9 100644 --- a/packages/doeff-vm-core/src/ir_stream.rs +++ b/packages/doeff-vm-core/src/ir_stream.rs @@ -1,12 +1,14 @@ //! Stream abstraction for stepping AST/program sources. use std::fmt; +use std::ops::Deref; use std::sync::{Arc, Mutex}; use pyo3::prelude::*; use crate::do_ctrl::DoCtrl; use crate::driver::PyException; +use crate::memory_stats; use crate::py_shared::PyShared; use crate::python_call::PythonCall; use crate::segment::ScopeStore; @@ -37,7 +39,44 @@ pub trait IRStream: fmt::Debug + Send { } } -pub type IRStreamRef = Arc>>; +#[derive(Debug)] +struct TrackedIRStream { + stream: Mutex>, +} + +impl Drop for TrackedIRStream { + fn drop(&mut self) { + memory_stats::unregister_ir_stream(); + } +} + +#[derive(Debug, Clone)] +pub struct IRStreamRef(Arc); + +impl IRStreamRef { + pub fn new(stream: Box) -> Self { + memory_stats::register_ir_stream(); + IRStreamRef(Arc::new(TrackedIRStream { + stream: Mutex::new(stream), + })) + } + + pub fn ptr_eq(lhs: &Self, rhs: &Self) -> bool { + Arc::ptr_eq(&lhs.0, &rhs.0) + } + + pub fn strong_count(&self) -> usize { + Arc::strong_count(&self.0) + } +} + +impl Deref for IRStreamRef { + type Target = Mutex>; + + fn deref(&self) -> &Self::Target { + &self.0.stream + } +} #[derive(Debug)] pub enum IRStreamStep { @@ -117,9 +156,17 @@ impl PythonGeneratorStream { let mut saw_resume = false; for instruction in instructions.try_iter().ok()? { let instruction = instruction.ok()?; - let offset = instruction.getattr("offset").ok()?.extract::().ok()?; + let offset = instruction + .getattr("offset") + .ok()? + .extract::() + .ok()?; if saw_resume { - let opname = instruction.getattr("opname").ok()?.extract::().ok()?; + let opname = instruction + .getattr("opname") + .ok()? + .extract::() + .ok()?; return Some(opname == "RETURN_VALUE"); } if offset == lasti { @@ -170,6 +217,7 @@ impl IRStream for PythonGeneratorStream { #[cfg(test)] mod tests { + use crate::memory_stats::live_object_counts; use pyo3::types::PyDict; use super::*; @@ -284,4 +332,44 @@ mod tests { assert_eq!(location.phase, None); }); } + + #[test] + fn test_ir_stream_live_count_tracks_underlying_stream() { + #[derive(Debug)] + struct DummyStream; + + impl IRStream for DummyStream { + fn resume( + &mut self, + _value: Value, + _store: &mut RustStore, + _scope: &mut ScopeStore, + ) -> IRStreamStep { + IRStreamStep::Return(Value::Unit) + } + + fn throw( + &mut self, + exc: PyException, + _store: &mut RustStore, + _scope: &mut ScopeStore, + ) -> IRStreamStep { + IRStreamStep::Throw(exc) + } + } + + let baseline = live_object_counts().live_ir_streams; + let stream = IRStreamRef::new(Box::new(DummyStream)); + assert_eq!(live_object_counts().live_ir_streams, baseline + 1); + + let stream_clone = stream.clone(); + assert_eq!(live_object_counts().live_ir_streams, baseline + 1); + assert_eq!(stream_clone.strong_count(), 2); + + drop(stream_clone); + assert_eq!(live_object_counts().live_ir_streams, baseline + 1); + + drop(stream); + assert_eq!(live_object_counts().live_ir_streams, baseline); + } } diff --git a/packages/doeff-vm-core/src/kleisli.rs b/packages/doeff-vm-core/src/kleisli.rs index d09667f8..e263a65d 100644 --- a/packages/doeff-vm-core/src/kleisli.rs +++ b/packages/doeff-vm-core/src/kleisli.rs @@ -1,6 +1,6 @@ //! Kleisli arrow types for IR-level callables (SPEC-VM-017). -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; @@ -447,7 +447,7 @@ impl Kleisli for PyKleisli { }; let stream = PythonGeneratorStream::new(generator, get_frame); - let stream_ref: IRStreamRef = Arc::new(Mutex::new(Box::new(stream))); + let stream_ref = IRStreamRef::new(Box::new(stream)); let metadata = CallMetadata::new( self.name.clone(), self.file.clone().unwrap_or_else(|| "".to_string()), @@ -714,11 +714,11 @@ impl Kleisli for RustKleisli { }; let program = self.factory.create_program_for_run(run_token); - let stream: IRStreamRef = Arc::new(Mutex::new(Box::new(RustKleisliStream::new( + let stream = IRStreamRef::new(Box::new(RustKleisliStream::new( program, effect, continuation, - )))); + ))); Ok(DoCtrl::IRStream { stream, diff --git a/packages/doeff-vm-core/src/lib.rs b/packages/doeff-vm-core/src/lib.rs index 4f03da83..58c71fe5 100644 --- a/packages/doeff-vm-core/src/lib.rs +++ b/packages/doeff-vm-core/src/lib.rs @@ -38,6 +38,8 @@ pub mod ir_stream; #[cfg(feature = "python_bridge")] pub mod kleisli; #[cfg(feature = "python_bridge")] +pub mod memory_stats; +#[cfg(feature = "python_bridge")] pub mod py_key; #[cfg(feature = "python_bridge")] pub mod python_call; @@ -102,6 +104,8 @@ pub use ir_stream::{IRStream, IRStreamRef, IRStreamStep, PythonGeneratorStream, #[cfg(feature = "python_bridge")] pub use kleisli::{Kleisli, KleisliDebugInfo, KleisliRef, PyKleisli, RustKleisli}; #[cfg(feature = "python_bridge")] +pub use memory_stats::{live_object_counts, VmLiveObjectCounts}; +#[cfg(feature = "python_bridge")] pub use py_key::HashedPyKey; #[cfg(feature = "python_bridge")] pub use python_call::{PendingPython, PyCallOutcome, PythonCall}; diff --git a/packages/doeff-vm-core/src/memory_stats.rs b/packages/doeff-vm-core/src/memory_stats.rs new file mode 100644 index 00000000..4e333e01 --- /dev/null +++ b/packages/doeff-vm-core/src/memory_stats.rs @@ -0,0 +1,58 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct VmLiveObjectCounts { + pub live_segments: usize, + pub live_continuations: usize, + pub live_ir_streams: usize, + pub in_place_reentries: usize, + pub abandoned_transfer_branch_frees: usize, +} + +static LIVE_SEGMENTS: AtomicUsize = AtomicUsize::new(0); +static LIVE_CONTINUATIONS: AtomicUsize = AtomicUsize::new(0); +static LIVE_IR_STREAMS: AtomicUsize = AtomicUsize::new(0); +static IN_PLACE_REENTRIES: AtomicUsize = AtomicUsize::new(0); +static ABANDONED_TRANSFER_BRANCH_FREES: AtomicUsize = AtomicUsize::new(0); + +pub fn live_object_counts() -> VmLiveObjectCounts { + VmLiveObjectCounts { + live_segments: LIVE_SEGMENTS.load(Ordering::Relaxed), + live_continuations: LIVE_CONTINUATIONS.load(Ordering::Relaxed), + live_ir_streams: LIVE_IR_STREAMS.load(Ordering::Relaxed), + in_place_reentries: IN_PLACE_REENTRIES.load(Ordering::Relaxed), + abandoned_transfer_branch_frees: ABANDONED_TRANSFER_BRANCH_FREES.load(Ordering::Relaxed), + } +} + +pub(crate) fn register_segment() { + LIVE_SEGMENTS.fetch_add(1, Ordering::Relaxed); +} + +pub(crate) fn unregister_segment() { + LIVE_SEGMENTS.fetch_sub(1, Ordering::Relaxed); +} + +pub(crate) fn register_continuation() { + LIVE_CONTINUATIONS.fetch_add(1, Ordering::Relaxed); +} + +pub(crate) fn unregister_continuation() { + LIVE_CONTINUATIONS.fetch_sub(1, Ordering::Relaxed); +} + +pub(crate) fn register_ir_stream() { + LIVE_IR_STREAMS.fetch_add(1, Ordering::Relaxed); +} + +pub(crate) fn unregister_ir_stream() { + LIVE_IR_STREAMS.fetch_sub(1, Ordering::Relaxed); +} + +pub(crate) fn record_in_place_reentry() { + IN_PLACE_REENTRIES.fetch_add(1, Ordering::Relaxed); +} + +pub(crate) fn record_abandoned_transfer_branch_free() { + ABANDONED_TRANSFER_BRANCH_FREES.fetch_add(1, Ordering::Relaxed); +} diff --git a/packages/doeff-vm-core/src/segment.rs b/packages/doeff-vm-core/src/segment.rs index ba01f1ce..8cf50899 100644 --- a/packages/doeff-vm-core/src/segment.rs +++ b/packages/doeff-vm-core/src/segment.rs @@ -8,6 +8,7 @@ use crate::frame::CallMetadata; use crate::frame::Frame; use crate::ids::{FiberId, Marker, ScopeId}; use crate::kleisli::KleisliRef; +use crate::memory_stats; use crate::py_key::HashedPyKey; use crate::py_shared::PyShared; use crate::step::PyException; @@ -40,7 +41,7 @@ pub struct ScopeStore { pub scope_bindings: Vec>>, } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Fiber { pub scope_id: ScopeId, pub persistent_epoch: u64, @@ -58,6 +59,7 @@ pub struct Fiber { impl Fiber { pub fn new(marker: Marker, parent: Option) -> Self { + memory_stats::register_segment(); Fiber { scope_id: ScopeId::fresh(), persistent_epoch: 0, @@ -80,6 +82,7 @@ impl Fiber { handled_marker: Marker, handler: KleisliRef, ) -> Self { + memory_stats::register_segment(); Fiber { scope_id: ScopeId::fresh(), persistent_epoch: 0, @@ -107,6 +110,7 @@ impl Fiber { handler: KleisliRef, types: Option>, ) -> Self { + memory_stats::register_segment(); Fiber { scope_id: ScopeId::fresh(), persistent_epoch: 0, @@ -157,12 +161,39 @@ impl Fiber { } } +impl Clone for Fiber { + fn clone(&self) -> Self { + memory_stats::register_segment(); + Fiber { + scope_id: self.scope_id, + persistent_epoch: self.persistent_epoch, + marker: self.marker, + frames: self.frames.clone(), + parent: self.parent, + state_store: self.state_store.clone(), + writer_log: self.writer_log.clone(), + kind: self.kind.clone(), + pending_error_context: self.pending_error_context.clone(), + throw_parent: self.throw_parent.clone(), + interceptor_eval_depth: self.interceptor_eval_depth, + interceptor_skip_stack: self.interceptor_skip_stack.clone(), + } + } +} + +impl Drop for Fiber { + fn drop(&mut self) { + memory_stats::unregister_segment(); + } +} + pub type Segment = Fiber; pub type SegmentKind = FiberKind; #[cfg(test)] mod tests { use super::*; + use crate::memory_stats::live_object_counts; use pyo3::Python; use crate::do_ctrl::DoCtrl; @@ -224,4 +255,20 @@ mod tests { assert!(!seg.has_frames()); assert!(seg.pop_frame().is_none()); } + + #[test] + fn test_segment_live_count_tracks_clones() { + let baseline = live_object_counts().live_segments; + let seg = Fiber::new(Marker::fresh(), None); + assert_eq!(live_object_counts().live_segments, baseline + 1); + + let seg_clone = seg.clone(); + assert_eq!(live_object_counts().live_segments, baseline + 2); + + drop(seg_clone); + assert_eq!(live_object_counts().live_segments, baseline + 1); + + drop(seg); + assert_eq!(live_object_counts().live_segments, baseline); + } } diff --git a/packages/doeff-vm-core/src/vm/dispatch.rs b/packages/doeff-vm-core/src/vm/dispatch.rs index a136cda1..52c6e5df 100644 --- a/packages/doeff-vm-core/src/vm/dispatch.rs +++ b/packages/doeff-vm-core/src/vm/dispatch.rs @@ -22,14 +22,17 @@ impl VM { seg_id: SegmentId, ) -> Option<(DispatchId, &crate::dispatch_observer::DispatchContext)> { let dispatch_id = self.dispatch_origin_id_in_segment(seg_id)?; - let dispatch = self.dispatch_observer.dispatch(dispatch_id).unwrap_or_else(|| { - panic!( - "dispatch observer invariant violated: segment {} references dispatch {} but \ + let dispatch = self + .dispatch_observer + .dispatch(dispatch_id) + .unwrap_or_else(|| { + panic!( + "dispatch observer invariant violated: segment {} references dispatch {} but \ observer has no context", - seg_id.index(), - dispatch_id.raw() - ) - }); + seg_id.index(), + dispatch_id.raw() + ) + }); Some((dispatch_id, dispatch)) } @@ -40,7 +43,10 @@ impl VM { fn dispatch_origin_view(&self, dispatch_id: DispatchId) -> Option { let dispatch = self.dispatch_observer.dispatch(dispatch_id)?; - Some(Self::dispatch_origin_view_from_context(dispatch_id, dispatch)) + Some(Self::dispatch_origin_view_from_context( + dispatch_id, + dispatch, + )) } fn dispatch_origins_from_segment( @@ -53,7 +59,10 @@ impl VM { while let Some(seg_id) = cursor { if let Some((dispatch_id, dispatch)) = self.dispatch_context_for_segment(seg_id) { if seen.insert(dispatch_id) { - origins.push(Self::dispatch_origin_view_from_context(dispatch_id, dispatch)); + origins.push(Self::dispatch_origin_view_from_context( + dispatch_id, + dispatch, + )); } } cursor = self.segments.get(seg_id).and_then(|seg| seg.parent); @@ -183,7 +192,10 @@ impl VM { let mut cursor = self.current_segment; while let Some(seg_id) = cursor { if let Some((dispatch_id, dispatch)) = self.dispatch_context_for_segment(seg_id) { - return Some(Self::dispatch_origin_view_from_context(dispatch_id, dispatch)); + return Some(Self::dispatch_origin_view_from_context( + dispatch_id, + dispatch, + )); } cursor = self.segments.get(seg_id).and_then(|seg| seg.parent); } @@ -227,11 +239,7 @@ impl VM { EvalReturnContinuation::ResumeToContinuation { continuation } | EvalReturnContinuation::ReturnToContinuation { continuation } | EvalReturnContinuation::EvalInScopeReturn { continuation } => { - Self::continuation_is_in_origin_chain( - continuation, - target_cont_id, - visited, - ) + Self::continuation_is_in_origin_chain(continuation, target_cont_id, visited) } EvalReturnContinuation::ApplyResolveFunction { .. } | EvalReturnContinuation::ApplyResolveArg { .. } @@ -324,13 +332,14 @@ impl VM { dispatch_id: DispatchId, ) -> Option<(SegmentId, Continuation, Marker)> { let dispatch = self.dispatch_observer.dispatch(dispatch_id)?; - self.handler_dispatch_is_live(&dispatch.active_handler.continuation).then(|| { - ( - dispatch.active_handler.segment_id, - dispatch.active_handler.continuation.clone(), - dispatch.active_handler.marker, - ) - }) + self.handler_dispatch_is_live(&dispatch.active_handler.continuation) + .then(|| { + ( + dispatch.active_handler.segment_id, + dispatch.active_handler.continuation.clone(), + dispatch.active_handler.marker, + ) + }) } pub(super) fn handler_dispatch_for_any( @@ -605,9 +614,9 @@ impl VM { self.marker_handler_trace_info(marker) .is_some_and(|(_, kind, file, _)| { kind == HandlerKind::Python - && file.as_deref().is_some_and(|path| { - !Self::is_internal_doeff_handler_source_file(path) - }) + && file + .as_deref() + .is_some_and(|path| !Self::is_internal_doeff_handler_source_file(path)) }) } @@ -630,7 +639,8 @@ impl VM { let seg_id = target_seg_id?; if let Some(dispatch_id) = self.dispatch_observer.segment_dispatch_id(seg_id) { - if let Some((active_seg_id, continuation, _)) = self.handler_dispatch_for_any(dispatch_id) + if let Some((active_seg_id, continuation, _)) = + self.handler_dispatch_for_any(dispatch_id) { if active_seg_id == seg_id && self.handler_dispatch_is_live(&continuation) { return Some(continuation); @@ -1028,14 +1038,12 @@ impl VM { fn annotate_live_continuation(&self, continuation: &mut Continuation, seg_id: SegmentId) { continuation.set_resume_dispatch_id(self.current_segment_dispatch_id_any()); - continuation.set_dispatch_handler_hint( - self.handlers_in_caller_chain(seg_id) - .first() - .map(|entry| crate::continuation::DispatchHandlerHint { - marker: entry.marker, - prompt_seg_id: entry.prompt_seg_id, - }), - ); + continuation.set_dispatch_handler_hint(self.handlers_in_caller_chain(seg_id).first().map( + |entry| crate::continuation::DispatchHandlerHint { + marker: entry.marker, + prompt_seg_id: entry.prompt_seg_id, + }, + )); } pub(crate) fn capture_live_continuation( @@ -1165,17 +1173,18 @@ impl VM { self.dispatch_observer .segment_dispatch_id(seg_id) .and_then(|dispatch_id| { - self.dispatch_origin_for_dispatch_id(dispatch_id).map(|origin| { - self.handlers_in_caller_chain( - origin - .k_origin - .segment_id() - .expect("dispatch origin continuations must be captured"), - ) - .into_iter() - .map(|entry| entry.prompt_seg_id) - .collect() - }) + self.dispatch_origin_for_dispatch_id(dispatch_id) + .map(|origin| { + self.handlers_in_caller_chain( + origin + .k_origin + .segment_id() + .expect("dispatch origin continuations must be captured"), + ) + .into_iter() + .map(|entry| entry.prompt_seg_id) + .collect() + }) }) .unwrap_or_default() } else { @@ -1205,47 +1214,50 @@ impl VM { let resume_dispatch_id = self .current_segment .and_then(|current_seg_id| self.dispatch_observer.segment_dispatch_id(current_seg_id)); - let mut k_user = if Self::is_execution_context_effect(&effect) && original_exception.is_some() { - let reusable_origin = self.current_dispatch_origin().filter(|origin| { - let Some(current_original) = original_exception.as_ref() else { - return false; - }; - let Some(origin_original) = origin.original_exception.as_ref() else { - return false; - }; - let same_original_exception = match (origin_original, current_original) { - ( - PyException::Materialized { - exc_value: origin_value, - .. - }, - PyException::Materialized { - exc_value: current_value, - .. - }, - ) => Python::attach(|py| { - origin_value.bind(py).as_ptr() == current_value.bind(py).as_ptr() - }), - _ => false, - }; - if !same_original_exception { - return false; - } - let Some(origin_seg_id) = origin.k_origin.segment_id() else { - return false; - }; - let Some(current_hint) = self.first_handler_hint_in_caller_chain(seg_id) else { - return true; - }; - self.find_prompt_boundary_in_caller_chain(origin_seg_id, current_hint.marker) - .is_some() - }); - reusable_origin - .map(|origin| origin.k_origin.clone_for_dispatch(Some(dispatch_id))) - .unwrap_or_else(|| self.capture_live_continuation(current_seg, seg_id, Some(dispatch_id))) - } else { - self.capture_live_continuation(current_seg, seg_id, Some(dispatch_id)) - }; + let mut k_user = + if Self::is_execution_context_effect(&effect) && original_exception.is_some() { + let reusable_origin = self.current_dispatch_origin().filter(|origin| { + let Some(current_original) = original_exception.as_ref() else { + return false; + }; + let Some(origin_original) = origin.original_exception.as_ref() else { + return false; + }; + let same_original_exception = match (origin_original, current_original) { + ( + PyException::Materialized { + exc_value: origin_value, + .. + }, + PyException::Materialized { + exc_value: current_value, + .. + }, + ) => Python::attach(|py| { + origin_value.bind(py).as_ptr() == current_value.bind(py).as_ptr() + }), + _ => false, + }; + if !same_original_exception { + return false; + } + let Some(origin_seg_id) = origin.k_origin.segment_id() else { + return false; + }; + let Some(current_hint) = self.first_handler_hint_in_caller_chain(seg_id) else { + return true; + }; + self.find_prompt_boundary_in_caller_chain(origin_seg_id, current_hint.marker) + .is_some() + }); + reusable_origin + .map(|origin| origin.k_origin.clone_for_dispatch(Some(dispatch_id))) + .unwrap_or_else(|| { + self.capture_live_continuation(current_seg, seg_id, Some(dispatch_id)) + }) + } else { + self.capture_live_continuation(current_seg, seg_id, Some(dispatch_id)) + }; k_user.set_resume_dispatch_id(resume_dispatch_id); k_user.set_dispatch_handler_hint(self.first_handler_hint_in_caller_chain(seg_id)); if let Some(seg) = self.current_segment_mut() { @@ -1561,11 +1573,21 @@ impl VM { ) { let exec_seg = Self::continuation_exec_segment(k, caller, dispatch_id); let exec_seg_id = self.alloc_segment(exec_seg); + self.configure_continuation_segment(exec_seg_id, k, dispatch_id); + } + + fn configure_continuation_segment( + &mut self, + exec_seg_id: SegmentId, + k: &Continuation, + dispatch_id: Option, + ) { self.set_scope_parent(exec_seg_id, k.scope_parent_snapshot()); self.replace_scope_bindings(exec_seg_id, k.scope_bindings_snapshot().clone()); self.replace_segment_var_overrides(exec_seg_id, k.var_overrides_snapshot().clone()); if let Some(dispatch_id) = dispatch_id { - self.dispatch_observer.bind_segment(exec_seg_id, dispatch_id); + self.dispatch_observer + .bind_segment(exec_seg_id, dispatch_id); if let Some(hint) = k.dispatch_handler_hint() { let restoring_outer_dispatch = k.dispatch_id() != Some(dispatch_id); let resuming_user_defined_python_handler = @@ -1608,6 +1630,8 @@ impl VM { ); } } + } else { + self.dispatch_observer.unbind_segment(exec_seg_id); } self.current_segment = Some(exec_seg_id); } @@ -1631,7 +1655,8 @@ impl VM { let anchor_seg_id = self.alloc_segment(anchor); self.set_scope_parent(anchor_seg_id, caller); if let Some(dispatch_id) = dispatch_id { - self.dispatch_observer.bind_segment(anchor_seg_id, dispatch_id); + self.dispatch_observer + .bind_segment(anchor_seg_id, dispatch_id); } anchor_seg_id } @@ -1649,18 +1674,20 @@ impl VM { let anchor_seg_id = self.alloc_segment(anchor); self.set_scope_parent(anchor_seg_id, caller); if let Some(dispatch_id) = dispatch_id { - self.dispatch_observer.bind_segment(anchor_seg_id, dispatch_id); + self.dispatch_observer + .bind_segment(anchor_seg_id, dispatch_id); } anchor_seg_id } fn park_segment_after_capture(&mut self, seg_id: SegmentId) { - let Some(seg) = self.segments.get_mut(seg_id) else { + let Some(parent) = self.segments.get(seg_id).and_then(|seg| seg.parent) else { return; }; - seg.frames.clear(); - seg.pending_error_context = None; - seg.throw_parent = None; + self.free_segment(seg_id); + if self.current_segment == Some(seg_id) { + self.current_segment = Some(parent); + } } fn segment_is_tail_resume_return(&self, seg_id: SegmentId) -> bool { @@ -1691,10 +1718,6 @@ impl VM { caller: Option, dispatch_id: Option, ) -> Option { - if caller != k.captured_caller() { - return None; - } - let seg_id = k.segment_id()?; let snapshot = k.segment()?; if matches!(&snapshot.kind, SegmentKind::PromptBoundary { .. }) @@ -1707,12 +1730,17 @@ impl VM { return None; } let live = self.segments.get(seg_id)?; - if !self.current_segment_is_transition_anchor_for(seg_id) { + if self.current_segment_is_transition_anchor_for(seg_id) { + return (live.marker == snapshot.marker + && self.dispatch_observer.segment_dispatch_id(seg_id) == dispatch_id) + .then_some(seg_id); + } + if caller != k.captured_caller() { return None; } (live.marker == snapshot.marker && self.dispatch_observer.segment_dispatch_id(seg_id) == dispatch_id) - .then_some(seg_id) + .then_some(seg_id) } fn current_segment_is_transition_anchor_for(&self, target_seg_id: SegmentId) -> bool { @@ -1748,12 +1776,79 @@ impl VM { self.free_segment(current_seg_id); } + fn abandoned_branch_root_for_transfer( + &self, + preserved_ancestor: Option, + ) -> Option { + let current_seg_id = self.current_segment?; + let mut cursor = current_seg_id; + let mut child_below_preserved = None; + + loop { + if Some(cursor) == preserved_ancestor { + return child_below_preserved; + } + let parent = self.segments.get(cursor).and_then(|segment| segment.parent); + child_below_preserved = Some(cursor); + match parent { + Some(parent_id) => cursor = parent_id, + None => return Some(current_seg_id), + } + } + } + + fn free_segment_subtree(&mut self, root_seg_id: SegmentId) { + let mut stack = vec![root_seg_id]; + let mut order = Vec::new(); + let mut seen = HashSet::new(); + + while let Some(seg_id) = stack.pop() { + if !seen.insert(seg_id) { + continue; + } + order.push(seg_id); + for (child_id, segment) in self.segments.iter() { + if segment.parent == Some(seg_id) { + stack.push(child_id); + } + } + } + + for seg_id in order.into_iter().rev() { + self.free_segment(seg_id); + } + } + + fn abandon_current_live_branch_for_transfer(&mut self, preserved_ancestor: Option) { + let Some(root_seg_id) = self.abandoned_branch_root_for_transfer(preserved_ancestor) else { + return; + }; + crate::memory_stats::record_abandoned_transfer_branch_free(); + self.free_segment_subtree(root_seg_id); + self.current_segment = + preserved_ancestor.filter(|seg_id| self.segments.get(*seg_id).is_some()); + } + fn enter_or_reenter_continuation_segment_with_dispatch( &mut self, k: &Continuation, caller: Option, dispatch_id: Option, ) { + if let Some(exec_seg_id) = self.live_segment_id_for_in_place_reentry(k, caller, dispatch_id) + { + crate::memory_stats::record_in_place_reentry(); + self.free_current_transition_anchor_for(exec_seg_id); + let exec_seg = Self::continuation_exec_segment(k, caller, dispatch_id); + self.register_segment_persistent_state(&exec_seg); + let live_seg = self + .segments + .get_mut(exec_seg_id) + .expect("live continuation segment must exist for in-place reentry"); + *live_seg = exec_seg; + self.configure_continuation_segment(exec_seg_id, k, dispatch_id); + return; + } self.enter_continuation_segment_with_dispatch(k, caller, dispatch_id); } @@ -1826,6 +1921,13 @@ impl VM { } } }; + if kind.is_transferred() { + let preserved_ancestor = self + .current_handler_dispatch() + .map(|(_, _, _, _, prompt_seg_id)| prompt_seg_id) + .or(caller); + self.abandon_current_live_branch_for_transfer(preserved_ancestor); + } self.enter_or_reenter_continuation_segment_with_dispatch(&k, caller, dispatch_id); self.mode = Mode::Deliver(value); StepEvent::Continue @@ -1840,29 +1942,34 @@ impl VM { .and_then(|dispatch_id| { self.current_handler_dispatch() .filter(|(_, current_dispatch_id, ..)| *current_dispatch_id == dispatch_id) - .and_then(|(handler_seg_id, _, _continuation, marker, _prompt_seg_id)| { - if exact_origin_target && self.is_user_defined_python_handler_marker(marker) - { - if self.segment_is_tail_resume_return(handler_seg_id) { - let anchor_seg_id = - self.alloc_tail_resume_anchor(k.captured_caller(), Some(dispatch_id)); + .and_then( + |(handler_seg_id, _, _continuation, marker, _prompt_seg_id)| { + if exact_origin_target + && self.is_user_defined_python_handler_marker(marker) + { + if self.segment_is_tail_resume_return(handler_seg_id) { + let anchor_seg_id = self.alloc_tail_resume_anchor( + k.captured_caller(), + Some(dispatch_id), + ); + self.park_segment_after_capture(handler_seg_id); + return Some(anchor_seg_id); + } + let handler_return = self + .capture_continuation(Some(dispatch_id)) + .expect("dispatch resume requires a live handler segment"); + let anchor_seg_id = self.alloc_resume_return_anchor( + k.captured_caller(), + handler_return, + Some(dispatch_id), + ); self.park_segment_after_capture(handler_seg_id); return Some(anchor_seg_id); } - let handler_return = self - .capture_continuation(Some(dispatch_id)) - .expect("dispatch resume requires a live handler segment"); - let anchor_seg_id = self.alloc_resume_return_anchor( - k.captured_caller(), - handler_return, - Some(dispatch_id), - ); - self.park_segment_after_capture(handler_seg_id); - return Some(anchor_seg_id); - } - self.is_user_defined_python_handler_marker(marker) - .then_some(handler_seg_id) - }) + self.is_user_defined_python_handler_marker(marker) + .then_some(handler_seg_id) + }, + ) }) .or_else(|| k.captured_caller()); self.activate_continuation(ContinuationActivationKind::Resume, k, value, caller) @@ -2148,7 +2255,8 @@ impl VM { ) .or_else(|| self.continuation_chain_segment_id(parent_k_user)) .unwrap_or(prompt_seg_id); - let eval_return = if Self::continuation_chain_contains_return_to_continuation(parent_k_user) { + let eval_return = if Self::continuation_chain_contains_return_to_continuation(parent_k_user) + { EvalReturnContinuation::ReturnToContinuation { continuation: parent_k_user.clone(), } @@ -2159,19 +2267,13 @@ impl VM { }; pass_seg.push_frame(Frame::EvalReturn(Box::new(eval_return))); let pass_cont_id = ContId::fresh(); - let mut pass_cont = Continuation::with_id( - pass_cont_id, - &pass_seg, - pass_segment_id, - Some(dispatch_id), - ); + let mut pass_cont = + Continuation::with_id(pass_cont_id, &pass_seg, pass_segment_id, Some(dispatch_id)); pass_cont.set_resume_dispatch_id(parent_k_user.resume_dispatch_id()); - pass_cont.set_dispatch_handler_hint(Some( - crate::continuation::DispatchHandlerHint { - marker: handler_marker, - prompt_seg_id, - }, - )); + pass_cont.set_dispatch_handler_hint(Some(crate::continuation::DispatchHandlerHint { + marker: handler_marker, + prompt_seg_id, + })); Ok(pass_cont) } @@ -2437,15 +2539,14 @@ impl VM { .filter(|entry| !matches!(entry, ActiveChainEntry::ContextEntry { .. })) .collect(); self.finish_dispatch_tracking(dispatch_id); - self.mode = - match TraceState::enrich_original_exception_with_context( - original, - value, - active_chain, - ) { - Ok(exception) => Mode::Throw(exception), - Err(effect_err) => Mode::Throw(effect_err), - }; + self.mode = match TraceState::enrich_original_exception_with_context( + original, + value, + active_chain, + ) { + Ok(exception) => Mode::Throw(exception), + Err(effect_err) => Mode::Throw(effect_err), + }; return StepEvent::Continue; } self.finish_dispatch_tracking(dispatch_id); @@ -2478,12 +2579,14 @@ impl VM { .filter(|entry| !matches!(entry, ActiveChainEntry::ContextEntry { .. })) .collect(); self.finish_dispatch_tracking(dispatch_id); - self.mode = - match TraceState::enrich_original_exception_with_context(original, value, active_chain) - { - Ok(exception) => Mode::Throw(exception), - Err(effect_err) => Mode::Throw(effect_err), - }; + self.mode = match TraceState::enrich_original_exception_with_context( + original, + value, + active_chain, + ) { + Ok(exception) => Mode::Throw(exception), + Err(effect_err) => Mode::Throw(effect_err), + }; return StepEvent::Continue; } @@ -2560,21 +2663,21 @@ impl VM { // visible chain from the running segment. During dispatch we keep the // existing Delegate-aware behavior so handler code sees the same // caller-visible stack as the effect site. - let chain_start = if let Some((_, _, continuation, _, _)) = self.current_handler_dispatch() { + let chain_start = if let Some((_, _, continuation, _, _)) = self.current_handler_dispatch() + { self.root_delegate_parent_segment_id( &continuation, "GetHandlers parent chain must be Delegate-created continuations", ) .or_else(|| continuation.segment_id()) .or_else(|| { - self.current_dispatch_origin() - .and_then(|origin| { - self.root_delegate_parent_segment_id( - &origin.k_origin, - "GetHandlers parent chain must be Delegate-created continuations", - ) - .or_else(|| origin.k_origin.segment_id()) - }) + self.current_dispatch_origin().and_then(|origin| { + self.root_delegate_parent_segment_id( + &origin.k_origin, + "GetHandlers parent chain must be Delegate-created continuations", + ) + .or_else(|| origin.k_origin.segment_id()) + }) }) .expect("dispatch origin continuations must be captured") } else { diff --git a/packages/doeff-vm-core/src/vm/step.rs b/packages/doeff-vm-core/src/vm/step.rs index f58b6136..1c18f303 100644 --- a/packages/doeff-vm-core/src/vm/step.rs +++ b/packages/doeff-vm-core/src/vm/step.rs @@ -59,10 +59,15 @@ impl VM { } fn should_treat_python_handler_gen_return_as_handler_completion(&self) -> bool { - self.current_seg() - .frames - .iter() - .all(|frame| !matches!(frame, Frame::Program { handler_kind: Some(_), .. })) + self.current_seg().frames.iter().all(|frame| { + !matches!( + frame, + Frame::Program { + handler_kind: Some(_), + .. + } + ) + }) } /// Set mode to Throw with a RuntimeError and return Continue. @@ -163,10 +168,10 @@ impl VM { ))); } - let stream = Arc::new(std::sync::Mutex::new(Box::new(PythonGeneratorStream::new( + let stream = IRStreamRef::new(Box::new(PythonGeneratorStream::new( PyShared::new(wrapped.generator.clone_ref(py)), PyShared::new(wrapped.get_frame.clone_ref(py)), - )) as Box)); + )) as Box); Ok(( stream, Self::merged_metadata_from_doeff( @@ -235,8 +240,7 @@ impl VM { let caller = segment.parent; let scope_parent = self.scope_parent(seg_id); let throw_parent = segment.throw_parent.clone(); - let mode = - std::mem::replace(&mut self.mode, Mode::Deliver(Value::Unit)); + let mode = std::mem::replace(&mut self.mode, Mode::Deliver(Value::Unit)); match mode { Mode::Deliver(value) => { // Don't free here — step_return reads the segment's caller. @@ -263,19 +267,20 @@ impl VM { let active_chain = self .assemble_active_chain(Some(&exc)) .into_iter() - .filter(|entry| !matches!(entry, ActiveChainEntry::ContextEntry { .. })) + .filter(|entry| { + !matches!(entry, ActiveChainEntry::ContextEntry { .. }) + }) .collect::>(); - let exc = - if Self::should_enrich_uncaught_exception_with_active_chain( - &active_chain, - ) { - self.enrich_uncaught_exception_with_active_chain( - exc, - active_chain.clone(), - ) - } else { - exc - }; + let exc = if Self::should_enrich_uncaught_exception_with_active_chain( + &active_chain, + ) { + self.enrich_uncaught_exception_with_active_chain( + exc, + active_chain.clone(), + ) + } else { + exc + }; self.completed_segment = Some(seg_id); self.store_completed_outputs_from(seg_id); self.reparent_children(seg_id, None, scope_parent); @@ -460,8 +465,7 @@ impl VM { } match mode { Mode::Deliver(value) => { - self.mode = - self.handle_interceptor_apply_result(continuation, value); + self.mode = self.handle_interceptor_apply_result(continuation, value); StepEvent::Continue } Mode::Throw(exc) => { @@ -487,8 +491,7 @@ impl VM { seg.interceptor_eval_depth = seg.interceptor_eval_depth.saturating_sub(1); match mode { Mode::Deliver(value) => { - self.mode = - self.handle_interceptor_eval_result(continuation, value); + self.mode = self.handle_interceptor_eval_result(continuation, value); StepEvent::Continue } Mode::Throw(exc) => { @@ -521,9 +524,7 @@ impl VM { StepEvent::Continue } Mode::HandleYield(yielded) => { - unreachable!( - "tail-resume return frame received HandleYield mode: {yielded:?}" - ) + unreachable!("tail-resume return frame received HandleYield mode: {yielded:?}") } Mode::Return(value) => { unreachable!("tail-resume return frame received Return mode: {value:?}") @@ -578,8 +579,7 @@ impl VM { } match mode { Mode::Deliver(value) => { - self.mode = - self.mode_from_eval_return_continuation(continuation, value); + self.mode = self.mode_from_eval_return_continuation(continuation, value); StepEvent::Continue } Mode::Throw(exc) => { @@ -858,7 +858,10 @@ impl VM { self.handler_stream_throw_continuation(&stream, handler_kind) { self.mode = Mode::HandleYield( - if self.exact_dispatch_origin_for_continuation(&continuation).is_some() { + if self + .exact_dispatch_origin_for_continuation(&continuation) + .is_some() + { DoCtrl::TransferThrow { continuation, exception: exc, @@ -906,13 +909,12 @@ impl VM { | PythonCall::EvalExpr { .. } | PythonCall::CallAsync { .. } => None, }; - self.pending_python = - Some(PendingPython::StepUserGenerator { - stream, - metadata, - handler_kind, - incoming_throw, - }); + self.pending_python = Some(PendingPython::StepUserGenerator { + stream, + metadata, + handler_kind, + incoming_throw, + }); return StepEvent::NeedsPython(call); } @@ -957,8 +959,7 @@ impl VM { ), ) }; - self.pending_python = - Some(PendingPython::RustProgramContinuation { marker, k }); + self.pending_python = Some(PendingPython::RustProgramContinuation { marker, k }); StepEvent::NeedsPython(call) } } @@ -1435,14 +1436,13 @@ impl VM { } fn step_handle_yield(&mut self) -> StepEvent { - let yielded = - match std::mem::replace(&mut self.mode, Mode::Deliver(Value::Unit)) { - Mode::HandleYield(y) => y, - other => { - self.mode = other; - return StepEvent::Error(VMError::internal("invalid mode for handle_yield")); - } - }; + let yielded = match std::mem::replace(&mut self.mode, Mode::Deliver(Value::Unit)) { + Mode::HandleYield(y) => y, + other => { + self.mode = other; + return StepEvent::Error(VMError::internal("invalid mode for handle_yield")); + } + }; match yielded { DoCtrl::Pure { value } => self.handle_yield_pure(value), DoCtrl::Map { @@ -2248,14 +2248,13 @@ impl VM { } fn step_return(&mut self) -> StepEvent { - let value = - match std::mem::replace(&mut self.mode, Mode::Deliver(Value::Unit)) { - Mode::Return(v) => v, - other => { - self.mode = other; - return StepEvent::Error(VMError::internal("invalid mode for return")); - } - }; + let value = match std::mem::replace(&mut self.mode, Mode::Deliver(Value::Unit)) { + Mode::Return(v) => v, + other => { + self.mode = other; + return StepEvent::Error(VMError::internal("invalid mode for return")); + } + }; let seg_id = match self.current_segment { Some(id) => id, @@ -2354,8 +2353,7 @@ impl VM { if let Some(ref m) = metadata { self.emit_frame_exited_due_to_error(None, m, None, &exception); } - self.mode = - self.mode_after_generror(GenErrorSite::EvalExpr, exception, false); + self.mode = self.mode_after_generror(GenErrorSite::EvalExpr, exception, false); } PyCallOutcome::GenReturn(value) | PyCallOutcome::Value(value) => { if metadata.is_some() && matches!(value, Value::Python(_)) { @@ -2637,8 +2635,7 @@ impl VM { return; } - self.mode = - self.mode_after_generror(GenErrorSite::ExpandReturnProgram, exception, false); + self.mode = self.mode_after_generror(GenErrorSite::ExpandReturnProgram, exception, false); } fn receive_step_user_generator_result( @@ -2708,7 +2705,10 @@ impl VM { self.handler_stream_throw_continuation(&stream, handler_kind) { self.mode = Mode::HandleYield( - if self.exact_dispatch_origin_for_continuation(&continuation).is_some() { + if self + .exact_dispatch_origin_for_continuation(&continuation) + .is_some() + { DoCtrl::TransferThrow { continuation, exception, @@ -2797,8 +2797,7 @@ impl VM { self.mode = Mode::Deliver(result); } PyCallOutcome::GenError(exception) => { - self.mode = - self.mode_after_generror(GenErrorSite::AsyncEscape, exception, false); + self.mode = self.mode_after_generror(GenErrorSite::AsyncEscape, exception, false); } PyCallOutcome::GenYield(_) | PyCallOutcome::GenReturn(_) => { self.receive_unexpected_outcome(); diff --git a/packages/doeff-vm-core/src/vm/vm_trace.rs b/packages/doeff-vm-core/src/vm/vm_trace.rs index 6c7665bc..873482e5 100644 --- a/packages/doeff-vm-core/src/vm/vm_trace.rs +++ b/packages/doeff-vm-core/src/vm/vm_trace.rs @@ -8,7 +8,7 @@ impl VM { Frame::Program { stream: snapshot_stream, .. - } => Arc::ptr_eq(&snapshot_stream, stream), + } => IRStreamRef::ptr_eq(snapshot_stream, stream), Frame::InterceptorApply(_) | Frame::InterceptorEval(_) | Frame::EvalReturn(_) diff --git a/packages/doeff-vm/Cargo.lock b/packages/doeff-vm/Cargo.lock index 7b176cc6..9d9c13fd 100644 --- a/packages/doeff-vm/Cargo.lock +++ b/packages/doeff-vm/Cargo.lock @@ -16,6 +16,7 @@ version = "0.1.0" dependencies = [ "doeff-core-effects", "doeff-vm-core", + "libc", "pyo3", ] diff --git a/packages/doeff-vm/Cargo.toml b/packages/doeff-vm/Cargo.toml index 6a65c721..9d933ee1 100644 --- a/packages/doeff-vm/Cargo.toml +++ b/packages/doeff-vm/Cargo.toml @@ -20,6 +20,7 @@ vm_debug_logs = [] pyo3 = { version = "0.28", features = ["py-clone"] } doeff-vm-core = { path = "../doeff-vm-core", features = ["python_bridge"] } doeff-core-effects = { path = "../doeff-core-effects" } +libc = "0.2" [dev-dependencies] pyo3 = { version = "0.28", features = ["auto-initialize", "py-clone"] } diff --git a/packages/doeff-vm/doeff_vm/__init__.py b/packages/doeff-vm/doeff_vm/__init__.py index cee5e873..454906f3 100644 --- a/packages/doeff-vm/doeff_vm/__init__.py +++ b/packages/doeff-vm/doeff_vm/__init__.py @@ -176,6 +176,7 @@ def validated_nesting_to_generator(self): async_run = _ext.async_run +memory_stats = _ext.memory_stats state = _ext.state diff --git a/packages/doeff-vm/src/pyvm.rs b/packages/doeff-vm/src/pyvm.rs index 23e164fb..524d1b19 100644 --- a/packages/doeff-vm/src/pyvm.rs +++ b/packages/doeff-vm/src/pyvm.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use pyo3::exceptions::{ PyAttributeError, PyBaseException, PyRuntimeError, PyStopIteration, PyTypeError, @@ -21,7 +21,7 @@ use crate::effect::{ use crate::error::VMError; use crate::frame::CallMetadata; use crate::ids::{Marker, SegmentId}; -use crate::ir_stream::{IRStream, PythonGeneratorStream}; +use crate::ir_stream::{IRStream, IRStreamRef, PythonGeneratorStream}; use crate::kleisli::{DgfnKleisli, IdentityKleisli, KleisliRef, PyKleisli}; use crate::py_key::HashedPyKey; use crate::py_shared::PyShared; @@ -33,8 +33,8 @@ use crate::vm::VM; use doeff_core_effects::scheduler::{set_run_external_wait_mode, ExternalWaitMode}; use doeff_core_effects::sentinels::PyRustHandlerSentinel; use doeff_vm_core::{ - install_vm_hooks, DoExprTag, PyDoCtrlBase, PyDoExprBase, PyEffectBase, PyK, PyResultErr, - PyResultOk, PyTraceFrame, PyTraceHop, PyVar, VmHooks, + install_vm_hooks, live_object_counts, DoExprTag, PyDoCtrlBase, PyDoExprBase, PyEffectBase, PyK, + PyResultErr, PyResultOk, PyTraceFrame, PyTraceHop, PyVar, VmHooks, }; fn ensure_vm_core_hooks_installed() { @@ -44,6 +44,17 @@ fn ensure_vm_core_hooks_installed() { }); } +#[cfg(target_os = "linux")] +fn current_rust_heap_bytes() -> usize { + let info = unsafe { libc::mallinfo2() }; + (info.uordblks as usize).saturating_add(info.hblkhd as usize) +} + +#[cfg(not(target_os = "linux"))] +fn current_rust_heap_bytes() -> usize { + 0 +} + fn build_traceback_data_pyobject( py: Python<'_>, trace: Vec, @@ -503,6 +514,68 @@ impl PyVM { self.vm.continuation_registry.len() } + pub fn memory_stats(&self, py: Python<'_>) -> PyResult> { + let counts = live_object_counts(); + let mut normal_segments = 0usize; + let mut prompt_segments = 0usize; + let mut interceptor_segments = 0usize; + let mut mask_segments = 0usize; + let mut segments_with_frames = 0usize; + let mut empty_segments = 0usize; + let mut program_frames = 0usize; + let mut interceptor_frames = 0usize; + let mut eval_return_frames = 0usize; + let mut other_frames = 0usize; + for (_, segment) in self.vm.segments.iter() { + match &segment.kind { + SegmentKind::Normal => normal_segments += 1, + SegmentKind::PromptBoundary { .. } => prompt_segments += 1, + SegmentKind::InterceptorBoundary { .. } => interceptor_segments += 1, + SegmentKind::MaskBoundary { .. } => mask_segments += 1, + } + if segment.frames.is_empty() { + empty_segments += 1; + } else { + segments_with_frames += 1; + } + for frame in &segment.frames { + match frame { + crate::frame::Frame::Program { .. } => program_frames += 1, + crate::frame::Frame::InterceptorApply(_) + | crate::frame::Frame::InterceptorEval(_) => interceptor_frames += 1, + crate::frame::Frame::EvalReturn(_) => eval_return_frames += 1, + crate::frame::Frame::MapReturn { .. } + | crate::frame::Frame::FlatMapBindResult + | crate::frame::Frame::FlatMapBindSource { .. } + | crate::frame::Frame::InterceptBodyReturn { .. } => other_frames += 1, + } + } + } + let dict = PyDict::new(py); + dict.set_item("arena_segments", self.vm.segments.len())?; + dict.set_item("continuation_registry", self.vm.continuation_registry.len())?; + dict.set_item("normal_segments", normal_segments)?; + dict.set_item("prompt_segments", prompt_segments)?; + dict.set_item("interceptor_segments", interceptor_segments)?; + dict.set_item("mask_segments", mask_segments)?; + dict.set_item("segments_with_frames", segments_with_frames)?; + dict.set_item("empty_segments", empty_segments)?; + dict.set_item("program_frames", program_frames)?; + dict.set_item("interceptor_frames", interceptor_frames)?; + dict.set_item("eval_return_frames", eval_return_frames)?; + dict.set_item("other_frames", other_frames)?; + dict.set_item("live_segments", counts.live_segments)?; + dict.set_item("live_continuations", counts.live_continuations)?; + dict.set_item("live_ir_streams", counts.live_ir_streams)?; + dict.set_item("in_place_reentries", counts.in_place_reentries)?; + dict.set_item( + "abandoned_transfer_branch_frees", + counts.abandoned_transfer_branch_frees, + )?; + dict.set_item("rust_heap_bytes", current_rust_heap_bytes())?; + Ok(dict.into()) + } + pub fn enable_debug(&mut self, level: String) { use crate::vm::DebugConfig; let config = match level.as_str() { @@ -1605,7 +1678,10 @@ fn scope_bindings_from_pyany(obj: &Bound<'_, PyAny>) -> PyResult>> = - Arc::new(Mutex::new(Box::new(PythonGeneratorStream::new( - PyShared::new(wrapped.generator.clone_ref(py)), - PyShared::new(wrapped.get_frame.clone_ref(py)), - )) as Box)); + let stream = IRStreamRef::new(Box::new(PythonGeneratorStream::new( + PyShared::new(wrapped.generator.clone_ref(py)), + PyShared::new(wrapped.get_frame.clone_ref(py)), + )) as Box); Ok(DoCtrl::IRStream { stream, @@ -1931,16 +2006,22 @@ pub(crate) fn classify_yielded_bound( } DoExprTag::ReadVar => { let read: PyRef<'_, PyReadVar> = obj.extract()?; - let var: PyRef<'_, PyVar> = read.var.bind(py).extract().map_err(|_| { - PyTypeError::new_err("ReadVar.var must be Var") - })?; - Ok(DoCtrl::ReadVar { var: var.to_var_id() }) + let var: PyRef<'_, PyVar> = read + .var + .bind(py) + .extract() + .map_err(|_| PyTypeError::new_err("ReadVar.var must be Var"))?; + Ok(DoCtrl::ReadVar { + var: var.to_var_id(), + }) } DoExprTag::WriteVar => { let write: PyRef<'_, PyWriteVar> = obj.extract()?; - let var: PyRef<'_, PyVar> = write.var.bind(py).extract().map_err(|_| { - PyTypeError::new_err("WriteVar.var must be Var") - })?; + let var: PyRef<'_, PyVar> = write + .var + .bind(py) + .extract() + .map_err(|_| PyTypeError::new_err("WriteVar.var must be Var"))?; Ok(DoCtrl::WriteVar { var: var.to_var_id(), value: Value::from_pyobject(write.value.bind(py)), @@ -1948,9 +2029,11 @@ pub(crate) fn classify_yielded_bound( } DoExprTag::WriteVarNonlocal => { let write: PyRef<'_, PyWriteVarNonlocal> = obj.extract()?; - let var: PyRef<'_, PyVar> = write.var.bind(py).extract().map_err(|_| { - PyTypeError::new_err("WriteVarNonlocal.var must be Var") - })?; + let var: PyRef<'_, PyVar> = write + .var + .bind(py) + .extract() + .map_err(|_| PyTypeError::new_err("WriteVarNonlocal.var must be Var"))?; Ok(DoCtrl::WriteVarNonlocal { var: var.to_var_id(), value: Value::from_pyobject(write.value.bind(py)), @@ -3466,7 +3549,7 @@ mod tests { handlers_list.unbind().into(), None, ) - .unwrap(), + .unwrap(), ) .unwrap() .into_any(); @@ -4089,7 +4172,6 @@ mod tests { }); } - #[test] fn test_r13i_effect_base_tag() { Python::attach(|py| { @@ -4446,6 +4528,22 @@ fn async_run<'py>( Ok(ns.get_item("_coro")?.unwrap().into_any()) } +#[pyfunction] +fn memory_stats(py: Python<'_>) -> PyResult> { + let counts = live_object_counts(); + let dict = PyDict::new(py); + dict.set_item("live_segments", counts.live_segments)?; + dict.set_item("live_continuations", counts.live_continuations)?; + dict.set_item("live_ir_streams", counts.live_ir_streams)?; + dict.set_item("in_place_reentries", counts.in_place_reentries)?; + dict.set_item( + "abandoned_transfer_branch_frees", + counts.abandoned_transfer_branch_frees, + )?; + dict.set_item("rust_heap_bytes", current_rust_heap_bytes())?; + Ok(dict.into()) +} + #[pymodule] pub fn doeff_vm(m: &Bound<'_, PyModule>) -> PyResult<()> { ensure_vm_core_hooks_installed(); @@ -4544,5 +4642,6 @@ pub fn doeff_vm(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add("TAG_UNKNOWN", DoExprTag::Unknown as u8)?; m.add_function(wrap_pyfunction!(run, m)?)?; m.add_function(wrap_pyfunction!(async_run, m)?)?; + m.add_function(wrap_pyfunction!(memory_stats, m)?)?; Ok(()) } diff --git a/packages/doeff-vm/tests/test_memory_stats.py b/packages/doeff-vm/tests/test_memory_stats.py new file mode 100644 index 00000000..60085418 --- /dev/null +++ b/packages/doeff-vm/tests/test_memory_stats.py @@ -0,0 +1,27 @@ +import doeff_vm + + +def test_memory_stats_exported_with_expected_keys(): + stats = doeff_vm.memory_stats() + + assert callable(doeff_vm.memory_stats) + assert set(stats) >= { + "live_segments", + "live_continuations", + "live_ir_streams", + "rust_heap_bytes", + } + assert all(isinstance(stats[key], int) for key in stats) + + +def test_memory_stats_counts_return_to_baseline_after_run(): + before = doeff_vm.memory_stats() + + result = doeff_vm.run(doeff_vm.Pure(7)) + after = doeff_vm.memory_stats() + + assert result.is_ok() + assert result.value == 7 + assert after["live_segments"] == before["live_segments"] + assert after["live_continuations"] == before["live_continuations"] + assert after["live_ir_streams"] == before["live_ir_streams"] From 6b02e3e3e4b99f414ae37deb8ec0158423b15ed3 Mon Sep 17 00:00:00 2001 From: proboscis Date: Sat, 21 Mar 2026 16:30:53 +0900 Subject: [PATCH 2/4] Tighten vm run-session cleanup --- packages/doeff-vm-core/src/vm.rs | 4 ++ packages/doeff-vm/tests/test_memory_stats.py | 67 ++++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/packages/doeff-vm-core/src/vm.rs b/packages/doeff-vm-core/src/vm.rs index a219b1b2..01a2b6ea 100644 --- a/packages/doeff-vm-core/src/vm.rs +++ b/packages/doeff-vm-core/src/vm.rs @@ -367,9 +367,13 @@ impl VM { self.continuation_registry.clear(); self.consumed_cont_ids.clear(); self.dispatch_observer.clear(); + self.segments.clear(); self.var_store.clear(); self.mode = Mode::Deliver(Value::Unit); self.pending_python = None; + self.current_segment = None; + self.completed_segment = None; + self.trace_state.clear(); self.scope_parents.clear(); self.scope_state_store.clear(); self.scope_writer_logs.clear(); diff --git a/packages/doeff-vm/tests/test_memory_stats.py b/packages/doeff-vm/tests/test_memory_stats.py index 60085418..e416d458 100644 --- a/packages/doeff-vm/tests/test_memory_stats.py +++ b/packages/doeff-vm/tests/test_memory_stats.py @@ -1,4 +1,13 @@ +from dataclasses import dataclass +from pathlib import Path + import doeff_vm +from doeff import Gather, Pass, Resume, Spawn, WithHandler, do +from doeff.effects.base import Effect, EffectBase +from doeff.handlers import sqlite_cache_handler +from doeff.handlers.cache_handlers import memo_rewriters +from doeff.rust_vm import default_handlers +from doeff.rust_vm import run as vm_run def test_memory_stats_exported_with_expected_keys(): @@ -25,3 +34,61 @@ def test_memory_stats_counts_return_to_baseline_after_run(): assert after["live_segments"] == before["live_segments"] assert after["live_continuations"] == before["live_continuations"] assert after["live_ir_streams"] == before["live_ir_streams"] + + +def test_memory_stats_counts_return_to_baseline_after_deep_handler_spawn_chain( + tmp_path: Path, +): + cache_path = tmp_path / "vm_memory_stats.sqlite3" + + @dataclass(frozen=True, kw_only=True) + class SyntheticQuery(EffectBase): + key: str + + def synthetic_query_handler(): + @do + def _handler(effect: Effect, k): + if not isinstance(effect, SyntheticQuery): + yield Pass() + return + return (yield Resume(k, effect.key)) + + return _handler + + @do + def worker(batch_index: int, task_index: int): + return (yield SyntheticQuery(key=f"{batch_index}:{task_index}")) + + @do + def scenario(): + for batch_index in range(2): + tasks = [] + for task_index in range(20): + task = yield Spawn( + worker(batch_index=batch_index, task_index=task_index), + daemon=False, + ) + tasks.append(task) + values = yield Gather(*tasks) + if len(values) != 20: + raise AssertionError(f"expected 20 values, got {len(values)}") + return None + + wrapped = scenario() + for handler in reversed( + ( + synthetic_query_handler(), + *memo_rewriters(SyntheticQuery), + sqlite_cache_handler(cache_path), + ) + ): + wrapped = WithHandler(handler, wrapped) + + before = doeff_vm.memory_stats() + result = vm_run(wrapped, handlers=default_handlers()) + after = doeff_vm.memory_stats() + + assert result.is_ok() + assert after["live_segments"] == before["live_segments"] + assert after["live_continuations"] == before["live_continuations"] + assert after["live_ir_streams"] == before["live_ir_streams"] From 5e55acb93ddbad8ee749bc52ef60ebf6ba1ac666 Mon Sep 17 00:00:00 2001 From: proboscis Date: Sat, 21 Mar 2026 16:51:50 +0900 Subject: [PATCH 3/4] Shrink vm runtime buffers after run --- packages/doeff-vm-core/src/arena.rs | 9 ++ packages/doeff-vm-core/src/debug_state.rs | 4 + .../doeff-vm-core/src/dispatch_observer.rs | 21 ++++ packages/doeff-vm-core/src/trace_state.rs | 21 ++++ packages/doeff-vm-core/src/var_store.rs | 6 ++ packages/doeff-vm-core/src/vm.rs | 95 +++++++++++++++++++ packages/doeff-vm/src/pyvm.rs | 59 ++++++++++++ packages/doeff-vm/tests/test_memory_stats.py | 62 ++++++++++++ 8 files changed, 277 insertions(+) diff --git a/packages/doeff-vm-core/src/arena.rs b/packages/doeff-vm-core/src/arena.rs index 39adc709..277e56db 100644 --- a/packages/doeff-vm-core/src/arena.rs +++ b/packages/doeff-vm-core/src/arena.rs @@ -84,6 +84,10 @@ impl FiberArena { } pub fn capacity(&self) -> usize { + self.fibers.capacity() + } + + pub fn slot_count(&self) -> usize { self.fibers.len() } @@ -91,6 +95,11 @@ impl FiberArena { self.fibers.clear(); self.free_list.clear(); } + + pub fn shrink_to_fit(&mut self) { + self.fibers.shrink_to_fit(); + self.free_list.shrink_to_fit(); + } } impl Default for FiberArena { diff --git a/packages/doeff-vm-core/src/debug_state.rs b/packages/doeff-vm-core/src/debug_state.rs index f1365824..07b66880 100644 --- a/packages/doeff-vm-core/src/debug_state.rs +++ b/packages/doeff-vm-core/src/debug_state.rs @@ -46,6 +46,10 @@ impl DebugState { self.trace_events.clear(); } + pub(crate) fn shrink_to_fit(&mut self) { + self.trace_events.shrink_to_fit(); + } + pub(crate) fn trace_events(&self) -> &[TraceEvent] { &self.trace_events } diff --git a/packages/doeff-vm-core/src/dispatch_observer.rs b/packages/doeff-vm-core/src/dispatch_observer.rs index d2e86284..deb7fa7e 100644 --- a/packages/doeff-vm-core/src/dispatch_observer.rs +++ b/packages/doeff-vm-core/src/dispatch_observer.rs @@ -33,6 +33,27 @@ impl DispatchObserver { self.segment_dispatch_ids.clear(); } + pub(crate) fn shrink_to_fit(&mut self) { + self.dispatches.shrink_to_fit(); + self.segment_dispatch_ids.shrink_to_fit(); + } + + pub(crate) fn dispatch_count(&self) -> usize { + self.dispatches.len() + } + + pub(crate) fn segment_binding_count(&self) -> usize { + self.segment_dispatch_ids.len() + } + + pub(crate) fn dispatch_capacity(&self) -> usize { + self.dispatches.capacity() + } + + pub(crate) fn segment_binding_capacity(&self) -> usize { + self.segment_dispatch_ids.capacity() + } + pub(crate) fn start_dispatch( &mut self, dispatch_id: DispatchId, diff --git a/packages/doeff-vm-core/src/trace_state.rs b/packages/doeff-vm-core/src/trace_state.rs index 3fff1174..3fb16879 100644 --- a/packages/doeff-vm-core/src/trace_state.rs +++ b/packages/doeff-vm-core/src/trace_state.rs @@ -75,6 +75,22 @@ impl Default for TraceState { } impl TraceState { + pub(crate) fn frame_stack_len(&self) -> usize { + self.frame_stack.len() + } + + pub(crate) fn dispatch_display_count(&self) -> usize { + self.dispatch_displays.len() + } + + pub(crate) fn frame_stack_capacity(&self) -> usize { + self.frame_stack.capacity() + } + + pub(crate) fn dispatch_display_capacity(&self) -> usize { + self.dispatch_displays.capacity() + } + pub(crate) fn dispatch_has_terminal_result(&self, dispatch_id: DispatchId) -> bool { self.dispatch_display(dispatch_id) .is_some_and(Self::dispatch_result_prevents_throw_overwrite) @@ -84,6 +100,11 @@ impl TraceState { *self = Self::default(); } + pub(crate) fn shrink_to_fit(&mut self) { + self.frame_stack.shrink_to_fit(); + self.dispatch_displays.shrink_to_fit(); + } + pub(crate) fn finish_dispatch(&mut self, dispatch_id: DispatchId) { let keep_for_traceback = self .dispatch_displays diff --git a/packages/doeff-vm-core/src/var_store.rs b/packages/doeff-vm-core/src/var_store.rs index 8fc9d984..30fbcd26 100644 --- a/packages/doeff-vm-core/src/var_store.rs +++ b/packages/doeff-vm-core/src/var_store.rs @@ -18,6 +18,12 @@ impl VarStore { self.overrides_by_segment.clear(); } + pub fn shrink_to_fit(&mut self) { + self.cells.shrink_to_fit(); + self.bindings_by_segment.shrink_to_fit(); + self.overrides_by_segment.shrink_to_fit(); + } + pub fn init_segment(&mut self, seg_id: SegmentId) { self.bindings_by_segment.entry(seg_id).or_default(); self.overrides_by_segment.entry(seg_id).or_default(); diff --git a/packages/doeff-vm-core/src/vm.rs b/packages/doeff-vm-core/src/vm.rs index 01a2b6ea..394a2107 100644 --- a/packages/doeff-vm-core/src/vm.rs +++ b/packages/doeff-vm-core/src/vm.rs @@ -364,23 +364,38 @@ impl VM { handler.on_run_end(run_token); } self.run_handlers.clear(); + self.run_handlers.shrink_to_fit(); self.continuation_registry.clear(); + self.continuation_registry.shrink_to_fit(); self.consumed_cont_ids.clear(); + self.consumed_cont_ids.shrink_to_fit(); self.dispatch_observer.clear(); + self.dispatch_observer.shrink_to_fit(); self.segments.clear(); + self.segments.shrink_to_fit(); self.var_store.clear(); + self.var_store.shrink_to_fit(); self.mode = Mode::Deliver(Value::Unit); self.pending_python = None; self.current_segment = None; self.completed_segment = None; self.trace_state.clear(); + self.trace_state.shrink_to_fit(); + self.debug.shrink_to_fit(); self.scope_parents.clear(); + self.scope_parents.shrink_to_fit(); self.scope_state_store.clear(); + self.scope_state_store.shrink_to_fit(); self.scope_writer_logs.clear(); + self.scope_writer_logs.shrink_to_fit(); self.scope_persistent_epochs.clear(); + self.scope_persistent_epochs.shrink_to_fit(); self.retired_scope_state_store.clear(); + self.retired_scope_state_store.shrink_to_fit(); self.retired_scope_writer_logs.clear(); + self.retired_scope_writer_logs.shrink_to_fit(); self.retired_scope_persistent_epochs.clear(); + self.retired_scope_persistent_epochs.shrink_to_fit(); } pub fn enable_trace(&mut self, enabled: bool) { @@ -391,6 +406,86 @@ impl VM { self.debug.trace_events() } + pub fn dispatch_count(&self) -> usize { + self.dispatch_observer.dispatch_count() + } + + pub fn segment_dispatch_binding_count(&self) -> usize { + self.dispatch_observer.segment_binding_count() + } + + pub fn dispatch_capacity(&self) -> usize { + self.dispatch_observer.dispatch_capacity() + } + + pub fn segment_dispatch_binding_capacity(&self) -> usize { + self.dispatch_observer.segment_binding_capacity() + } + + pub fn trace_frame_stack_count(&self) -> usize { + self.trace_state.frame_stack_len() + } + + pub fn trace_dispatch_display_count(&self) -> usize { + self.trace_state.dispatch_display_count() + } + + pub fn trace_frame_stack_capacity(&self) -> usize { + self.trace_state.frame_stack_capacity() + } + + pub fn trace_dispatch_display_capacity(&self) -> usize { + self.trace_state.dispatch_display_capacity() + } + + pub fn scope_state_count(&self) -> usize { + self.scope_state_store.len() + } + + pub fn scope_writer_log_count(&self) -> usize { + self.scope_writer_logs.len() + } + + pub fn scope_epoch_count(&self) -> usize { + self.scope_persistent_epochs.len() + } + + pub fn retired_scope_state_count(&self) -> usize { + self.retired_scope_state_store.len() + } + + pub fn retired_scope_writer_log_count(&self) -> usize { + self.retired_scope_writer_logs.len() + } + + pub fn retired_scope_epoch_count(&self) -> usize { + self.retired_scope_persistent_epochs.len() + } + + pub fn scope_state_capacity(&self) -> usize { + self.scope_state_store.capacity() + } + + pub fn scope_writer_log_capacity(&self) -> usize { + self.scope_writer_logs.capacity() + } + + pub fn scope_epoch_capacity(&self) -> usize { + self.scope_persistent_epochs.capacity() + } + + pub fn retired_scope_state_capacity(&self) -> usize { + self.retired_scope_state_store.capacity() + } + + pub fn retired_scope_writer_log_capacity(&self) -> usize { + self.retired_scope_writer_logs.capacity() + } + + pub fn retired_scope_epoch_capacity(&self) -> usize { + self.retired_scope_persistent_epochs.capacity() + } + pub fn py_store(&self) -> Option<&PyStore> { self.py_store.as_ref() } diff --git a/packages/doeff-vm/src/pyvm.rs b/packages/doeff-vm/src/pyvm.rs index 524d1b19..aec0bf55 100644 --- a/packages/doeff-vm/src/pyvm.rs +++ b/packages/doeff-vm/src/pyvm.rs @@ -553,7 +553,66 @@ impl PyVM { } let dict = PyDict::new(py); dict.set_item("arena_segments", self.vm.segments.len())?; + dict.set_item("arena_slots", self.vm.segments.slot_count())?; + dict.set_item("arena_capacity", self.vm.segments.capacity())?; dict.set_item("continuation_registry", self.vm.continuation_registry.len())?; + dict.set_item("dispatch_count", self.vm.dispatch_count())?; + dict.set_item("dispatch_capacity", self.vm.dispatch_capacity())?; + dict.set_item( + "segment_dispatch_bindings", + self.vm.segment_dispatch_binding_count(), + )?; + dict.set_item( + "segment_dispatch_binding_capacity", + self.vm.segment_dispatch_binding_capacity(), + )?; + dict.set_item("trace_frame_stack", self.vm.trace_frame_stack_count())?; + dict.set_item( + "trace_frame_stack_capacity", + self.vm.trace_frame_stack_capacity(), + )?; + dict.set_item( + "trace_dispatch_displays", + self.vm.trace_dispatch_display_count(), + )?; + dict.set_item( + "trace_dispatch_display_capacity", + self.vm.trace_dispatch_display_capacity(), + )?; + dict.set_item("debug_trace_events", self.vm.trace_events().len())?; + dict.set_item("scope_state_count", self.vm.scope_state_count())?; + dict.set_item("scope_state_capacity", self.vm.scope_state_capacity())?; + dict.set_item("scope_writer_log_count", self.vm.scope_writer_log_count())?; + dict.set_item( + "scope_writer_log_capacity", + self.vm.scope_writer_log_capacity(), + )?; + dict.set_item("scope_epoch_count", self.vm.scope_epoch_count())?; + dict.set_item("scope_epoch_capacity", self.vm.scope_epoch_capacity())?; + dict.set_item( + "retired_scope_state_count", + self.vm.retired_scope_state_count(), + )?; + dict.set_item( + "retired_scope_state_capacity", + self.vm.retired_scope_state_capacity(), + )?; + dict.set_item( + "retired_scope_writer_log_count", + self.vm.retired_scope_writer_log_count(), + )?; + dict.set_item( + "retired_scope_writer_log_capacity", + self.vm.retired_scope_writer_log_capacity(), + )?; + dict.set_item( + "retired_scope_epoch_count", + self.vm.retired_scope_epoch_count(), + )?; + dict.set_item( + "retired_scope_epoch_capacity", + self.vm.retired_scope_epoch_capacity(), + )?; dict.set_item("normal_segments", normal_segments)?; dict.set_item("prompt_segments", prompt_segments)?; dict.set_item("interceptor_segments", interceptor_segments)?; diff --git a/packages/doeff-vm/tests/test_memory_stats.py b/packages/doeff-vm/tests/test_memory_stats.py index e416d458..e62ceee6 100644 --- a/packages/doeff-vm/tests/test_memory_stats.py +++ b/packages/doeff-vm/tests/test_memory_stats.py @@ -92,3 +92,65 @@ def scenario(): assert after["live_segments"] == before["live_segments"] assert after["live_continuations"] == before["live_continuations"] assert after["live_ir_streams"] == before["live_ir_streams"] + + +def test_pyvm_run_releases_internal_vm_capacities_after_deep_handler_spawn_chain( + tmp_path: Path, +): + cache_path = tmp_path / "vm_memory_stats_pyvm.sqlite3" + + @dataclass(frozen=True, kw_only=True) + class SyntheticQuery(EffectBase): + key: str + + def synthetic_query_handler(): + @do + def _handler(effect: Effect, k): + if not isinstance(effect, SyntheticQuery): + yield Pass() + return + return (yield Resume(k, effect.key)) + + return _handler + + @do + def worker(batch_index: int, task_index: int): + return (yield SyntheticQuery(key=f"{batch_index}:{task_index}")) + + @do + def scenario(): + for batch_index in range(2): + tasks = [] + for task_index in range(20): + task = yield Spawn( + worker(batch_index=batch_index, task_index=task_index), + daemon=False, + ) + tasks.append(task) + values = yield Gather(*tasks) + if len(values) != 20: + raise AssertionError(f"expected 20 values, got {len(values)}") + return None + + program = scenario() + for handler in reversed( + ( + synthetic_query_handler(), + *memo_rewriters(SyntheticQuery), + sqlite_cache_handler(cache_path), + *default_handlers(), + ) + ): + program = WithHandler(handler, program) + + vm = doeff_vm.PyVM() + vm.run(program) + after = vm.memory_stats() + + assert after["arena_capacity"] == 0 + assert after["dispatch_capacity"] == 0 + assert after["segment_dispatch_binding_capacity"] == 0 + assert after["scope_state_capacity"] == 0 + assert after["scope_writer_log_capacity"] == 0 + assert after["retired_scope_state_capacity"] == 0 + assert after["retired_scope_writer_log_capacity"] == 0 From d6ae0c81715a53c563cdf55087b139a9ad6c7987 Mon Sep 17 00:00:00 2001 From: proboscis Date: Sat, 21 Mar 2026 20:40:52 +0900 Subject: [PATCH 4/4] Restore vendored Result compatibility --- doeff/_vendor.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/doeff/_vendor.py b/doeff/_vendor.py index 7712c3a6..bf70a1d1 100644 --- a/doeff/_vendor.py +++ b/doeff/_vendor.py @@ -288,8 +288,60 @@ def __hash__(self) -> int: FrozenDict = frozendict +class Result(Generic[T_co]): + """Legacy Result base kept for older imports and persisted annotations.""" + + __slots__ = () + + def is_ok(self) -> bool: + return isinstance(self, Ok) + + def is_err(self) -> bool: + return isinstance(self, Err) + + def ok(self) -> T_co | None: + if isinstance(self, Ok): + return self.value + return None + + def err(self) -> Any | None: + if isinstance(self, Err): + return self.error + return None + + def expect(self, message: str) -> T_co: + if isinstance(self, Ok): + return self.value + if message: + raise RuntimeError(f"{message}: {self.error}") from self.error + raise self.error + + def unwrap(self) -> T_co: + if isinstance(self, Ok): + return self.value + raise self.error + + def unwrap_err(self) -> Any: + if isinstance(self, Err): + return self.error + raise RuntimeError("Called unwrap_err on Ok value") + + def map(self, func: Callable[[T_co], U]) -> "Result[U]": + if isinstance(self, Ok): + return Ok(func(self.value)) + return self + + def map_err(self, func: Callable[[Any], Any]) -> "Result[T_co]": + if isinstance(self, Err): + return Err(func(self.error)) + return self + + def __bool__(self) -> bool: + return self.is_ok() + + @dataclass(frozen=True) -class Ok(Generic[T_co]): +class Ok(Result[T_co], Generic[T_co]): """Legacy Result shim kept for persisted pickles created before the Rust move.""" value: T_co @@ -305,7 +357,7 @@ def __bool__(self) -> bool: @dataclass(frozen=True) -class Err: +class Err(Result[NoReturn]): """Legacy error shim kept for persisted pickles created before the Rust move.""" error: Any @@ -327,6 +379,7 @@ def __bool__(self) -> bool: "Maybe", "Nothing", "Ok", + "Result", "Some", "TraceError", "WGraph",