diff --git a/src/util/queues.hpp b/src/util/queues.hpp index 07ca5f9..77c231d 100644 --- a/src/util/queues.hpp +++ b/src/util/queues.hpp @@ -1,47 +1,27 @@ #pragma once -#include -#include #include -#include -#include +#include #include #include -#include +#include namespace goggles::util { -/// @brief Single-producer, single-consumer lock-free ring buffer. +/// @brief Single-producer, single-consumer queue with fixed capacity. /// -/// `capacity` must be a power of two. Construction throws on invalid capacity or allocation -/// failure. +/// Capacity must be > 0. Construction throws `std::invalid_argument` on zero capacity. template class SPSCQueue { public: explicit SPSCQueue(size_t capacity) - : m_capacity(capacity), m_buffer_size(capacity * 2), m_capacity_mask(m_buffer_size - 1), - m_buffer(nullptr) { - // Power-of-2 required for efficient modulo via bitwise AND - if (capacity == 0 || (capacity & (capacity - 1)) != 0) { - throw std::invalid_argument("SPSCQueue capacity must be power of 2"); + : m_capacity(capacity), m_buffer(capacity), m_head(0), m_tail(0), m_size(0) { + if (capacity == 0) { + throw std::invalid_argument("SPSCQueue capacity must be > 0"); } - - m_buffer = static_cast(std::aligned_alloc(alignof(T), sizeof(T) * m_buffer_size)); - if (m_buffer == nullptr) { - throw std::bad_alloc(); - } - - m_head.store(0, std::memory_order_relaxed); - m_tail.store(0, std::memory_order_relaxed); } - ~SPSCQueue() { - while (try_pop()) { - } - if (m_buffer) { - std::free(m_buffer); - } - } + ~SPSCQueue() = default; SPSCQueue(const SPSCQueue&) = delete; SPSCQueue& operator=(const SPSCQueue&) = delete; @@ -49,67 +29,58 @@ class SPSCQueue { SPSCQueue& operator=(SPSCQueue&&) = delete; auto try_push(const T& item) -> bool { - const size_t current_head = m_head.load(std::memory_order_relaxed); - const size_t current_tail = m_tail.load(std::memory_order_acquire); - const size_t current_size = (current_head - current_tail) & m_capacity_mask; - if (current_size >= m_capacity) { + std::lock_guard lock(m_mutex); + if (m_size >= m_capacity) { return false; } - - new (&m_buffer[current_head]) T(item); - const size_t next_head = (current_head + 1) & m_capacity_mask; - m_head.store(next_head, std::memory_order_release); + m_buffer[m_head].emplace(item); + m_head = (m_head + 1) % m_capacity; + ++m_size; return true; } auto try_push(T&& item) -> bool { - const size_t current_head = m_head.load(std::memory_order_relaxed); - const size_t current_tail = m_tail.load(std::memory_order_acquire); - const size_t current_size = (current_head - current_tail) & m_capacity_mask; - if (current_size >= m_capacity) { + std::lock_guard lock(m_mutex); + if (m_size >= m_capacity) { return false; } - - new (&m_buffer[current_head]) T(std::move(item)); - const size_t next_head = (current_head + 1) & m_capacity_mask; - m_head.store(next_head, std::memory_order_release); + m_buffer[m_head].emplace(std::move(item)); + m_head = (m_head + 1) % m_capacity; + ++m_size; return true; } auto try_pop() -> std::optional { - const size_t current_tail = m_tail.load(std::memory_order_relaxed); - if (current_tail == m_head.load(std::memory_order_acquire)) { + std::lock_guard lock(m_mutex); + if (m_size == 0) { return std::nullopt; } - - T item = std::move(m_buffer[current_tail]); - m_buffer[current_tail].~T(); - const size_t next_tail = (current_tail + 1) & m_capacity_mask; - m_tail.store(next_tail, std::memory_order_release); + std::optional item = std::move(m_buffer[m_tail]); + m_buffer[m_tail].reset(); + m_tail = (m_tail + 1) % m_capacity; + --m_size; return item; } [[nodiscard]] auto size() const -> size_t { - const size_t current_head = m_head.load(std::memory_order_acquire); - const size_t current_tail = m_tail.load(std::memory_order_acquire); - return (current_head - current_tail) & m_capacity_mask; + std::lock_guard lock(m_mutex); + return m_size; } [[nodiscard]] auto empty() const -> bool { - const size_t current_tail = m_tail.load(std::memory_order_relaxed); - return current_tail == m_head.load(std::memory_order_acquire); + std::lock_guard lock(m_mutex); + return m_size == 0; } [[nodiscard]] auto capacity() const -> size_t { return m_capacity; } private: - alignas(64) std::atomic m_head; - alignas(64) std::atomic m_tail; - const size_t m_capacity; - const size_t m_buffer_size; - const size_t m_capacity_mask; - T* m_buffer; + std::vector> m_buffer; + size_t m_head; + size_t m_tail; + size_t m_size; + mutable std::mutex m_mutex; }; } // namespace goggles::util diff --git a/tests/util/test_queues.cpp b/tests/util/test_queues.cpp index fe72cc1..7414bc1 100644 --- a/tests/util/test_queues.cpp +++ b/tests/util/test_queues.cpp @@ -7,21 +7,28 @@ using namespace goggles::util; TEST_CASE("SPSCQueue construction and basic properties", "[queues]") { - SECTION("Construct with power-of-2 capacity") { + SECTION("Construct with valid capacity") { SPSCQueue queue(8); REQUIRE(queue.capacity() == 8); REQUIRE(queue.size() == 0); } - SECTION("Construct with non-power-of-2 capacity throws") { - REQUIRE_THROWS_AS(SPSCQueue(7), std::invalid_argument); - REQUIRE_THROWS_AS(SPSCQueue(10), std::invalid_argument); + SECTION("Non-power-of-2 capacity is accepted") { + SPSCQueue queue(7); + REQUIRE(queue.capacity() == 7); + + SPSCQueue queue2(10); + REQUIRE(queue2.capacity() == 10); } SECTION("Minimum capacity of 1 works") { SPSCQueue queue(1); REQUIRE(queue.capacity() == 1); } + + SECTION("Zero capacity is rejected") { + REQUIRE_THROWS_AS(SPSCQueue(0), std::invalid_argument); + } } TEST_CASE("SPSCQueue basic operations", "[queues]") { @@ -158,10 +165,6 @@ TEST_CASE("SPSCQueue FIFO ordering", "[queues]") { } TEST_CASE("SPSCQueue edge cases and boundary conditions", "[queues]") { - SECTION("Zero capacity is rejected") { - REQUIRE_THROWS_AS(SPSCQueue(0), std::invalid_argument); - } - SECTION("Capacity 1 handles full/empty correctly") { SPSCQueue queue(1);