From 807ad3dd0c60e1892dae71706411eac219bbfb7c Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Tue, 31 Mar 2026 15:11:18 +0100 Subject: [PATCH] Make Linux and Posix eventfd functions configurable --- lib_eio_linux/eio_linux.ml | 10 +++++----- lib_eio_linux/eio_linux.mli | 5 ++++- lib_eio_linux/sched.ml | 8 ++++---- lib_eio_posix/domain_mgr.ml | 18 ++++++++++-------- lib_eio_posix/eio_posix.ml | 4 ++-- lib_eio_posix/eio_posix.mli | 6 ++++-- lib_eio_posix/sched.ml | 4 ++-- lib_eio_posix/sched.mli | 6 ++++-- 8 files changed, 35 insertions(+), 26 deletions(-) diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index 79678c99e..a9ceb5612 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -505,8 +505,8 @@ let stdenv ~run_event_loop = method backend_id = "linux" end -let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a = - Sched.with_sched ?fallback config @@ fun st -> +let run_event_loop (type a) ?fallback ?eventfd config (main : _ -> a) arg : a = + Sched.with_sched ?fallback ?eventfd config @@ fun st -> let open Effect.Deep in let extra_effects : _ effect_handler = { effc = fun (type a) (e : a Effect.t) : ((a, Sched.exit) continuation -> Sched.exit) option -> @@ -561,9 +561,9 @@ let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a = } in Sched.run ~extra_effects st main arg -let run ?queue_depth ?n_blocks ?block_size ?polling_timeout ?fallback main = +let run ?queue_depth ?n_blocks ?block_size ?polling_timeout ?fallback ?eventfd main = let config = Sched.config ?queue_depth ?n_blocks ?block_size ?polling_timeout () in - let stdenv = stdenv ~run_event_loop:(run_event_loop ?fallback:None config) in + let stdenv = stdenv ~run_event_loop:(run_event_loop ?fallback:None ?eventfd config) in (* SIGPIPE makes no sense in a modern application. *) Sys.(set_signal sigpipe Signal_ignore); - run_event_loop ?fallback config main stdenv + run_event_loop ?fallback ?eventfd config main stdenv diff --git a/lib_eio_linux/eio_linux.mli b/lib_eio_linux/eio_linux.mli index 6159bc29d..8c9a332f0 100644 --- a/lib_eio_linux/eio_linux.mli +++ b/lib_eio_linux/eio_linux.mli @@ -31,6 +31,7 @@ val run : ?block_size:int -> ?polling_timeout:int -> ?fallback:([`Msg of string] -> 'a) -> + ?eventfd:(int -> Unix.file_descr) -> (stdenv -> 'a) -> 'a (** Run an event loop using io_uring. @@ -45,6 +46,8 @@ val run : @param fallback Call this instead if io_uring is not available for some reason. The argument is a message describing the problem (for logging). - The default simply raises an exception. *) + The default simply raises an exception. + @param eventfd Call this instead of a binding to eventfd(2) to create an eventfd + for the scheduler. *) module Low_level = Low_level diff --git a/lib_eio_linux/sched.ml b/lib_eio_linux/sched.ml index 3abd4be37..6d54a483b 100644 --- a/lib_eio_linux/sched.ml +++ b/lib_eio_linux/sched.ml @@ -498,8 +498,8 @@ external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd" let no_fallback (`Msg msg) = failwith msg -let with_eventfd fn = - let eventfd = Eio_unix.Private.Rcfd.make (eio_eventfd 0) in +let with_eventfd ?(eventfd = eio_eventfd) fn = + let eventfd = Eio_unix.Private.Rcfd.make (eventfd 0) in let close () = if not (Eio_unix.Private.Rcfd.close eventfd) then failwith "eventfd already closed!" in @@ -510,7 +510,7 @@ let with_eventfd fn = close (); Printexc.raise_with_backtrace ex bt -let with_sched ?(fallback=no_fallback) config fn = +let with_sched ?(fallback=no_fallback) ?eventfd config fn = let { queue_depth; n_blocks; block_size; polling_timeout } = config in match Uring.create ~queue_depth ?polling_timeout () with | exception Unix.Unix_error(ENOSYS, _, _) -> fallback (`Msg "io_uring is not available on this system") @@ -540,7 +540,7 @@ let with_sched ?(fallback=no_fallback) config fn = let sleep_q = Zzz.create () in let io_q = Queue.create () in let mem_q = Lwt_dllist.create () in - with_eventfd @@ fun eventfd -> + with_eventfd ?eventfd @@ fun eventfd -> let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool } with diff --git a/lib_eio_posix/domain_mgr.ml b/lib_eio_posix/domain_mgr.ml index f4bc98b9b..83d8602d2 100644 --- a/lib_eio_posix/domain_mgr.ml +++ b/lib_eio_posix/domain_mgr.ml @@ -36,8 +36,8 @@ let socketpair k ~sw ~domain ~ty ~protocol wrap_a wrap_b = discontinue k (Err.wrap code name arg) (* Run an event loop in the current domain, using [fn x] as the root fiber. *) -let run_event_loop fn x = - Sched.with_sched @@ fun sched -> +let run_event_loop ?pipe fn x = + Sched.with_sched ?pipe @@ fun sched -> let open Effect.Deep in let extra_effects : _ effect_handler = { effc = fun (type a) (e : a Effect.t) : ((a, Sched.exit) continuation -> Sched.exit) option -> @@ -94,14 +94,16 @@ let unwrap_backtrace = function | Error (ex, bt) -> Printexc.raise_with_backtrace ex bt module Impl = struct - type t = unit + type t = { + pipe : (cloexec:bool -> Unix.file_descr * Unix.file_descr) option; + } let domain_spawn ctx enqueue fn = Domain.spawn @@ fun () -> Trace.domain_spawn ~parent:(Eio.Private.Fiber_context.tid ctx); Fun.protect fn ~finally:(fun () -> enqueue (Ok ())) - let run_raw () fn = + let run_raw _t fn = let domain = ref None in Eio.Private.Suspend.enter "run-domain" (fun ctx enqueue -> domain := Some (domain_spawn ctx enqueue (wrap_backtrace fn)) @@ -109,19 +111,19 @@ module Impl = struct Trace.with_span "Domain.join" @@ fun () -> unwrap_backtrace (Domain.join (Option.get !domain)) - let run () fn = + let run t fn = let domain = ref None in Eio.Private.Suspend.enter "run-domain" (fun ctx enqueue -> let cancelled, set_cancelled = Promise.create () in Eio.Private.Fiber_context.set_cancel_fn ctx (Promise.resolve set_cancelled); domain := Some (domain_spawn ctx enqueue (fun () -> - run_event_loop (wrap_backtrace (fun () -> fn ~cancelled)) () + run_event_loop ?pipe:t.pipe (wrap_backtrace (fun () -> fn ~cancelled)) () )) ); Trace.with_span "Domain.join" @@ fun () -> unwrap_backtrace (Domain.join (Option.get !domain)) end -let v = +let v ?pipe () = let handler = Eio.Domain_manager.Pi.mgr (module Impl) in - Eio.Resource.T ((), handler) + Eio.Resource.T (Impl.{ pipe }, handler) diff --git a/lib_eio_posix/eio_posix.ml b/lib_eio_posix/eio_posix.ml index 9a838db98..350d7d6a2 100644 --- a/lib_eio_posix/eio_posix.ml +++ b/lib_eio_posix/eio_posix.ml @@ -18,7 +18,7 @@ module Low_level = Low_level type stdenv = Eio_unix.Stdenv.base -let run main = +let run ?pipe main = (* SIGPIPE makes no sense in a modern application. *) Sys.(set_signal sigpipe Signal_ignore); Eio_unix.Process.install_sigchld_handler (); @@ -34,7 +34,7 @@ let run main = method mono_clock = Time.mono_clock method net = Net.v method process_mgr = Process.mgr - method domain_mgr = Domain_mgr.v + method domain_mgr = Domain_mgr.v ?pipe () method cwd = ((Fs.cwd, "") :> Eio.Fs.dir_ty Eio.Path.t) method fs = ((Fs.fs, "") :> Eio.Fs.dir_ty Eio.Path.t) method secure_random = Flow.secure_random diff --git a/lib_eio_posix/eio_posix.mli b/lib_eio_posix/eio_posix.mli index a8ca1a138..6f2eddf08 100644 --- a/lib_eio_posix/eio_posix.mli +++ b/lib_eio_posix/eio_posix.mli @@ -3,10 +3,12 @@ type stdenv = Eio_unix.Stdenv.base (** The type of the standard set of resources available on POSIX systems. *) -val run : (stdenv -> 'a) -> 'a +val run : ?pipe:(cloexec:bool -> Unix.file_descr * Unix.file_descr) -> (stdenv -> 'a) -> 'a (** [run main] runs an event loop and calls [main stdenv] inside it. - For portable code, you should use {!Eio_main.run} instead, which will call this for you if appropriate. *) + For portable code, you should use {!Eio_main.run} instead, which will call this for you if appropriate. + + @param pipe Configure how to create a pipe for the underlying scheduler (defaults to {! Unix.pipe}). *) module Low_level = Low_level (** Low-level API for making POSIX calls directly. *) diff --git a/lib_eio_posix/sched.ml b/lib_eio_posix/sched.ml index 23dae62a2..849d75f48 100644 --- a/lib_eio_posix/sched.ml +++ b/lib_eio_posix/sched.ml @@ -233,11 +233,11 @@ let rec next t : [`Exit_scheduler] = next t ) -let with_sched fn = +let with_sched ?(pipe=(fun ~cloexec -> Unix.pipe ~cloexec ())) fn = let run_q = Lf_queue.create () in Lf_queue.push run_q IO; let sleep_q = Zzz.create () in - let eventfd_r, eventfd_w = Unix.pipe ~cloexec:true () in + let eventfd_r, eventfd_w = pipe ~cloexec:true in Unix.set_nonblock eventfd_r; Unix.set_nonblock eventfd_w; let eventfd = Rcfd.make eventfd_w in diff --git a/lib_eio_posix/sched.mli b/lib_eio_posix/sched.mli index 22cf07d1c..5ed2a3331 100644 --- a/lib_eio_posix/sched.mli +++ b/lib_eio_posix/sched.mli @@ -10,10 +10,12 @@ type exit and so does not return until the whole event loop is finished. Such functions should normally be called in tail position. *) -val with_sched : (t -> 'a) -> 'a +val with_sched : ?pipe:(cloexec:bool -> Unix.file_descr * Unix.file_descr) -> (t -> 'a) -> 'a (** [with_sched fn] sets up a scheduler and calls [fn t]. Typically [fn] will call {!run}. - When [fn] returns, the scheduler's resources are freed. *) + When [fn] returns, the scheduler's resources are freed. + + @param pipe Can be used to change the function that provides a pipe for the scheduler. *) val run : extra_effects:exit Effect.Deep.effect_handler ->