Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion marimo/_pyodide/pyodide_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def _launch_pyodide_kernel(
)

if is_edit_mode:
signal.signal(signal.SIGINT, handlers.construct_interrupt_handler(ctx))
signal.signal(signal.SIGINT, handlers.construct_interrupt_handler())

async def listen_completion() -> None:
while True:
Expand Down
9 changes: 9 additions & 0 deletions marimo/_runtime/context/kernel_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from marimo._ast.app import InternalApp
from marimo._messaging.types import KernelStreams
from marimo._runtime.runner.scheduler import Scheduler
from marimo._runtime.runtime import Kernel
from marimo._runtime.state import State
from marimo._runtime.virtual_file import VirtualFileStorageType
Expand All @@ -41,6 +42,14 @@ class KernelRuntimeContext(RuntimeContext):
_app: InternalApp | None = None
_id_provider: IDProvider | None = None
_execution_context: ExecutionContext | None = None
# Set while a Scheduler's `async with` is open. Lookup goes through
# the currently-installed context — not one captured at install
# time — so embedded-app child contexts route SIGINT correctly.
_active_scheduler: Scheduler | None = None

@property
def active_scheduler(self) -> Scheduler | None:
return self._active_scheduler

@property
def graph(self) -> DirectedGraph:
Expand Down
67 changes: 2 additions & 65 deletions marimo/_runtime/executor/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,18 @@

from __future__ import annotations

import asyncio
import contextlib
import functools
import signal
import threading
from dataclasses import replace
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING

from marimo import _loggers
from marimo._entrypoints.registry import EntryPointRegistry
from marimo._runtime.control_flow import MarimoInterrupt
from marimo._runtime.executor.executor import DefaultExecutor, Executor
from marimo._runtime.executor.lifecycles import ExecutionLifecycle, Skip
from marimo._runtime.runner.result import RunResult
from marimo._types.globals import MutableGlobals

if TYPE_CHECKING:
from collections.abc import Callable, Iterator
from collections.abc import Callable

from marimo._ast.cell import CellImpl

Expand Down Expand Up @@ -92,18 +86,6 @@ def evaluate_sync(

return self._teardown_chain(cell, glbls, completed, result)

async def evaluate_interruptible(
self, cell: CellImpl, glbls: MutableGlobals
) -> RunResult:
"""Await `evaluate` with SIGINT capture for coroutine cells."""
if not cell.is_coroutine():
return await self.evaluate(cell, glbls)
future = asyncio.ensure_future(self.evaluate(cell, glbls))
if threading.current_thread() is threading.main_thread():
with _cancel_on_sigint(future):
return await future
return await future

def _setup_chain(
self, cell: CellImpl, glbls: MutableGlobals
) -> tuple[list[ExecutionLifecycle], Skip | None, BaseException | None]:
Expand Down Expand Up @@ -196,48 +178,3 @@ def resolve_executor() -> Executor:
e,
)
return DefaultExecutor()


# Adapted from
# https://github.com/ipython/ipykernel/blob/eddd3e666a82ebec287168b0da7cfa03639a3772/ipykernel/ipkernel.py#L312
@contextlib.contextmanager
def _cancel_on_sigint(future: asyncio.Future[Any]) -> Iterator[None]:
"""Cancel `future` if a SIGINT arrives during evaluation."""
sigint_future: asyncio.Future[int] = asyncio.Future()

def cancel_unless_done(f: asyncio.Future[Any], _: Any) -> None:
if f.cancelled() or f.done():
return
f.cancel()

sigint_future.add_done_callback(
functools.partial(cancel_unless_done, future)
)
future.add_done_callback(
functools.partial(cancel_unless_done, sigint_future)
)

# Capture the previously-installed SIGINT handler *before* we install
# ours so `handle_sigint` can invoke it for its side effects
# (kernel broadcast, duckdb interrupt). For async cells the actual
# halt comes from cancelling the future, not from a raised
# `MarimoInterrupt` — so we swallow that here.
prior_sigint = signal.getsignal(signal.SIGINT)

def handle_sigint(signum: int, frame: Any) -> None:
if sigint_future.cancelled() or sigint_future.done():
return
sigint_future.set_result(1)
if callable(prior_sigint):
try:
prior_sigint(signum, frame)
except MarimoInterrupt:
# The kernel's handler raises MarimoInterrupt for sync
# halt; we cancel the future instead.
pass

save_sigint = signal.signal(signal.SIGINT, handle_sigint)
try:
yield
finally:
signal.signal(signal.SIGINT, save_sigint)
65 changes: 42 additions & 23 deletions marimo/_runtime/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from marimo._messaging.notification_utils import broadcast_notification
from marimo._runtime.context import get_context
from marimo._runtime.context.kernel_context import KernelRuntimeContext
from marimo._runtime.context.types import safe_get_context
from marimo._runtime.control_flow import MarimoInterrupt

LOGGER = _loggers.marimo_logger()
Expand All @@ -20,35 +21,53 @@
from marimo._runtime.runtime import Kernel


def construct_interrupt_handler(
context: KernelRuntimeContext,
) -> Callable[[int, Any], None]:
def construct_interrupt_handler() -> Callable[[int, Any], None]:
def interrupt_handler(signum: int, frame: Any) -> None:
"""Tries to interrupt the kernel."""
del signum
del frame

# Resolve the *currently installed* context, not one captured at
# install time — embedded apps swap in their own child context.
ctx = safe_get_context()
if not isinstance(ctx, KernelRuntimeContext):
return

# `execution_context` is a per-task ContextVar — unreadable from
# this thread while user tasks are suspended in `select()`. The
# scheduler publication is the authoritative "is a run in flight"
# signal; `execution_context` is opportunistic (only used for
# the duckdb hook below).
sched = ctx.active_scheduler
exec_ctx = ctx.execution_context
if sched is None and exec_ctx is None:
return

LOGGER.info("Interrupt request received")
# TODO(akshayka): if kernel is in `run` but not executing,
# it won't be interrupted, which isn't right ... but the
# probability of that happening is low.
if context.execution_context is not None:
broadcast_notification(InterruptedNotification())
# DuckDB connections are sometimes left in an inconsistent
# state when interrupted by a SIGINT. Manually interrupting
# duckdb through its own API seems to be safer.
if context.execution_context.duckdb_connection is not None:
try:
context.execution_context.duckdb_connection.interrupt()
except Exception as e:
# Coarse try/except; let's not kill the kernel if something
# goes wrong.
LOGGER.warning(
"Failed to interrupt running duckdb connection. This "
"may be a bug in duckdb or marimo. %s",
e,
)
raise MarimoInterrupt
broadcast_notification(InterruptedNotification())

# DuckDB connections are sometimes left in an inconsistent state
# when interrupted by a SIGINT; route through duckdb's own API.
if exec_ctx is not None and exec_ctx.duckdb_connection is not None:
try:
exec_ctx.duckdb_connection.interrupt()
except Exception as e:
LOGGER.warning(
"Failed to interrupt running duckdb connection. This "
"may be a bug in duckdb or marimo. %s",
e,
)

if sched is not None and sched.has_active_tasks():
# Async cell in flight: cancel via the loop. Raising from a
# signal handler escapes into asyncio internals and surfaces
# as an internal-error empty RunResult.
sched.cancel_all()
return

if sched is not None:
sched.cancel_all()
raise MarimoInterrupt

return interrupt_handler

Expand Down
Loading
Loading