Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion doeff/effects/spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

import doeff_vm

from doeff.handlers.spawn_handler import spawn_intercept_handler
from doeff.handlers.spawn_handler import (
spawn_intercept_handler,
sync_spawn_intercept_handler,
)

from ._program_types import ProgramLike
from ._validators import ensure_dict_str_any, ensure_program_like
Expand Down Expand Up @@ -267,5 +270,6 @@ def _validate_priority(priority: int) -> int:
"promise_id_of",
"spawn",
"spawn_intercept_handler",
"sync_spawn_intercept_handler",
"task_id_of",
]
41 changes: 30 additions & 11 deletions doeff/handlers/await_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

from doeff.do import do
from doeff.effects.base import Effect
from doeff.effects.external_promise import ExternalPromise
from doeff.effects.external_promise import CreateExternalPromise
from doeff.effects.wait import Wait
from doeff.effects.wait import wait

PythonAsyncioAwaitEffect = doeff_vm.PythonAsyncioAwaitEffect
sync_await_handler = doeff_vm.sync_await_handler

_loop_lock = threading.Lock()
_loop_thread: threading.Thread | None = None
Expand Down Expand Up @@ -125,16 +127,33 @@ def _on_done(completed: Any) -> None:
future.add_done_callback(_on_done)


@do
def sync_await_handler(effect: Effect, k: Any):
"""Handle Await effects via background-loop bridge for sync execution."""
if isinstance(effect, PythonAsyncioAwaitEffect):
promise = yield CreateExternalPromise()
_submit_awaitable(effect.awaitable, promise)
value = yield Wait(promise.future)
return (yield doeff_vm.Resume(k, value))
def _external_promise_from_handle(handle: Any) -> ExternalPromise[Any]:
if isinstance(handle, ExternalPromise):
return handle

yield doeff_vm.Pass()
if not isinstance(handle, dict) or handle.get("type") != "ExternalPromise":
raise TypeError(
"Expected ExternalPromise handle dict or ExternalPromise instance, "
f"got {type(handle).__name__}"
)

promise_id = handle.get("promise_id")
if not isinstance(promise_id, int):
raise TypeError("ExternalPromise handle missing integer promise_id")

completion_queue = handle.get("completion_queue")
if completion_queue is None:
raise TypeError("ExternalPromise handle missing completion_queue")

return ExternalPromise(
_handle=handle,
_completion_queue=completion_queue,
_id=promise_id,
)


def _submit_awaitable_handle(awaitable: Awaitable[Any], handle: Any) -> None:
_submit_awaitable(awaitable, _external_promise_from_handle(handle))


@do
Expand All @@ -154,7 +173,7 @@ async def _kickoff() -> None:
asyncio.get_running_loop().create_task(_run_and_complete())

_ = yield doeff_vm.PythonAsyncSyntaxEscape(action=_kickoff)
value = yield Wait(promise.future)
value = yield wait(promise.future)
return (yield doeff_vm.Resume(k, value))

yield doeff_vm.Pass()
Expand Down
12 changes: 11 additions & 1 deletion doeff/handlers/spawn_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,14 @@ def spawn_intercept_handler(effect: Effect, k: Any):
yield doeff_vm.Pass()


__all__ = ["spawn_intercept_handler"]
@do
def sync_spawn_intercept_handler(effect: Effect, k: Any):
from doeff.effects.spawn import SpawnEffect, coerce_task_handle

if isinstance(effect, SpawnEffect):
raw = yield doeff_vm.Delegate()
return (yield doeff_vm.Transfer(k, coerce_task_handle(raw)))
yield doeff_vm.Pass()


__all__ = ["spawn_intercept_handler", "sync_spawn_intercept_handler"]
6 changes: 4 additions & 2 deletions doeff/rust_vm.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,11 @@ def default_handlers() -> list[Any]:
"""
vm = _vm()
from doeff.handlers.await_handlers import sync_await_handler
from doeff.handlers.spawn_handler import spawn_intercept_handler
from doeff.handlers.spawn_handler import sync_spawn_intercept_handler

return [
*_core_handler_sentinels(vm),
spawn_intercept_handler,
sync_spawn_intercept_handler,
sync_await_handler,
]

Expand Down Expand Up @@ -646,6 +646,7 @@ def WithIntercept(
"result_safe",
"lazy_ask",
"await_handler",
"sync_await_handler",
}


Expand Down Expand Up @@ -693,6 +694,7 @@ def __getattr__(name: str) -> Any:
"WithIntercept",
"async_run",
"await_handler",
"sync_await_handler",
"default_async_handlers",
"default_handlers",
"lazy_ask",
Expand Down
Loading