Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions lockfree/mpmc/queue.hpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ namespace mpmc {
template <typename T, size_t size> class Queue {
static_assert(std::is_trivial<T>::value, "The type T must be trivial");
static_assert(size > 2, "Buffer size must be bigger than 2");
static_assert((size & (size - 1)) == 0, "Buffer size must be a power of 2");

/********************** PUBLIC METHODS ************************/
public:
Expand Down Expand Up @@ -86,14 +87,26 @@ template <typename T, size_t size> class Queue {
private:
struct Slot {
T val;
std::atomic_size_t pop_count;
std::atomic_size_t push_count;

Slot() : pop_count(0U), push_count(0U) {}
/**
* Counts all pushes and pops performed on this slot.
* Parity encodes the current state, while the value encodes the
* revolution:
* Even (2*R) - equal pushes and pops (R each) - EMPTY,
* ready for the R-th push.
* Odd (2*R+1) - one more push than pop - FULL, ready for the
* R-th pop.
*/
std::atomic_size_t access_count;

Slot() : access_count(0U) {}
};

/********************** PRIVATE MEMBERS ***********************/
private:
/* Clips access_count / 2 to the same range as revolution_count,
* keeping the our_turn check correct through counter wrap-around. */
static constexpr size_t _revolution_mask = ~size_t(0) / size;

Slot _data[size]; /**< Data array */
#if LOCKFREE_CACHE_COHERENT
alignas(LOCKFREE_CACHELINE_LENGTH)
Expand Down
34 changes: 18 additions & 16 deletions lockfree/mpmc/queue_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,26 @@ template <typename T, size_t size> bool Queue<T, size>::Push(const T &element) {
while (true) {
const size_t index = w_count % size;

const size_t push_count =
_data[index].push_count.load(std::memory_order_acquire);
const size_t pop_count =
_data[index].pop_count.load(std::memory_order_relaxed);
const size_t access_count =
_data[index].access_count.load(std::memory_order_acquire);

if (push_count != pop_count) {
/* Odd access_count means one more push than pop — the slot is full. */
if (access_count % 2 != 0) {
return false;
}

const size_t revolution_count = w_count / size;
const bool our_turn = revolution_count == push_count;
const size_t slot_revolution = (access_count / 2) & _revolution_mask;
const bool our_turn = slot_revolution == revolution_count;

if (our_turn) {
/* Try to acquire the slot by bumping the monotonic write counter */
if (_w_count.compare_exchange_weak(w_count, w_count + 1U,
std::memory_order_relaxed)) {
_data[index].val = element;
_data[index].push_count.store(push_count + 1U,
std::memory_order_release);
/* Advance to the next odd value — marks the slot as full */
_data[index].access_count.store(access_count + 1U,
std::memory_order_release);
return true;
}
} else {
Expand All @@ -82,25 +83,26 @@ template <typename T, size_t size> bool Queue<T, size>::Pop(T &element) {
while (true) {
const size_t index = r_count % size;

const size_t pop_count =
_data[index].pop_count.load(std::memory_order_acquire);
const size_t push_count =
_data[index].push_count.load(std::memory_order_relaxed);
const size_t access_count =
_data[index].access_count.load(std::memory_order_acquire);

if (pop_count == push_count) {
/* Even access_count means equal pushes and pops — the slot is empty. */
if (access_count % 2 == 0) {
return false;
}

const size_t revolution_count = r_count / size;
const bool our_turn = revolution_count == pop_count;
const size_t slot_revolution = (access_count / 2) & _revolution_mask;
const bool our_turn = slot_revolution == revolution_count;

if (our_turn) {
/* Try to acquire the slot by bumping the monotonic read counter. */
if (_r_count.compare_exchange_weak(r_count, r_count + 1U,
std::memory_order_relaxed)) {
element = _data[index].val;
_data[index].pop_count.store(pop_count + 1U,
std::memory_order_release);
/* Advance to the next even value — marks the slot as empty */
_data[index].access_count.store(access_count + 1U,
std::memory_order_release);
return true;
}
} else {
Expand Down
8 changes: 4 additions & 4 deletions tests/mpmc/priority_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

TEST_CASE("mpmc::PriorityQueue - Write to empty, lowest priority and read back",
"[mpmc_pq_write_empty_lowest]") {
lockfree::mpmc::PriorityQueue<int16_t, 20, 3> queue;
lockfree::mpmc::PriorityQueue<int16_t, 32, 3> queue;

bool const push_success = queue.Push(-1024, 0);
REQUIRE(push_success);
Expand All @@ -21,7 +21,7 @@ TEST_CASE("mpmc::PriorityQueue - Write to empty, lowest priority and read back",
TEST_CASE(
"mpmc::PriorityQueue - Write to empty, highest priority and read back",
"[mpmc_pq_write_empty_highest]") {
lockfree::mpmc::PriorityQueue<int16_t, 20, 3> queue;
lockfree::mpmc::PriorityQueue<int16_t, 32, 3> queue;

bool const push_success = queue.Push(-1024, 2);
REQUIRE(push_success);
Expand All @@ -36,7 +36,7 @@ TEST_CASE("mpmc::PriorityQueue - Write multiple with different priority and "
"read back ensuring "
"proper sequence",
"[mpmc_pq_write_multiple_read_multiple]") {
lockfree::mpmc::PriorityQueue<uint64_t, 10, 4> queue;
lockfree::mpmc::PriorityQueue<uint64_t, 16, 4> queue;

bool push_success = queue.Push(256, 2);
REQUIRE(push_success);
Expand Down Expand Up @@ -69,7 +69,7 @@ TEST_CASE("mpmc::PriorityQueue - Write multiple with different priority and "
}

TEST_CASE("mpmc::PriorityQueue - Optional API", "[mpmc_pq_optional_api]") {
lockfree::mpmc::PriorityQueue<int16_t, 20, 3> queue;
lockfree::mpmc::PriorityQueue<int16_t, 32, 3> queue;

bool const push_success = queue.Push(-1024, 0);
REQUIRE(push_success);
Expand Down
11 changes: 5 additions & 6 deletions tests/mpmc/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

TEST_CASE("mpmc::Queue - Write to empty and read back",
"[mpmc_q_write_empty]") {
lockfree::mpmc::Queue<int16_t, 20> queue;
lockfree::mpmc::Queue<int16_t, 32> queue;

bool const push_success = queue.Push(-1024);
REQUIRE(push_success);
Expand All @@ -19,28 +19,27 @@ TEST_CASE("mpmc::Queue - Write to empty and read back",
}

TEST_CASE("mpmc::Queue - Read empty", "[mpmc_q_read_empty]") {
lockfree::mpmc::Queue<uint8_t, 20> queue;
lockfree::mpmc::Queue<uint8_t, 16> queue;

uint8_t read = 0;
bool const pop_success = queue.Pop(read);
REQUIRE(!pop_success);
}

TEST_CASE("mpmc::Queue - Write full", "[mpmc_q_write_full]") {
lockfree::mpmc::Queue<uint8_t, 5> queue;
lockfree::mpmc::Queue<uint8_t, 4> queue;

bool push_success = queue.Push(1U);
push_success = queue.Push(1U);
push_success = queue.Push(2U);
push_success = queue.Push(3U);
push_success = queue.Push(5U);
push_success = queue.Push(6U);
REQUIRE(!push_success);
}

TEST_CASE("mpmc::Queue - Write multiple to empty and read back",
"[mpmc_q_write_empty_multiple]") {
lockfree::mpmc::Queue<float, 20> queue;
lockfree::mpmc::Queue<float, 32> queue;

bool push_success = queue.Push(2.7183F);
REQUIRE(push_success);
Expand Down Expand Up @@ -83,7 +82,7 @@ TEST_CASE("mpmc::Queue - Write with overflow and read back from start",
}

TEST_CASE("mpmc::Queue - Optional API", "[mpmc_q_optional_api]") {
lockfree::mpmc::Queue<uint64_t, 20> queue;
lockfree::mpmc::Queue<uint64_t, 32> queue;

REQUIRE(!queue.PopOptional());
queue.Push(-1024);
Expand Down