diff --git a/executables/referenceApp/configuration/include/systems/IEthernetDriverSystem.h b/executables/referenceApp/configuration/include/systems/IEthernetDriverSystem.h index a3bacd4f99b..b15c976d925 100644 --- a/executables/referenceApp/configuration/include/systems/IEthernetDriverSystem.h +++ b/executables/referenceApp/configuration/include/systems/IEthernetDriverSystem.h @@ -25,6 +25,6 @@ class IEthernetDriverSystem virtual void setGroupcastAddressRecognition(::etl::array const mac) const = 0; virtual bool getLinkStatus(size_t port) = 0; virtual bool writeFrame(struct netif* const aNetif, struct pbuf* const buf) = 0; - virtual ::lwiputils::PbufQueue::Receiver getRx() = 0; + virtual ::lwiputils::PbufQueue& getRx() = 0; }; } // namespace ethernet diff --git a/executables/referenceApp/platforms/posix/main/include/systems/TapEthernetSystem.h b/executables/referenceApp/platforms/posix/main/include/systems/TapEthernetSystem.h index 6eda65e5f8b..f77800029a5 100644 --- a/executables/referenceApp/platforms/posix/main/include/systems/TapEthernetSystem.h +++ b/executables/referenceApp/platforms/posix/main/include/systems/TapEthernetSystem.h @@ -41,10 +41,7 @@ class TapEthernetSystem final bool writeFrame(netif* ni, pbuf* pb) override; - ::lwiputils::PbufQueue::Receiver getRx() override - { - return ::lwiputils::PbufQueue::Receiver(_driver._queue); - } + ::lwiputils::PbufQueue& getRx() override { return _driver._queue; } ::async::ContextType _context; ::async::TimeoutType _rxTimeout; diff --git a/executables/referenceApp/platforms/s32k148evb/main/include/systems/S32K148EvbEthernetSystem.h b/executables/referenceApp/platforms/s32k148evb/main/include/systems/S32K148EvbEthernetSystem.h index 56ccdd4942d..670052a00ff 100644 --- a/executables/referenceApp/platforms/s32k148evb/main/include/systems/S32K148EvbEthernetSystem.h +++ b/executables/referenceApp/platforms/s32k148evb/main/include/systems/S32K148EvbEthernetSystem.h @@ -50,10 +50,7 @@ class S32K148EvbEthernetSystem final return _driver.writeFrame(aNetif, buf); } - ::lwiputils::PbufQueue::Receiver getRx() override - { - return ::lwiputils::PbufQueue::Receiver(_driver._rxBuffers._queue); - } + ::lwiputils::PbufQueue& getRx() override { return _driver._rxBuffers._queue; } ::async::ContextType const _context; ::async::TimeoutType _timeout; diff --git a/libs/bsw/lwipSocket/include/lwipSocket/utils/LwipHelper.h b/libs/bsw/lwipSocket/include/lwipSocket/utils/LwipHelper.h index 1cafd61f409..ce7d6b1271a 100644 --- a/libs/bsw/lwipSocket/include/lwipSocket/utils/LwipHelper.h +++ b/libs/bsw/lwipSocket/include/lwipSocket/utils/LwipHelper.h @@ -10,9 +10,9 @@ #pragma once +#include #include #include -#include extern "C" { @@ -33,7 +33,7 @@ struct RxCustomPbuf void* slot; }; -using PbufQueue = ::util::spsc::Queue; +using PbufQueue = ::etl::queue_spsc_atomic; err_t initNetifDriverParameters(::etl::span const macAddr, netif& lwipNetif); ::ip::IPAddress from_lwipIp(ip_addr_t const& lwipIp); @@ -47,7 +47,7 @@ inline ip_addr_t to_lwipIp(::ip::IPAddress const& ip) } bool processPbufQueue( - ::lwiputils::PbufQueue::Receiver receiver, + ::lwiputils::PbufQueue& receiver, ::etl::span lwnetifs, ::etl::span vlanIds); } // namespace lwiputils diff --git a/libs/bsw/lwipSocket/src/lwipSocket/utils/FilterFrame.cpp b/libs/bsw/lwipSocket/src/lwipSocket/utils/FilterFrame.cpp index f1e464de732..5c8c1a3e0b3 100644 --- a/libs/bsw/lwipSocket/src/lwipSocket/utils/FilterFrame.cpp +++ b/libs/bsw/lwipSocket/src/lwipSocket/utils/FilterFrame.cpp @@ -91,7 +91,7 @@ netif* filterETHFrames( * Input all pbufs to their respective interfaces */ bool processPbufQueue( - ::lwiputils::PbufQueue::Receiver receiver, + ::lwiputils::PbufQueue& receiver, ::etl::span const lwnetifs, ::etl::span const vlanIds) { @@ -101,7 +101,11 @@ bool processPbufQueue( auto const queued = receiver.size(); for (size_t i = 0; i < queued; ++i) { - auto* const p = receiver.read(); + pbuf* p = nullptr; + if (!receiver.pop(p)) + { + break; + } auto* const pNetIf = filterETHFrames(p, lwnetifs, vlanIds); if ((pNetIf != nullptr) && (pNetIf->input != nullptr)) { diff --git a/libs/bsw/util/include/util/spsc/Queue.h b/libs/bsw/util/include/util/spsc/Queue.h deleted file mode 100644 index b7ecb0436e1..00000000000 --- a/libs/bsw/util/include/util/spsc/Queue.h +++ /dev/null @@ -1,254 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2024 Accenture - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - -#pragma once - -#include -#include - -#include - -namespace util -{ -namespace spsc -{ -namespace internal -{ -template -struct TxData; -} - -/** - * Lock-free implementation of a single-producer, single-consumer queue. - * Provides two nested classes "Sender" and "Receiver" to be used on the sending and receiving - * end. - * Both get passed a reference to the queue at construction time. - * - * Optionally a third template parameter can be provided. - * This parameter is a boolean (defaulting to false) that, if true, creates a Queue that - * will keep track of what elements have been consumed by the receiving end. - * In this case the method Queue::Sender::checkConsumed(F) can be used with F being a callable - * that will be called for each element that is already consumed. - */ -template< - class T, - uint16_t N, // N is intentionally a smaller type than size_t used for storing - // the indices, to ensure (2*N) does never overflow. - bool TrackConsumed = false> -class Queue -{ - // Just make sure, in case someone ever compiles this for a 16bit or 8bit target. - static_assert(sizeof(uint16_t) < sizeof(size_t), ""); - - // The tx.sent and rx.received variables are always in the range [0, 2*N] while the ring - // contains N elements. This way there are two distinct constellations where - // tx.sent and rx.received point two the same element: - // 1) tx.sent == rx.received - // 2) tx.sent == rx.received + N - // Using this trick the ambiguity between the empty and full cases of the queue is resolved - // without the need for an unused element in the queue: Case 1) means "empty" and case 2) means - // "full". - // - using TxData = internal::TxData; - - TxData tx; - - struct RxData - { - ::etl::atomic received; - - RxData() : received(0U) {} - }; - - RxData rx; - -public: - using value_type = T; - - Queue() = default; - - void reset() - { - rx.received.store(0U); - tx.reset(); - } - - size_t capacity() const { return N; } - - class Receiver - { - public: - class Read - { - Read(Read const&) = delete; - - Receiver& _r; - - public: - explicit Read(Receiver& r) : _r(r) {} - - T* operator->() const { return &_r.peek(); } - - T& operator*() const { return _r.peek(); } - - ~Read() { _r.advance(); } - }; - - explicit Receiver(Queue& queue) : _txData(queue.tx), _rxData(queue.rx) {} - - bool empty() const - { - size_t const received = _rxData.received.load(); - size_t const sent = _txData.sent.load(); - return sent == received; - } - - T read() { return *Read(*this); } - - void advance() - { - size_t const received = _rxData.received.load(); - _rxData.received.store((received + 1U) % (2U * static_cast(N))); - } - - T& peek() const { return _txData.data[_rxData.received.load() % N]; } - - size_t size() const - { - size_t const received = _rxData.received.load(); - size_t const sent = _txData.sent.load(); - return (sent + ((sent < received) ? 1U : 0U) * (2U * N)) - received; - } - - void clear() - { - while (!empty()) - { - (void)advance(); - } - } - - private: - TxData& _txData; - RxData& _rxData; - }; - - class Sender - { - public: - class Write - { - Write(Write const&) = delete; - - Sender& _s; - - public: - explicit Write(Sender& s) : _s(s) {} - - T* operator->() const { return &_s.next(); } - - T& operator*() const { return _s.next(); } - - ~Write() { _s.write_next(); } - }; - - explicit Sender(Queue& queue) : _rxData(queue.rx), _txData(queue.tx) {} - - void write(T const& value) { *Write(*this) = value; } - - template - void checkConsumed(F const& onConsumed) - { - _txData.check(_rxData, onConsumed); - } - - T& next() const { return _txData.data[_txData.sent.load() % N]; } - - void write_next() - { - size_t const sent = _txData.sent.load(); - _txData.sent.store((sent + 1U) % (2U * static_cast(N))); - } - - size_t size() const - { - size_t const sent = _txData.sent.load(); - size_t const received = _rxData.received.load(); - return (sent + ((sent < received) ? 1U : 0U) * (2U * N)) - received; - } - - bool full() const { return _txData.full(_rxData); } - - private: - RxData const& _rxData; - TxData& _txData; - }; -}; - -namespace internal -{ -template -struct TxData -{ - ::etl::atomic sent; - ::etl::atomic acked; - ::etl::array data; - - TxData() : sent(0U), acked(0U), data() {} - - void reset() - { - sent.store(0U); - acked.store(0U); - } - - template - void check(RxData const& rxData, OnConsumed const& onConsumed) - { - size_t numAcked = acked.load(); - while (numAcked != rxData.received.load()) - { - onConsumed(data[numAcked % N]); - numAcked = (numAcked + 1U) % (2U * N); - acked.store(numAcked); - } - } - - template - bool full(RxData const& /*rxData*/) const - { - size_t const numSent = sent.load(); - size_t const numAcked = acked.load(); - return (numSent == ((numAcked + N) % (2U * N))); - } -}; - -template -struct TxData -{ - ::etl::atomic sent; - ::etl::array data; - - TxData() : sent(0U), data() {} - - void reset() { sent.store(0U); } - - template - bool full(RxData const& rxData) const - { - size_t const numSent = sent.load(); - size_t const numReceived = rxData.received.load(); - return (numSent == (numReceived + N) % (2U * N)); - } -}; -} // namespace internal - -} // namespace spsc -} // namespace util diff --git a/libs/bsw/util/include/util/spsc/ReadWrite.h b/libs/bsw/util/include/util/spsc/ReadWrite.h deleted file mode 100644 index d1e3e5efee2..00000000000 --- a/libs/bsw/util/include/util/spsc/ReadWrite.h +++ /dev/null @@ -1,241 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2024 Accenture - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - -#pragma once - -#include - -namespace util -{ -namespace spsc -{ -/** - * Models read access to an underlying input stream of elements. - * \tparam T Type of elements of underlying input stream. - */ -template -class IReader -{ - IReader& operator=(IReader const&); - -public: - virtual ~IReader() = default; - - /** - * Returns whether there are items to read. - * \note - * Always check that the reader is not empty before reading, - * otherwise behaviour is undefined! - */ - virtual bool empty() const = 0; - - /** - * Returns a read only reference to the next element. - * \note - * If the underlying input is empty, the behaviour is undefined. - */ - virtual T const& peek() const = 0; - - /** - * Advances the underlying input stream by one. - * \note - * No sanity checks are made, thus calling advance() on an empty - * IReader results in undefined behaviour. - */ - virtual void advance() = 0; - - /** - * Returns the current number of items that can be read. - */ - virtual size_t size() const = 0; -}; - -/** - * Reader is input stream abstraction based on a IReader interface. - * \tparam T Type of elements in the input stream. - */ -template -class Reader -{ - IReader& _reader; - -public: - /** - * Constructs a Reader from a given IReader. - */ - Reader(IReader& reader) : _reader(reader) {} - - struct Read - { - explicit Read(IReader& reader) : _reader(reader) {} - - ~Read() { _reader.advance(); } - - T const& operator*() const { return _reader.peek(); } - - T const* operator->() const { return &_reader.peek(); } - - IReader& _reader; - }; - - // This will only work if RVO is done! - Read create() { return Read(_reader); } - - IReader& reader() { return _reader; } - - T read() { return *Read(_reader); } - - template - void read(Consumer const& consumer) - { - consumer(_reader.peek()); - _reader.advance(); - } - - size_t size() const { return _reader.size(); } - - bool empty() const { return _reader.empty(); } -}; - -template -class IWriter -{ - IWriter& operator=(IWriter const&); - -public: - virtual ~IWriter() = default; - - /** - * Returns whether and item can be written. - * - * \note - * Always check that the writer is not full before inserting an element, - * otherwise behaviour is undefined! - */ - virtual bool full() const = 0; - - /** - * Returns a writable reference to the next elements. Call commit after the elements - * has been populated to make it available to the reader. - * - * \note - * The underlying stream must not be full otherwise the behaviour is undefined. - */ - virtual T& next() = 0; - - /** - * Advances the underlying input stream by one. - * - * \note - * No sanity checks are made, thus calling advance() on a full - * IWriter results in undefined behaviour. - */ - virtual void commit() = 0; - - /** - * Writes a given value of type T. - * - * One possible implementation might be: - * \code{.cpp} - * next() = value; - * commit(); - * \endcode - */ - virtual void write(T const& value) = 0; -}; - -template -class Writer -{ - IWriter& _writer; - -public: - Writer(IWriter& writer) : _writer(writer) {} - - struct Write - { - explicit Write(IWriter& writer) : _writer(writer) {} - - ~Write() { _writer.commit(); } - - T& operator*() const { return _writer.next(); } - - T* operator->() const { return &_writer.next(); } - - T* memory() const { return &_writer.next(); } - - IWriter& _writer; - }; - - // This will only work if RVO is done!!! - Write create() { return Write(_writer); } - - ::estd::constructor emplace() { return ::estd::constructor(Write(_writer).memory()); } - - virtual void write(T const& value) - { - _writer.next() = value; - _writer.commit(); - } - - template - void write(Producer const& producer) - { - producer(_writer.next()); - _writer.commit(); - } - - bool full() const { return _writer.full(); } -}; - -template -struct QueueReader : public IReader -{ -public: - using value_type = typename Q::value_type; - - explicit QueueReader(Q& queue) : receiver(queue) {} - - bool empty() const override { return receiver.empty(); } - - value_type const& peek() const override { return receiver.peek(); } - - void advance() override { receiver.advance(); } - - size_t size() const override { return receiver.size(); } - - typename Q::Receiver receiver; -}; - -template -struct QueueWriter : public IWriter -{ -public: - using value_type = typename Q::value_type; - - explicit QueueWriter(Q& queue) : sender(queue) {} - - bool full() const override { return sender.full(); } - - value_type& next() override { return sender.next(); } - - void commit() override { sender.write_next(); } - - void write(value_type const& value) override - { - sender.next() = value; - sender.write_next(); - } - - typename Q::Sender sender; -}; - -} // namespace spsc -} // namespace util diff --git a/libs/bsw/util/test/src/util/spsc/QueueTest.cpp b/libs/bsw/util/test/src/util/spsc/QueueTest.cpp deleted file mode 100644 index c8738c72a0f..00000000000 --- a/libs/bsw/util/test/src/util/spsc/QueueTest.cpp +++ /dev/null @@ -1,800 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2024 Accenture - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - -#include "util/spsc/Queue.h" - -#include "util/spsc/ReadWrite.h" - -#include -#include - -namespace -{ -using namespace ::testing; - -// The tested types can be constructed from an integer. This makes it easier to compare them -// especially in typed tests. - -struct X -{ - X() : x(0) {} - - X(int i) : x(i) {} - - int x; -}; - -bool operator==(X const& lhs, X const& rhs) { return lhs.x == rhs.x; } - -struct SpSc : public ::testing::Test -{ - using IntQueue = util::spsc::Queue; - using StructQueue = util::spsc::Queue; - - IntQueue queue; - IntQueue::Sender sender; - IntQueue::Receiver receiver; - - SpSc() : sender(queue), receiver(queue) {} -}; - -template -struct SpScQueueWriterReader : public ::testing::Test -{ - static size_t const QUEUE_SIZE = 6U; - using Queue = util::spsc::Queue; - - Queue queue; - util::spsc::QueueWriter writer; - util::spsc::QueueReader reader; - - SpScQueueWriterReader() : writer(queue), reader(queue) {} -}; - -template -size_t const SpScQueueWriterReader::QUEUE_SIZE; - -template -struct SpScWriterReader : public ::testing::Test -{ - static size_t const QUEUE_SIZE = 6U; - using Queue = util::spsc::Queue; - Queue queue; - - util::spsc::QueueWriter qWriter; - util::spsc::QueueReader qReader; - - util::spsc::Writer writer; - util::spsc::Reader reader; - - SpScWriterReader() : qWriter(queue), qReader(queue), writer(qWriter), reader(qReader) {} - - size_t insertNumberedElementsUntilFull() - { - size_t insertedElements = 0; - while (!this->writer.full()) - { - T const element(insertedElements); - this->writer.write(element); - ++insertedElements; - } - return insertedElements; - } - - size_t readNumberedElementsUntilEmpty() - { - size_t readElements = 0; - while (!this->reader.empty()) - { - T const readElement(readElements); - T const e = this->reader.read(); - EXPECT_EQ(readElement, e); - ++readElements; - } - return readElements; - } -}; - -template -size_t const SpScWriterReader::QUEUE_SIZE; - -using MyTypes = ::testing::Types; - -TYPED_TEST_SUITE(SpScWriterReader, MyTypes); - -/** - * \desc - * Checks that capacity() of Queue returns the correct value. - */ -TYPED_TEST(SpScWriterReader, Capacity) -{ - EXPECT_EQ(SpScWriterReader::QUEUE_SIZE, this->queue.capacity()); -} - -/** - * \desc - * This test verifies that the Writer is not full after being created. - */ -TYPED_TEST(SpScWriterReader, WriterNotFullAfterCreating) { EXPECT_FALSE(this->writer.full()); } - -/** - * \desc - * This test verifies that the Reader is empty after being created. - */ -TYPED_TEST(SpScWriterReader, ReaderEmptyAfterCreating) { EXPECT_TRUE(this->reader.empty()); } - -/** - * \desc - * Verifies that the Reader sees one element after it has been written and checks that its - * content is equal to the inserted. - */ -TYPED_TEST(SpScWriterReader, WriteAndReadOneElement) -{ - TypeParam const element(10); - this->writer.write(element); - EXPECT_FALSE(this->reader.empty()); - EXPECT_EQ(1U, this->reader.size()); - TypeParam readElement = this->reader.read(); - EXPECT_EQ(element, readElement); - EXPECT_TRUE(this->reader.empty()); -} - -/** - * \desc - * Inserts elements until the Writer returns full and checks that capacity elements were added. - */ -TYPED_TEST(SpScWriterReader, WriteUntilFullAndCheckThatCapacityElementsHaveBeenInserted) -{ - // Fill queue with different elements - auto const insertedElements = this->insertNumberedElementsUntilFull(); - EXPECT_EQ(this->queue.capacity(), insertedElements); -} - -/** - * \desc - * Inserts elements until the Writer returns full and checks that the same number of elements - * can be read and their values match the inserted ones. - */ -TYPED_TEST(SpScWriterReader, WriteUntilFullAndReadElementsUntilEmpty) -{ - auto const insertedElements = this->insertNumberedElementsUntilFull(); - auto const readElements = this->readNumberedElementsUntilEmpty(); - - EXPECT_EQ(insertedElements, readElements); -} - -/** - * \desc - * Verifies that the pointer and reference operators point to the same memory. - * Also the memory() function returns the same address. - */ -TYPED_TEST(SpScWriterReader, WriterWriteAccessOperatorsAccessSameElement) -{ - typename util::spsc::Writer::Write w = this->writer.create(); - TypeParam& element = *w; - EXPECT_EQ(&element, w.operator->()); - EXPECT_EQ(&element, w.memory()); -} - -/** - * \desc - * Verifies that the pointer and reference operators point to the same memory. - */ -TYPED_TEST(SpScWriterReader, ReaderReadAccessOperatorsAccessSameElement) -{ - this->writer.write(TypeParam(17)); - typename util::spsc::Reader::Read r = this->reader.create(); - TypeParam const& element = *r; - EXPECT_EQ(&element, r.operator->()); -} - -/** - * \desc - * Verifies that using a Writer::Write object to fill the queue and a Reader::Read to - * empty it works the same way as using Writer and Reader API directly. - */ -TYPED_TEST(SpScWriterReader, WriterWriteUntilFullAndUseReaderReadUntilEmpty) -{ - // Fill queue with different elements - size_t insertedElements = 0; - while (!this->writer.full()) - { - TypeParam const element(insertedElements); - typename util::spsc::Writer::Write w = this->writer.create(); - *w = element; - ++insertedElements; - } - EXPECT_EQ(this->queue.capacity(), insertedElements); - // read them back - size_t readElements = 0; - while (!this->reader.empty()) - { - TypeParam const readElement(readElements); - typename util::spsc::Reader::Read r = this->reader.create(); - EXPECT_EQ(readElement, *r); - ++readElements; - } - EXPECT_EQ(insertedElements, readElements); -} - -/** - * \desc - * Verifies that using emplace() to insert elements works. - */ -TYPED_TEST(SpScWriterReader, EmplaceUntilFullAndReadElementsUntilEmpty) -{ - // Fill queue with different elements - int insertedElements = 0; - while (!this->writer.full()) - { - this->writer.emplace().construct(insertedElements); - ++insertedElements; - } - int readElements = this->readNumberedElementsUntilEmpty(); - - EXPECT_EQ(insertedElements, readElements); -} - -template -struct Producer -{ - T _i; - - Producer(int i) : _i(i) {} - - void operator()(T& value) const { value = _i; } -}; - -/** - * \desc - * Verifies that using a Producer to insert elements works. - */ -TYPED_TEST(SpScWriterReader, ProduceUntilFullAndReadElementsUntilEmpty) -{ - // Fill queue with different elements - int insertedElements = 0; - while (!this->writer.full()) - { - Producer p(insertedElements); - this->writer.write(p); - ++insertedElements; - } - int readElements = this->readNumberedElementsUntilEmpty(); - - EXPECT_EQ(insertedElements, readElements); -} - -template -struct Consumer -{ - T _i; - - Consumer(int i) : _i(i) {} - - void operator()(T const& value) const { EXPECT_EQ(value, _i); } -}; - -/** - * \desc - * Verifies that using a Consumer to read the elements works. - */ -TYPED_TEST(SpScWriterReader, WriteUntilFullAndUseReadWithConsumerUntilEmpty) -{ - int insertedElements = this->insertNumberedElementsUntilFull(); - // read them back - int readElements = 0; - while (!this->reader.empty()) - { - Consumer c(readElements); - this->reader.read(c); - ++readElements; - } - - EXPECT_EQ(insertedElements, readElements); -} - -TYPED_TEST(SpScWriterReader, WriteUntilFullAndUseReaderUntilEmpty) -{ - int insertedElements = this->insertNumberedElementsUntilFull(); - - ::util::spsc::IReader& r = this->reader.reader(); - // read them back - int readElements = 0; - while (!r.empty()) - { - TypeParam const expected(readElements); - TypeParam const& v = r.peek(); - EXPECT_EQ(expected, v); - r.advance(); - ++readElements; - } - - EXPECT_EQ(insertedElements, readElements); -} - -TYPED_TEST_SUITE(SpScQueueWriterReader, MyTypes); - -/** - * \desc - * This test assures that a QueueWriter is not full after the queue has been created. - */ -TYPED_TEST(SpScQueueWriterReader, WriterNotFullAfterCreating) { EXPECT_FALSE(this->writer.full()); } - -/** - * \desc - * This test assures that a QueueReader is empty after creation. - */ -TYPED_TEST(SpScQueueWriterReader, ReaderEmptyAfterCreating) { EXPECT_TRUE(this->reader.empty()); } - -/** - * \desc - * This test assures that capacity() elements are inserted when a previously empty queue becomes - * full using the write() function to insert elements. - */ -TYPED_TEST(SpScQueueWriterReader, WriterWriteUntilFull) -{ - size_t insertedElements = 0; - while (!this->writer.full()) - { - TypeParam const element(insertedElements); - this->writer.write(element); - ++insertedElements; - } - EXPECT_EQ(this->queue.capacity(), insertedElements); -} - -/** - * \desc - * This test assures that capacity() elements are inserted when a previously empty queue becomes - * full using the next() and commit() function to insert elements. - */ -TYPED_TEST(SpScQueueWriterReader, WriterNextCommitUntilFull) -{ - size_t insertedElements = 0; - while (!this->writer.full()) - { - TypeParam const element(insertedElements); - this->writer.next() = element; - this->writer.commit(); - ++insertedElements; - } - EXPECT_EQ(this->queue.capacity(), insertedElements); -} - -/** - * \desc - * This test assures that capacity() elements are inserted when a previously empty queue becomes - * full using IWriter::write() function to insert elements. - */ -TYPED_TEST(SpScQueueWriterReader, IWriterWriteUntilFull) -{ - size_t insertedElements = 0; - while (!this->writer.full()) - { - TypeParam const element(insertedElements); - util::spsc::IWriter& w = this->writer; - w.write(element); - ++insertedElements; - } - EXPECT_EQ(this->queue.capacity(), insertedElements); -} - -/** - * \desc - * This test verifies that calling size() on a QueueReader always returns the number of elements - * that have been written by a QueueWriter. - */ -TYPED_TEST(SpScQueueWriterReader, ReaderSizeIsNumberOfInsertedElements) -{ - size_t insertedElements = 0; - while (!this->writer.full()) - { - TypeParam const element(insertedElements); - this->writer.write(element); - ++insertedElements; - EXPECT_EQ(insertedElements, this->reader.size()); - } -} - -TEST_F(SpSc, is_initially_empty) { ASSERT_TRUE(receiver.empty()); } - -TEST_F(SpSc, can_be_pushed_and_poped_a_single_element) -{ - sender.write(4); - ASSERT_FALSE(receiver.empty()); - ASSERT_EQ(4, receiver.read()); - ASSERT_TRUE(receiver.empty()); - - sender.write(5); - ASSERT_FALSE(receiver.empty()); - ASSERT_EQ(5, receiver.read()); - ASSERT_TRUE(receiver.empty()); -} - -TEST_F(SpSc, can_be_pushed_and_poped_two_elements) -{ - sender.write(4); - ASSERT_FALSE(receiver.empty()); - sender.write(5); - ASSERT_FALSE(receiver.empty()); - - ASSERT_EQ(4, receiver.read()); - ASSERT_FALSE(receiver.empty()); - - ASSERT_EQ(5, receiver.read()); - ASSERT_TRUE(receiver.empty()); -} - -TEST_F(SpSc, the_Sender_can_be_created_multiple_times) -{ - // Note that while it is possible to create multiple instances of Sender, - // they all have to be used from the same thread. - - sender.write(4); - ASSERT_FALSE(receiver.empty()); - - IntQueue::Sender sender2(queue); - sender2.write(5); - ASSERT_FALSE(receiver.empty()); - - ASSERT_EQ(4, receiver.read()); - ASSERT_FALSE(receiver.empty()); - - ASSERT_EQ(5, receiver.read()); - ASSERT_TRUE(receiver.empty()); -} - -TEST_F(SpSc, the_Reveiver_can_be_created_multiple_times) -{ - // Note that while it is possible to create multiple instances of Receiver, - // they all have to be used from the same thread. - - sender.write(4); - ASSERT_FALSE(receiver.empty()); - sender.write(5); - ASSERT_FALSE(receiver.empty()); - - ASSERT_EQ(4, receiver.read()); - ASSERT_FALSE(receiver.empty()); - - IntQueue::Receiver receiver2(queue); - ASSERT_EQ(5, receiver2.read()); - ASSERT_TRUE(receiver.empty()); - ASSERT_TRUE(receiver2.empty()); -} - -TEST_F(SpSc, the_Reveiver_can_clear_the_queue) -{ - sender.write(4); - ASSERT_FALSE(receiver.empty()); - sender.write(5); - ASSERT_FALSE(receiver.empty()); - - receiver.clear(); - ASSERT_TRUE(receiver.empty()); -} - -TEST_F(SpSc, two_step_write_avoids_copy) -{ - StructQueue queue; - StructQueue::Sender sender(queue); - StructQueue::Receiver receiver(queue); - - X& x = sender.next(); - x.x = 2; - EXPECT_TRUE(receiver.empty()); - sender.write_next(); - EXPECT_FALSE(receiver.empty()); - - X read = receiver.read(); - EXPECT_TRUE(receiver.empty()); - ASSERT_EQ(2, read.x); -} - -TEST_F(SpSc, scoped_write_avoids_copy) -{ - StructQueue queue; - StructQueue::Sender sender(queue); - StructQueue::Receiver receiver(queue); - - { - StructQueue::Sender::Write write(sender); - write->x = 2; - EXPECT_TRUE(receiver.empty()); - } - EXPECT_FALSE(receiver.empty()); - - X read = receiver.read(); - EXPECT_TRUE(receiver.empty()); - ASSERT_EQ(2, read.x); -} - -TEST_F(SpSc, two_step_read_avoids_copy) -{ - StructQueue queue; - StructQueue::Sender sender(queue); - StructQueue::Receiver receiver(queue); - - { - X x; - x.x = 4; - sender.write(x); - } - - X const& x = receiver.peek(); - EXPECT_EQ(4, x.x); - EXPECT_FALSE(receiver.empty()); - - receiver.advance(); - EXPECT_TRUE(receiver.empty()); -} - -TEST_F(SpSc, scoped_read_avoids_copy) -{ - StructQueue queue; - StructQueue::Sender sender(queue); - StructQueue::Receiver receiver(queue); - - X x; - x.x = 4; - sender.write(x); - - { - StructQueue::Receiver::Read read(receiver); - EXPECT_EQ(4, read->x); - EXPECT_EQ(4, (*read).x); - EXPECT_FALSE(receiver.empty()); - } - EXPECT_TRUE(receiver.empty()); -} - -TEST_F(SpSc, can_be_full) -{ - sender.write(1); - ASSERT_FALSE(sender.full()); - sender.write(2); - ASSERT_FALSE(sender.full()); - sender.write(3); - ASSERT_FALSE(sender.full()); - sender.write(4); - ASSERT_FALSE(sender.full()); - sender.write(5); - ASSERT_FALSE(sender.full()); - sender.write(6); - ASSERT_TRUE(sender.full()); - - receiver.read(); - ASSERT_FALSE(sender.full()); -} - -TEST_F(SpSc, overflow) -{ - // This test, and in particular the seemingly arbitrary constant "10" used below only - // makes sense when knowing about the implementation detail of this queue that - // internally the indices are stored in the range [0; 2*N] - // 10 is chosen, because 10 < 2*N < 10+N - for (size_t i = 0; i < 10; ++i) - { - EXPECT_EQ(0U, receiver.size()) << i; - EXPECT_EQ(0U, sender.size()) << i; - sender.write(uint16_t(i)); - EXPECT_EQ(1U, receiver.size()) << i; - EXPECT_EQ(1U, sender.size()) << i; - ASSERT_EQ(uint16_t(i), receiver.read()); - EXPECT_EQ(0U, receiver.size()) << i; - EXPECT_EQ(0U, sender.size()) << i; - } - ASSERT_FALSE(sender.full()); - ASSERT_TRUE(receiver.empty()); - EXPECT_EQ(0U, receiver.size()); - EXPECT_EQ(0U, sender.size()); - - sender.write(252); - ASSERT_FALSE(receiver.empty()); - ASSERT_FALSE(sender.full()); - EXPECT_EQ(1U, receiver.size()); - EXPECT_EQ(1U, sender.size()); - - sender.write(253); - ASSERT_FALSE(receiver.empty()); - ASSERT_FALSE(sender.full()); - EXPECT_EQ(2U, receiver.size()); - EXPECT_EQ(2U, sender.size()); - - sender.write(254); - ASSERT_FALSE(receiver.empty()); - ASSERT_FALSE(sender.full()); - EXPECT_EQ(3U, receiver.size()); - EXPECT_EQ(3U, sender.size()); - - sender.write(255); - ASSERT_FALSE(receiver.empty()); - ASSERT_FALSE(sender.full()); - EXPECT_EQ(4U, receiver.size()); - EXPECT_EQ(4U, sender.size()); - - sender.write(256); - ASSERT_FALSE(receiver.empty()); - ASSERT_FALSE(sender.full()); - EXPECT_EQ(5U, receiver.size()); - EXPECT_EQ(5U, sender.size()); - - sender.write(257); - ASSERT_TRUE(sender.full()); - ASSERT_FALSE(receiver.empty()); - EXPECT_EQ(6U, receiver.size()); - EXPECT_EQ(6U, sender.size()); - - ASSERT_EQ(252, receiver.read()); - EXPECT_EQ(5U, receiver.size()); - EXPECT_EQ(5U, sender.size()); - ASSERT_EQ(253, receiver.read()); - EXPECT_EQ(4U, receiver.size()); - EXPECT_EQ(4U, sender.size()); - ASSERT_EQ(254, receiver.read()); - EXPECT_EQ(3U, receiver.size()); - EXPECT_EQ(3U, sender.size()); - ASSERT_EQ(255, receiver.read()); - EXPECT_EQ(2U, receiver.size()); - EXPECT_EQ(2U, sender.size()); - ASSERT_EQ(256, receiver.read()); - EXPECT_EQ(1U, receiver.size()); - EXPECT_EQ(1U, sender.size()); - ASSERT_EQ(257, receiver.read()); - EXPECT_EQ(0U, receiver.size()); - EXPECT_EQ(0U, sender.size()); - - ASSERT_FALSE(sender.full()); - ASSERT_TRUE(receiver.empty()); -} - -class OnConsumed -{ -public: - MOCK_CONST_METHOD1(called, void(uint16_t)); - - void operator()(uint16_t& x) const { called(x); } -}; - -TEST_F(SpSc, consumed_notification) -{ - OnConsumed onConsumed; - using IntQueueConsumed = util::spsc::Queue; - - IntQueueConsumed queue; - IntQueueConsumed::Sender sender(queue); - IntQueueConsumed::Receiver receiver(queue); - - sender.write(4); - sender.write(2); - ASSERT_FALSE(receiver.empty()); - ASSERT_EQ(4, receiver.read()); - ASSERT_EQ(2, receiver.read()); - ASSERT_TRUE(receiver.empty()); - - EXPECT_CALL(onConsumed, called(4)).Times(1); - EXPECT_CALL(onConsumed, called(2)).Times(1); - sender.checkConsumed(onConsumed); - sender.write(5); - EXPECT_CALL(onConsumed, called(_)).Times(0); - sender.checkConsumed(onConsumed); - ASSERT_FALSE(receiver.empty()); - ASSERT_EQ(5, receiver.read()); - EXPECT_CALL(onConsumed, called(5)).Times(1); - sender.checkConsumed(onConsumed); - ASSERT_TRUE(receiver.empty()); -} - -TEST_F(SpSc, consumed_notification_modify) -{ - OnConsumed onConsumed; - using IntQueueConsumed = util::spsc::Queue; - IntQueueConsumed queue; - IntQueueConsumed::Sender sender(queue); - IntQueueConsumed::Receiver receiver(queue); - sender.write(4); - ASSERT_FALSE(receiver.empty()); - uint16_t& v = receiver.peek(); - ASSERT_EQ(4, v); - v = v * 2; - receiver.advance(); - ASSERT_TRUE(receiver.empty()); - EXPECT_CALL(onConsumed, called(8)).Times(1); - sender.checkConsumed(onConsumed); -} - -TEST_F( - SpSc, a_queue_with_consumed_notification_stays_full_until_the_data_consumption_has_been_acked) -{ - OnConsumed onConsumed; - using IntQueueConsumed = util::spsc::Queue; - - IntQueueConsumed queue; - IntQueueConsumed::Sender sender(queue); - IntQueueConsumed::Receiver receiver(queue); - - sender.write(4); - sender.write(2); - sender.write(2); - sender.write(2); - sender.write(2); - sender.write(2); - ASSERT_TRUE(sender.full()); - receiver.read(); - ASSERT_TRUE(sender.full()); - - sender.checkConsumed(onConsumed); - ASSERT_FALSE(sender.full()); -} - -TEST_F(SpSc, queue_with_track_consumed_can_be_reset) -{ - OnConsumed onConsumed; - using IntQueueConsumed = util::spsc::Queue; - - IntQueueConsumed queue; - IntQueueConsumed::Sender sender(queue); - IntQueueConsumed::Receiver receiver(queue); - - // Apart from these 3 lines this test is a copy of the consumed_notification one - sender.write(8); - sender.write(6); - queue.reset(); - - sender.write(4); - sender.write(2); - ASSERT_FALSE(receiver.empty()); - ASSERT_EQ(4, receiver.read()); - ASSERT_EQ(2, receiver.read()); - ASSERT_TRUE(receiver.empty()); - - EXPECT_CALL(onConsumed, called(4)).Times(1); - EXPECT_CALL(onConsumed, called(2)).Times(1); - sender.checkConsumed(onConsumed); - sender.write(5); - EXPECT_CALL(onConsumed, called(_)).Times(0); - sender.checkConsumed(onConsumed); - ASSERT_FALSE(receiver.empty()); - ASSERT_EQ(5, receiver.read()); - EXPECT_CALL(onConsumed, called(5)).Times(1); - sender.checkConsumed(onConsumed); - ASSERT_TRUE(receiver.empty()); -} - -TEST_F(SpSc, can_be_reset) -{ - using IntQueueConsumed = util::spsc::Queue; - - IntQueueConsumed queue; - IntQueueConsumed::Sender sender(queue); - IntQueueConsumed::Receiver receiver(queue); - - // Apart from these 3 lines this test is a copy of the consumed_notification one - sender.write(8); - sender.write(6); - queue.reset(); - - sender.write(4); - sender.write(2); - ASSERT_FALSE(receiver.empty()); - ASSERT_EQ(4, receiver.read()); - ASSERT_EQ(2, receiver.read()); - ASSERT_TRUE(receiver.empty()); -} - -TEST(SpSc_read_write, can_instantiate_queue_reader_writer) -{ - using Q = util::spsc::Queue; - Q q; - util::spsc::QueueReader reader(q); - util::spsc::QueueWriter writer(q); -} - -} // namespace diff --git a/platforms/posix/bsp/tapEthernetDriver/include/TapEthernetDriver.h b/platforms/posix/bsp/tapEthernetDriver/include/TapEthernetDriver.h index 6e2ae4e9cd2..67b0b440fc9 100644 --- a/platforms/posix/bsp/tapEthernetDriver/include/TapEthernetDriver.h +++ b/platforms/posix/bsp/tapEthernetDriver/include/TapEthernetDriver.h @@ -11,9 +11,9 @@ #pragma once #include "etl/array.h" +#include "etl/queue_spsc_atomic.h" #include "lwipSocket/netif/LwipNetworkInterface.h" #include "lwipSocket/utils/LwipHelper.h" -#include "util/spsc/Queue.h" #include diff --git a/platforms/posix/bsp/tapEthernetDriver/src/TapEthernetDriver.cpp b/platforms/posix/bsp/tapEthernetDriver/src/TapEthernetDriver.cpp index 9c84c727cfe..bab64b1fc9c 100644 --- a/platforms/posix/bsp/tapEthernetDriver/src/TapEthernetDriver.cpp +++ b/platforms/posix/bsp/tapEthernetDriver/src/TapEthernetDriver.cpp @@ -100,8 +100,7 @@ void TapEthernetDriver::stop() void TapEthernetDriver::readFrame() { - auto sender = ::lwiputils::PbufQueue::Sender(_queue); - if (sender.full()) + if (_queue.full()) { // TODO: increment queue full stat counter return; @@ -137,7 +136,11 @@ void TapEthernetDriver::readFrame() delete driverPbuf; }; - sender.write(&frameBuf->buf.pbuf); + if (!_queue.push(&frameBuf->buf.pbuf)) + { + delete[] frameData; + delete frameBuf; + } } bool TapEthernetDriver::writeFrame(pbuf* const buf) const diff --git a/platforms/s32k1xx/bsp/bspEthernet/src/ethernet/RxBuffers.cpp b/platforms/s32k1xx/bsp/bspEthernet/src/ethernet/RxBuffers.cpp index 8c5ac17a27f..93ed39bd1be 100644 --- a/platforms/s32k1xx/bsp/bspEthernet/src/ethernet/RxBuffers.cpp +++ b/platforms/s32k1xx/bsp/bspEthernet/src/ethernet/RxBuffers.cpp @@ -103,14 +103,13 @@ void RxBuffers::interrupt() struct pbuf* buf; netif* pNetif; - auto sender = ::lwiputils::PbufQueue::Sender(_queue); do { buf = readFrame(pNetif); if (buf != nullptr) { // frame transfer was successful - if (sender.full()) + if (_queue.full()) { // enqueue failed static bool once = false; @@ -125,8 +124,11 @@ void RxBuffers::interrupt() { // allocation was successful - sender.next() = buf; - sender.write_next(); + if (!_queue.push(buf)) + { + // enqueue failed, release buffer to avoid leaking ownership + (void)pbuf_free(buf); + } // frame has been successfully enqueued // TODO: notify TCPIP task by setting event }