Skip to content

fix: allow async cancellation#9705

Draft
dmadisetti wants to merge 2 commits into
mainfrom
dm/nonblocking-runner
Draft

fix: allow async cancellation#9705
dmadisetti wants to merge 2 commits into
mainfrom
dm/nonblocking-runner

Conversation

@dmadisetti
Copy link
Copy Markdown
Collaborator

📝 Summary

Promotes scheduler to long lived singleton on kernel context for look-up of running cells. This in turn allows task cancellations on async cells, which were previously un-cancelable. Example of cancelation:

image

@vercel
Copy link
Copy Markdown

vercel Bot commented May 27, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
marimo-docs Ready Ready Preview, Comment May 28, 2026 1:11am

Request Review

Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 12 files

Architecture diagram
sequenceDiagram
    participant Client as Browser Client
    participant Kernel as Kernel (main thread)
    participant SIGINT as Signal Handler
    participant Context as KernelRuntimeContext
    participant Scheduler as SequentialScheduler
    participant CellRunner as CellRunner
    participant Evaluator as CellEvaluator
    participant Task as asyncio Task

    Note over Client,Task: Cell Execution with Cancellation Support

    Client->>Kernel: Request cell execution
    Kernel->>Scheduler: Create SequentialScheduler(cells_to_run, graph)
    Scheduler->>Scheduler: Initialize _active={}, _interrupted=False

    Note over Scheduler,CellRunner: async with Scheduler publishes on context

    Scheduler->>Context: __aenter__: set _active_scheduler=self
    Context-->>Scheduler: OK

    loop For each cell batch
        Scheduler->>Scheduler: batch(cell_ids) - yield one cell at a time
        Scheduler->>CellRunner: Run cell_id
        CellRunner->>CellRunner: Check self.cancelled(cell_id)
        alt Cell is cancelled
            CellRunner->>CellRunner: Mark cell as "cancelled"
        else Cell not cancelled
            alt Cell is coroutine (async)
                CellRunner->>Task: asyncio.ensure_future(evaluate(cell, glbls))
                CellRunner->>Scheduler: register_task(cell_id, task)
                Note over CellRunner,Task: Task registered so SIGINT can find it
                CellRunner->>Task: await task
                alt Cell completes normally
                    Task-->>CellRunner: RunResult
                else Cell cancelled externally
                    Task-->>CellRunner: asyncio.CancelledError
                end
                CellRunner->>Scheduler: unregister_task(cell_id)
            else Cell is sync
                CellRunner->>Evaluator: evaluate(cell, glbls)
                Evaluator-->>CellRunner: RunResult
            end
            CellRunner->>CellRunner: _finalize_run_result()
            Note over CellRunner: CancelledError → MarimoInterrupt
            CellRunner-->>Scheduler: Completed cell
        end
    end

    Scheduler->>Context: __aexit__: clear _active_scheduler
    Context-->>Scheduler: OK

    Note over Client,Task: SIGINT/Cancellation Flow

    Client->>Kernel: Send interrupt signal
    Kernel->>SIGINT: SIGINT received
    SIGINT->>SIGINT: construct_interrupt_handler()
    SIGINT->>Context: safe_get_context()
    Context-->>SIGINT: KernelRuntimeContext
    SIGINT->>Context: Check execution_context
    alt No execution context
        SIGINT->>SIGINT: return (no-op)
    else Execution context exists
        SIGINT->>SIGINT: Broadcast InterruptedNotification()
        SIGINT->>Context: Check duckdb_connection
        alt DuckDB connection present
            SIGINT->>SIGINT: interrupt duckdb connection
        end
        SIGINT->>Context: Check active_scheduler
        alt Scheduler has active tasks
            SIGINT->>Scheduler: cancel_all()
            Scheduler->>Scheduler: Set _interrupted=true
            Scheduler->>Task: call_soon_threadsafe(task.cancel())
            Task-->>Scheduler: Task cancelled
            SIGINT->>SIGINT: return (no MarimoInterrupt raise)
        else Scheduler exists but no active tasks
            SIGINT->>Scheduler: cancel_all()
            SIGINT->>SIGINT: raise MarimoInterrupt
        else No scheduler
            SIGINT->>SIGINT: raise MarimoInterrupt
        end
    end
Loading

Reply with feedback, questions, or to request a fix.

Re-trigger cubic

Comment thread marimo/_runtime/runner/cell_runner.py Outdated
Comment thread marimo/_runtime/runner/scheduler.py
@dmadisetti dmadisetti force-pushed the dm/nonblocking-runner branch from eda4d02 to 50fa1b4 Compare May 28, 2026 01:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant