diff --git a/kernel/boot/boot.cpp b/kernel/boot/boot.cpp index eb0a8bf5..18a1f246 100644 --- a/kernel/boot/boot.cpp +++ b/kernel/boot/boot.cpp @@ -27,6 +27,7 @@ #include "drivers/input/input.h" #include "net/net.h" #include "random/random.h" +#include "sync/futex.h" #ifdef STLX_UNIT_TESTS_ENABLED #include "runner.h" @@ -107,6 +108,8 @@ extern "C" __PRIVILEGED_CODE void stlx_init() { log::fatal("rc::reaper::init failed"); } + sync::futex_init(); + if (fs::init() != fs::OK) { log::fatal("fs::init failed"); } diff --git a/kernel/sync/futex.cpp b/kernel/sync/futex.cpp new file mode 100644 index 00000000..05008a61 --- /dev/null +++ b/kernel/sync/futex.cpp @@ -0,0 +1,193 @@ +#include "sync/futex.h" +#include "sync/spinlock.h" +#include "sched/sched.h" +#include "sched/task.h" +#include "mm/uaccess.h" +#include "mm/vma.h" +#include "common/hash.h" +#include "common/string.h" +#include "clock/clock.h" +#include "timer/timer.h" + +namespace sync { + +constexpr uint32_t WAKE_BATCH_SIZE = 16; + +__PRIVILEGED_BSS static futex_bucket g_futex_table[FUTEX_BUCKET_COUNT]; + +__PRIVILEGED_CODE void futex_init() { + for (uint32_t i = 0; i < FUTEX_BUCKET_COUNT; i++) { + g_futex_table[i].lock = SPINLOCK_INIT; + g_futex_table[i].waiters.init(); + } +} + +__PRIVILEGED_CODE static uint32_t futex_hash(mm::mm_context* mm, uintptr_t addr) { + uint64_t h = hash::combine(hash::ptr(mm), hash::u64(addr)); + return static_cast(h) & FUTEX_BUCKET_MASK; +} + +__PRIVILEGED_CODE int32_t futex_wait(uintptr_t uaddr, uint32_t expected, + uint64_t timeout_ns) { + sched::task* self = sched::current(); + mm::mm_context* mm = self->exec.mm_ctx; + if (uaddr & 0x3) return -22; // EINVAL + + // Read the value before taking the bucket lock. copy_from_user + // acquires mm_ctx->lock (a sleeping mutex) so it must not be called + // under a spinlock. This also faults in the page so the re-read + // under the spinlock below is safe. + uint32_t pre_val; + if (mm) { + int32_t rc = mm::uaccess::copy_from_user( + &pre_val, reinterpret_cast(uaddr), sizeof(uint32_t)); + if (rc != 0) return -14; // EFAULT + } else { + string::memcpy(&pre_val, reinterpret_cast(uaddr), + sizeof(uint32_t)); + } + + // Early exit: if the value already changed, no need to lock the bucket. + if (pre_val != expected) return -11; // EAGAIN + + uint32_t idx = futex_hash(mm, uaddr); + futex_bucket* bucket = &g_futex_table[idx]; + + futex_waiter waiter; + waiter.task = self; + waiter.mm = mm; + waiter.addr = uaddr; + waiter.link = {}; + + irq_state irq = spin_lock_irqsave(bucket->lock); + + // Re-read the futex word under the bucket lock. The page is already + // validated/faulted by the copy_from_user above, so a direct read + // is safe here. This atomic check-and-enqueue prevents lost wakeups. + uint32_t current_val; + string::memcpy(¤t_val, reinterpret_cast(uaddr), + sizeof(uint32_t)); + + if (current_val != expected) { + spin_unlock_irqrestore(bucket->lock, irq); + return -11; // EAGAIN + } + + self->state = sched::TASK_STATE_BLOCKED; + bucket->waiters.push_back(&waiter); + + if (timeout_ns > 0) { + uint64_t deadline = clock::now_ns() + timeout_ns; + timer::schedule_sleep(self, deadline); + } + + spin_unlock_irqrestore(bucket->lock, irq); + sched::yield(); + + // Cancel any outstanding timer to prevent spurious wakes of future + // blocking operations if we were woken by futex_wake before timeout. + timer::cancel_sleep(self); + + // Remove self from bucket if still linked (timeout or kill wakeup). + bool was_linked = false; + irq = spin_lock_irqsave(bucket->lock); + if (waiter.link.is_linked()) { + bucket->waiters.remove(&waiter); + was_linked = true; + } + spin_unlock_irqrestore(bucket->lock, irq); + + if (sched::is_kill_pending()) return -4; // EINTR + if (was_linked) return -110; // ETIMEDOUT + return 0; +} + +__PRIVILEGED_CODE int32_t futex_wake(uintptr_t uaddr, uint32_t count) { + sched::task* self = sched::current(); + mm::mm_context* mm = self->exec.mm_ctx; + if (uaddr & 0x3) return -22; // EINVAL + if (count == 0) return 0; + + uint32_t idx = futex_hash(mm, uaddr); + futex_bucket* bucket = &g_futex_table[idx]; + + uint32_t total_woken = 0; + + for (;;) { + sched::task* batch[WAKE_BATCH_SIZE]; + uint32_t n = 0; + bool done = false; + + irq_state irq = spin_lock_irqsave(bucket->lock); + + auto it = bucket->waiters.begin(); + auto end = bucket->waiters.end(); + while (it != end && n < WAKE_BATCH_SIZE) { + futex_waiter& w = *it; + ++it; // advance before removal + if (w.mm == mm && w.addr == uaddr) { + bucket->waiters.remove(&w); + batch[n++] = w.task; + if (total_woken + n >= count) { + done = true; + break; + } + } + } + + if (n == 0) done = true; + spin_unlock_irqrestore(bucket->lock, irq); + + for (uint32_t i = 0; i < n; i++) { + sched::wake(batch[i]); + } + total_woken += n; + + if (done) break; + } + + return static_cast(total_woken); +} + +__PRIVILEGED_CODE int32_t futex_wake_all(uintptr_t uaddr) { + sched::task* self = sched::current(); + mm::mm_context* mm = self->exec.mm_ctx; + if (uaddr & 0x3) return -22; // EINVAL + + uint32_t idx = futex_hash(mm, uaddr); + futex_bucket* bucket = &g_futex_table[idx]; + + uint32_t total_woken = 0; + + for (;;) { + sched::task* batch[WAKE_BATCH_SIZE]; + uint32_t n = 0; + + irq_state irq = spin_lock_irqsave(bucket->lock); + + auto it = bucket->waiters.begin(); + auto end = bucket->waiters.end(); + while (it != end && n < WAKE_BATCH_SIZE) { + futex_waiter& w = *it; + ++it; + if (w.mm == mm && w.addr == uaddr) { + bucket->waiters.remove(&w); + batch[n++] = w.task; + } + } + + bool drained = (n == 0); + spin_unlock_irqrestore(bucket->lock, irq); + + for (uint32_t i = 0; i < n; i++) { + sched::wake(batch[i]); + } + total_woken += n; + + if (drained) break; + } + + return static_cast(total_woken); +} + +} // namespace sync diff --git a/kernel/sync/futex.h b/kernel/sync/futex.h new file mode 100644 index 00000000..3b8f873b --- /dev/null +++ b/kernel/sync/futex.h @@ -0,0 +1,56 @@ +#ifndef STELLUX_SYNC_FUTEX_H +#define STELLUX_SYNC_FUTEX_H + +#include "common/types.h" +#include "common/list.h" +#include "sync/spinlock.h" + +namespace sched { struct task; } +namespace mm { struct mm_context; } + +namespace sync { + +struct futex_waiter { + sched::task* task; + mm::mm_context* mm; + uintptr_t addr; + list::node link; +}; + +struct futex_bucket { + spinlock lock; + list::head waiters; +}; + +constexpr uint32_t FUTEX_BUCKET_COUNT = 256; +constexpr uint32_t FUTEX_BUCKET_MASK = FUTEX_BUCKET_COUNT - 1; + +/** + * Initialize the futex hash table. Call once during boot after sched::init(). + * @note Privilege: **required** + */ +__PRIVILEGED_CODE void futex_init(); + +/** + * Block if *uaddr == expected. timeout_ns=0 means wait indefinitely. + * Returns 0 on wake, -EAGAIN on mismatch, -ETIMEDOUT, -EINTR, -EFAULT. + * @note Privilege: **required** + */ +__PRIVILEGED_CODE int32_t futex_wait(uintptr_t uaddr, uint32_t expected, + uint64_t timeout_ns); + +/** + * Wake up to count threads waiting on uaddr. Returns number woken. + * @note Privilege: **required** + */ +__PRIVILEGED_CODE int32_t futex_wake(uintptr_t uaddr, uint32_t count); + +/** + * Wake all threads waiting on uaddr. Returns number woken. + * @note Privilege: **required** + */ +__PRIVILEGED_CODE int32_t futex_wake_all(uintptr_t uaddr); + +} // namespace sync + +#endif // STELLUX_SYNC_FUTEX_H diff --git a/kernel/syscall/handlers/sys_futex.cpp b/kernel/syscall/handlers/sys_futex.cpp new file mode 100644 index 00000000..7472e36a --- /dev/null +++ b/kernel/syscall/handlers/sys_futex.cpp @@ -0,0 +1,19 @@ +#include "syscall/handlers/sys_futex.h" +#include "sync/futex.h" + +DEFINE_SYSCALL3(futex_wait, uaddr, expected, timeout_ns) { + return sync::futex_wait( + static_cast(uaddr), + static_cast(expected), + timeout_ns); +} + +DEFINE_SYSCALL2(futex_wake, uaddr, count) { + return sync::futex_wake( + static_cast(uaddr), + static_cast(count)); +} + +DEFINE_SYSCALL1(futex_wake_all, uaddr) { + return sync::futex_wake_all(static_cast(uaddr)); +} diff --git a/kernel/syscall/handlers/sys_futex.h b/kernel/syscall/handlers/sys_futex.h new file mode 100644 index 00000000..9a456c69 --- /dev/null +++ b/kernel/syscall/handlers/sys_futex.h @@ -0,0 +1,10 @@ +#ifndef STELLUX_SYSCALL_HANDLERS_SYS_FUTEX_H +#define STELLUX_SYSCALL_HANDLERS_SYS_FUTEX_H + +#include "syscall/syscall_table.h" + +DECLARE_SYSCALL(futex_wait); +DECLARE_SYSCALL(futex_wake); +DECLARE_SYSCALL(futex_wake_all); + +#endif // STELLUX_SYSCALL_HANDLERS_SYS_FUTEX_H diff --git a/kernel/syscall/syscall.h b/kernel/syscall/syscall.h index 663b6087..f59a10e7 100644 --- a/kernel/syscall/syscall.h +++ b/kernel/syscall/syscall.h @@ -26,6 +26,11 @@ constexpr uint64_t SYS_PROC_KILL_TID = 1018; // PTY constexpr uint64_t SYS_PTY_CREATE = 1020; +// Futex +constexpr uint64_t SYS_FUTEX_WAIT = 1030; +constexpr uint64_t SYS_FUTEX_WAKE = 1031; +constexpr uint64_t SYS_FUTEX_WAKE_ALL = 1032; + /** * Architecture-specific syscall initialization (MSRs on x86, etc.) * @note Privilege: **required** diff --git a/kernel/syscall/syscall_table.cpp b/kernel/syscall/syscall_table.cpp index cb3d3897..bd5531e2 100644 --- a/kernel/syscall/syscall_table.cpp +++ b/kernel/syscall/syscall_table.cpp @@ -19,6 +19,7 @@ #include "syscall/handlers/sys_select.h" #include "syscall/handlers/sys_pipe.h" #include "syscall/handlers/sys_uname.h" +#include "syscall/handlers/sys_futex.h" namespace syscall { @@ -112,6 +113,10 @@ __PRIVILEGED_CODE void init_syscall_table() { REGISTER_SYSCALL(SYS_PTY_CREATE, pty_create); + REGISTER_SYSCALL(SYS_FUTEX_WAIT, futex_wait); + REGISTER_SYSCALL(SYS_FUTEX_WAKE, futex_wake); + REGISTER_SYSCALL(SYS_FUTEX_WAKE_ALL, futex_wake_all); + register_arch_syscalls(); } diff --git a/kernel/syscall/syscall_table.h b/kernel/syscall/syscall_table.h index 79abee74..ee1fcab5 100644 --- a/kernel/syscall/syscall_table.h +++ b/kernel/syscall/syscall_table.h @@ -42,6 +42,7 @@ constexpr int64_t EAFNOSUPPORT = -97; constexpr int64_t EADDRINUSE = -98; constexpr int64_t EISCONN = -106; constexpr int64_t ENOTCONN = -107; +constexpr int64_t ETIMEDOUT = -110; constexpr int64_t ECONNREFUSED = -111; extern handler_t g_syscall_table[MAX_SYSCALL_NUM]; diff --git a/kernel/tests/sync/futex.test.cpp b/kernel/tests/sync/futex.test.cpp new file mode 100644 index 00000000..af410415 --- /dev/null +++ b/kernel/tests/sync/futex.test.cpp @@ -0,0 +1,331 @@ +#define STLX_TEST_TIER TIER_SCHED + +#include "stlx_unit_test.h" +#include "helpers.h" +#include "sched/sched.h" +#include "sched/task.h" +#include "sync/futex.h" +#include "clock/clock.h" +#include "dynpriv/dynpriv.h" + +using test_helpers::spin_wait; +using test_helpers::spin_wait_ge; +using test_helpers::brief_delay; + +TEST_SUITE(futex); + +// --- wait returns EAGAIN on value mismatch --- + +static volatile uint32_t g_mismatch_val = 42; + +TEST(futex, wait_eagain_on_mismatch) { + g_mismatch_val = 42; + int32_t rc = 0; + RUN_ELEVATED({ + rc = sync::futex_wait( + reinterpret_cast(&g_mismatch_val), 99, 0); + }); + EXPECT_EQ(rc, static_cast(-11)); // EAGAIN +} + +// --- wake with no waiters returns 0 --- + +static volatile uint32_t g_nowait_val = 0; + +TEST(futex, wake_no_waiters) { + g_nowait_val = 0; + int32_t rc = 0; + RUN_ELEVATED({ + rc = sync::futex_wake( + reinterpret_cast(&g_nowait_val), 1); + }); + EXPECT_EQ(rc, static_cast(0)); +} + +// --- basic wait and wake --- + +static volatile uint32_t g_basic_val = 0; +static volatile uint32_t g_basic_waiting = 0; +static volatile uint32_t g_basic_woken = 0; + +static void basic_waiter_fn(void*) { + __atomic_store_n(&g_basic_waiting, 1, __ATOMIC_RELEASE); + RUN_ELEVATED({ + sync::futex_wait( + reinterpret_cast(&g_basic_val), 0, 0); + }); + __atomic_store_n(&g_basic_woken, 1, __ATOMIC_RELEASE); + sched::exit(0); +} + +TEST(futex, basic_wait_and_wake) { + g_basic_val = 0; + g_basic_waiting = 0; + g_basic_woken = 0; + + RUN_ELEVATED({ + sched::task* t = sched::create_kernel_task( + basic_waiter_fn, nullptr, "ftx_basic"); + ASSERT_NOT_NULL(t); + sched::enqueue(t); + }); + + ASSERT_TRUE(spin_wait(&g_basic_waiting)); + brief_delay(); + + __atomic_store_n(&g_basic_val, 1, __ATOMIC_RELEASE); + RUN_ELEVATED({ + sync::futex_wake( + reinterpret_cast(&g_basic_val), 1); + }); + + EXPECT_TRUE(spin_wait(&g_basic_woken)); +} + +// --- wake respects count --- + +constexpr uint32_t WAKE_N_TASKS = 4; +static volatile uint32_t g_wn_val = 0; +static volatile uint32_t g_wn_ready = 0; +static volatile uint32_t g_wn_woken = 0; + +static void wake_n_waiter_fn(void*) { + __atomic_fetch_add(&g_wn_ready, 1, __ATOMIC_ACQ_REL); + RUN_ELEVATED({ + sync::futex_wait( + reinterpret_cast(&g_wn_val), 0, 0); + }); + __atomic_fetch_add(&g_wn_woken, 1, __ATOMIC_ACQ_REL); + sched::exit(0); +} + +TEST(futex, wake_count_respected) { + g_wn_val = 0; + g_wn_ready = 0; + g_wn_woken = 0; + + RUN_ELEVATED({ + for (uint32_t i = 0; i < WAKE_N_TASKS; i++) { + sched::task* t = sched::create_kernel_task( + wake_n_waiter_fn, nullptr, "ftx_wn"); + ASSERT_NOT_NULL(t); + sched::enqueue(t); + } + }); + + ASSERT_TRUE(spin_wait_ge(&g_wn_ready, WAKE_N_TASKS)); + brief_delay(); + + int32_t woken = 0; + RUN_ELEVATED({ + woken = sync::futex_wake( + reinterpret_cast(&g_wn_val), 2); + }); + EXPECT_EQ(woken, static_cast(2)); + + ASSERT_TRUE(spin_wait_ge(&g_wn_woken, 2)); + brief_delay(); + EXPECT_EQ(__atomic_load_n(&g_wn_woken, __ATOMIC_ACQUIRE), 2u); + + int32_t rest = 0; + RUN_ELEVATED({ + rest = sync::futex_wake_all( + reinterpret_cast(&g_wn_val)); + }); + EXPECT_EQ(rest, static_cast(2)); + + ASSERT_TRUE(spin_wait_ge(&g_wn_woken, WAKE_N_TASKS)); +} + +// --- wait with timeout --- + +static volatile uint32_t g_timeout_val = 0; +static volatile int32_t g_timeout_rc = 0; +static volatile uint64_t g_timeout_elapsed = 0; +static volatile uint32_t g_timeout_done = 0; + +static void timeout_waiter_fn(void*) { + uint64_t before = clock::now_ns(); + RUN_ELEVATED({ + g_timeout_rc = sync::futex_wait( + reinterpret_cast(&g_timeout_val), 0, + 50000000ULL); // 50ms + }); + __atomic_store_n(&g_timeout_elapsed, + clock::now_ns() - before, __ATOMIC_RELEASE); + __atomic_store_n(&g_timeout_done, 1, __ATOMIC_RELEASE); + sched::exit(0); +} + +TEST(futex, wait_timeout) { + g_timeout_val = 0; + g_timeout_rc = 0; + g_timeout_elapsed = 0; + g_timeout_done = 0; + + RUN_ELEVATED({ + sched::task* t = sched::create_kernel_task( + timeout_waiter_fn, nullptr, "ftx_tmo"); + ASSERT_NOT_NULL(t); + sched::enqueue(t); + }); + + ASSERT_TRUE(spin_wait(&g_timeout_done)); + EXPECT_EQ(static_cast(g_timeout_rc), + static_cast(-110)); // ETIMEDOUT + EXPECT_GE(g_timeout_elapsed, 10000000ULL); +} + +// --- killed thread unblocks --- + +static volatile uint32_t g_kill_val = 0; +static volatile uint32_t g_kill_entered = 0; +static sched::task* g_kill_task = nullptr; + +static void kill_waiter_fn(void*) { + __atomic_store_n(&g_kill_entered, 1, __ATOMIC_RELEASE); + RUN_ELEVATED({ + sync::futex_wait( + reinterpret_cast(&g_kill_val), 0, 0); + }); + sched::exit(0); +} + +TEST(futex, killed_thread_unblocks) { + g_kill_val = 0; + g_kill_entered = 0; + g_kill_task = nullptr; + + RUN_ELEVATED({ + g_kill_task = sched::create_kernel_task( + kill_waiter_fn, nullptr, "ftx_kill"); + ASSERT_NOT_NULL(g_kill_task); + sched::enqueue(g_kill_task); + }); + + ASSERT_TRUE(spin_wait(&g_kill_entered)); + brief_delay(); + + RUN_ELEVATED({ + sched::force_wake_for_kill(g_kill_task); + }); + + // Allow time for the task to die and be reaped + brief_delay(); + brief_delay(); + EXPECT_TRUE(true); +} + +// --- wake_all wakes everyone --- + +constexpr uint32_t WALL_TASKS = 8; +static volatile uint32_t g_wall_val = 0; +static volatile uint32_t g_wall_ready = 0; +static volatile uint32_t g_wall_woken = 0; + +static void wake_all_waiter_fn(void*) { + __atomic_fetch_add(&g_wall_ready, 1, __ATOMIC_ACQ_REL); + RUN_ELEVATED({ + sync::futex_wait( + reinterpret_cast(&g_wall_val), 0, 0); + }); + __atomic_fetch_add(&g_wall_woken, 1, __ATOMIC_ACQ_REL); + sched::exit(0); +} + +TEST(futex, wake_all_wakes_everyone) { + g_wall_val = 0; + g_wall_ready = 0; + g_wall_woken = 0; + + RUN_ELEVATED({ + for (uint32_t i = 0; i < WALL_TASKS; i++) { + sched::task* t = sched::create_kernel_task( + wake_all_waiter_fn, nullptr, "ftx_wall"); + ASSERT_NOT_NULL(t); + sched::enqueue(t); + } + }); + + ASSERT_TRUE(spin_wait_ge(&g_wall_ready, WALL_TASKS)); + brief_delay(); + + int32_t woken = 0; + RUN_ELEVATED({ + woken = sync::futex_wake_all( + reinterpret_cast(&g_wall_val)); + }); + EXPECT_EQ(woken, static_cast(WALL_TASKS)); + EXPECT_TRUE(spin_wait_ge(&g_wall_woken, WALL_TASKS)); +} + +// --- different addresses are independent --- + +static volatile uint32_t g_ind_val_a = 0; +static volatile uint32_t g_ind_val_b = 0; +static volatile uint32_t g_ind_ready_a = 0; +static volatile uint32_t g_ind_ready_b = 0; +static volatile uint32_t g_ind_woken_a = 0; +static volatile uint32_t g_ind_woken_b = 0; + +static void ind_waiter_a_fn(void*) { + __atomic_store_n(&g_ind_ready_a, 1, __ATOMIC_RELEASE); + RUN_ELEVATED({ + sync::futex_wait( + reinterpret_cast(&g_ind_val_a), 0, 0); + }); + __atomic_store_n(&g_ind_woken_a, 1, __ATOMIC_RELEASE); + sched::exit(0); +} + +static void ind_waiter_b_fn(void*) { + __atomic_store_n(&g_ind_ready_b, 1, __ATOMIC_RELEASE); + RUN_ELEVATED({ + sync::futex_wait( + reinterpret_cast(&g_ind_val_b), 0, 0); + }); + __atomic_store_n(&g_ind_woken_b, 1, __ATOMIC_RELEASE); + sched::exit(0); +} + +TEST(futex, different_addresses_independent) { + g_ind_val_a = 0; + g_ind_val_b = 0; + g_ind_ready_a = 0; + g_ind_ready_b = 0; + g_ind_woken_a = 0; + g_ind_woken_b = 0; + + RUN_ELEVATED({ + sched::task* ta = sched::create_kernel_task( + ind_waiter_a_fn, nullptr, "ftx_ind_a"); + sched::task* tb = sched::create_kernel_task( + ind_waiter_b_fn, nullptr, "ftx_ind_b"); + ASSERT_NOT_NULL(ta); + ASSERT_NOT_NULL(tb); + sched::enqueue(ta); + sched::enqueue(tb); + }); + + ASSERT_TRUE(spin_wait(&g_ind_ready_a)); + ASSERT_TRUE(spin_wait(&g_ind_ready_b)); + brief_delay(); + + // Wake only address A + RUN_ELEVATED({ + sync::futex_wake( + reinterpret_cast(&g_ind_val_a), 1); + }); + + ASSERT_TRUE(spin_wait(&g_ind_woken_a)); + brief_delay(); + EXPECT_EQ(__atomic_load_n(&g_ind_woken_b, __ATOMIC_ACQUIRE), 0u); + + // Now wake B + RUN_ELEVATED({ + sync::futex_wake( + reinterpret_cast(&g_ind_val_b), 1); + }); + + EXPECT_TRUE(spin_wait(&g_ind_woken_b)); +}