From ed267be64a792ed3e917916ad8b76adbfe735059 Mon Sep 17 00:00:00 2001 From: Djordje Nedic Date: Sun, 22 Feb 2026 22:59:21 +0100 Subject: [PATCH 1/2] performance(mpmc queue): Make the MPMC Queue use a single slot access counter This moves to a design that encodes the push and pop count of a slot to one counter, increasing performance. The fullness of the slot is encoded in the parity, while the "our turn" mechanic still exists through the divison by 2. --- lockfree/mpmc/queue.hpp | 16 ++++++++++++---- lockfree/mpmc/queue_impl.hpp | 32 ++++++++++++++++---------------- 2 files changed, 28 insertions(+), 20 deletions(-) mode change 100755 => 100644 lockfree/mpmc/queue.hpp diff --git a/lockfree/mpmc/queue.hpp b/lockfree/mpmc/queue.hpp old mode 100755 new mode 100644 index 10e91c6..3a94af3 --- a/lockfree/mpmc/queue.hpp +++ b/lockfree/mpmc/queue.hpp @@ -86,10 +86,18 @@ template class Queue { private: struct Slot { T val; - std::atomic_size_t pop_count; - std::atomic_size_t push_count; - - Slot() : pop_count(0U), push_count(0U) {} + /** + * Counts all pushes and pops performed on this slot. + * Parity encodes the current state, while the value encodes the + * revolution: + * Even (2*R) - equal pushes and pops (R each) - EMPTY, + * ready for the R-th push. + * Odd (2*R+1) - one more push than pop - FULL, ready for the + * R-th pop. + */ + std::atomic_size_t access_count; + + Slot() : access_count(0U) {} }; /********************** PRIVATE MEMBERS ***********************/ diff --git a/lockfree/mpmc/queue_impl.hpp b/lockfree/mpmc/queue_impl.hpp index 324a548..874b7ef 100644 --- a/lockfree/mpmc/queue_impl.hpp +++ b/lockfree/mpmc/queue_impl.hpp @@ -49,25 +49,25 @@ template bool Queue::Push(const T &element) { while (true) { const size_t index = w_count % size; - const size_t push_count = - _data[index].push_count.load(std::memory_order_acquire); - const size_t pop_count = - _data[index].pop_count.load(std::memory_order_relaxed); + const size_t access_count = + _data[index].access_count.load(std::memory_order_acquire); - if (push_count != pop_count) { + /* Odd access_count means one more push than pop — the slot is full. */ + if (access_count % 2 != 0) { return false; } const size_t revolution_count = w_count / size; - const bool our_turn = revolution_count == push_count; + const bool our_turn = access_count / 2 == revolution_count; if (our_turn) { /* Try to acquire the slot by bumping the monotonic write counter */ if (_w_count.compare_exchange_weak(w_count, w_count + 1U, std::memory_order_relaxed)) { _data[index].val = element; - _data[index].push_count.store(push_count + 1U, - std::memory_order_release); + /* Advance to the next odd value — marks the slot as full */ + _data[index].access_count.store(access_count + 1U, + std::memory_order_release); return true; } } else { @@ -82,25 +82,25 @@ template bool Queue::Pop(T &element) { while (true) { const size_t index = r_count % size; - const size_t pop_count = - _data[index].pop_count.load(std::memory_order_acquire); - const size_t push_count = - _data[index].push_count.load(std::memory_order_relaxed); + const size_t access_count = + _data[index].access_count.load(std::memory_order_acquire); - if (pop_count == push_count) { + /* Even access_count means equal pushes and pops — the slot is empty. */ + if (access_count % 2 == 0) { return false; } const size_t revolution_count = r_count / size; - const bool our_turn = revolution_count == pop_count; + const bool our_turn = access_count / 2 == revolution_count; if (our_turn) { /* Try to acquire the slot by bumping the monotonic read counter. */ if (_r_count.compare_exchange_weak(r_count, r_count + 1U, std::memory_order_relaxed)) { element = _data[index].val; - _data[index].pop_count.store(pop_count + 1U, - std::memory_order_release); + /* Advance to the next even value — marks the slot as empty */ + _data[index].access_count.store(access_count + 1U, + std::memory_order_release); return true; } } else { From a9dce4069fa59230e1f7f97d26af19b1ce523042 Mon Sep 17 00:00:00 2001 From: Djordje Nedic Date: Mon, 23 Feb 2026 22:56:45 +0100 Subject: [PATCH 2/2] fix(mpmc queue): Fix the our_turn check when the monotonic indexes wrap This makes the access count and the revolution count in the same range, ensuring index wrapping does not cause a deadlock for slot acquisition. A consequence of this is that buffer sizes now must be powers of 2. --- lockfree/mpmc/queue.hpp | 5 +++++ lockfree/mpmc/queue_impl.hpp | 6 ++++-- tests/mpmc/priority_queue.cpp | 8 ++++---- tests/mpmc/queue.cpp | 11 +++++------ 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/lockfree/mpmc/queue.hpp b/lockfree/mpmc/queue.hpp index 3a94af3..f4d0bbb 100644 --- a/lockfree/mpmc/queue.hpp +++ b/lockfree/mpmc/queue.hpp @@ -55,6 +55,7 @@ namespace mpmc { template class Queue { static_assert(std::is_trivial::value, "The type T must be trivial"); static_assert(size > 2, "Buffer size must be bigger than 2"); + static_assert((size & (size - 1)) == 0, "Buffer size must be a power of 2"); /********************** PUBLIC METHODS ************************/ public: @@ -102,6 +103,10 @@ template class Queue { /********************** PRIVATE MEMBERS ***********************/ private: + /* Clips access_count / 2 to the same range as revolution_count, + * keeping the our_turn check correct through counter wrap-around. */ + static constexpr size_t _revolution_mask = ~size_t(0) / size; + Slot _data[size]; /**< Data array */ #if LOCKFREE_CACHE_COHERENT alignas(LOCKFREE_CACHELINE_LENGTH) diff --git a/lockfree/mpmc/queue_impl.hpp b/lockfree/mpmc/queue_impl.hpp index 874b7ef..439d64e 100644 --- a/lockfree/mpmc/queue_impl.hpp +++ b/lockfree/mpmc/queue_impl.hpp @@ -58,7 +58,8 @@ template bool Queue::Push(const T &element) { } const size_t revolution_count = w_count / size; - const bool our_turn = access_count / 2 == revolution_count; + const size_t slot_revolution = (access_count / 2) & _revolution_mask; + const bool our_turn = slot_revolution == revolution_count; if (our_turn) { /* Try to acquire the slot by bumping the monotonic write counter */ @@ -91,7 +92,8 @@ template bool Queue::Pop(T &element) { } const size_t revolution_count = r_count / size; - const bool our_turn = access_count / 2 == revolution_count; + const size_t slot_revolution = (access_count / 2) & _revolution_mask; + const bool our_turn = slot_revolution == revolution_count; if (our_turn) { /* Try to acquire the slot by bumping the monotonic read counter. */ diff --git a/tests/mpmc/priority_queue.cpp b/tests/mpmc/priority_queue.cpp index 7f2c7f0..ab76ce0 100644 --- a/tests/mpmc/priority_queue.cpp +++ b/tests/mpmc/priority_queue.cpp @@ -7,7 +7,7 @@ TEST_CASE("mpmc::PriorityQueue - Write to empty, lowest priority and read back", "[mpmc_pq_write_empty_lowest]") { - lockfree::mpmc::PriorityQueue queue; + lockfree::mpmc::PriorityQueue queue; bool const push_success = queue.Push(-1024, 0); REQUIRE(push_success); @@ -21,7 +21,7 @@ TEST_CASE("mpmc::PriorityQueue - Write to empty, lowest priority and read back", TEST_CASE( "mpmc::PriorityQueue - Write to empty, highest priority and read back", "[mpmc_pq_write_empty_highest]") { - lockfree::mpmc::PriorityQueue queue; + lockfree::mpmc::PriorityQueue queue; bool const push_success = queue.Push(-1024, 2); REQUIRE(push_success); @@ -36,7 +36,7 @@ TEST_CASE("mpmc::PriorityQueue - Write multiple with different priority and " "read back ensuring " "proper sequence", "[mpmc_pq_write_multiple_read_multiple]") { - lockfree::mpmc::PriorityQueue queue; + lockfree::mpmc::PriorityQueue queue; bool push_success = queue.Push(256, 2); REQUIRE(push_success); @@ -69,7 +69,7 @@ TEST_CASE("mpmc::PriorityQueue - Write multiple with different priority and " } TEST_CASE("mpmc::PriorityQueue - Optional API", "[mpmc_pq_optional_api]") { - lockfree::mpmc::PriorityQueue queue; + lockfree::mpmc::PriorityQueue queue; bool const push_success = queue.Push(-1024, 0); REQUIRE(push_success); diff --git a/tests/mpmc/queue.cpp b/tests/mpmc/queue.cpp index 3c9399a..5b9338b 100644 --- a/tests/mpmc/queue.cpp +++ b/tests/mpmc/queue.cpp @@ -7,7 +7,7 @@ TEST_CASE("mpmc::Queue - Write to empty and read back", "[mpmc_q_write_empty]") { - lockfree::mpmc::Queue queue; + lockfree::mpmc::Queue queue; bool const push_success = queue.Push(-1024); REQUIRE(push_success); @@ -19,7 +19,7 @@ TEST_CASE("mpmc::Queue - Write to empty and read back", } TEST_CASE("mpmc::Queue - Read empty", "[mpmc_q_read_empty]") { - lockfree::mpmc::Queue queue; + lockfree::mpmc::Queue queue; uint8_t read = 0; bool const pop_success = queue.Pop(read); @@ -27,20 +27,19 @@ TEST_CASE("mpmc::Queue - Read empty", "[mpmc_q_read_empty]") { } TEST_CASE("mpmc::Queue - Write full", "[mpmc_q_write_full]") { - lockfree::mpmc::Queue queue; + lockfree::mpmc::Queue queue; bool push_success = queue.Push(1U); push_success = queue.Push(1U); push_success = queue.Push(2U); push_success = queue.Push(3U); - push_success = queue.Push(5U); push_success = queue.Push(6U); REQUIRE(!push_success); } TEST_CASE("mpmc::Queue - Write multiple to empty and read back", "[mpmc_q_write_empty_multiple]") { - lockfree::mpmc::Queue queue; + lockfree::mpmc::Queue queue; bool push_success = queue.Push(2.7183F); REQUIRE(push_success); @@ -83,7 +82,7 @@ TEST_CASE("mpmc::Queue - Write with overflow and read back from start", } TEST_CASE("mpmc::Queue - Optional API", "[mpmc_q_optional_api]") { - lockfree::mpmc::Queue queue; + lockfree::mpmc::Queue queue; REQUIRE(!queue.PopOptional()); queue.Push(-1024);