From bcdef5eeb80d6c6a7d5d4c0e3195b4792662d694 Mon Sep 17 00:00:00 2001 From: Metehan Gezer Date: Mon, 11 Aug 2025 17:07:47 +0300 Subject: [PATCH 1/5] buffer changes and outbound queue --- znet/include/znet/backends/tcp.h | 12 +++ znet/include/znet/buffer.h | 163 ++++++++++++++++++++++--------- znet/include/znet/transport.h | 2 + znet/src/backend/tcp.cc | 34 ++++--- znet/src/codec.cc | 3 +- znet/src/p2p/dialer.cc | 3 - znet/src/p2p/locator.cc | 4 +- znet/src/peer_session.cc | 1 + 8 files changed, 161 insertions(+), 61 deletions(-) diff --git a/znet/include/znet/backends/tcp.h b/znet/include/znet/backends/tcp.h index 1ccd8e8..ced79a2 100644 --- a/znet/include/znet/backends/tcp.h +++ b/znet/include/znet/backends/tcp.h @@ -19,6 +19,8 @@ #include "znet/peer_session.h" #include "znet/precompiled.h" +#include + namespace znet { namespace backends { @@ -34,9 +36,18 @@ class TCPTransportLayer : public TransportLayer { bool IsClosed() override { return is_closed_; } + void Update() override; + private: std::shared_ptr ReadBuffer(); + struct QueuedPacket { + std::shared_ptr buffer; + SendOptions options; + }; + + bool SendInternal(std::shared_ptr buffer, SendOptions options); + char data_[ZNET_MAX_BUFFER_SIZE]{}; int read_offset_ = 0; ssize_t data_size_ = 0; @@ -44,6 +55,7 @@ class TCPTransportLayer : public TransportLayer { SocketHandle socket_; bool has_more_; bool is_closed_ = false; + std::deque outbound_; }; diff --git a/znet/include/znet/buffer.h b/znet/include/znet/buffer.h index 77268ed..a01155b 100644 --- a/znet/include/znet/buffer.h +++ b/znet/include/znet/buffer.h @@ -19,10 +19,35 @@ namespace znet { +enum class BufferError { + None, + WriteAfterSeal, + CannotAllocate, + ReadOutOfBounds, + CorruptedFormat, +}; + +inline std::string GetBufferErrorString(BufferError error) { + switch (error) { + case BufferError::None: + return "NoError"; + case BufferError::WriteAfterSeal: + return "WriteAfterSeal"; + case BufferError::CannotAllocate: + return "CannotAllocate"; + case BufferError::ReadOutOfBounds: + return "ReadOutOfBounds"; + case BufferError::CorruptedFormat: + return "CorruptedFormat"; + default: + return "Unknown"; + } +} + class Buffer { public: explicit Buffer(Endianness endianness = Endianness::LittleEndian) { - failed_to_read_ = false; + last_error_ = BufferError::None; endianness_ = endianness; read_cursor_ = 0; write_cursor_ = 0; @@ -35,7 +60,7 @@ class Buffer { Buffer(const char* data, int data_size, Endianness endianness = Endianness::LittleEndian) { - failed_to_read_ = false; + last_error_ = BufferError::None; endianness_ = endianness; read_cursor_ = 0; write_cursor_ = data_size; @@ -45,7 +70,7 @@ class Buffer { #endif data_ = new (std::nothrow) char[allocated_size_]; if (!data_) { - failed_to_alloc_ = true; + last_error_ = BufferError::CannotAllocate; allocated_size_ = 0; return; } @@ -65,14 +90,14 @@ class Buffer { allocated_size_ = buffer.allocated_size_; write_cursor_ = 0; read_cursor_ = 0; - failed_to_read_ = false; + last_error_ = BufferError::None; #ifdef ZNET_BUFFER_COUNT_MEMORY_ALLOCATIONS mem_allocations_ = 0; #endif // todo safe errors data_ = new (std::nothrow) char[allocated_size_]; if (!data_) { - failed_to_alloc_ = true; + last_error_ = BufferError::CannotAllocate; allocated_size_ = 0; return; } @@ -83,13 +108,13 @@ class Buffer { template::value && (sizeof(T) <= 8), int>::type = 0> void Read(T* arr, size_t size) { if (size > std::numeric_limits::max() / sizeof(T)) { - failed_to_read_ = true; + last_error_ = BufferError::CorruptedFormat; return; } char* pt = reinterpret_cast(arr); size_t calculated_size = sizeof(T) * size; if (!CheckReadableBytes(size)) { - failed_to_read_ = true; + last_error_ = BufferError::ReadOutOfBounds; return; } std::memcpy(pt, data_ + read_cursor_, calculated_size); @@ -116,12 +141,11 @@ class Buffer { size_t size = sizeof(T); std::unique_ptr data(new (std::nothrow) char[size]); if (!data) { - failed_to_read_ = true; - failed_to_alloc_ = true; + last_error_ = BufferError::CannotAllocate; return 0; } if (!CheckReadableBytes(size)) { - failed_to_read_ = true; + last_error_ = BufferError::ReadOutOfBounds; return 0; } if (GetSystemEndianness() == endianness_) { @@ -166,7 +190,7 @@ class Buffer { } ZNET_LOG_WARN("Invalid internet protocol version {}!", ver); // unknown version - failed_to_read_ = true; + last_error_ = BufferError::CorruptedFormat; return nullptr; } @@ -193,18 +217,17 @@ class Buffer { uint8_t size = sizeof(T); char* data = new (std::nothrow) char[size]; if (!data) { - failed_to_alloc_ = true; - failed_to_read_ = true; + last_error_ = BufferError::CannotAllocate; return 0; } std::memset(data, 0, size); if (!CheckReadableBytes(1)) { - failed_to_read_ = true; + last_error_ = BufferError::ReadOutOfBounds; return 0; } uint8_t actual_size = ReadChar(); if (!CheckReadableBytes(actual_size)) { - failed_to_read_ = true; + last_error_ = BufferError::ReadOutOfBounds; return 0; } if (GetSystemEndianness() == endianness_) { @@ -225,13 +248,12 @@ class Buffer { std::string ReadString() { size_t size = ReadVarInt(); if (!CheckReadableBytes(size)) { - failed_to_read_ = true; + last_error_ = BufferError::ReadOutOfBounds; return ""; } char* data = new (std::nothrow) char[size]; if (!data) { - failed_to_alloc_ = true; - failed_to_read_ = true; + last_error_ = BufferError::CannotAllocate; return ""; } for (size_t i = 0; i < size; i++) { @@ -268,12 +290,12 @@ class Buffer { size_t size = ReadVarInt(); size_t size_bytes = size * sizeof(T); if (!CheckReadableBytes(size_bytes)) { - failed_to_read_ = true; + last_error_ = BufferError::ReadOutOfBounds; return nullptr; } T* ptr = new T[size]; if (!ptr) { - failed_to_alloc_ = true; + last_error_ = BufferError::CannotAllocate; return nullptr; } std::unique_ptr array(ptr); @@ -289,12 +311,12 @@ class Buffer { if (size_r != size) { ZNET_LOG_ERROR("Array size mismatch. Expected: {}, Actual: {}", size, size_r); - failed_to_read_ = true; + last_error_ = BufferError::CorruptedFormat; return {}; } size_t size_bytes = size * sizeof(T); if (!CheckReadableBytes(size_bytes)) { - failed_to_read_ = true; + last_error_ = BufferError::ReadOutOfBounds; return {}; } std::array array; @@ -305,6 +327,9 @@ class Buffer { } void WriteString(const std::string& str) { + if (!CheckSeal()) { + return; + } size_t size = str.size(); const char* data = str.data(); ReserveIncremental(size + sizeof(size)); @@ -331,6 +356,9 @@ class Buffer { template::value && (sizeof(T) <= 16), int>::type = 0> void WriteNumber(T c) { + if (!CheckSeal()) { + return; + } char* pt = reinterpret_cast(&c); size_t size = sizeof(c); ReserveIncremental(size); @@ -348,11 +376,16 @@ class Buffer { template> T WriteCustom() { - return T::Write(); + if (!CheckSeal()) { + return; + } + return T::Write(*this); } - void WriteInetAddress(InetAddress& address) { + if (!CheckSeal()) { + return; + } if (address.ipv() == InetProtocolVersion::IPv4) { WriteInt(4); // raw IPv4 (network‐order) + port (network‐order) @@ -365,6 +398,8 @@ class Buffer { // raw IPv6 (16 bytes) + port Write(addr->sin6_addr.s6_addr, 16); Write(reinterpret_cast(&addr->sin6_port), 1); + } else { + WriteInt(0); } } @@ -376,6 +411,9 @@ class Buffer { // write a std::bitset (little‑endian bit order) template void WriteBitset(const std::bitset& bs) { + if (!CheckSeal()) { + return; + } constexpr size_t BYTES = (N + 7) / 8; uint8_t data[BYTES] = {}; for (size_t i = 0; i < N; ++i) { @@ -388,6 +426,9 @@ class Buffer { template::value && (sizeof(T) <= 8), int>::type = 0> void Write(T* arr, size_t size) { + if (!CheckSeal()) { + return; + } auto* pt = reinterpret_cast(arr); size_t calculated_size = sizeof(T) * size; ReserveIncremental(calculated_size); @@ -397,6 +438,9 @@ class Buffer { template::value && (sizeof(T) <= 8), int>::type = 0> void WriteVarInt(T c) { + if (!CheckSeal()) { + return; + } char* pt = reinterpret_cast(&c); uint8_t size = sizeof(c); // assume 1 byte for the size uint8_t actual_size = 0; @@ -421,6 +465,9 @@ class Buffer { template void WriteMap(Map& map, KeyFunc key_func, ValueFunc value_func) { + if (!CheckSeal()) { + return; + } WriteInt(map.size()); for (auto& kv : map) { (this->*key_func)(kv.first); @@ -430,6 +477,9 @@ class Buffer { template void WriteVector(std::vector& v, ValueFunc value_func) { + if (!CheckSeal()) { + return; + } size_t size = v.size(); WriteInt(size); for (auto& value : v) { @@ -439,6 +489,9 @@ class Buffer { template void WriteArray(T (&v)[size], ValueFunc value_func) { + if (!CheckSeal()) { + return; + } WriteInt(size); for (int i = 0; i < size; i++) { auto& value = v[i]; @@ -448,6 +501,9 @@ class Buffer { template void WriteArray(T* v, size_t size, ValueFunc value_func) { + if (!CheckSeal()) { + return; + } if (!v) { WriteInt(0); return; @@ -461,6 +517,9 @@ class Buffer { template void WriteArray(std::shared_ptr& v, size_t size, ValueFunc value_func) { + if (!CheckSeal()) { + return; + } WriteInt(size); for (int i = 0; i < size; i++) { auto& value = v[i]; @@ -487,9 +546,12 @@ class Buffer { if (write_cursor_ == allocated_size_) { return; } + if (!CheckSeal()) { + return; + } char* new_data = new (std::nothrow) char[write_cursor_]; if (!new_data) { - failed_to_alloc_ = true; + last_error_ = BufferError::CannotAllocate; return; } std::memcpy(new_data, data_, write_cursor_); @@ -499,9 +561,12 @@ class Buffer { } void Reset(bool deallocate = false) { + if (!CheckSeal()) { + return; + } write_cursor_ = 0; read_cursor_ = 0; - failed_to_read_ = false; + last_error_ = BufferError::None; if (deallocate) { allocated_size_ = 0; delete[] data_; @@ -554,26 +619,21 @@ class Buffer { } void SkipWrite(size_t size) { + if (sealed_.load(std::memory_order_acquire)) { + last_error_ = BufferError::WriteAfterSeal; + return; + } ReserveIncremental(size); write_cursor_ += size; } /** - * @return true if previous read call was failed and clears the value. - */ - ZNET_NODISCARD bool IsFailedToRead() { - bool failed_to_read = failed_to_read_; - failed_to_read_ = false; - return failed_to_read; - } - - /** - * @return true if previous allocation was failed and clears the value + * @return Returns the previous error and clears it */ - ZNET_NODISCARD bool IsFailedToAlloc() { - bool failed_to_alloc = failed_to_alloc_; - failed_to_alloc_ = false; - return failed_to_alloc; + ZNET_NODISCARD BufferError GetAndClearLastError() { + BufferError error = last_error_; + last_error_ = BufferError::None; + return error; } void ReserveIncremental(size_t additional_bytes) { @@ -583,6 +643,9 @@ class Buffer { void ReserveExact(size_t size) { Reserve(size, true); } void Reserve(size_t size, bool exact = false) { + if (!CheckSeal()) { + return; + } if (!data_) { size_t target_size; if (exact) { @@ -592,7 +655,7 @@ class Buffer { } data_ = new (std::nothrow) char[target_size]; if (!data_) { - failed_to_alloc_ = true; + last_error_ = BufferError::CannotAllocate; return; } allocated_size_ = target_size; @@ -607,7 +670,7 @@ class Buffer { size_t target_size_ = size * 2; char* tmp_data = new (std::nothrow) char[target_size_]; if (!tmp_data) { - failed_to_alloc_ = true; + last_error_ = BufferError::CannotAllocate; return; } allocated_size_ = target_size_; @@ -619,6 +682,10 @@ class Buffer { #endif } + void Seal() { + sealed_.store(true, std::memory_order_release); + } + private: ZNET_NODISCARD bool CheckReadableBytes(size_t required) const { #if defined(DEBUG) && !defined(DISABLE_ASSERT_READABLE_BYTES) @@ -626,6 +693,14 @@ class Buffer { #endif return std::min(write_cursor_, read_limit_) >= read_cursor_ + required; } + ZNET_NODISCARD inline bool CheckSeal() { + if (sealed_.load(std::memory_order_acquire)) { + ZNET_LOG_DEBUG("Tried to write to a sealed buffer."); + last_error_ = BufferError::WriteAfterSeal; + return false; + } + return true; + } Endianness endianness_; size_t allocated_size_; @@ -633,8 +708,8 @@ class Buffer { size_t read_cursor_; size_t read_limit_ = std::numeric_limits::max(); char* data_; - bool failed_to_read_; - bool failed_to_alloc_; + BufferError last_error_; + std::atomic_bool sealed_{false}; #ifdef ZNET_BUFFER_COUNT_MEMORY_ALLOCATIONS size_t mem_allocations_; #endif diff --git a/znet/include/znet/transport.h b/znet/include/znet/transport.h index 29404c5..7b0363b 100644 --- a/znet/include/znet/transport.h +++ b/znet/include/znet/transport.h @@ -31,6 +31,8 @@ class TransportLayer { virtual bool IsClosed() = 0; + virtual void Update() = 0; + }; } diff --git a/znet/src/backend/tcp.cc b/znet/src/backend/tcp.cc index 7c48ae7..fe4068c 100644 --- a/znet/src/backend/tcp.cc +++ b/znet/src/backend/tcp.cc @@ -22,7 +22,6 @@ namespace backends { TCPTransportLayer::TCPTransportLayer(SocketHandle socket) : socket_(socket) { - } TCPTransportLayer::~TCPTransportLayer() { @@ -93,11 +92,10 @@ std::shared_ptr TCPTransportLayer::Receive() { std::shared_ptr TCPTransportLayer::ReadBuffer() { if (buffer_ && buffer_->readable_bytes() > 0) { size_t cursor = buffer_->read_cursor(); - auto size = buffer_->ReadVarInt(); + size_t size = buffer_->ReadVarInt(); if (buffer_->readable_bytes() < size) { if (!has_more_) { - ZNET_LOG_ERROR("Received malformed frame, closing connection!"); - Close(); + ZNET_LOG_ERROR("Received malformed frame, dropping buffer!"); return nullptr; } buffer_->set_read_cursor(cursor); @@ -113,6 +111,17 @@ std::shared_ptr TCPTransportLayer::ReadBuffer() { return nullptr; } +bool TCPTransportLayer::SendInternal(std::shared_ptr buffer, SendOptions options) { + while (send(socket_, buffer->data(), buffer->size(), 0) < 0) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + continue; + } + ZNET_LOG_ERROR("Error sending packet to the server: {}", GetLastErrorInfo()); + return false; + } + return true; +} + bool TCPTransportLayer::Send(std::shared_ptr buffer, SendOptions options) { if (IsClosed()) { ZNET_LOG_WARN("Tried to send a packet to a closed connection, dropping packet!"); @@ -136,17 +145,18 @@ bool TCPTransportLayer::Send(std::shared_ptr buffer, SendOptions options new_buffer->WriteVarInt(buffer->size()); new_buffer->Write(buffer->data() + buffer->read_cursor(), buffer->size()); - // todo check - while (send(socket_, new_buffer->data(), new_buffer->size(), 0) < 0) { - if (errno == EWOULDBLOCK || errno == EAGAIN) { - continue; - } - ZNET_LOG_ERROR("Error sending packet to the server: {}", GetLastErrorInfo()); - return false; - } + outbound_.push_back(QueuedPacket{new_buffer, options}); return true; } +void TCPTransportLayer::Update() { + while (!outbound_.empty()) { + QueuedPacket& queued = outbound_.front(); + SendInternal(queued.buffer, queued.options); + outbound_.pop_front(); + } +} + Result TCPTransportLayer::Close(CloseOptions options) { if (is_closed_) { return Result::AlreadyDisconnected; diff --git a/znet/src/codec.cc b/znet/src/codec.cc index 961e4df..74757d9 100644 --- a/znet/src/codec.cc +++ b/znet/src/codec.cc @@ -20,7 +20,8 @@ void Codec::Deserialize(std::shared_ptr buffer, PacketHandlerBase& handl while (buffer->readable_bytes() > 0) { auto packet_id = buffer->ReadVarInt(); auto size = buffer->ReadInt(); - if (buffer->IsFailedToRead()) { + BufferError error = buffer->GetAndClearLastError(); + if (error != BufferError::None) { ZNET_LOG_DEBUG("Reading packet header failed, dropping buffer!"); break; } diff --git a/znet/src/p2p/dialer.cc b/znet/src/p2p/dialer.cc index c7be4bc..0c1164d 100644 --- a/znet/src/p2p/dialer.cc +++ b/znet/src/p2p/dialer.cc @@ -150,9 +150,6 @@ std::shared_ptr PunchSync(const std::shared_ptr& local } } } - - *out_result = Result::Failure; - return nullptr; } } diff --git a/znet/src/p2p/locator.cc b/znet/src/p2p/locator.cc index cf305b2..1cdb49a 100644 --- a/znet/src/p2p/locator.cc +++ b/znet/src/p2p/locator.cc @@ -82,8 +82,10 @@ Result PeerLocator::Connect() { if (result == Result::Success) { PeerConnectedEvent event{session, punch_id_, peer_name_, target_peer_name_}; event_callback_(event); - return; + } else { + ZNET_LOG_ERROR("Punch failed with reason: {}", GetResultString(result)); } + return; } PeerLocatorCloseEvent event; event_callback_(event); diff --git a/znet/src/peer_session.cc b/znet/src/peer_session.cc index 2968955..583b733 100644 --- a/znet/src/peer_session.cc +++ b/znet/src/peer_session.cc @@ -51,6 +51,7 @@ void PeerSession::Process() { Close(); return; } + transport_layer_->Update(); std::shared_ptr buffer; if ((buffer = transport_layer_->Receive())) { buffer = compr::HandleInDynamic(buffer); From c0f10b3a879b0228c767d491a7347490a4988b71 Mon Sep 17 00:00:00 2001 From: Metehan Gezer Date: Wed, 13 Aug 2025 01:11:42 +0300 Subject: [PATCH 2/5] added GetAnyBindAddress and GetLocalAddress --- rendezvous-server/main.cc | 10 ++++++---- znet/include/znet/base/inet_addr.h | 3 +++ znet/src/inet_addr.cc | 19 +++++++++++++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/rendezvous-server/main.cc b/rendezvous-server/main.cc index 1c5f93a..716bf87 100644 --- a/rendezvous-server/main.cc +++ b/rendezvous-server/main.cc @@ -197,15 +197,17 @@ int main(int argc, char* argv[]) { // handle ipv6 instead of hard coding the endpoint // allow lan connection if possible response->target_peer_ = other_data->peer_name_; - response->target_endpoint_ = other_data->session_->remote_address(); - response->bind_endpoint_ = znet::InetAddress::from("0.0.0.0", session->remote_address()->port()); + auto other_address = other_data->session_->remote_address(); + auto local_address = session->remote_address(); + response->target_endpoint_ = other_address; + response->bind_endpoint_ = znet::InetAddress::from(znet::GetAnyBindAddress(local_address->ipv()), local_address->port()); response->punch_id_ = punch_id; session->SendPacket(response); response = std::make_shared(); response->target_peer_ = data->peer_name_; - response->target_endpoint_ = session->remote_address(); - response->bind_endpoint_ = znet::InetAddress::from("0.0.0.0", other_data->session_->remote_address()->port()); + response->target_endpoint_ = local_address; + response->bind_endpoint_ = znet::InetAddress::from(znet::GetAnyBindAddress(other_address->ipv()), other_address->port()); response->punch_id_ = punch_id; other_data->session_->SendPacket(response); } diff --git a/znet/include/znet/base/inet_addr.h b/znet/include/znet/base/inet_addr.h index 998ee35..f8c8b06 100644 --- a/znet/include/znet/base/inet_addr.h +++ b/znet/include/znet/base/inet_addr.h @@ -24,6 +24,9 @@ IPv6Address ParseIPv6(const std::string& ip_str); int GetDomainByInetProtocolVersion(InetProtocolVersion version); +std::string GetAnyBindAddress(InetProtocolVersion version); +std::string GetLocalAddress(InetProtocolVersion version); + bool IsIPv4(const std::string& ip); bool IsIPv6(const std::string& ip); diff --git a/znet/src/inet_addr.cc b/znet/src/inet_addr.cc index 7c07eea..252137c 100644 --- a/znet/src/inet_addr.cc +++ b/znet/src/inet_addr.cc @@ -41,6 +41,25 @@ int GetDomainByInetProtocolVersion(InetProtocolVersion version) { return 0; } +std::string GetAnyBindAddress(InetProtocolVersion version) { + switch (version) { + case InetProtocolVersion::IPv4: + return "0.0.0.0"; + case InetProtocolVersion::IPv6: + return "::"; + } +} + + +std::string GetLocalAddress(InetProtocolVersion version) { + switch (version) { + case InetProtocolVersion::IPv4: + return "127.0.0.1"; + case InetProtocolVersion::IPv6: + return "::1"; + } +} + bool IsIPv4(const std::string& ip) { sockaddr_in sa; return inet_pton(AF_INET, ip.c_str(), &(sa.sin_addr)) == 1; From 78f5b3943e5e28321091fe0d636b9c921ee8bad5 Mon Sep 17 00:00:00 2001 From: Metehan Gezer Date: Wed, 13 Aug 2025 01:49:50 +0300 Subject: [PATCH 3/5] added atomic include --- rendezvous-server/main.cc | 1 - znet/include/znet/precompiled.h | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/rendezvous-server/main.cc b/rendezvous-server/main.cc index 716bf87..3b5db26 100644 --- a/rendezvous-server/main.cc +++ b/rendezvous-server/main.cc @@ -194,7 +194,6 @@ int main(int argc, char* argv[]) { auto response = std::make_shared(); // add start_at - // handle ipv6 instead of hard coding the endpoint // allow lan connection if possible response->target_peer_ = other_data->peer_name_; auto other_address = other_data->session_->remote_address(); diff --git a/znet/include/znet/precompiled.h b/znet/include/znet/precompiled.h index 63d307c..f5cc464 100644 --- a/znet/include/znet/precompiled.h +++ b/znet/include/znet/precompiled.h @@ -26,6 +26,7 @@ #include #include #include +#include #include From 7c93156064073e047d23c2424bba26d8b559a18c Mon Sep 17 00:00:00 2001 From: Metehan Gezer Date: Wed, 13 Aug 2025 02:31:00 +0300 Subject: [PATCH 4/5] enable zstd --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d715fce..7ef8329 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.16.3) project(znet-parent) add_subdirectory(vendor/fmt) -#add_subdirectory(vendor/zstd/build/cmake) +add_subdirectory(vendor/zstd/build/cmake) add_subdirectory(znet) # rendezvous server From b2128148dfcf1c4a3aa6fb0aa7431c73e8626374 Mon Sep 17 00:00:00 2001 From: Metehan Gezer Date: Wed, 13 Aug 2025 04:49:11 +0300 Subject: [PATCH 5/5] tidy backend --- rendezvous-server/main.cc | 4 ++- znet/CMakeLists.txt | 1 + znet/include/znet/base/types.h | 5 ++-- znet/include/znet/p2p/dialer.h | 1 + znet/include/znet/p2p/locator.h | 2 ++ znet/include/znet/p2p/rendezvous.h | 1 + znet/include/znet/peer_session.h | 15 ++++++++++-- znet/src/backend/backend.cc | 39 ++++++++++++++++++++++++++++++ znet/src/backend/tcp.cc | 21 ++-------------- znet/src/p2p/dialer.cc | 15 +++++++++++- znet/src/p2p/locator.cc | 4 ++- znet/src/peer_session.cc | 5 +++- 12 files changed, 85 insertions(+), 28 deletions(-) create mode 100644 znet/src/backend/backend.cc diff --git a/rendezvous-server/main.cc b/rendezvous-server/main.cc index 3b5db26..60f8efe 100644 --- a/rendezvous-server/main.cc +++ b/rendezvous-server/main.cc @@ -124,7 +124,7 @@ int main(int argc, char* argv[]) { std::string target = result["target"].as(); ZNET_LOG_INFO("Starting relay on {}:{}...", target, port); - znet::ServerConfig config{target, port}; + znet::ServerConfig config{target, port,std::chrono::seconds(5), znet::ConnectionType::TCP}; znet::Server relay{config}; relay.SetEventCallback(ZNET_BIND_GLOBAL_FN(OnEvent)); @@ -201,6 +201,7 @@ int main(int argc, char* argv[]) { response->target_endpoint_ = other_address; response->bind_endpoint_ = znet::InetAddress::from(znet::GetAnyBindAddress(local_address->ipv()), local_address->port()); response->punch_id_ = punch_id; + response->connection_type_ = znet::ConnectionType::TCP; session->SendPacket(response); response = std::make_shared(); @@ -208,6 +209,7 @@ int main(int argc, char* argv[]) { response->target_endpoint_ = local_address; response->bind_endpoint_ = znet::InetAddress::from(znet::GetAnyBindAddress(other_address->ipv()), other_address->port()); response->punch_id_ = punch_id; + response->connection_type_ = znet::ConnectionType::TCP; other_data->session_->SendPacket(response); } } diff --git a/znet/CMakeLists.txt b/znet/CMakeLists.txt index 279fedc..93ac157 100644 --- a/znet/CMakeLists.txt +++ b/znet/CMakeLists.txt @@ -28,6 +28,7 @@ set(ZNET_SOURCES src/pch.cc src/init.cc src/backend/tcp.cc + src/backend/backend.cc src/p2p/locator.cc src/p2p/dialer.cc ) diff --git a/znet/include/znet/base/types.h b/znet/include/znet/base/types.h index e5c970e..c40c3ab 100644 --- a/znet/include/znet/base/types.h +++ b/znet/include/znet/base/types.h @@ -122,10 +122,9 @@ inline std::string GetResultString(Result result) { enum class ConnectionType { TCP, - RUDP, - //RakNet, + //RUDP, //ENet, - //WebSocket, + //QUIC }; #if defined(TARGET_APPLE) || defined(TARGET_WEB) || defined(TARGET_LINUX) diff --git a/znet/include/znet/p2p/dialer.h b/znet/include/znet/p2p/dialer.h index 5b1360d..a3ceeb4 100644 --- a/znet/include/znet/p2p/dialer.h +++ b/znet/include/znet/p2p/dialer.h @@ -50,6 +50,7 @@ std::shared_ptr PunchSync(const std::shared_ptr& local const std::shared_ptr& peer, Result* out_result, bool is_initiator, + ConnectionType connection_type = ConnectionType::TCP, int timeout_ms = 5000); } // namespace p2p diff --git a/znet/include/znet/p2p/locator.h b/znet/include/znet/p2p/locator.h index 94780d1..bc03625 100644 --- a/znet/include/znet/p2p/locator.h +++ b/znet/include/znet/p2p/locator.h @@ -26,6 +26,7 @@ namespace p2p { struct PeerLocatorConfig { std::string server_ip; PortNumber server_port; + ConnectionType connection_type = ConnectionType::TCP; }; class PeerLocatorReadyEvent : public Event { @@ -126,6 +127,7 @@ class PeerLocator { std::shared_ptr bind_endpoint_; std::shared_ptr target_endpoint_; + ConnectionType connection_type_; std::string target_peer_name_; uint64_t punch_id_ = ~0; diff --git a/znet/include/znet/p2p/rendezvous.h b/znet/include/znet/p2p/rendezvous.h index d1a6b38..50a0d39 100644 --- a/znet/include/znet/p2p/rendezvous.h +++ b/znet/include/znet/p2p/rendezvous.h @@ -72,6 +72,7 @@ class StartPunchRequestPacket : public Packet { std::string target_peer_; std::shared_ptr bind_endpoint_; std::shared_ptr target_endpoint_; + ConnectionType connection_type_; uint64_t punch_id_; }; diff --git a/znet/include/znet/peer_session.h b/znet/include/znet/peer_session.h index 5b8c993..36ae134 100644 --- a/znet/include/znet/peer_session.h +++ b/znet/include/znet/peer_session.h @@ -36,7 +36,9 @@ class PeerSession { public: PeerSession(std::shared_ptr local_address, std::shared_ptr remote_address, - std::unique_ptr transport_layer, bool is_initiator = false, + std::unique_ptr transport_layer, + ConnectionType connection_type, + bool is_initiator = false, bool self_managed = false); PeerSession(const PeerSession&) = delete; PeerSession(PeerSession&&) = delete; @@ -120,7 +122,7 @@ class PeerSession { time_since_connect()).count(); } - CompressionType out_compression_type() const { + ZNET_NODISCARD CompressionType out_compression_type() const { return out_compression_type_; } @@ -129,6 +131,14 @@ class PeerSession { ZNET_LOG_INFO("Set out compression to {} for {}", GetCompressionTypeName(type), id_); } + ZNET_NODISCARD bool is_initiator() const { + return is_initiator_; + } + + ZNET_NODISCARD ConnectionType connection_type() const { + return connection_type_; + } + protected: friend class EncryptionLayer; @@ -146,6 +156,7 @@ class PeerSession { PortNumber local_port_; std::shared_ptr remote_address_; PortNumber remote_port_; + ConnectionType connection_type_; std::shared_ptr codec_; std::shared_ptr handler_; diff --git a/znet/src/backend/backend.cc b/znet/src/backend/backend.cc new file mode 100644 index 0000000..5f71cc2 --- /dev/null +++ b/znet/src/backend/backend.cc @@ -0,0 +1,39 @@ +// +// Copyright 2025 Metehan Gezer +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// + +// +// Created by Metehan Gezer on 13/08/2025. +// + +#include "znet/backends/backend.h" +#include "znet/backends/tcp.h" + +namespace znet { +namespace backends { + +std::unique_ptr CreateClientFromType(ConnectionType type, + std::shared_ptr server_address) { + if (type == ConnectionType::TCP) { + return std::make_unique(server_address); + } + return nullptr; +} + +std::unique_ptr CreateServerFromType(ConnectionType type, + std::shared_ptr bind_address) { + if (type == ConnectionType::TCP) { + return std::make_unique(bind_address); + } + return nullptr; +} + + +} +} \ No newline at end of file diff --git a/znet/src/backend/tcp.cc b/znet/src/backend/tcp.cc index fe4068c..6c25d91 100644 --- a/znet/src/backend/tcp.cc +++ b/znet/src/backend/tcp.cc @@ -262,7 +262,7 @@ Result TCPClientBackend::Connect() { client_session_ = std::make_shared(local_address_, server_address_, - std::make_unique(client_socket_), true); + std::make_unique(client_socket_), ConnectionType::TCP, true); return Result::Success; } @@ -408,7 +408,7 @@ std::shared_ptr TCPServerBackend::Accept() { return nullptr; } return std::make_shared(bind_address_, remote_address, - std::make_unique(client_socket)); + std::make_unique(client_socket), ConnectionType::TCP); } void TCPServerBackend::AcceptAndReject() { @@ -421,22 +421,5 @@ void TCPServerBackend::AcceptAndReject() { bool TCPServerBackend::IsAlive() { return is_listening_; } - -std::unique_ptr CreateClientFromType(ConnectionType type, - std::shared_ptr server_address) { - if (type == ConnectionType::TCP) { - return std::make_unique(server_address); - } - return nullptr; -} - -std::unique_ptr CreateServerFromType(ConnectionType type, - std::shared_ptr bind_address) { - if (type == ConnectionType::TCP) { - return std::make_unique(bind_address); - } - return nullptr; -} - } } diff --git a/znet/src/p2p/dialer.cc b/znet/src/p2p/dialer.cc index 0c1164d..354f12f 100644 --- a/znet/src/p2p/dialer.cc +++ b/znet/src/p2p/dialer.cc @@ -35,7 +35,7 @@ bool WouldBlock(int e) { #endif } -std::shared_ptr PunchSync(const std::shared_ptr& local, +std::shared_ptr PunchSyncTCP(const std::shared_ptr& local, const std::shared_ptr& peer, Result* out_result, bool is_initiator, @@ -141,6 +141,7 @@ std::shared_ptr PunchSync(const std::shared_ptr& local *out_result = Result::Success; return std::make_shared(local, peer, std::make_unique(socket_handle), + ConnectionType::TCP, is_initiator, true); } else { @@ -152,5 +153,17 @@ std::shared_ptr PunchSync(const std::shared_ptr& local } } +std::shared_ptr PunchSync(const std::shared_ptr& local, + const std::shared_ptr& peer, + Result* out_result, + bool is_initiator, + ConnectionType connection_type, + int timeout_ms) { + if (connection_type == ConnectionType::TCP) { + return PunchSyncTCP(local, peer, out_result, is_initiator, timeout_ms); + } + return nullptr; +} + } } \ No newline at end of file diff --git a/znet/src/p2p/locator.cc b/znet/src/p2p/locator.cc index 1cdb49a..ae71d82 100644 --- a/znet/src/p2p/locator.cc +++ b/znet/src/p2p/locator.cc @@ -34,6 +34,7 @@ class LocatorPacketHandler : public PacketHandler(true); locator_.client_.Disconnect(options); @@ -77,7 +78,8 @@ Result PeerLocator::Connect() { bind_endpoint_, target_endpoint_, &result, - IsInitiator(punch_id_, peer_name_, target_peer_name_) + IsInitiator(punch_id_, peer_name_, target_peer_name_), + connection_type_ ); if (result == Result::Success) { PeerConnectedEvent event{session, punch_id_, peer_name_, target_peer_name_}; diff --git a/znet/src/peer_session.cc b/znet/src/peer_session.cc index 583b733..cec3b70 100644 --- a/znet/src/peer_session.cc +++ b/znet/src/peer_session.cc @@ -19,11 +19,14 @@ namespace znet { PeerSession::PeerSession(std::shared_ptr local_address, std::shared_ptr remote_address, std::unique_ptr transport_layer, + ConnectionType connection_type, bool is_initiator, bool self_managed) : local_address_(std::move(local_address)), remote_address_(std::move(remote_address)), - transport_layer_(std::move(transport_layer)), is_initiator_(is_initiator), + transport_layer_(std::move(transport_layer)), + connection_type_(connection_type), + is_initiator_(is_initiator), encryption_layer_(*this), connect_time_(std::chrono::steady_clock::now()) { static SessionId sIdCount = 0;