diff --git a/settings/default/network.lua b/settings/default/network.lua index 055e73b9d2b..81573fda465 100644 --- a/settings/default/network.lua +++ b/settings/default/network.lua @@ -37,9 +37,10 @@ xi.settings.network = HTTP_HOST = 'localhost', HTTP_PORT = 8088, - -- Central message server settings (ensure these are the same on both all map servers and the central (lobby) server - ZMQ_IP = '127.0.0.1', - ZMQ_PORT = 54003, + -- Central message server settings + ZMQ_TRANSPORT = 'tcp', + ZMQ_IP = '127.0.0.1', + ZMQ_PORT = 54003, -- =========================== -- NOTE: The settings that follow will not necessarily need to be modified diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index b4636189de2..4efb62c2ba9 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -1,4 +1,13 @@ set(COMMON_SOURCES + ${CMAKE_CURRENT_SOURCE_DIR}/zmq/channel.h + ${CMAKE_CURRENT_SOURCE_DIR}/zmq/endpoint.h + ${CMAKE_CURRENT_SOURCE_DIR}/zmq/endpoint.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/zmq/router_endpoint.h + ${CMAKE_CURRENT_SOURCE_DIR}/zmq/router_endpoint.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/zmq/dealer_endpoint.h + ${CMAKE_CURRENT_SOURCE_DIR}/zmq/dealer_endpoint.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/zmq/zmq_service.h + ${CMAKE_CURRENT_SOURCE_DIR}/zmq/zmq_service.cpp ${CMAKE_CURRENT_SOURCE_DIR}/application.cpp ${CMAKE_CURRENT_SOURCE_DIR}/application.h ${CMAKE_CURRENT_SOURCE_DIR}/arguments.cpp @@ -19,6 +28,7 @@ set(COMMON_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/engine.h ${CMAKE_CURRENT_SOURCE_DIR}/filewatcher.cpp ${CMAKE_CURRENT_SOURCE_DIR}/filewatcher.h + ${CMAKE_CURRENT_SOURCE_DIR}/ipp_message.h ${CMAKE_CURRENT_SOURCE_DIR}/ipp.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ipp.h ${CMAKE_CURRENT_SOURCE_DIR}/logging.cpp diff --git a/src/common/application.cpp b/src/common/application.cpp index 91ecaa35578..d3ef0061798 100644 --- a/src/common/application.cpp +++ b/src/common/application.cpp @@ -295,6 +295,11 @@ auto Application::scheduler() -> Scheduler& return scheduler_; } +auto Application::zmqService() -> ZMQService& +{ + return zmqService_; +} + auto Application::args() const -> Arguments& { return *args_; diff --git a/src/common/application.h b/src/common/application.h index f8cb82eedb8..316cb3230c2 100644 --- a/src/common/application.h +++ b/src/common/application.h @@ -24,6 +24,7 @@ #include "arguments.h" #include "common/engine.h" #include "common/scheduler.h" +#include "common/zmq/zmq_service.h" #include // for signal_set @@ -98,13 +99,17 @@ class Application // auto scheduler() -> Scheduler&; + auto zmqService() -> ZMQService&; auto args() const -> Arguments&; auto console() const -> ConsoleService&; protected: std::chrono::steady_clock::time_point startTime_{ std::chrono::steady_clock::now() }; - Scheduler scheduler_; + Scheduler scheduler_; + + ZMQService zmqService_; + asio::signal_set signals_; std::string serverName_; diff --git a/src/common/ipp_message.h b/src/common/ipp_message.h new file mode 100644 index 00000000000..6716569cf18 --- /dev/null +++ b/src/common/ipp_message.h @@ -0,0 +1,33 @@ +/* +=========================================================================== + + Copyright (c) 2026 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#pragma once + +#include + +#include + +// An IP+Port-addressed message: a routing id plus an opaque payload. +struct IPPMessage +{ + IPP ipp; + std::vector payload; +}; diff --git a/src/common/zmq/channel.h b/src/common/zmq/channel.h new file mode 100644 index 00000000000..a23c65976b0 --- /dev/null +++ b/src/common/zmq/channel.h @@ -0,0 +1,67 @@ +/* +=========================================================================== + + Copyright (c) 2026 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#pragma once + +#include + +#include +#include +#include + +#include + +namespace ipc +{ + +// +// ipc::Channel +// +// A non-owning handle to one IPC endpoint's message queues. +// +template +class Channel +{ +public: + Channel(moodycamel::ConcurrentQueue& incoming, moodycamel::ConcurrentQueue& outgoing) + : incoming_(incoming) + , outgoing_(outgoing) + { + } + + // Dequeue one inbound message. Returns false when none are available. + [[nodiscard]] auto tryReceive(MessageT& out) -> bool + { + return incoming_.try_dequeue(out); + } + + // Queue a message for the I/O thread to send. + auto send(MessageT message) -> void + { + outgoing_.enqueue(std::move(message)); + } + +private: + moodycamel::ConcurrentQueue& incoming_; + moodycamel::ConcurrentQueue& outgoing_; +}; + +} // namespace ipc diff --git a/src/common/zmq/dealer_endpoint.cpp b/src/common/zmq/dealer_endpoint.cpp new file mode 100644 index 00000000000..70bfaeabc56 --- /dev/null +++ b/src/common/zmq/dealer_endpoint.cpp @@ -0,0 +1,84 @@ +/* +=========================================================================== + + Copyright (c) 2026 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#include + +#include + +#include + +DealerEndpoint::DealerEndpoint(std::string endpoint, uint64 routingId) +: endpoint_(std::move(endpoint)) +, routingId_(routingId) +{ +} + +auto DealerEndpoint::open(zmq::context_t& ctx) -> bool +{ + socket_ = zmq::socket_t(ctx, zmq::socket_type::dealer); + socket_.set(zmq::sockopt::routing_id, zmq::const_buffer(&routingId_, sizeof(uint64))); + try + { + socket_.connect(endpoint_); + opened_.store(true, std::memory_order_release); + } + catch (const zmq::error_t& err) + { + ShowError(fmt::format("ZMQService: unable to connect dealer socket '{}': {}", endpoint_, err.what())); + opened_.store(false, std::memory_order_release); + } + return opened_.load(std::memory_order_acquire); +} + +auto DealerEndpoint::close() -> void +{ + if (socket_) + { + socket_.close(); + } +} + +auto DealerEndpoint::socketHandle() -> void* +{ + return socket_.handle(); +} + +auto DealerEndpoint::onReadable() -> void +{ + while (true) + { + zmq::message_t msg; + if (!socket_.recv(msg, zmq::recv_flags::dontwait)) + { + break; + } + incomingQueue_.enqueue(std::move(msg)); + } +} + +auto DealerEndpoint::flushOutbound() -> void +{ + zmq::message_t out; + while (outgoingQueue_.try_dequeue(out)) + { + socket_.send(out, zmq::send_flags::dontwait); + } +} diff --git a/src/common/zmq/dealer_endpoint.h b/src/common/zmq/dealer_endpoint.h new file mode 100644 index 00000000000..05a8fc24bcb --- /dev/null +++ b/src/common/zmq/dealer_endpoint.h @@ -0,0 +1,50 @@ +/* +=========================================================================== + + Copyright (c) 2026 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#pragma once + +#include +#include + +#include + +#include +#include + +class DealerEndpoint final : public ZmqEndpoint +{ +public: + DealerEndpoint(std::string endpoint, uint64 routingId); + + auto open(zmq::context_t& ctx) -> bool override; + auto close() -> void override; + auto socketHandle() -> void* override; + auto onReadable() -> void override; + auto flushOutbound() -> void override; + + moodycamel::ConcurrentQueue incomingQueue_; + moodycamel::ConcurrentQueue outgoingQueue_; + +private: + std::string endpoint_; + uint64 routingId_; + zmq::socket_t socket_; +}; diff --git a/src/common/zmq/endpoint.cpp b/src/common/zmq/endpoint.cpp new file mode 100644 index 00000000000..48d8456acd4 --- /dev/null +++ b/src/common/zmq/endpoint.cpp @@ -0,0 +1,29 @@ +/* +=========================================================================== + + Copyright (c) 2026 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#include + +ZmqEndpoint::~ZmqEndpoint() = default; + +auto ZmqEndpoint::opened() const -> bool +{ + return opened_.load(std::memory_order_acquire); +} diff --git a/src/common/zmq/endpoint.h b/src/common/zmq/endpoint.h new file mode 100644 index 00000000000..7c917219ac3 --- /dev/null +++ b/src/common/zmq/endpoint.h @@ -0,0 +1,58 @@ +/* +=========================================================================== + + Copyright (c) 2026 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#pragma once + +#include + +#include + +class ZmqEndpoint +{ +public: + ZmqEndpoint() = default; + virtual ~ZmqEndpoint(); + + ZmqEndpoint(const ZmqEndpoint&) = delete; + ZmqEndpoint& operator=(const ZmqEndpoint&) = delete; + ZmqEndpoint(ZmqEndpoint&&) = delete; + ZmqEndpoint& operator=(ZmqEndpoint&&) = delete; + + [[nodiscard]] auto opened() const -> bool; + + // Create the socket from ctx, set options, and bind/connect. + virtual auto open(zmq::context_t& ctx) -> bool = 0; + + // Close the socket. + virtual auto close() -> void = 0; + + // Raw 0MQ socket handle for zmq_pollitem_t. + virtual auto socketHandle() -> void* = 0; + + // Drain all currently-available inbound messages into the incoming queue. + virtual auto onReadable() -> void = 0; + + // Send everything queued for outbound. + virtual auto flushOutbound() -> void = 0; + +protected: + std::atomic opened_{ false }; +}; diff --git a/src/common/zmq/router_endpoint.cpp b/src/common/zmq/router_endpoint.cpp new file mode 100644 index 00000000000..011fe54caf9 --- /dev/null +++ b/src/common/zmq/router_endpoint.cpp @@ -0,0 +1,97 @@ +/* +=========================================================================== + + Copyright (c) 2026 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#include + +#include +#include + +#include +#include +#include + +#include + +RouterEndpoint::RouterEndpoint(std::string endpoint) +: endpoint_(std::move(endpoint)) +{ +} + +auto RouterEndpoint::open(zmq::context_t& ctx) -> bool +{ + socket_ = zmq::socket_t(ctx, zmq::socket_type::router); + try + { + socket_.bind(endpoint_); + opened_.store(true, std::memory_order_release); + } + catch (const zmq::error_t& err) + { + ShowError(fmt::format( + "IPC router could not bind '{}': {}. Is another xi_world or xi_test already bound to this " + "endpoint? IPC will not function in this process.", + endpoint_, + err.what())); + opened_.store(false, std::memory_order_release); + } + return opened_.load(std::memory_order_acquire); +} + +auto RouterEndpoint::close() -> void +{ + if (socket_) + { + socket_.close(); + } +} + +auto RouterEndpoint::socketHandle() -> void* +{ + return socket_.handle(); +} + +auto RouterEndpoint::onReadable() -> void +{ + while (true) + { + std::array msgs; + if (!zmq::recv_multipart_n(socket_, msgs.data(), msgs.size(), zmq::recv_flags::dontwait)) + { + break; + } + + auto ipp = IPP(msgs[0]); + auto payload = std::vector(msgs[1].data(), msgs[1].data() + msgs[1].size()); + incomingQueue_.enqueue(IPPMessage{ ipp, std::move(payload) }); + } +} + +auto RouterEndpoint::flushOutbound() -> void +{ + IPPMessage out; + while (outgoingQueue_.try_dequeue(out)) + { + std::array msgs; + msgs[0] = out.ipp.toZMQMessage(); + msgs[1] = zmq::message_t(out.payload); + zmq::send_multipart(socket_, msgs, zmq::send_flags::dontwait); + } +} diff --git a/src/common/zmq/router_endpoint.h b/src/common/zmq/router_endpoint.h new file mode 100644 index 00000000000..6a631e32c42 --- /dev/null +++ b/src/common/zmq/router_endpoint.h @@ -0,0 +1,49 @@ +/* +=========================================================================== + + Copyright (c) 2026 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#pragma once + +#include +#include + +#include + +#include +#include + +class RouterEndpoint final : public ZmqEndpoint +{ +public: + explicit RouterEndpoint(std::string endpoint); + + auto open(zmq::context_t& ctx) -> bool override; + auto close() -> void override; + auto socketHandle() -> void* override; + auto onReadable() -> void override; + auto flushOutbound() -> void override; + + moodycamel::ConcurrentQueue incomingQueue_; + moodycamel::ConcurrentQueue outgoingQueue_; + +private: + std::string endpoint_; + zmq::socket_t socket_; +}; diff --git a/src/common/zmq/zmq_service.cpp b/src/common/zmq/zmq_service.cpp new file mode 100644 index 00000000000..04df391aee9 --- /dev/null +++ b/src/common/zmq/zmq_service.cpp @@ -0,0 +1,195 @@ +/* +=========================================================================== + + Copyright (c) 2026 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#include + +#include +#include +#include +#include + +#include + +namespace +{ + +constexpr long kPollTimeoutMs = 10; + +} // namespace + +ZMQService::ZMQService(std::string threadName) +: threadName_(std::move(threadName)) +, thread_( + [this]() + { + run(); + }) +{ +} + +ZMQService::~ZMQService() +{ + stop(); +} + +auto ZMQService::stop() noexcept -> void +{ + stop_.store(true, std::memory_order_relaxed); + if (thread_.joinable()) + { + thread_.join(); + } +} + +auto ZMQService::registerRouter(const std::string& endpoint) -> ipc::Channel +{ + auto owned = std::make_unique(endpoint); + + RouterEndpoint& ep = *owned; + + // Fire-and-forget: the I/O thread opens it. The channel below is usable immediately (it references + // queues that exist now); if bind fails the endpoint logs it and nothing flows. + pendingRegistrations_.enqueue(std::move(owned)); + + return ipc::Channel{ ep.incomingQueue_, ep.outgoingQueue_ }; +} + +auto ZMQService::registerDealer(const std::string& endpoint, uint64 routingId) -> ipc::Channel +{ + auto owned = std::make_unique(endpoint, routingId); + + DealerEndpoint& ep = *owned; + + pendingRegistrations_.enqueue(std::move(owned)); + + return ipc::Channel{ ep.incomingQueue_, ep.outgoingQueue_ }; +} + +auto ZMQService::drainPending(std::vector>& endpoints) -> bool +{ + bool added = false; + + std::unique_ptr endpoint; + while (pendingRegistrations_.try_dequeue(endpoint)) + { + endpoint->open(context_); + endpoints.push_back(std::move(endpoint)); + added = true; + } + + return added; +} + +auto ZMQService::run() -> void +{ + TracySetThreadName(threadName_.c_str()); + + std::vector> endpoints; + + // Poll set, rebuilt only when an endpoint is added (registrations are rare). zmq_poll refreshes + // each item's revents every call, so the cached items/polled can be reused between rebuilds. + std::vector items; + std::vector polled; + + bool rebuild = false; + + try + { + while (!stop_.load(std::memory_order_relaxed)) + { + // Apply pending registrations: open sockets on this thread. + if (drainPending(endpoints)) + { + rebuild = true; + } + + if (stop_.load(std::memory_order_relaxed)) + { + break; + } + + if (rebuild) + { + items.clear(); + polled.clear(); + for (auto& ep : endpoints) // poll only the endpoints that opened successfully + { + if (ep->opened()) + { + items.push_back(zmq_pollitem_t{ ep->socketHandle(), 0, ZMQ_POLLIN, 0 }); + polled.push_back(ep.get()); + } + } + rebuild = false; + } + + zmq_pollitem_t* pollItems = nullptr; + if (!items.empty()) + { + pollItems = items.data(); + } + + const int rc = zmq_poll(pollItems, static_cast(items.size()), kPollTimeoutMs); + if (rc < 0) + { + if (zmq_errno() == ETERM) + { + break; + } + ShowError(fmt::format("ZMQService zmq_poll error: {}", zmq_strerror(zmq_errno()))); + continue; + } + + for (std::size_t i = 0; i < polled.size(); ++i) + { + try + { + if (items[i].revents & ZMQ_POLLIN) + { + polled[i]->onReadable(); + } + + polled[i]->flushOutbound(); + } + catch (const zmq::error_t& e) + { + if (e.num() == ETERM) + { + stop_.store(true, std::memory_order_relaxed); + break; + } + ShowError(fmt::format("ZMQService endpoint error: {}", e.what())); + } + } + } + } + catch (const std::exception& e) + { + // Never let an exception escape the I/O thread + ShowError(fmt::format("ZMQService I/O thread error: {}", e.what())); + } + + // Close every socket on this (the I/O) thread before the context is torn down. + for (auto& ep : endpoints) + { + ep->close(); + } +} diff --git a/src/common/zmq/zmq_service.h b/src/common/zmq/zmq_service.h new file mode 100644 index 00000000000..50484b4cf5c --- /dev/null +++ b/src/common/zmq/zmq_service.h @@ -0,0 +1,76 @@ +/* +=========================================================================== + + Copyright (c) 2026 LandSandBoat Dev Teams + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/ + +=========================================================================== +*/ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +// +// ZMQService +// +// Owns a single zmq::context_t and a single I/O thread running ONE zmq_poll loop over every +// registered socket. Endpoints are created, bound/connected, polled, and closed exclusively on the +// I/O thread (ZMQ sockets are not thread-safe). Producers/consumers exchange messages with the I/O +// thread through each endpoint's lock-free queues, through a non-owning ipc::Channel. +// +class ZMQService final +{ +public: + explicit ZMQService(std::string threadName = "ZMQ I/O"); + ~ZMQService(); + + ZMQService(const ZMQService&) = delete; + ZMQService& operator=(const ZMQService&) = delete; + ZMQService(ZMQService&&) = delete; + ZMQService& operator=(ZMQService&&) = delete; + + [[nodiscard]] auto registerRouter(const std::string& endpoint) -> ipc::Channel; + + [[nodiscard]] auto registerDealer(const std::string& endpoint, uint64 routingId) -> ipc::Channel; + + auto stop() noexcept -> void; + +private: + // I/O thread body + auto run() -> void; + + auto drainPending(std::vector>& endpoints) -> bool; + + std::string threadName_; + zmq::context_t context_{ 1 }; + std::atomic stop_{ false }; + + moodycamel::ConcurrentQueue> pendingRegistrations_; + + std::jthread thread_; +}; diff --git a/src/common/zmq_dealer_wrapper.h b/src/common/zmq_dealer_wrapper.h deleted file mode 100644 index 59794535c2a..00000000000 --- a/src/common/zmq_dealer_wrapper.h +++ /dev/null @@ -1,140 +0,0 @@ -/* -=========================================================================== - - Copyright (c) 2025 LandSandBoat Dev Teams - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see http://www.gnu.org/licenses/ - -=========================================================================== -*/ - -#pragma once - -#include -#include - -#include -#include -#include - -#include -#include -#include - -class ZMQDealerWrapper final -{ - class ZMQWorker final - { - public: - ZMQWorker(std::atomic& requestExit, - moodycamel::ConcurrentQueue& incomingQueue, - moodycamel::ConcurrentQueue& outgoingQueue, - const std::string& endpoint, - uint64_t routingId) - : requestExit_(requestExit) - , incomingQueue_(incomingQueue) - , outgoingQueue_(outgoingQueue) - , zmqContext_(1) - , zmqSocket_(zmqContext_, zmq::socket_type::dealer) - { - zmqSocket_.set(zmq::sockopt::routing_id, zmq::const_buffer(&routingId, sizeof(uint64))); - zmqSocket_.set(zmq::sockopt::rcvtimeo, 200); - - try - { - zmqSocket_.connect(endpoint); - } - catch (zmq::error_t& err) - { - throw std::runtime_error(fmt::format("Unable to bind chat socket: {}", err.what())); - } - - listen(); - } - - ~ZMQWorker() - { - zmqSocket_.close(); - zmqContext_.close(); - } - - private: - void listen() - { - while (!requestExit_) - { - zmq::message_t msg; - try - { - if (!zmqSocket_.recv(msg, zmq::recv_flags::none)) - { - while (outgoingQueue_.try_dequeue(msg)) - { - zmqSocket_.send(msg, zmq::send_flags::none); - } - continue; - } - - incomingQueue_.enqueue(std::move(msg)); - } - catch (zmq::error_t& e) - { - // Context was terminated (ETERM = 156384765) - // Exit loop - if (e.num() == 156384765) - { - return; - } - - ShowError(fmt::format("Message: {}", e.what())); - continue; - } - } - } - - private: - std::atomic& requestExit_; - - moodycamel::ConcurrentQueue& incomingQueue_; - moodycamel::ConcurrentQueue& outgoingQueue_; - - zmq::context_t zmqContext_; - zmq::socket_t zmqSocket_; - }; - -public: - ZMQDealerWrapper(const std::string& endpoint, uint64 routingId) - : requestExit_(false) - , thread_( - [this, endpoint, routingId]() - { - TracySetThreadName("ZMQ Dealer"); - ZMQWorker worker(requestExit_, incomingQueue_, outgoingQueue_, endpoint, routingId); - }) - { - } - - ~ZMQDealerWrapper() - { - requestExit_ = true; - thread_.join(); - } - - moodycamel::ConcurrentQueue incomingQueue_; - moodycamel::ConcurrentQueue outgoingQueue_; - -private: - std::atomic requestExit_; - std::jthread thread_; -}; diff --git a/src/common/zmq_router_wrapper.h b/src/common/zmq_router_wrapper.h deleted file mode 100644 index 0b859c85800..00000000000 --- a/src/common/zmq_router_wrapper.h +++ /dev/null @@ -1,159 +0,0 @@ -/* -=========================================================================== - - Copyright (c) 2025 LandSandBoat Dev Teams - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see http://www.gnu.org/licenses/ - -=========================================================================== -*/ - -#pragma once - -#include -#include - -#include -#include -#include - -#include -#include -#include - -struct IPPMessage -{ - IPP ipp; - std::vector payload; -}; - -class ZMQRouterWrapper final -{ - class ZMQWorker final - { - public: - ZMQWorker(std::atomic& requestExit, - moodycamel::ConcurrentQueue& incomingQueue, - moodycamel::ConcurrentQueue& outgoingQueue, - const std::string& endpoint) - : requestExit_(requestExit) - , incomingQueue_(incomingQueue) - , outgoingQueue_(outgoingQueue) - , zmqContext_(1) - , zmqSocket_(zmqContext_, zmq::socket_type::router) - { - zmqSocket_.set(zmq::sockopt::rcvtimeo, 200); - - try - { - zmqSocket_.bind(endpoint); - } - catch (zmq::error_t& err) - { - throw std::runtime_error(fmt::format("Unable to bind chat socket: {}", err.what())); - } - - listen(); - } - - ~ZMQWorker() - { - zmqSocket_.close(); - zmqContext_.close(); - } - - private: - void listen() - { - while (!requestExit_) - { - // Since we are a zmq::socket_type::router, we expect a multipart message: - // [routing id (IPP), message] - std::array msgs; - try - { - if (!zmq::recv_multipart_n(zmqSocket_, msgs.data(), msgs.size(), zmq::recv_flags::none)) - { - IPPMessage msg; - while (outgoingQueue_.try_dequeue(msg)) - { - // We send the same way as we receive: [routing id (IPP), message] - msgs[0] = msg.ipp.toZMQMessage(); - msgs[1] = zmq::message_t(msg.payload); - zmq::send_multipart(zmqSocket_, msgs, zmq::send_flags::dontwait); - } - continue; - } - } - catch (zmq::error_t& e) - { - // Context was terminated - // Exit loop - if (e.num() == 156384765) // ETERM - { - return; - } - - ShowError(fmt::format("Message: {}", e.what())); - continue; - } - - // Handle incoming message - auto& from = msgs[0]; - auto& data = msgs[1]; - - // TODO: Reduce copies here - - auto ipp = IPP(from); - auto payload = std::vector(data.data(), data.data() + data.size()); - - incomingQueue_.enqueue(IPPMessage{ ipp, payload }); - } - } - - private: - std::atomic& requestExit_; - - moodycamel::ConcurrentQueue& incomingQueue_; - moodycamel::ConcurrentQueue& outgoingQueue_; - - zmq::context_t zmqContext_; - zmq::socket_t zmqSocket_; - }; - -public: - ZMQRouterWrapper(const std::string& endpoint) - : requestExit_(false) - , thread_( - [this, endpoint]() - { - TracySetThreadName("ZMQ Router"); - ZMQWorker worker(requestExit_, incomingQueue_, outgoingQueue_, endpoint); - }) - { - } - - ~ZMQRouterWrapper() - { - requestExit_ = true; - thread_.join(); - } - - moodycamel::ConcurrentQueue incomingQueue_; - moodycamel::ConcurrentQueue outgoingQueue_; - -private: - std::atomic requestExit_; - std::jthread thread_; -}; diff --git a/src/login/auth_session.cpp b/src/login/auth_session.cpp index 681f67bc0fc..5144c7ccbd0 100644 --- a/src/login/auth_session.cpp +++ b/src/login/auth_session.cpp @@ -249,7 +249,7 @@ void auth_session::read_func() .accountId = accountID, }); - zmqDealerWrapper_.outgoingQueue_.enqueue(zmq::message_t(payload.data(), payload.size())); + dealerChannel_.send(zmq::message_t(payload.data(), payload.size())); // set Satchel to the same size as inventory on all chars on their account if character has OTP // Note: Upgrades happen in-game with gobbiebag diff --git a/src/login/auth_session.h b/src/login/auth_session.h index 5f8c73727ee..fc87797e846 100644 --- a/src/login/auth_session.h +++ b/src/login/auth_session.h @@ -28,7 +28,9 @@ #include "handler_session.h" #include "login_helpers.h" -#include "common/zmq_dealer_wrapper.h" +#include "common/zmq/channel.h" + +#include enum class login_cmd : uint8_t { @@ -106,9 +108,9 @@ DECLARE_FORMAT_AS_UNDERLYING(ACCOUNT_PRIVILEGE_CODE); class auth_session : public handler_session { public: - auth_session(asio::ssl::stream socket, ZMQDealerWrapper& zmqDealerWrapper) + auth_session(asio::ssl::stream socket, ipc::Channel dealerChannel) : handler_session(std::move(socket)) - , zmqDealerWrapper_(zmqDealerWrapper) + , dealerChannel_(dealerChannel) { DebugSockets(fmt::format("auth_session from {}", ipAddress)); } @@ -135,7 +137,7 @@ class auth_session : public handler_session void do_write(std::size_t length); private: - ZMQDealerWrapper& zmqDealerWrapper_; + ipc::Channel dealerChannel_; Maybe> validatePassword(std::string username, std::string password); }; diff --git a/src/login/connect_application.cpp b/src/login/connect_application.cpp index 2236c7ffd91..ecbc68c79fd 100644 --- a/src/login/connect_application.cpp +++ b/src/login/connect_application.cpp @@ -48,7 +48,7 @@ ConnectApplication::~ConnectApplication() = default; auto ConnectApplication::createEngine() -> std::unique_ptr { certificateHelpers::generateSelfSignedCert(); - return std::make_unique(scheduler_); + return std::make_unique(scheduler_, zmqService_); } void ConnectApplication::registerCommands(ConsoleService& console) diff --git a/src/login/connect_engine.cpp b/src/login/connect_engine.cpp index 4d116e3e3ad..3603df0373c 100644 --- a/src/login/connect_engine.cpp +++ b/src/login/connect_engine.cpp @@ -28,7 +28,11 @@ namespace auto getZMQEndpointString() -> std::string { - return fmt::format("tcp://{}:{}", settings::get("network.ZMQ_IP"), settings::get("network.ZMQ_PORT")); + return fmt::format( + "{}://{}:{}", + settings::get("network.ZMQ_TRANSPORT"), + settings::get("network.ZMQ_IP"), + settings::get("network.ZMQ_PORT")); } auto getZMQRoutingId() -> uint64 @@ -45,12 +49,12 @@ constexpr auto kSessionCleanTime = 15min; } // namespace -ConnectEngine::ConnectEngine(Scheduler& scheduler) +ConnectEngine::ConnectEngine(Scheduler& scheduler, ZMQService& zmqService) : scheduler_(scheduler) -, zmqDealerWrapper_(getZMQEndpointString(), getZMQRoutingId()) -, m_authHandler(scheduler_, settings::get("network.LOGIN_AUTH_PORT"), zmqDealerWrapper_) -, m_dataHandler(scheduler_, settings::get("network.LOGIN_DATA_PORT"), zmqDealerWrapper_) -, m_viewHandler(scheduler_, settings::get("network.LOGIN_VIEW_PORT"), zmqDealerWrapper_) +, dealerChannel_(zmqService.registerDealer(getZMQEndpointString(), getZMQRoutingId())) +, m_authHandler(scheduler_, settings::get("network.LOGIN_AUTH_PORT"), dealerChannel_) +, m_dataHandler(scheduler_, settings::get("network.LOGIN_DATA_PORT"), dealerChannel_) +, m_viewHandler(scheduler_, settings::get("network.LOGIN_VIEW_PORT"), dealerChannel_) { periodicCleanupToken_ = scheduler.intervalOnMainThread( kSessionCleanTime, diff --git a/src/login/connect_engine.h b/src/login/connect_engine.h index 044c8c533ea..e565272d14a 100644 --- a/src/login/connect_engine.h +++ b/src/login/connect_engine.h @@ -22,7 +22,7 @@ #pragma once #include "common/application.h" -#include "common/zmq_dealer_wrapper.h" +#include "common/zmq/zmq_service.h" #ifndef _WIN32 #include @@ -38,7 +38,7 @@ class ConnectEngine final : public Engine { public: - ConnectEngine(Scheduler& scheduler); + ConnectEngine(Scheduler& scheduler, ZMQService& zmqService); ~ConnectEngine() override; private: @@ -52,7 +52,7 @@ class ConnectEngine final : public Engine Scheduler& scheduler_; - ZMQDealerWrapper zmqDealerWrapper_; + ipc::Channel dealerChannel_; handler m_authHandler; handler m_dataHandler; diff --git a/src/login/data_session.cpp b/src/login/data_session.cpp index 8cfcb5f12e0..b721bffc6c5 100644 --- a/src/login/data_session.cpp +++ b/src/login/data_session.cpp @@ -68,7 +68,7 @@ void data_session::read_func() session_t& session = loginHelpers::get_authenticated_session(ipAddress, sessionHash); if (!session.data_session) { - session.data_session = std::make_shared(std::forward>(socket_), zmqDealerWrapper_); + session.data_session = std::make_shared(std::forward>(socket_), dealerChannel_); session.data_session->sessionHash = sessionHash; } @@ -524,7 +524,7 @@ void data_session::read_func() db::preparedStmt("UPDATE char_flags SET disconnecting = 0 WHERE charid = ?", charid); db::preparedStmt("UPDATE char_stats SET zoning = 2 WHERE charid = ?", charid); - zmqDealerWrapper_.outgoingQueue_.enqueue(zmq::message_t(payload.data(), payload.size())); + dealerChannel_.send(zmq::message_t(payload.data(), payload.size())); } if (settings::get("login.LOG_USER_IP")) diff --git a/src/login/data_session.h b/src/login/data_session.h index c9657788672..5a9720e1809 100644 --- a/src/login/data_session.h +++ b/src/login/data_session.h @@ -30,15 +30,17 @@ #include "login_packets.h" #include "common/ipp.h" -#include "common/zmq_dealer_wrapper.h" +#include "common/zmq/channel.h" + +#include // port 54230 class data_session : public handler_session { public: - data_session(asio::ssl::stream socket, ZMQDealerWrapper& zmqDealerWrapper) + data_session(asio::ssl::stream socket, ipc::Channel dealerChannel) : handler_session(std::move(socket)) - , zmqDealerWrapper_(zmqDealerWrapper) + , dealerChannel_(dealerChannel) { DebugSockets("data_session from IP %s", ipAddress); } @@ -57,7 +59,8 @@ class data_session : public handler_session void handle_error(std::error_code ec, std::shared_ptr self) override; private: - ZMQDealerWrapper& zmqDealerWrapper_; - lpkt_chr_info2 characterInfoResponse = {}; // Store this for char deletion/creation client behavior. We need to skip slots instead of "flatten" them. - bool generatedCharInfo = false; + ipc::Channel dealerChannel_; + + lpkt_chr_info2 characterInfoResponse = {}; // Store this for char deletion/creation client behavior. We need to skip slots instead of "flatten" them. + bool generatedCharInfo = false; }; diff --git a/src/login/handler.h b/src/login/handler.h index 07473c1cfda..a33149b77e6 100644 --- a/src/login/handler.h +++ b/src/login/handler.h @@ -30,17 +30,19 @@ #include "view_session.h" #include "common/scheduler.h" -#include "common/zmq_dealer_wrapper.h" +#include "common/zmq/channel.h" + +#include template class handler { public: - handler(Scheduler& scheduler, unsigned int port, ZMQDealerWrapper& zmqDealerWrapper) + handler(Scheduler& scheduler, unsigned int port, ipc::Channel dealerChannel) : scheduler_(scheduler) , acceptor_(scheduler_.mainContext(), asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) , sslContext_(asio::ssl::context::tls_server) - , zmqDealerWrapper_(zmqDealerWrapper) + , dealerChannel_(dealerChannel) { acceptor_.set_option(asio::socket_base::reuse_address(true)); @@ -62,7 +64,7 @@ class handler if (!ec) { - const auto sessionHandler = std::make_shared(asio::ssl::stream(std::move(socket), sslContext_), zmqDealerWrapper_); + const auto sessionHandler = std::make_shared(asio::ssl::stream(std::move(socket), sslContext_), dealerChannel_); scheduler_.postToWorkerThread( [sessionHandler] { @@ -80,5 +82,5 @@ class handler asio::ip::tcp::acceptor acceptor_; asio::ssl::context sslContext_; - ZMQDealerWrapper& zmqDealerWrapper_; + ipc::Channel dealerChannel_; }; diff --git a/src/login/view_session.cpp b/src/login/view_session.cpp index d70e4ac260a..42530ae259d 100644 --- a/src/login/view_session.cpp +++ b/src/login/view_session.cpp @@ -43,7 +43,7 @@ void view_session::read_func() session_t& session = loginHelpers::get_authenticated_session(ipAddress, sessionHash); if (!session.view_session) { - session.view_session = std::make_shared(std::forward>(socket_), zmqDealerWrapper_); + session.view_session = std::make_shared(std::forward>(socket_), dealerChannel_); } session.view_session->sessionHash = sessionHash; diff --git a/src/login/view_session.h b/src/login/view_session.h index 657b24ea8b0..b17cd4a74c6 100644 --- a/src/login/view_session.h +++ b/src/login/view_session.h @@ -27,7 +27,9 @@ #include "handler_session.h" #include "login_helpers.h" -#include "common/zmq_dealer_wrapper.h" +#include "common/zmq/channel.h" + +#include // Main menu (Lobby), port 54001 // A comment on the packets below, defined as macros. @@ -40,9 +42,9 @@ class view_session : public handler_session { public: - view_session(asio::ssl::stream socket, ZMQDealerWrapper& zmqDealerWrapper) + view_session(asio::ssl::stream socket, ipc::Channel dealerChannel) : handler_session(std::move(socket)) - , zmqDealerWrapper_(zmqDealerWrapper) + , dealerChannel_(dealerChannel) { DebugSockets("view_session from IP %s", ipAddress); } @@ -58,5 +60,5 @@ class view_session : public handler_session void handle_error(std::error_code ec, std::shared_ptr self) override; private: - ZMQDealerWrapper& zmqDealerWrapper_; + ipc::Channel dealerChannel_; }; diff --git a/src/map/ipc_client.cpp b/src/map/ipc_client.cpp index 71f4acc9410..52f2c9d5264 100644 --- a/src/map/ipc_client.cpp +++ b/src/map/ipc_client.cpp @@ -54,33 +54,44 @@ #include "utils/serverutils.h" #include "utils/zoneutils.h" -// TODO: Don't do this -std::unique_ptr ipcClient_; +namespace +{ + +IPCClient* sClient = nullptr; + +} // namespace -void message::init(MapNetworking& networking) +void message::init(IPCClient& client) { - TracyZoneScoped; + sClient = &client; +} - ipcClient_ = std::make_unique(networking); +auto message::detail::client() -> IPCClient& +{ + return *sClient; } void message::handle_incoming() { TracyZoneScoped; - ipcClient_->handleIncomingMessages(); + sClient->handleIncomingMessages(); } -IPCClient::IPCClient(MapNetworking& networking) +IPCClient::IPCClient(MapNetworking& networking, ZMQService& zmqService) : networking_(networking) -, zmqDealerWrapper_(getZMQEndpointString(), getZMQRoutingId()) +, channel_(zmqService.registerDealer(getZMQEndpointString(), getZMQRoutingId())) { TracyZoneScoped; } auto IPCClient::getZMQEndpointString() -> std::string { - return fmt::format("tcp://{}:{}", settings::get("network.ZMQ_IP"), settings::get("network.ZMQ_PORT")); + return fmt::format( + "{}://{}:{}", + settings::get("network.ZMQ_TRANSPORT"), + settings::get("network.ZMQ_IP"), + settings::get("network.ZMQ_PORT")); } auto IPCClient::getZMQRoutingId() -> uint64 @@ -115,7 +126,7 @@ void IPCClient::handleIncomingMessages() // TODO: Can we stop more messages appearing on the queue while we're processing? zmq::message_t out; - while (zmqDealerWrapper_.incomingQueue_.try_dequeue(out)) + while (channel_.tryReceive(out)) { const auto firstByte = out.data()[0]; const auto msgType = ipc::toString(static_cast(firstByte)); diff --git a/src/map/ipc_client.h b/src/map/ipc_client.h index ed23aaebf3b..5f39b13e65d 100644 --- a/src/map/ipc_client.h +++ b/src/map/ipc_client.h @@ -25,7 +25,7 @@ #include "common/ipc.h" #include "common/lua.h" #include "common/mmo.h" -#include "common/zmq_dealer_wrapper.h" +#include "common/zmq/zmq_service.h" #include #include @@ -38,7 +38,7 @@ class MapNetworking; class IPCClient final : public ipc::IPCMessageHandlerBase { public: - IPCClient(MapNetworking& networking); + IPCClient(MapNetworking& networking, ZMQService& zmqService); auto getZMQEndpointString() -> std::string; auto getZMQRoutingId() -> uint64; @@ -93,8 +93,9 @@ class IPCClient final : public ipc::IPCMessageHandlerBase void handleUnknownMessage(const IPP& ipp, const std::span message); private: - MapNetworking& networking_; - ZMQDealerWrapper zmqDealerWrapper_; + MapNetworking& networking_; + + ipc::Channel channel_; }; // @@ -111,25 +112,31 @@ void IPCClient::sendMessage(const T& message) DebugIPCFmt("Sending message: {}", ipc::toStringV); const auto bytes = ipc::toBytesWithHeader(message); - zmqDealerWrapper_.outgoingQueue_.enqueue(zmq::message_t(bytes)); + channel_.send(zmq::message_t(bytes)); } // // Convenience namespace // -// TODO: Don't do this -extern std::unique_ptr ipcClient_; - namespace message { -void init(MapNetworking& networking); +// TODO: For convenience, we bind the IPCClient to this global namespace, but we should +// pipe things properly through to the places that want them. +void init(IPCClient& client); + +namespace detail +{ + +auto client() -> IPCClient&; + +} // namespace detail template void send(const T& message) { - ipcClient_->sendMessage(message); + detail::client().sendMessage(message); } void handle_incoming(); diff --git a/src/map/map_engine.cpp b/src/map/map_engine.cpp index dc36265e0b1..12273e67d1b 100644 --- a/src/map/map_engine.cpp +++ b/src/map/map_engine.cpp @@ -157,7 +157,8 @@ auto MapEngine::init() -> Task zlib_init(); ShowInfo("do_init: starting ZMQ thread"); - message::init(networking()); + ipcClient_ = std::make_unique(networking(), application_.zmqService()); + message::init(*ipcClient_); ShowInfo("do_init: loading items"); itemutils::Initialize(); diff --git a/src/map/map_engine.h b/src/map/map_engine.h index 85513a51fac..3075ea0253e 100644 --- a/src/map/map_engine.h +++ b/src/map/map_engine.h @@ -35,6 +35,7 @@ // class IPP; +class IPCClient; class MapNetworking; class MapStatistics; class CZone; @@ -100,6 +101,7 @@ class MapEngine final : public Engine std::unique_ptr mapStatistics_; std::unique_ptr networking_; + std::unique_ptr ipcClient_; std::atomic watchdogLastUpdate_; MapConfig& config_; }; diff --git a/src/test/test_application.cpp b/src/test/test_application.cpp index 2148f77b56e..4b514b586ee 100644 --- a/src/test/test_application.cpp +++ b/src/test/test_application.cpp @@ -22,8 +22,12 @@ #include "test_application.h" #include "test_engine.h" +#include "common/settings.h" + #include +#include + namespace { @@ -108,15 +112,22 @@ void TestApplication::run() scheduler_.postToMainThread( [&]() -> Task { + // The test harness embeds both the world and map servers in this one process. Route their + // IPC over inproc:// (a shared, in-process transport) instead of a TCP port. + settings::set("network.ZMQ_TRANSPORT", std::string("inproc")); + // - // Prepare MapEngine + // Prepare WorldEngine // - // Without a world server actively pumping the queues, - // the embedded map server deadlocks on exit + auto worldEngine = std::make_unique(scheduler_, zmqService_, WorldEngine::EnableHTTPServer::No); + + worldEngine->onInitialize(); + + // + // Prepare MapEngine // - // We will need this to work to support multiprocess tests and validating systems that rely on world server. - // However, that requires deeper rework to the IPP logic so we can smartly route messages during tests. + MapConfig mapConfig{ .isTestServer = true, .lazyZones = true, @@ -130,14 +141,6 @@ void TestApplication::run() mapEngine->onInitialize(); - // - // Prepare WorldEngine - // - - auto worldEngine = std::make_unique(scheduler_, WorldEngine::EnableHTTPServer::No); - - worldEngine->onInitialize(); - // // Prepare TestEngine with MapEngine and WorldEngine // diff --git a/src/world/ipc_server.cpp b/src/world/ipc_server.cpp index a0a3ff5db72..51de14e773b 100644 --- a/src/world/ipc_server.cpp +++ b/src/world/ipc_server.cpp @@ -38,14 +38,18 @@ namespace auto getZMQEndpointString() -> std::string { - return fmt::format("tcp://{}:{}", settings::get("network.ZMQ_IP"), settings::get("network.ZMQ_PORT")); + return fmt::format( + "{}://{}:{}", + settings::get("network.ZMQ_TRANSPORT"), + settings::get("network.ZMQ_IP"), + settings::get("network.ZMQ_PORT")); } } // namespace -IPCServer::IPCServer(WorldEngine& worldServer) +IPCServer::IPCServer(WorldEngine& worldServer, ZMQService& zmqService) : worldServer_(worldServer) -, zmqRouterWrapper_(getZMQEndpointString()) +, channel_(zmqService.registerRouter(getZMQEndpointString())) { TracyZoneScoped; } @@ -365,9 +369,9 @@ void IPCServer::handleIncomingMessages() { TracyZoneScoped; - // TODO: Can we stop more messages appearing on the queue while we're processing? + // TODO: Should we stop more messages appearing on the queue while we're processing? IPPMessage message; - while (zmqRouterWrapper_.incomingQueue_.try_dequeue(message)) + while (channel_.tryReceive(message)) { const auto firstByte = message.payload[0]; const auto msgType = ipc::toString(static_cast(firstByte)); diff --git a/src/world/ipc_server.h b/src/world/ipc_server.h index 599f1259011..000b9d7ff1e 100644 --- a/src/world/ipc_server.h +++ b/src/world/ipc_server.h @@ -24,7 +24,7 @@ #include "common/ipc.h" #include "common/ipp.h" #include "common/mmo.h" -#include "common/zmq_dealer_wrapper.h" +#include "common/zmq/zmq_service.h" #include "character_cache.h" #include "world_engine.h" @@ -39,7 +39,7 @@ class IPCServer final : public ipc::IPCMessageHandlerBase { public: - IPCServer(WorldEngine& worldServer); + IPCServer(WorldEngine& worldServer, ZMQService& zmqService); void handleIncomingMessages(); @@ -126,9 +126,10 @@ class IPCServer final : public ipc::IPCMessageHandlerBase private: WorldEngine& worldServer_; - CharacterCache characterCache_; - ZoneSettings zoneSettings_; - ZMQRouterWrapper zmqRouterWrapper_; + CharacterCache characterCache_; + ZoneSettings zoneSettings_; + + ipc::Channel channel_; }; // @@ -143,9 +144,7 @@ void IPCServer::sendMessage(const IPP& ipp, const T& message) DebugIPCFmt("Sending {} message to {}", ipc::toStringV, ipp.toString()); const auto bytes = ipc::toBytesWithHeader(message); - const auto out = IPPMessage{ ipp, std::vector{ bytes.begin(), bytes.end() } }; - - zmqRouterWrapper_.outgoingQueue_.enqueue(std::move(out)); + channel_.send(IPPMessage{ ipp, std::vector{ bytes.begin(), bytes.end() } }); } template @@ -158,7 +157,6 @@ void IPCServer::broadcastMessage(const T& message) for (const auto& ipp : zoneSettings_.mapEndpoints_) { const auto bytes = ipc::toBytesWithHeader(message); - const auto out = IPPMessage{ ipp, std::vector{ bytes.begin(), bytes.end() } }; - zmqRouterWrapper_.outgoingQueue_.enqueue(std::move(out)); + channel_.send(IPPMessage{ ipp, std::vector{ bytes.begin(), bytes.end() } }); } } diff --git a/src/world/world_application.cpp b/src/world/world_application.cpp index da4edb46d4c..0bd6688eb34 100644 --- a/src/world/world_application.cpp +++ b/src/world/world_application.cpp @@ -50,5 +50,5 @@ WorldApplication::~WorldApplication() = default; auto WorldApplication::createEngine() -> std::unique_ptr { const auto httpEnabled = settings::get("network.ENABLE_HTTP"); - return std::make_unique(scheduler_, WorldEngine::EnableHTTPServer{ httpEnabled }); + return std::make_unique(scheduler_, zmqService_, WorldEngine::EnableHTTPServer{ httpEnabled }); } diff --git a/src/world/world_engine.cpp b/src/world/world_engine.cpp index 61109cea6fd..b9e24d3179b 100644 --- a/src/world/world_engine.cpp +++ b/src/world/world_engine.cpp @@ -35,9 +35,9 @@ #include "party_system.h" #include "time_server.h" -WorldEngine::WorldEngine(Scheduler& scheduler, EnableHTTPServer enableHTTPServer) +WorldEngine::WorldEngine(Scheduler& scheduler, ZMQService& zmqService, EnableHTTPServer enableHTTPServer) : scheduler_(scheduler) -, ipcServer_(std::make_unique(*this)) +, ipcServer_(std::make_unique(*this, zmqService)) , partySystem_(std::make_unique(*this)) , conquestSystem_(std::make_unique(*this)) , besiegedSystem_(std::make_unique(*this)) diff --git a/src/world/world_engine.h b/src/world/world_engine.h index fa7e1d0da40..b4c0bdcc31a 100644 --- a/src/world/world_engine.h +++ b/src/world/world_engine.h @@ -22,9 +22,10 @@ #pragma once #include +#include #include #include -#include +#include #include "http_server.h" @@ -44,7 +45,7 @@ class WorldEngine final : public Engine public: using EnableHTTPServer = xi::Flag; - WorldEngine(Scheduler& scheduler, EnableHTTPServer enableHTTPServer); + WorldEngine(Scheduler& scheduler, ZMQService& zmqService, EnableHTTPServer enableHTTPServer); ~WorldEngine() override; // TODO: Make all of these members private