Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion doeff/_types_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
This module contains the foundational types with zero internal dependencies.
"""


import json
import traceback
import warnings
Expand Down Expand Up @@ -693,6 +692,12 @@ def error(self) -> BaseException: ...
@property
def traceback_data(self) -> Any | None: ...

@property
def last_active_chain(self) -> Any: ...

@property
def early_terminated(self) -> bool: ...

def is_ok(self) -> bool: ...

def is_err(self) -> bool: ...
Expand Down
68 changes: 68 additions & 0 deletions doeff/rust_vm.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,68 @@ def _print_doeff_trace(doeff_tb: Any | None) -> None:
return


_EARLY_TERMINATION_MESSAGE = "Program terminated early before the root program completed."


def _early_termination_exception(run_result: Any) -> BaseException:
try:
value = run_result.value
if isinstance(value, BaseException):
return value
try:
is_err = value.is_err
except AttributeError:
is_err = None
if callable(is_err) and is_err():
try:
error = value.error
except AttributeError:
error = None
if isinstance(error, BaseException):
return error
except Exception:
pass
return RuntimeError(_EARLY_TERMINATION_MESSAGE)


def _warn_early_termination_if_present(run_result: Any) -> None:
try:
early_terminated = run_result.early_terminated
except AttributeError:
return
if early_terminated is not True:
return

try:
active_chain = run_result.last_active_chain
except AttributeError:
active_chain = ()
if not isinstance(active_chain, (list, tuple)):
active_chain = ()

try:
from doeff.traceback import build_doeff_traceback

doeff_tb = build_doeff_traceback(
_early_termination_exception(run_result),
(),
active_chain,
allow_active=True,
)
except Exception as exc:
warnings.warn(f"Failed to build early termination warning: {exc}", stacklevel=2)
doeff_tb = None

try:
import sys

print(_EARLY_TERMINATION_MESSAGE, file=sys.stderr)
except Exception as exc:
warnings.warn(f"Failed to print early termination warning: {exc}", stacklevel=2)

_print_doeff_trace(doeff_tb)


def _run_call_kwargs(
run_fn: Any,
*,
Expand Down Expand Up @@ -532,6 +594,7 @@ def run(
store: dict[str, Any] | None = None,
trace: bool = False,
print_doeff_trace: bool = False,
warn_early_termination: bool = True,
) -> Any:
vm = _vm()
try:
Expand All @@ -551,6 +614,8 @@ def run(
doeff_tb = _build_doeff_traceback_if_present(result)
if print_doeff_trace:
_print_doeff_trace(doeff_tb)
if warn_early_termination:
_warn_early_termination_if_present(result)
return _raise_unhandled_effect_if_present(result, raise_unhandled=raise_unhandled)


Expand All @@ -561,6 +626,7 @@ async def async_run(
store: dict[str, Any] | None = None,
trace: bool = False,
print_doeff_trace: bool = False,
warn_early_termination: bool = True,
) -> Any:
vm = _vm()
try:
Expand All @@ -580,6 +646,8 @@ async def async_run(
doeff_tb = _build_doeff_traceback_if_present(result)
if print_doeff_trace:
_print_doeff_trace(doeff_tb)
if warn_early_termination:
_warn_early_termination_if_present(result)
return _raise_unhandled_effect_if_present(result, raise_unhandled=raise_unhandled)


Expand Down
53 changes: 53 additions & 0 deletions packages/doeff-vm-core/src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ pub struct VM {
pub current_segment: Option<SegmentId>,
pub(crate) debug: DebugState,
pub(crate) trace_state: TraceState,
pub(crate) last_active_chain: Vec<ActiveChainEntry>,
pub(crate) root_program_stream: Option<IRStreamRef>,
pub(crate) root_program_completed: bool,
pub(crate) early_terminated: bool,
pub continuation_registry: HashMap<ContId, Continuation>,
pub active_run_token: Option<u64>,
}
Expand All @@ -294,6 +298,10 @@ impl VM {
current_segment: None,
debug: DebugState::new(DebugConfig::default()),
trace_state: TraceState::default(),
last_active_chain: Vec::new(),
root_program_stream: None,
root_program_completed: false,
early_terminated: false,
continuation_registry: HashMap::new(),
active_run_token: None,
}
Expand All @@ -313,11 +321,56 @@ impl VM {
let token = NEXT_RUN_TOKEN.fetch_add(1, Ordering::Relaxed);
self.active_run_token = Some(token);
self.trace_state.clear();
self.last_active_chain.clear();
self.root_program_stream = None;
self.root_program_completed = false;
self.early_terminated = false;
self.interceptor_state.clear_for_run();
self.run_handlers.clear();
token
}

pub(crate) fn register_root_program_stream(
&mut self,
stream: &IRStreamRef,
handler_kind: Option<HandlerKind>,
) {
if handler_kind.is_some() || self.root_program_stream.is_some() {
return;
}
self.root_program_stream = Some(stream.clone());
}

pub(crate) fn maybe_mark_root_program_completed(&mut self, stream: &IRStreamRef) {
if self.root_program_completed {
return;
}
if self
.root_program_stream
.as_ref()
.is_some_and(|root_stream| Arc::ptr_eq(root_stream, stream))
{
self.root_program_completed = true;
self.root_program_stream = None;
}
}

pub fn has_root_program_stream(&self) -> bool {
self.root_program_stream.is_some()
}

pub fn root_program_completed(&self) -> bool {
self.root_program_completed
}

pub fn early_terminated(&self) -> bool {
self.early_terminated
}

pub(crate) fn mark_early_terminated(&mut self) {
self.early_terminated = true;
}

pub fn current_run_token(&self) -> Option<u64> {
self.active_run_token
}
Expand Down
3 changes: 2 additions & 1 deletion packages/doeff-vm-core/src/vm/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1377,11 +1377,12 @@ impl VM {

if let Some((_dispatch_id, original_exception, terminal)) = error_dispatch {
if terminal {
let active_chain = self
let active_chain: Vec<ActiveChainEntry> = self
.assemble_active_chain(Some(&original_exception))
.into_iter()
.filter(|entry| !matches!(entry, ActiveChainEntry::ContextEntry { .. }))
.collect();
self.set_last_active_chain(&active_chain);
let enriched_exception = match TraceState::enrich_original_exception_with_context(
original_exception,
value,
Expand Down
48 changes: 32 additions & 16 deletions packages/doeff-vm-core/src/vm/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,6 @@ impl VM {
StepEvent::Continue
}

pub(super) fn throw_handler_protocol_error(&mut self, message: impl Into<String>) -> StepEvent {
if self.current_segment.is_none() {
return StepEvent::Error(VMError::internal(
"throw_handler_protocol_error called without current segment",
));
}
self.set_contextual_throw(PyException::handler_protocol_error(message));
StepEvent::Continue
}

pub(super) fn contextual_throw_mode(&mut self, exception: PyException) -> Mode {
self.mode_after_generror(GenErrorSite::VmRaisedUser, exception, false)
}
Expand Down Expand Up @@ -188,6 +178,7 @@ impl VM {
self.finalize_active_dispatches_as_threw(&exc);
let trace = self.assemble_traceback_entries(&exc);
let active_chain = self.assemble_active_chain(Some(&exc));
self.set_last_active_chain(&active_chain);
self.segments.reparent_children(seg_id, None);
self.segments.free(seg_id);
self.current_segment = None;
Expand Down Expand Up @@ -440,11 +431,15 @@ impl VM {
.contains_key(&continuation.cont_id)
&& !self.is_one_shot_consumed(continuation.cont_id)
{
self.mark_one_shot_consumed(continuation.cont_id);
return self.throw_handler_protocol_error(format!(
let exception = PyException::handler_protocol_error(format!(
"handler returned without consuming continuation {}; use Resume(k, v), Transfer(k, v), Discontinue(k, exn), or Pass()",
continuation.cont_id.raw(),
));
self.mark_early_terminated();
self.mark_one_shot_consumed(continuation.cont_id);
self.emit_handler_threw_for_dispatch(dispatch_id, &exception);
self.current_seg_mut().mode = self.contextual_throw_mode(exception);
return StepEvent::Continue;
}

if let Err(err) = self
Expand Down Expand Up @@ -500,11 +495,12 @@ impl VM {
match mode {
Mode::Deliver(value) => {
if let Some(original) = k_origin.pending_error_context {
let active_chain = self
let active_chain: Vec<ActiveChainEntry> = self
.assemble_active_chain(Some(&original))
.into_iter()
.filter(|entry| !matches!(entry, ActiveChainEntry::ContextEntry { .. }))
.collect();
self.set_last_active_chain(&active_chain);
self.current_seg_mut().mode =
match TraceState::enrich_original_exception_with_context(
original,
Expand All @@ -527,11 +523,15 @@ impl VM {
unreachable!("dispatch origin frame received HandleYield mode: {yielded:?}")
}
Mode::Return(value) => {
return self.throw_handler_protocol_error(format!(
let exception = PyException::handler_protocol_error(format!(
"handler returned without consuming continuation before dispatch {} completed: {:?}",
dispatch_id.raw(),
value,
))
));
self.mark_early_terminated();
self.emit_handler_threw_for_dispatch(dispatch_id, &exception);
self.current_seg_mut().mode = self.contextual_throw_mode(exception);
StepEvent::Continue
}
}
}
Expand Down Expand Up @@ -810,6 +810,7 @@ impl VM {
self.handle_stream_yield(yielded, stream, metadata, handler_kind)
}
IRStreamStep::Return(value) => {
self.maybe_mark_root_program_completed(&stream);
if let Some(ref m) = metadata {
self.emit_frame_exited(m);
}
Expand Down Expand Up @@ -1751,6 +1752,7 @@ impl VM {
metadata: Option<CallMetadata>,
handler_kind: Option<HandlerKind>,
) -> StepEvent {
self.register_root_program_stream(&stream, handler_kind);
if let Some(ref m) = metadata {
self.emit_frame_entered(m, handler_kind);
}
Expand Down Expand Up @@ -2444,12 +2446,26 @@ impl VM {
let _ = self.handle_stream_yield(yielded, stream, metadata, handler_kind);
}
PyCallOutcome::GenReturn(value) => {
self.maybe_mark_root_program_completed(&stream);
if let Some(ref m) = metadata {
self.emit_frame_exited(m);
}
if handler_kind == Some(HandlerKind::Python) {
if let Some(exception) = Self::returned_control_primitive_exception(&value) {
self.set_contextual_throw(exception);
if let Some(dispatch_id) =
self.current_active_handler_dispatch_id().or_else(|| {
let dispatch_id = self.current_segment_dispatch_id_any()?;
if self.current_segment_is_active_handler_for_dispatch(dispatch_id)
{
Some(dispatch_id)
} else {
None
}
})
{
self.emit_handler_threw_for_dispatch(dispatch_id, &exception);
}
self.current_seg_mut().mode = self.contextual_throw_mode(exception);
return;
}
}
Expand Down
25 changes: 25 additions & 0 deletions packages/doeff-vm-core/src/vm/vm_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ impl VM {
return Mode::Throw(exception);
}

self.capture_last_active_chain(Some(&exception));

if exception.is_materialized_synthetic_vm_error() && !active_handler_supports_conversion {
return Mode::Throw(exception);
}
Expand Down Expand Up @@ -430,6 +432,29 @@ impl VM {
)
}

fn snapshot_active_chain_without_context(
active_chain: &[ActiveChainEntry],
) -> Vec<ActiveChainEntry> {
active_chain
.iter()
.filter(|entry| !matches!(entry, ActiveChainEntry::ContextEntry { .. }))
.cloned()
.collect()
}

pub(crate) fn capture_last_active_chain(&mut self, exception: Option<&PyException>) {
let active_chain = self.assemble_active_chain(exception);
self.set_last_active_chain(&active_chain);
}

pub(crate) fn set_last_active_chain(&mut self, active_chain: &[ActiveChainEntry]) {
self.last_active_chain = Self::snapshot_active_chain_without_context(active_chain);
}

pub fn last_active_chain(&self) -> &[ActiveChainEntry] {
&self.last_active_chain
}

fn should_attach_active_chain_for_dispatch(&self, dispatch_id: DispatchId) -> bool {
let Some(origin) = self.dispatch_origin_for_dispatch_id(dispatch_id) else {
return false;
Expand Down
2 changes: 2 additions & 0 deletions packages/doeff-vm/doeff_vm/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ RunResultValue: TypeAlias = Ok[_T] | Err

class RunResult(Generic[_T]):
traceback_data: DoeffTracebackData | None
last_active_chain: Any
early_terminated: bool
@property
def value(self) -> _T: ...
@property
Expand Down
Loading
Loading