diff --git a/lib_eio_posix/.claude/settings.local.json b/lib_eio_posix/.claude/settings.local.json new file mode 100644 index 00000000..8805bcdb --- /dev/null +++ b/lib_eio_posix/.claude/settings.local.json @@ -0,0 +1,9 @@ +{ + "permissions": { + "allow": [ + "Bash(timeout 3 sh:*)", + "Bash(timeout 5 sh:*)", + "Bash(dune build:*)" + ] + } +} diff --git a/lib_eio_posix/posix_eventfd.ml b/lib_eio_posix/posix_eventfd.ml new file mode 100644 index 00000000..4f119eaf --- /dev/null +++ b/lib_eio_posix/posix_eventfd.ml @@ -0,0 +1,66 @@ +(* + * Copyright (C) 2025 Aaron Eline + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) +module Rcfd = Eio_unix.Private.Rcfd + +module Writer = struct + type t = Rcfd.t + let wake_buffer = Bytes.of_string "!" + let wakeup t = + Rcfd.use t + ~if_closed:ignore + (fun fd -> + try + ignore (Unix.single_write fd wake_buffer 0 1) + with + (* If the pipe is full then a wake up is pending anyway. *) + | Unix.Unix_error ((Unix.EACCES | EWOULDBLOCK), _, _) -> () + (* We're shutting down; the event has already been processed. *) + | Unix.Unix_error (Unix.EPIPE, _, _) -> () + ) +end + +module Owner = struct + type t = { + read : Unix.file_descr; + write : Rcfd.t; + } + + let is_reader fd t = fd = t.read + + let reader_fd t = t.read + + let create_writer t = t.write + + let clear t = + let buf = Bytes.create 8 in + let got = Unix.read t.read buf 0 (Bytes.length buf) in + assert (got > 0) + + let create () = + let read, write = Unix.pipe ~cloexec:true () in + Unix.set_nonblock read; + Unix.set_nonblock write; + let write = Rcfd.make write in + { + read; write + } + + let cleanup t = + Unix.close t.read; + let was_open = Rcfd.close t.write in + assert was_open + +end \ No newline at end of file diff --git a/lib_eio_posix/posix_eventfd.mli b/lib_eio_posix/posix_eventfd.mli new file mode 100644 index 00000000..4c7c8075 --- /dev/null +++ b/lib_eio_posix/posix_eventfd.mli @@ -0,0 +1,61 @@ +(* + * Copyright (C) 2025 Aaron Eline + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +(** + This module implements a thread safe re-usable abstraction for POSIX compliant eventfds. + Eventfds are backed by a pipe. +*) + +(** + The [Writer] module is the writing end of the eventfd. + Having a [Writer.t] allows you to signal to the holder of the owner to wake up + It is safe to share between threads +*) +module Writer : + sig type t + + (** Signal to the owning thread to wake up *) + val wakeup : t -> unit +end + +(** + The [Owner] module is the reading end of the event fd. + It is not safe to share between threads. + Anyone who holds a [Writer.t] can send wake up signals to an [Owner.t] +*) +module Owner : + sig + type t + + (** Check if a given fd is the read end of the event fd *) + val is_reader : Unix.file_descr -> t -> bool + + (** @return the reading fd of the eventfd. This is not safe to share between threads. *) + val reader_fd : t -> Unix.file_descr + + (** Get a view into the write end of the event fd. + @return the writing end. Safe to share between threads. *) + val create_writer : t -> Writer.t + + (** Clear pending events from the event fd *) + val clear : t -> unit + + (** @return a new eventfd owner *) + val create : unit -> t + + (** Close the file descriptors associated with the eventfd *) + val cleanup : t -> unit + end diff --git a/lib_eio_posix/sched.ml b/lib_eio_posix/sched.ml index 23dae62a..230d74d8 100644 --- a/lib_eio_posix/sched.ml +++ b/lib_eio_posix/sched.ml @@ -21,21 +21,11 @@ module Fiber_context = Eio.Private.Fiber_context module Trace = Eio.Private.Trace module Rcfd = Eio_unix.Private.Rcfd module Poll = Iomux.Poll +module Eventfd = Posix_eventfd +open Sched_common type exit = [`Exit_scheduler] -(* The type of items in the run queue. *) -type runnable = - | IO : runnable (* Reminder to check for IO *) - | Thread : 'a Suspended.t * 'a -> runnable (* Resume a fiber with a result value *) - | Failed_thread : 'a Suspended.t * exn -> runnable (* Resume a fiber with an exception *) - -(* For each FD we track which fibers are waiting for it to become readable/writeable. *) -type fd_event_waiters = { - read : unit Suspended.t Lwt_dllist.t; - write : unit Suspended.t Lwt_dllist.t; -} - type t = { (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) run_q : runnable Lf_queue.t; @@ -46,10 +36,9 @@ type t = { (* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event. In that case, [need_wakeup = true] and you must signal using [eventfd]. *) - eventfd : Rcfd.t; (* For sending events. *) - eventfd_r : Unix.file_descr; (* For reading events. *) + eventfd : Eventfd.Owner.t; - mutable active_ops : int; (* Exit when this is zero and [run_q] and [sleep_q] are empty. *) + active_ops : int Atomic.t; (* Exit when this is zero and [run_q] and [sleep_q] are empty. *) (* If [false], the main thread will check [run_q] before sleeping again (possibly because an event has been or will be sent to [eventfd]). @@ -60,29 +49,17 @@ type t = { sleep_q: Zzz.t; (* Fibers waiting for timers. *) + select_thread : Select.t Lazy.t; + thread_pool : Eio_unix.Private.Thread_pool.t; } -(* The message to send to [eventfd] (any character would do). *) -let wake_buffer = Bytes.of_string "!" (* This can be called from any systhread (including ones not running Eio), and also from signal handlers or GC finalizers. It must not take any locks. *) let wakeup t = Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *) - Rcfd.use t.eventfd - ~if_closed:ignore (* Domain has shut down (presumably after handling the event) *) - (fun fd -> - try - ignore (Unix.single_write fd wake_buffer 0 1 : int) - with - | Unix.Unix_error ((Unix.EAGAIN | EWOULDBLOCK), _, _) -> - (* If the pipe is full then a wake up is pending anyway. *) - () - | Unix.Unix_error (Unix.EPIPE, _, _) -> - (* We're shutting down; the event has already been processed. *) - () - ) + Eventfd.Writer.wakeup (Eventfd.Owner.create_writer t.eventfd) (* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) let enqueue_thread t k x = @@ -106,11 +83,22 @@ let get_waiters t fd = Hashtbl.add t.fd_map fd x; x -(* The OS told us that the event pipe is ready. Remove events. *) -let clear_event_fd t = - let buf = Bytes.create 8 in (* Read up to 8 events at a time *) - let got = Unix.read t.eventfd_r buf 0 (Bytes.length buf) in - assert (got > 0) + +let remove_fd t fd index = + Poll.invalidate_index t.poll index; + (* Try to find the new maxi, go back on index until we find the next + used slot, -1 means none in use. *) + let rec lower_maxi = function + | -1 -> t.poll_maxi <- -1 + | index -> + if Poll.((get_fd t.poll index) <> invalid_fd) then + t.poll_maxi <- index + else + lower_maxi (pred index) + in + if index = t.poll_maxi then + lower_maxi (pred index); + Hashtbl.remove t.fd_map fd (* Update [t.poll]'s entry for [fd] to match [waiters]. *) let update t waiters fd = @@ -124,20 +112,7 @@ let update t waiters fd = | true, true -> Poll.Flags.(pollin + pollout) in if flags = Poll.Flags.empty then ( - Poll.invalidate_index t.poll fdi; - (* Try to find the new maxi, go back on index until we find the next - used slot, -1 means none in use. *) - let rec lower_maxi = function - | -1 -> t.poll_maxi <- -1 - | index -> - if Poll.((get_fd t.poll index) <> invalid_fd) then - t.poll_maxi <- index - else - lower_maxi (pred index) - in - if fdi = t.poll_maxi then - lower_maxi (pred fdi); - Hashtbl.remove t.fd_map fd + remove_fd t fd fdi ) else ( Poll.set_index t.poll fdi fd flags; if fdi > t.poll_maxi then @@ -145,28 +120,46 @@ let update t waiters fd = ) let resume t node = - t.active_ops <- t.active_ops - 1; + Atomic.decr t.active_ops; let k : unit Suspended.t = Lwt_dllist.get node in Fiber_context.clear_cancel_fn k.fiber; enqueue_thread t k () (* Called when poll indicates that an event we requested for [fd] is ready. *) -let ready t _index fd revents = - assert (not Poll.Flags.(mem revents pollnval)); - if fd == t.eventfd_r then ( - clear_event_fd t +let ready t index fd revents = + if Eventfd.Owner.is_reader fd t.eventfd then ( + Eventfd.Owner.clear t.eventfd (* The scheduler will now look at the run queue again and notice any new items. *) ) else ( + (* Waiters is the set of computations waiting on this fd, split into readers and writers *) let waiters = Hashtbl.find t.fd_map fd in + (* Pending *will* contain all the computations we want to wake up *) let pending = Lwt_dllist.create () in - if Poll.Flags.(mem revents (pollout + pollhup + pollerr)) then - Lwt_dllist.transfer_l waiters.write pending; - if Poll.Flags.(mem revents (pollin + pollhup + pollerr)) then - Lwt_dllist.transfer_l waiters.read pending; - (* If pending has things, it means we modified the waiters, refresh our view *) - if not (Lwt_dllist.is_empty pending) then - update t waiters fd; - Lwt_dllist.iter_node_r (resume t) pending + (* Waking Procedure: + 1. If POLLOUT is set, wake the writers + 2. If POLLIN is set, wake the readers + 3. If any of POLLHUP, POLLERR, or POLLNVAL is set, wake both readers & writers. + On macOS, poll() returns POLLNVAL for fds it can't poll on, such as /dev/null. + This results in us blocking on block devices, which isn't a problem for /dev/null + But we may want to revisit this. + + Move any readers/writers into pending + *) + + if Poll.Flags.(mem revents pollnval) then begin + (* we need to remove this fd from the poll, and send it to the select watcher *) + Select.add_fd fd waiters (Lazy.force t.select_thread); + remove_fd t fd index; + Hashtbl.remove t.fd_map fd + end else begin + if Poll.Flags.(mem revents (pollout + pollhup + pollerr)) then + Lwt_dllist.transfer_l waiters.write pending; + if Poll.Flags.(mem revents (pollin + pollhup + pollerr)) then + Lwt_dllist.transfer_l waiters.read pending; + if not (Lwt_dllist.is_empty pending) then + update t waiters fd; + Lwt_dllist.iter_node_r (resume t) pending + end ) (* Switch control to the next ready continuation. @@ -203,7 +196,7 @@ let rec next t : [`Exit_scheduler] = Poll.Nanoseconds diff_ns | `Nothing -> Poll.Infinite in - if timeout = Infinite && t.active_ops = 0 && Lf_queue.is_empty t.run_q then ( + if timeout = Infinite && (Atomic.get t.active_ops) = 0 && Lf_queue.is_empty t.run_q then ( (* Nothing further can happen at this point. *) Lf_queue.close t.run_q; (* Just to catch bugs if something tries to enqueue later *) `Exit_scheduler @@ -237,24 +230,24 @@ let with_sched fn = let run_q = Lf_queue.create () in Lf_queue.push run_q IO; let sleep_q = Zzz.create () in - let eventfd_r, eventfd_w = Unix.pipe ~cloexec:true () in - Unix.set_nonblock eventfd_r; - Unix.set_nonblock eventfd_w; - let eventfd = Rcfd.make eventfd_w in - let cleanup () = - Unix.close eventfd_r; - let was_open = Rcfd.close eventfd in - assert was_open - in + let eventfd = Eventfd.Owner.create () in let poll = Poll.create () in let fd_map = Hashtbl.create 10 in let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in - let t = { run_q; poll; poll_maxi = (-1); fd_map; eventfd; eventfd_r; - active_ops = 0; need_wakeup = Atomic.make false; sleep_q; thread_pool } in + let active_ops = Atomic.make 0 in + let select_thread = Lazy.from_fun (fun () -> Select.init run_q (Eventfd.Owner.create_writer eventfd) active_ops) in + let t = { run_q; poll; poll_maxi = (-1); fd_map; eventfd; + active_ops; need_wakeup = Atomic.make false; sleep_q; thread_pool; select_thread } in + let eventfd_r = Eventfd.Owner.reader_fd eventfd in let eventfd_ri = Iomux.Util.fd_of_unix eventfd_r in Poll.set_index t.poll eventfd_ri eventfd_r Poll.Flags.pollin; if eventfd_ri > t.poll_maxi then t.poll_maxi <- eventfd_ri; + let cleanup () = + if Lazy.is_val select_thread then + Select.cleanup (Lazy.force select_thread); + Eventfd.Owner.cleanup eventfd + in match fn t with | x -> cleanup (); x | exception ex -> @@ -266,7 +259,7 @@ let await_readable t (k : unit Suspended.t) fd = match Fiber_context.get_error k.fiber with | Some e -> Suspended.discontinue k e | None -> - t.active_ops <- t.active_ops + 1; + Atomic.incr t.active_ops; let waiters = get_waiters t fd in let was_empty = Lwt_dllist.is_empty waiters.read in let node = Lwt_dllist.add_l k waiters.read in @@ -275,7 +268,7 @@ let await_readable t (k : unit Suspended.t) fd = Lwt_dllist.remove node; if Lwt_dllist.is_empty waiters.read then update t waiters fd; - t.active_ops <- t.active_ops - 1; + Atomic.decr t.active_ops; enqueue_failed_thread t k ex ); next t @@ -284,7 +277,7 @@ let await_writable t (k : unit Suspended.t) fd = match Fiber_context.get_error k.fiber with | Some e -> Suspended.discontinue k e | None -> - t.active_ops <- t.active_ops + 1; + Atomic.incr t.active_ops; let waiters = get_waiters t fd in let was_empty = Lwt_dllist.is_empty waiters.write in let node = Lwt_dllist.add_l k waiters.write in @@ -293,7 +286,7 @@ let await_writable t (k : unit Suspended.t) fd = Lwt_dllist.remove node; if Lwt_dllist.is_empty waiters.write then update t waiters fd; - t.active_ops <- t.active_ops - 1; + Atomic.decr t.active_ops; enqueue_failed_thread t k ex ); next t @@ -314,13 +307,13 @@ let await_timeout t (k : unit Suspended.t) time = next t let with_op t fn x = - t.active_ops <- t.active_ops + 1; + Atomic.incr t.active_ops; match fn x with | r -> - t.active_ops <- t.active_ops - 1; + Atomic.decr t.active_ops; r | exception ex -> - t.active_ops <- t.active_ops - 1; + Atomic.decr t.active_ops; raise ex [@@@alert "-unstable"] diff --git a/lib_eio_posix/sched_common.ml b/lib_eio_posix/sched_common.ml new file mode 100644 index 00000000..789208a7 --- /dev/null +++ b/lib_eio_posix/sched_common.ml @@ -0,0 +1,30 @@ +(* + * Copyright (C) 2025 Aaron Eline + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) +module Suspended = Eio_utils.Suspended +module Lf_queue = Eio_utils.Lf_queue +module Rcfd = Eio_unix.Private.Rcfd + +(* The type of items in the run queue. *) +type runnable = + | IO : runnable (* Reminder to check for IO *) + | Thread : 'a Suspended.t * 'a -> runnable (* Resume a fiber with a result value *) + | Failed_thread : 'a Suspended.t * exn -> runnable (* Resume a fiber with an exception *) + +(* For each FD we track which fibers are waiting for it to become readable/writeable. *) +type fd_event_waiters = { + read : unit Suspended.t Lwt_dllist.t; + write : unit Suspended.t Lwt_dllist.t; +} \ No newline at end of file diff --git a/lib_eio_posix/select.ml b/lib_eio_posix/select.ml new file mode 100644 index 00000000..07104b32 --- /dev/null +++ b/lib_eio_posix/select.ml @@ -0,0 +1,146 @@ +(* + * Copyright (C) 2025 Aaron Eline + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) +module Suspended = Eio_utils.Suspended +module Lf_queue = Eio_utils.Lf_queue +module Rcfd = Eio_unix.Private.Rcfd +module Eventfd = Posix_eventfd +open Sched_common + +type internal_state = { + (* The (write handle) queue of runnable computations maintained by the main schedular thread *) + main_q : runnable Lf_queue.t; + (* The (read handle) of incoming files we need to track *) + incoming_files : (Unix.file_descr * fd_event_waiters) Lf_queue.t ; + (* Map from file descriptors to tasks waiting on data from that descriptor *) + map : (Unix.file_descr, fd_event_waiters) Hashtbl.t; + (* Set of file descriptors we pass to select() to read from *) + read_set : Unix.file_descr list ref; + (* Set of file descriptors we pass to select() to write to *) + write_set : Unix.file_descr list ref; + (* atomic global count of in-flight IO requets *) + active_ops : int Atomic.t; + (* event fds *) + eventfd : Eventfd.Owner.t; + main_eventfd : Eventfd.Writer.t; + (* atomic bool that tracks if we should shutdown *) + shutdown : bool Atomic.t; +} + +(* The public (opaque) handle to a select thread *) +type t = { + state : internal_state; + thread : Thread.t; +} + +let queue t = t.state.incoming_files + +let eventfd t = Eventfd.Owner.create_writer t.state.eventfd + +let add_fd fd waiters t = + Lf_queue.push (queue t) (fd, waiters); + Eventfd.Writer.wakeup (eventfd t) + +(** Read from our incoming queue and track requests, + mutates the fd sets and the fd -> waiters mapping +*) +let queue_events t = + let more_events = ref true in + while !more_events do + match Lf_queue.pop t.incoming_files with + | None -> more_events := false + | Some (fd, waiters) -> + if not (Lwt_dllist.is_empty waiters.write) then + t.write_set := fd :: !(t.write_set); + if not (Lwt_dllist.is_empty waiters.read) then + t.read_set := fd :: !(t.read_set); + Hashtbl.add t.map fd waiters + done + +(** + Process the returned fds from select() and update either the read or write fdsets + @param returned_fds The fdset returned by select() + @param sent_fds The input fdset we're going to modify + @param access function for access the relevant part of fd_event_waiters (either read or write) + @param state the internal state for the select thread + @return bool indicating if we resumed any computations +*) +let process_fd_set returned_fds sent_fds access state = + (* tracks the fds that we removed from the input sets *) + let removed = Dynarray.create () in + (* tracks the suspended computations we need to queue *) + let to_queue = Lwt_dllist.create () in + List.iter ( + fun fd -> + if not (Eventfd.Owner.is_reader fd state.eventfd) then begin + let waiters = Hashtbl.find state.map fd in + let lst = access waiters in + Lwt_dllist.transfer_l lst to_queue; + Dynarray.add_last removed fd; + end + ) returned_fds; + (* retain all fds that we did not process *) + sent_fds := List.filter (fun fd -> not (Dynarray.mem fd removed)) !sent_fds; + (* for each computation we need to resume, + 1) Decrease our count of in-flight ops + 2) Push the computation to the schedulars list of active threads *) + Lwt_dllist.iter_l (fun k -> + Atomic.decr state.active_ops; + Lf_queue.push state.main_q (Thread (k, ())) + ) to_queue; + not (Dynarray.is_empty removed) + +(* Main loop for the select thread *) +let rec select_loop t = + if Atomic.get t.shutdown then + Eventfd.Owner.cleanup t.eventfd + else begin + (* Block on select, we will be woken up by one of two things + 1) the eventfd being written to, which signals we need to from our queue + 2) one of the fds were tracking being read/written to, which means we need to resume a computation *) + let read_fd = Eventfd.Owner.reader_fd t.eventfd in + let read_set = read_fd :: !(t.read_set) in + let (r, w, _) = Unix.select read_set !(t.write_set) [] (-1.0) in + if List.mem read_fd r then begin + Eventfd.Owner.clear t.eventfd; (* clear events *) + queue_events t + end; + let at_least_one_reader = process_fd_set r t.read_set (fun waiters -> waiters.read) t in + let at_least_one_writer = process_fd_set w t.write_set (fun waiters -> waiters.write) t in + if at_least_one_reader || at_least_one_writer then + Eventfd.Writer.wakeup t.main_eventfd; + select_loop t + end + +let cleanup t = + Atomic.set t.state.shutdown true; + Eventfd.Writer.wakeup (Eventfd.Owner.create_writer t.state.eventfd); + Thread.join t.thread + +let init main_q main_eventfd active_ops = + let eventfd = Eventfd.Owner.create () in + let state = { + main_q; + incoming_files = Lf_queue.create (); + main_eventfd; + active_ops; + map = Hashtbl.create 10; + read_set = ref []; + write_set = ref []; + eventfd; + shutdown = Atomic.make false; + } in + let thread = Thread.create (fun () -> select_loop state) () in + { state; thread } diff --git a/lib_eio_posix/select.mli b/lib_eio_posix/select.mli new file mode 100644 index 00000000..019fa579 --- /dev/null +++ b/lib_eio_posix/select.mli @@ -0,0 +1,33 @@ +(* + * Copyright (C) 2025 Aaron Eline + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) +module Suspended = Sched_common.Suspended +module Lf_queue = Sched_common.Lf_queue +module Rcfd = Sched_common.Rcfd +module Eventfd = Posix_eventfd + +(** Handle to the select thread for cleanup *) +type t + +(** Initialize the select thread *) +val init : + Sched_common.runnable Sched_common.Lf_queue.t -> + Eventfd.Writer.t -> int Atomic.t -> t + +(** Request that a file descriptor be tracked by the select thread *) +val add_fd : Unix.file_descr -> Sched_common.fd_event_waiters -> t -> unit + +(** Clean up the select thread (signals shutdown and joins the thread) *) +val cleanup : t -> unit \ No newline at end of file