From 3a46b7002ecd2176cb86b2370bf1fc3013f90153 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 22 Dec 2025 04:01:49 +0000 Subject: [PATCH 1/5] Initial plan From 37134529b78516b2941e1983d3450dfe7aa977ee Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 22 Dec 2025 04:10:38 +0000 Subject: [PATCH 2/5] feat(subprocess): Add IPC infrastructure for subprocess streaming - Add IPC protocol definitions (ipc_protocol.h) - Implement cross-platform named pipe IPC (ipc_pipe.h/cpp) - Add subprocess configuration (subprocess_config.h/cpp) - Implement subprocess lifecycle management (subprocess_manager.h/cpp) - Add subprocess module initialization (subprocess.h/cpp) - Update CMake to include subprocess module files Co-authored-by: ShadowLemoon <119576779+ShadowLemoon@users.noreply.github.com> --- cmake/compile_definitions/common.cmake | 9 + src/subprocess/ipc_pipe.cpp | 799 +++++++++++++++++++++++++ src/subprocess/ipc_pipe.h | 224 +++++++ src/subprocess/ipc_protocol.h | 187 ++++++ src/subprocess/subprocess.cpp | 45 ++ src/subprocess/subprocess.h | 43 ++ src/subprocess/subprocess_config.cpp | 42 ++ src/subprocess/subprocess_config.h | 34 ++ src/subprocess/subprocess_manager.cpp | 636 ++++++++++++++++++++ src/subprocess/subprocess_manager.h | 259 ++++++++ 10 files changed, 2278 insertions(+) create mode 100644 src/subprocess/ipc_pipe.cpp create mode 100644 src/subprocess/ipc_pipe.h create mode 100644 src/subprocess/ipc_protocol.h create mode 100644 src/subprocess/subprocess.cpp create mode 100644 src/subprocess/subprocess.h create mode 100644 src/subprocess/subprocess_config.cpp create mode 100644 src/subprocess/subprocess_config.h create mode 100644 src/subprocess/subprocess_manager.cpp create mode 100644 src/subprocess/subprocess_manager.h diff --git a/cmake/compile_definitions/common.cmake b/cmake/compile_definitions/common.cmake index 8662ca362a3..3865526bf58 100644 --- a/cmake/compile_definitions/common.cmake +++ b/cmake/compile_definitions/common.cmake @@ -130,6 +130,15 @@ set(SUNSHINE_TARGET_FILES "${CMAKE_SOURCE_DIR}/src/stat_trackers.cpp" "${CMAKE_SOURCE_DIR}/src/rswrapper.h" "${CMAKE_SOURCE_DIR}/src/rswrapper.c" + "${CMAKE_SOURCE_DIR}/src/subprocess/ipc_protocol.h" + "${CMAKE_SOURCE_DIR}/src/subprocess/ipc_pipe.h" + "${CMAKE_SOURCE_DIR}/src/subprocess/ipc_pipe.cpp" + "${CMAKE_SOURCE_DIR}/src/subprocess/subprocess_config.h" + "${CMAKE_SOURCE_DIR}/src/subprocess/subprocess_config.cpp" + "${CMAKE_SOURCE_DIR}/src/subprocess/subprocess_manager.h" + "${CMAKE_SOURCE_DIR}/src/subprocess/subprocess_manager.cpp" + "${CMAKE_SOURCE_DIR}/src/subprocess/subprocess.h" + "${CMAKE_SOURCE_DIR}/src/subprocess/subprocess.cpp" ${PLATFORM_TARGET_FILES}) if(NOT SUNSHINE_ASSETS_DIR_DEF) diff --git a/src/subprocess/ipc_pipe.cpp b/src/subprocess/ipc_pipe.cpp new file mode 100644 index 00000000000..b8b57805319 --- /dev/null +++ b/src/subprocess/ipc_pipe.cpp @@ -0,0 +1,799 @@ +/** + * @file src/subprocess/ipc_pipe.cpp + * @brief Platform-specific implementation of named pipe IPC. + */ +#include "ipc_pipe.h" + +#include "src/logging.h" + +#ifdef _WIN32 + #include +#else + #include + #include + #include + #include + #include + + #include + #include +#endif + +using namespace std::literals; + +namespace subprocess { + namespace ipc { + + const char * + result_to_string(result_e result) { + switch (result) { + case result_e::success: + return "success"; + case result_e::error_create_pipe: + return "error_create_pipe"; + case result_e::error_connect: + return "error_connect"; + case result_e::error_timeout: + return "error_timeout"; + case result_e::error_disconnected: + return "error_disconnected"; + case result_e::error_invalid_message: + return "error_invalid_message"; + case result_e::error_write: + return "error_write"; + case result_e::error_read: + return "error_read"; + default: + return "unknown"; + } + } + + // ===================================================== + // pipe_server_t implementation + // ===================================================== + + pipe_server_t::pipe_server_t() = default; + + pipe_server_t::~pipe_server_t() { + stop_receive_loop(); + close(); + } + + result_e + pipe_server_t::create(uint32_t session_id) { + pipe_name_ = get_pipe_name(session_id); + +#ifdef _WIN32 + // Create named pipe with overlapped I/O support + HANDLE handle = CreateNamedPipeA( + pipe_name_.c_str(), + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, + 1, // Max instances + 65536, // Output buffer size + 65536, // Input buffer size + 0, // Default timeout + nullptr // Default security + ); + + if (handle == INVALID_HANDLE_VALUE) { + BOOST_LOG(error) << "Failed to create named pipe: " << GetLastError(); + return result_e::error_create_pipe; + } + + pipe_handle_ = handle; + BOOST_LOG(debug) << "Created named pipe: " << pipe_name_; + return result_e::success; +#else + // On Unix, we use FIFO (named pipes) + // Remove existing pipe if any + unlink(pipe_name_.c_str()); + + if (mkfifo(pipe_name_.c_str(), 0600) != 0) { + BOOST_LOG(error) << "Failed to create FIFO: " << strerror(errno); + return result_e::error_create_pipe; + } + + BOOST_LOG(debug) << "Created FIFO: " << pipe_name_; + return result_e::success; +#endif + } + + result_e + pipe_server_t::wait_for_connection(int timeout_ms) { +#ifdef _WIN32 + if (!pipe_handle_) { + return result_e::error_create_pipe; + } + + HANDLE handle = static_cast(pipe_handle_); + + // Create event for overlapped operation + OVERLAPPED overlapped = {}; + overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); + if (!overlapped.hEvent) { + BOOST_LOG(error) << "Failed to create event: " << GetLastError(); + return result_e::error_create_pipe; + } + + // Start async connect + BOOL connected = ConnectNamedPipe(handle, &overlapped); + DWORD error = GetLastError(); + + if (!connected) { + if (error == ERROR_IO_PENDING) { + // Wait for connection with timeout + DWORD wait_result = WaitForSingleObject(overlapped.hEvent, timeout_ms); + CloseHandle(overlapped.hEvent); + + if (wait_result == WAIT_TIMEOUT) { + CancelIo(handle); + BOOST_LOG(debug) << "Connection timeout"; + return result_e::error_timeout; + } + else if (wait_result != WAIT_OBJECT_0) { + BOOST_LOG(error) << "Wait failed: " << GetLastError(); + return result_e::error_connect; + } + + // Check if connection succeeded + DWORD bytes; + if (!GetOverlappedResult(handle, &overlapped, &bytes, FALSE)) { + error = GetLastError(); + if (error != ERROR_PIPE_CONNECTED) { + BOOST_LOG(error) << "GetOverlappedResult failed: " << error; + return result_e::error_connect; + } + } + } + else if (error != ERROR_PIPE_CONNECTED) { + CloseHandle(overlapped.hEvent); + BOOST_LOG(error) << "ConnectNamedPipe failed: " << error; + return result_e::error_connect; + } + } + else { + CloseHandle(overlapped.hEvent); + } + + connected_ = true; + BOOST_LOG(info) << "Client connected to IPC pipe"; + return result_e::success; +#else + // On Unix, open the FIFO for read+write (non-blocking initially) + int fd = open(pipe_name_.c_str(), O_RDWR | O_NONBLOCK); + if (fd < 0) { + BOOST_LOG(error) << "Failed to open FIFO: " << strerror(errno); + return result_e::error_create_pipe; + } + + pipe_handle_ = reinterpret_cast(static_cast(fd)); + connected_ = true; + BOOST_LOG(info) << "IPC pipe ready"; + return result_e::success; +#endif + } + + result_e + pipe_server_t::send_message(message_type_e type, const void *payload, size_t payload_length) { + if (!connected_) { + return result_e::error_disconnected; + } + + std::lock_guard lock(write_mutex_); + + message_header_t header = make_header(type, static_cast(payload_length), sequence_number_++); + +#ifdef _WIN32 + HANDLE handle = static_cast(pipe_handle_); + + // Write header + DWORD written; + if (!WriteFile(handle, &header, sizeof(header), &written, nullptr) || written != sizeof(header)) { + BOOST_LOG(error) << "Failed to write header: " << GetLastError(); + return result_e::error_write; + } + + // Write payload if any + if (payload && payload_length > 0) { + if (!WriteFile(handle, payload, static_cast(payload_length), &written, nullptr) || + written != payload_length) { + BOOST_LOG(error) << "Failed to write payload: " << GetLastError(); + return result_e::error_write; + } + } + + return result_e::success; +#else + int fd = static_cast(reinterpret_cast(pipe_handle_)); + + // Write header + ssize_t written = write(fd, &header, sizeof(header)); + if (written != sizeof(header)) { + BOOST_LOG(error) << "Failed to write header: " << strerror(errno); + return result_e::error_write; + } + + // Write payload if any + if (payload && payload_length > 0) { + written = write(fd, payload, payload_length); + if (written != static_cast(payload_length)) { + BOOST_LOG(error) << "Failed to write payload: " << strerror(errno); + return result_e::error_write; + } + } + + return result_e::success; +#endif + } + + result_e + pipe_server_t::receive_message(message_header_t &header, std::vector &payload, int timeout_ms) { + if (!connected_) { + return result_e::error_disconnected; + } + +#ifdef _WIN32 + HANDLE handle = static_cast(pipe_handle_); + + // Create event for overlapped read + OVERLAPPED overlapped = {}; + overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); + if (!overlapped.hEvent) { + return result_e::error_read; + } + + auto cleanup = [&]() { + CloseHandle(overlapped.hEvent); + }; + + // Read header + DWORD read_bytes = 0; + BOOL success = ReadFile(handle, &header, sizeof(header), &read_bytes, &overlapped); + + if (!success) { + DWORD error = GetLastError(); + if (error == ERROR_IO_PENDING) { + DWORD wait_result = WaitForSingleObject(overlapped.hEvent, timeout_ms); + if (wait_result == WAIT_TIMEOUT) { + CancelIo(handle); + cleanup(); + return result_e::error_timeout; + } + else if (wait_result != WAIT_OBJECT_0) { + cleanup(); + return result_e::error_read; + } + + if (!GetOverlappedResult(handle, &overlapped, &read_bytes, FALSE)) { + cleanup(); + connected_ = false; + return result_e::error_disconnected; + } + } + else { + cleanup(); + connected_ = false; + return result_e::error_disconnected; + } + } + + if (read_bytes != sizeof(header)) { + cleanup(); + return result_e::error_read; + } + + // Validate header + if (!validate_header(header)) { + cleanup(); + return result_e::error_invalid_message; + } + + // Read payload if any + if (header.payload_length > 0) { + payload.resize(header.payload_length); + ResetEvent(overlapped.hEvent); + + success = ReadFile(handle, payload.data(), header.payload_length, &read_bytes, &overlapped); + if (!success) { + DWORD error = GetLastError(); + if (error == ERROR_IO_PENDING) { + DWORD wait_result = WaitForSingleObject(overlapped.hEvent, timeout_ms); + if (wait_result == WAIT_TIMEOUT) { + CancelIo(handle); + cleanup(); + return result_e::error_timeout; + } + else if (wait_result != WAIT_OBJECT_0) { + cleanup(); + return result_e::error_read; + } + + if (!GetOverlappedResult(handle, &overlapped, &read_bytes, FALSE)) { + cleanup(); + connected_ = false; + return result_e::error_disconnected; + } + } + else { + cleanup(); + connected_ = false; + return result_e::error_disconnected; + } + } + + if (read_bytes != header.payload_length) { + cleanup(); + return result_e::error_read; + } + } + else { + payload.clear(); + } + + cleanup(); + return result_e::success; +#else + int fd = static_cast(reinterpret_cast(pipe_handle_)); + + // Poll for data with timeout + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLIN; + + int ret = poll(&pfd, 1, timeout_ms); + if (ret == 0) { + return result_e::error_timeout; + } + else if (ret < 0) { + BOOST_LOG(error) << "Poll failed: " << strerror(errno); + return result_e::error_read; + } + + // Read header + ssize_t read_bytes = read(fd, &header, sizeof(header)); + if (read_bytes != sizeof(header)) { + if (read_bytes == 0) { + connected_ = false; + return result_e::error_disconnected; + } + return result_e::error_read; + } + + // Validate header + if (!validate_header(header)) { + return result_e::error_invalid_message; + } + + // Read payload if any + if (header.payload_length > 0) { + payload.resize(header.payload_length); + read_bytes = read(fd, payload.data(), header.payload_length); + if (read_bytes != static_cast(header.payload_length)) { + return result_e::error_read; + } + } + else { + payload.clear(); + } + + return result_e::success; +#endif + } + + void + pipe_server_t::start_receive_loop(message_callback_t callback) { + if (running_) { + return; + } + + running_ = true; + receive_thread_ = std::thread([this, callback]() { + while (running_ && connected_) { + message_header_t header; + std::vector payload; + + auto result = receive_message(header, payload, 1000); + if (result == result_e::error_timeout) { + continue; + } + else if (result != result_e::success) { + BOOST_LOG(warning) << "IPC receive error: " << result_to_string(result); + break; + } + + if (!callback(header, payload)) { + break; + } + } + }); + } + + void + pipe_server_t::stop_receive_loop() { + running_ = false; + if (receive_thread_.joinable()) { + receive_thread_.join(); + } + } + + bool + pipe_server_t::is_connected() const { + return connected_; + } + + void + pipe_server_t::close() { +#ifdef _WIN32 + if (pipe_handle_) { + DisconnectNamedPipe(static_cast(pipe_handle_)); + CloseHandle(static_cast(pipe_handle_)); + pipe_handle_ = nullptr; + } +#else + if (pipe_handle_) { + int fd = static_cast(reinterpret_cast(pipe_handle_)); + ::close(fd); + pipe_handle_ = nullptr; + } + if (!pipe_name_.empty()) { + unlink(pipe_name_.c_str()); + } +#endif + connected_ = false; + } + + // ===================================================== + // pipe_client_t implementation + // ===================================================== + + pipe_client_t::pipe_client_t() = default; + + pipe_client_t::~pipe_client_t() { + stop_receive_loop(); + disconnect(); + } + + result_e + pipe_client_t::connect(uint32_t session_id, int timeout_ms) { + pipe_name_ = get_pipe_name(session_id); + +#ifdef _WIN32 + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < std::chrono::milliseconds(timeout_ms)) { + HANDLE handle = CreateFileA( + pipe_name_.c_str(), + GENERIC_READ | GENERIC_WRITE, + 0, + nullptr, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + nullptr); + + if (handle != INVALID_HANDLE_VALUE) { + // Set pipe to message mode + DWORD mode = PIPE_READMODE_MESSAGE; + if (!SetNamedPipeHandleState(handle, &mode, nullptr, nullptr)) { + CloseHandle(handle); + BOOST_LOG(error) << "Failed to set pipe mode: " << GetLastError(); + return result_e::error_connect; + } + + pipe_handle_ = handle; + connected_ = true; + BOOST_LOG(info) << "Connected to IPC pipe: " << pipe_name_; + return result_e::success; + } + + DWORD error = GetLastError(); + if (error == ERROR_FILE_NOT_FOUND) { + // Pipe not yet created, wait and retry + std::this_thread::sleep_for(100ms); + continue; + } + else if (error == ERROR_PIPE_BUSY) { + // Wait for pipe to become available + if (!WaitNamedPipeA(pipe_name_.c_str(), timeout_ms)) { + break; + } + continue; + } + else { + BOOST_LOG(error) << "Failed to connect to pipe: " << error; + return result_e::error_connect; + } + } + + return result_e::error_timeout; +#else + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < std::chrono::milliseconds(timeout_ms)) { + int fd = open(pipe_name_.c_str(), O_RDWR | O_NONBLOCK); + if (fd >= 0) { + pipe_handle_ = reinterpret_cast(static_cast(fd)); + connected_ = true; + BOOST_LOG(info) << "Connected to IPC pipe: " << pipe_name_; + return result_e::success; + } + + if (errno == ENOENT) { + // FIFO not yet created, wait and retry + std::this_thread::sleep_for(100ms); + continue; + } + + BOOST_LOG(error) << "Failed to open FIFO: " << strerror(errno); + return result_e::error_connect; + } + + return result_e::error_timeout; +#endif + } + + result_e + pipe_client_t::send_message(message_type_e type, const void *payload, size_t payload_length) { + if (!connected_) { + return result_e::error_disconnected; + } + + std::lock_guard lock(write_mutex_); + + message_header_t header = make_header(type, static_cast(payload_length), sequence_number_++); + +#ifdef _WIN32 + HANDLE handle = static_cast(pipe_handle_); + + // Write header + DWORD written; + if (!WriteFile(handle, &header, sizeof(header), &written, nullptr) || written != sizeof(header)) { + BOOST_LOG(error) << "Failed to write header: " << GetLastError(); + return result_e::error_write; + } + + // Write payload if any + if (payload && payload_length > 0) { + if (!WriteFile(handle, payload, static_cast(payload_length), &written, nullptr) || + written != payload_length) { + BOOST_LOG(error) << "Failed to write payload: " << GetLastError(); + return result_e::error_write; + } + } + + return result_e::success; +#else + int fd = static_cast(reinterpret_cast(pipe_handle_)); + + // Write header + ssize_t written = write(fd, &header, sizeof(header)); + if (written != sizeof(header)) { + BOOST_LOG(error) << "Failed to write header: " << strerror(errno); + return result_e::error_write; + } + + // Write payload if any + if (payload && payload_length > 0) { + written = write(fd, payload, payload_length); + if (written != static_cast(payload_length)) { + BOOST_LOG(error) << "Failed to write payload: " << strerror(errno); + return result_e::error_write; + } + } + + return result_e::success; +#endif + } + + result_e + pipe_client_t::receive_message(message_header_t &header, std::vector &payload, int timeout_ms) { + if (!connected_) { + return result_e::error_disconnected; + } + +#ifdef _WIN32 + HANDLE handle = static_cast(pipe_handle_); + + // Create event for overlapped read + OVERLAPPED overlapped = {}; + overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); + if (!overlapped.hEvent) { + return result_e::error_read; + } + + auto cleanup = [&]() { + CloseHandle(overlapped.hEvent); + }; + + // Read header + DWORD read_bytes = 0; + BOOL success = ReadFile(handle, &header, sizeof(header), &read_bytes, &overlapped); + + if (!success) { + DWORD error = GetLastError(); + if (error == ERROR_IO_PENDING) { + DWORD wait_result = WaitForSingleObject(overlapped.hEvent, timeout_ms); + if (wait_result == WAIT_TIMEOUT) { + CancelIo(handle); + cleanup(); + return result_e::error_timeout; + } + else if (wait_result != WAIT_OBJECT_0) { + cleanup(); + return result_e::error_read; + } + + if (!GetOverlappedResult(handle, &overlapped, &read_bytes, FALSE)) { + cleanup(); + connected_ = false; + return result_e::error_disconnected; + } + } + else { + cleanup(); + connected_ = false; + return result_e::error_disconnected; + } + } + + if (read_bytes != sizeof(header)) { + cleanup(); + return result_e::error_read; + } + + // Validate header + if (!validate_header(header)) { + cleanup(); + return result_e::error_invalid_message; + } + + // Read payload if any + if (header.payload_length > 0) { + payload.resize(header.payload_length); + ResetEvent(overlapped.hEvent); + + success = ReadFile(handle, payload.data(), header.payload_length, &read_bytes, &overlapped); + if (!success) { + DWORD error = GetLastError(); + if (error == ERROR_IO_PENDING) { + DWORD wait_result = WaitForSingleObject(overlapped.hEvent, timeout_ms); + if (wait_result == WAIT_TIMEOUT) { + CancelIo(handle); + cleanup(); + return result_e::error_timeout; + } + else if (wait_result != WAIT_OBJECT_0) { + cleanup(); + return result_e::error_read; + } + + if (!GetOverlappedResult(handle, &overlapped, &read_bytes, FALSE)) { + cleanup(); + connected_ = false; + return result_e::error_disconnected; + } + } + else { + cleanup(); + connected_ = false; + return result_e::error_disconnected; + } + } + + if (read_bytes != header.payload_length) { + cleanup(); + return result_e::error_read; + } + } + else { + payload.clear(); + } + + cleanup(); + return result_e::success; +#else + int fd = static_cast(reinterpret_cast(pipe_handle_)); + + // Poll for data with timeout + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLIN; + + int ret = poll(&pfd, 1, timeout_ms); + if (ret == 0) { + return result_e::error_timeout; + } + else if (ret < 0) { + BOOST_LOG(error) << "Poll failed: " << strerror(errno); + return result_e::error_read; + } + + // Read header + ssize_t read_bytes = read(fd, &header, sizeof(header)); + if (read_bytes != sizeof(header)) { + if (read_bytes == 0) { + connected_ = false; + return result_e::error_disconnected; + } + return result_e::error_read; + } + + // Validate header + if (!validate_header(header)) { + return result_e::error_invalid_message; + } + + // Read payload if any + if (header.payload_length > 0) { + payload.resize(header.payload_length); + read_bytes = read(fd, payload.data(), header.payload_length); + if (read_bytes != static_cast(header.payload_length)) { + return result_e::error_read; + } + } + else { + payload.clear(); + } + + return result_e::success; +#endif + } + + void + pipe_client_t::start_receive_loop(message_callback_t callback) { + if (running_) { + return; + } + + running_ = true; + receive_thread_ = std::thread([this, callback]() { + while (running_ && connected_) { + message_header_t header; + std::vector payload; + + auto result = receive_message(header, payload, 1000); + if (result == result_e::error_timeout) { + continue; + } + else if (result != result_e::success) { + BOOST_LOG(warning) << "IPC receive error: " << result_to_string(result); + break; + } + + if (!callback(header, payload)) { + break; + } + } + }); + } + + void + pipe_client_t::stop_receive_loop() { + running_ = false; + if (receive_thread_.joinable()) { + receive_thread_.join(); + } + } + + bool + pipe_client_t::is_connected() const { + return connected_; + } + + void + pipe_client_t::disconnect() { +#ifdef _WIN32 + if (pipe_handle_) { + CloseHandle(static_cast(pipe_handle_)); + pipe_handle_ = nullptr; + } +#else + if (pipe_handle_) { + int fd = static_cast(reinterpret_cast(pipe_handle_)); + ::close(fd); + pipe_handle_ = nullptr; + } +#endif + connected_ = false; + } + + } // namespace ipc +} // namespace subprocess diff --git a/src/subprocess/ipc_pipe.h b/src/subprocess/ipc_pipe.h new file mode 100644 index 00000000000..253a9b512cd --- /dev/null +++ b/src/subprocess/ipc_pipe.h @@ -0,0 +1,224 @@ +/** + * @file src/subprocess/ipc_pipe.h + * @brief Named pipe IPC implementation for subprocess communication. + * + * This file provides cross-platform named pipe abstraction for communication + * between the main process (server) and subprocess (client). + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ipc_protocol.h" + +namespace subprocess { + namespace ipc { + + /** + * @brief Result codes for IPC operations. + */ + enum class result_e { + success, + error_create_pipe, + error_connect, + error_timeout, + error_disconnected, + error_invalid_message, + error_write, + error_read, + }; + + /** + * @brief Convert result code to string. + */ + const char * + result_to_string(result_e result); + + /** + * @brief Callback type for received messages. + * @param header The message header. + * @param payload The message payload data. + * @return true to continue receiving, false to stop. + */ + using message_callback_t = std::function &payload)>; + + /** + * @brief IPC Pipe Server - runs in main process. + * + * Creates a named pipe and waits for subprocess to connect. + */ + class pipe_server_t { + public: + pipe_server_t(); + ~pipe_server_t(); + + // Non-copyable + pipe_server_t(const pipe_server_t &) = delete; + pipe_server_t &operator=(const pipe_server_t &) = delete; + + /** + * @brief Create the named pipe server. + * @param session_id Session ID for pipe name. + * @return Result code. + */ + result_e + create(uint32_t session_id); + + /** + * @brief Wait for client connection. + * @param timeout_ms Timeout in milliseconds. + * @return Result code. + */ + result_e + wait_for_connection(int timeout_ms); + + /** + * @brief Send a message to the client. + * @param type Message type. + * @param payload Payload data (can be nullptr if payload_length is 0). + * @param payload_length Length of payload. + * @return Result code. + */ + result_e + send_message(message_type_e type, const void *payload = nullptr, size_t payload_length = 0); + + /** + * @brief Receive a message from the client. + * @param header Output message header. + * @param payload Output payload buffer. + * @param timeout_ms Timeout in milliseconds. + * @return Result code. + */ + result_e + receive_message(message_header_t &header, std::vector &payload, int timeout_ms); + + /** + * @brief Start async receive loop. + * @param callback Callback for received messages. + */ + void + start_receive_loop(message_callback_t callback); + + /** + * @brief Stop async receive loop. + */ + void + stop_receive_loop(); + + /** + * @brief Check if connected to client. + */ + bool + is_connected() const; + + /** + * @brief Close the pipe. + */ + void + close(); + + /** + * @brief Get the pipe name. + */ + const std::string & + pipe_name() const { + return pipe_name_; + } + + private: + std::string pipe_name_; + void *pipe_handle_ = nullptr; // Platform-specific handle + std::atomic connected_ { false }; + std::atomic running_ { false }; + std::thread receive_thread_; + std::mutex write_mutex_; + uint32_t sequence_number_ = 0; + }; + + /** + * @brief IPC Pipe Client - runs in subprocess. + * + * Connects to the named pipe created by main process. + */ + class pipe_client_t { + public: + pipe_client_t(); + ~pipe_client_t(); + + // Non-copyable + pipe_client_t(const pipe_client_t &) = delete; + pipe_client_t &operator=(const pipe_client_t &) = delete; + + /** + * @brief Connect to the named pipe server. + * @param session_id Session ID for pipe name. + * @param timeout_ms Connection timeout in milliseconds. + * @return Result code. + */ + result_e + connect(uint32_t session_id, int timeout_ms); + + /** + * @brief Send a message to the server. + * @param type Message type. + * @param payload Payload data (can be nullptr if payload_length is 0). + * @param payload_length Length of payload. + * @return Result code. + */ + result_e + send_message(message_type_e type, const void *payload = nullptr, size_t payload_length = 0); + + /** + * @brief Receive a message from the server. + * @param header Output message header. + * @param payload Output payload buffer. + * @param timeout_ms Timeout in milliseconds. + * @return Result code. + */ + result_e + receive_message(message_header_t &header, std::vector &payload, int timeout_ms); + + /** + * @brief Start async receive loop. + * @param callback Callback for received messages. + */ + void + start_receive_loop(message_callback_t callback); + + /** + * @brief Stop async receive loop. + */ + void + stop_receive_loop(); + + /** + * @brief Check if connected to server. + */ + bool + is_connected() const; + + /** + * @brief Disconnect from server. + */ + void + disconnect(); + + private: + std::string pipe_name_; + void *pipe_handle_ = nullptr; // Platform-specific handle + std::atomic connected_ { false }; + std::atomic running_ { false }; + std::thread receive_thread_; + std::mutex write_mutex_; + uint32_t sequence_number_ = 0; + }; + + } // namespace ipc +} // namespace subprocess diff --git a/src/subprocess/ipc_protocol.h b/src/subprocess/ipc_protocol.h new file mode 100644 index 00000000000..4aa809ca953 --- /dev/null +++ b/src/subprocess/ipc_protocol.h @@ -0,0 +1,187 @@ +/** + * @file src/subprocess/ipc_protocol.h + * @brief IPC protocol definitions for subprocess streaming architecture. + * + * This file defines the message formats and types used for communication + * between the main process (control plane) and the subprocess (data plane). + */ +#pragma once + +#include +#include +#include + +namespace subprocess { + namespace ipc { + + /** + * @brief Magic number to identify valid IPC messages. + */ + constexpr uint32_t IPC_MAGIC = 0x53554E53; // "SUNS" + + /** + * @brief IPC protocol version for compatibility checking. + */ + constexpr uint16_t IPC_VERSION = 1; + + /** + * @brief Message types for IPC communication. + */ + enum class message_type_e : uint16_t { + // Control messages (Main -> Subprocess) + INIT_SESSION = 0x0001, ///< Initialize streaming session with config + START_STREAM = 0x0002, ///< Start streaming + STOP_STREAM = 0x0003, ///< Stop streaming + REQUEST_IDR = 0x0004, ///< Request IDR frame + CHANGE_BITRATE = 0x0005, ///< Change encoding bitrate + INVALIDATE_REFS = 0x0006, ///< Invalidate reference frames + SHUTDOWN = 0x0007, ///< Shutdown subprocess + + // Socket handover (Main -> Subprocess) + SOCKET_INFO = 0x0100, ///< Socket information for handover + + // Status messages (Subprocess -> Main) + STATUS_READY = 0x0200, ///< Subprocess is ready + STATUS_STREAMING = 0x0201, ///< Streaming started + STATUS_STOPPED = 0x0202, ///< Streaming stopped + STATUS_ERROR = 0x0203, ///< Error occurred + + // Heartbeat + HEARTBEAT = 0x0300, ///< Keepalive message + HEARTBEAT_ACK = 0x0301, ///< Heartbeat acknowledgment + }; + + /** + * @brief Common message header for all IPC messages. + */ +#pragma pack(push, 1) + struct message_header_t { + uint32_t magic; ///< Magic number (IPC_MAGIC) + uint16_t version; ///< Protocol version + uint16_t type; ///< Message type (message_type_e) + uint32_t payload_length; ///< Length of payload following header + uint32_t sequence_number; ///< Sequence number for tracking + }; + + /** + * @brief Session initialization message payload. + */ + struct init_session_payload_t { + // Video configuration + int32_t width; + int32_t height; + int32_t framerate; + int32_t bitrate; ///< Bitrate in Kbps + int32_t slices_per_frame; + int32_t num_ref_frames; + int32_t encoder_csc_mode; + int32_t video_format; ///< 0=H264, 1=HEVC, 2=AV1 + int32_t dynamic_range; + int32_t chroma_sampling; ///< 0=4:2:0, 1=4:4:4 + int32_t enable_intra_refresh; + + // Audio configuration + int32_t audio_channels; + int32_t audio_mask; + int32_t audio_packet_duration; + uint8_t audio_high_quality; + uint8_t audio_host_audio; + + // Network configuration + int32_t packet_size; + int32_t min_fec_packets; + int32_t fec_percentage; + + // Encryption + uint8_t encryption_flags; + uint8_t gcm_key[16]; + uint8_t iv[16]; + + // Display name length followed by the string + uint16_t display_name_length; + // char display_name[display_name_length]; // Variable length + }; + + /** + * @brief Socket handover information (Windows-specific). + */ + struct socket_info_payload_t { + uint8_t socket_type; ///< 0=video, 1=audio, 2=control + uint16_t local_port; + uint16_t remote_port; + uint8_t address_family; ///< AF_INET=2, AF_INET6=23 + uint8_t remote_addr[16]; ///< IPv4 (4 bytes) or IPv6 (16 bytes) + + // Windows WSAPROTOCOL_INFO size is ~372 bytes + uint16_t protocol_info_length; + // uint8_t protocol_info[protocol_info_length]; // Variable length (WSAPROTOCOL_INFO) + }; + + /** + * @brief Bitrate change message payload. + */ + struct change_bitrate_payload_t { + int32_t new_bitrate_kbps; + }; + + /** + * @brief Reference frame invalidation payload. + */ + struct invalidate_refs_payload_t { + int64_t first_frame; + int64_t last_frame; + }; + + /** + * @brief Error status payload. + */ + struct status_error_payload_t { + int32_t error_code; + uint16_t message_length; + // char message[message_length]; // Variable length + }; + +#pragma pack(pop) + + /** + * @brief Pipe name format for IPC. + * Format: \\.\pipe\sunshine_subprocess_{session_id} + */ + inline std::string + get_pipe_name(uint32_t session_id) { + return "\\\\.\\pipe\\sunshine_subprocess_" + std::to_string(session_id); + } + + /** + * @brief Calculate total message size from header. + */ + inline size_t + get_message_size(const message_header_t &header) { + return sizeof(message_header_t) + header.payload_length; + } + + /** + * @brief Validate message header. + * @return true if header is valid, false otherwise. + */ + inline bool + validate_header(const message_header_t &header) { + return header.magic == IPC_MAGIC && header.version == IPC_VERSION; + } + + /** + * @brief Create a message header. + */ + inline message_header_t + make_header(message_type_e type, uint32_t payload_length, uint32_t sequence) { + message_header_t header {}; + header.magic = IPC_MAGIC; + header.version = IPC_VERSION; + header.type = static_cast(type); + header.payload_length = payload_length; + header.sequence_number = sequence; + return header; + } + + } // namespace ipc +} // namespace subprocess diff --git a/src/subprocess/subprocess.cpp b/src/subprocess/subprocess.cpp new file mode 100644 index 00000000000..c8f4a17f5c7 --- /dev/null +++ b/src/subprocess/subprocess.cpp @@ -0,0 +1,45 @@ +/** + * @file src/subprocess/subprocess.cpp + * @brief Implementation of subprocess module initialization. + */ +#include "subprocess.h" + +#include "src/logging.h" + +namespace subprocess { + + int + init() { + BOOST_LOG(info) << "Initializing subprocess streaming module"; + + // Initialize configuration + init_config(); + + auto &cfg = get_config(); + if (cfg.enabled) { + BOOST_LOG(info) << "Subprocess streaming mode is ENABLED"; + BOOST_LOG(info) << " Heartbeat interval: " << cfg.heartbeat_interval_ms << "ms"; + BOOST_LOG(info) << " Heartbeat timeout: " << cfg.heartbeat_timeout_ms << "ms"; + BOOST_LOG(info) << " Init timeout: " << cfg.init_timeout_ms << "ms"; + } + else { + BOOST_LOG(info) << "Subprocess streaming mode is DISABLED (using traditional streaming)"; + } + + return 0; + } + + void + shutdown() { + BOOST_LOG(info) << "Shutting down subprocess streaming module"; + + // Stop all subprocess workers + subprocess_manager_t::instance().stop_all(); + } + + bool + is_enabled() { + return get_config().enabled; + } + +} // namespace subprocess diff --git a/src/subprocess/subprocess.h b/src/subprocess/subprocess.h new file mode 100644 index 00000000000..bcdda30289e --- /dev/null +++ b/src/subprocess/subprocess.h @@ -0,0 +1,43 @@ +/** + * @file src/subprocess/subprocess.h + * @brief Main include file for subprocess streaming module. + * + * This module implements a separated architecture where: + * - Main process (SYSTEM - Control Plane): Handles RTSP handshake, authentication, control commands + * - Subprocess (User - Data Plane): Handles capture, encoding, sending video/audio data + * + * Benefits: + * - Lower latency by avoiding cross-process memory copies + * - Better WGC/audio compatibility (user session access) + * - Simpler architecture with clear separation of concerns + */ +#pragma once + +#include "ipc_pipe.h" +#include "ipc_protocol.h" +#include "subprocess_config.h" +#include "subprocess_manager.h" + +namespace subprocess { + + /** + * @brief Initialize the subprocess module. + * @return 0 on success, non-zero on failure. + */ + int + init(); + + /** + * @brief Shutdown the subprocess module. + */ + void + shutdown(); + + /** + * @brief Check if subprocess streaming mode is enabled. + * @return true if enabled. + */ + bool + is_enabled(); + +} // namespace subprocess diff --git a/src/subprocess/subprocess_config.cpp b/src/subprocess/subprocess_config.cpp new file mode 100644 index 00000000000..31dc56bf063 --- /dev/null +++ b/src/subprocess/subprocess_config.cpp @@ -0,0 +1,42 @@ +/** + * @file src/subprocess/subprocess_config.cpp + * @brief Implementation of subprocess configuration. + */ +#include "subprocess_config.h" + +#include "src/config.h" +#include "src/logging.h" + +namespace subprocess { + + static config_t g_config; + + config_t & + get_config() { + return g_config; + } + + void + init_config() { + // Initialize from main config + // For now, subprocess mode is disabled by default + // Users can enable it via configuration + g_config.enabled = false; + + // Set default values + g_config.heartbeat_interval_ms = 1000; + g_config.heartbeat_timeout_ms = 5000; + g_config.init_timeout_ms = 10000; + + // Auto-detect sender executable path based on platform +#ifdef _WIN32 + g_config.sender_executable = ""; // Will be set to sunshine-sender.exe in same directory +#else + g_config.sender_executable = ""; // Will be set to sunshine-sender in same directory +#endif + + BOOST_LOG(debug) << "Subprocess streaming mode: " + << (g_config.enabled ? "enabled" : "disabled"); + } + +} // namespace subprocess diff --git a/src/subprocess/subprocess_config.h b/src/subprocess/subprocess_config.h new file mode 100644 index 00000000000..ab5b340435a --- /dev/null +++ b/src/subprocess/subprocess_config.h @@ -0,0 +1,34 @@ +/** + * @file src/subprocess/subprocess_config.h + * @brief Configuration for subprocess streaming mode. + */ +#pragma once + +#include + +namespace subprocess { + + /** + * @brief Subprocess mode configuration. + */ + struct config_t { + bool enabled = false; ///< Enable subprocess streaming mode + std::string sender_executable; ///< Path to sender executable (auto-detect if empty) + int heartbeat_interval_ms = 1000; ///< Heartbeat interval in milliseconds + int heartbeat_timeout_ms = 5000; ///< Heartbeat timeout in milliseconds + int init_timeout_ms = 10000; ///< Subprocess initialization timeout + }; + + /** + * @brief Get the global subprocess configuration. + */ + config_t & + get_config(); + + /** + * @brief Initialize subprocess configuration from main config. + */ + void + init_config(); + +} // namespace subprocess diff --git a/src/subprocess/subprocess_manager.cpp b/src/subprocess/subprocess_manager.cpp new file mode 100644 index 00000000000..745214ef51f --- /dev/null +++ b/src/subprocess/subprocess_manager.cpp @@ -0,0 +1,636 @@ +/** + * @file src/subprocess/subprocess_manager.cpp + * @brief Implementation of subprocess lifecycle management. + */ +#include "subprocess_manager.h" + +#include +#include + +#include "src/logging.h" +#include "src/platform/common.h" + +#ifdef _WIN32 + #include + #include +#else + #include + #include + #include +#endif + +using namespace std::literals; + +namespace subprocess { + + const char * + state_to_string(state_e state) { + switch (state) { + case state_e::stopped: + return "stopped"; + case state_e::starting: + return "starting"; + case state_e::ready: + return "ready"; + case state_e::streaming: + return "streaming"; + case state_e::stopping: + return "stopping"; + case state_e::error: + return "error"; + default: + return "unknown"; + } + } + + // ===================================================== + // subprocess_worker_t implementation + // ===================================================== + + subprocess_worker_t::subprocess_worker_t() = default; + + subprocess_worker_t::~subprocess_worker_t() { + stop(5000); + } + + bool + subprocess_worker_t::start(const session_config_t &config, status_callback_t status_callback) { + std::lock_guard lock(mutex_); + + if (is_running()) { + BOOST_LOG(warning) << "Subprocess worker already running"; + return false; + } + + config_ = config; + status_callback_ = std::move(status_callback); + state_ = state_e::starting; + + BOOST_LOG(info) << "Starting subprocess worker for session " << config.session_id; + + // Create IPC pipe server + ipc_server_ = std::make_unique(); + auto result = ipc_server_->create(config.session_id); + if (result != ipc::result_e::success) { + BOOST_LOG(error) << "Failed to create IPC pipe: " << ipc::result_to_string(result); + state_ = state_e::error; + if (status_callback_) { + status_callback_(state_e::error, -1, "Failed to create IPC pipe"); + } + return false; + } + + // Launch subprocess + if (!launch_process()) { + BOOST_LOG(error) << "Failed to launch subprocess"; + state_ = state_e::error; + if (status_callback_) { + status_callback_(state_e::error, -2, "Failed to launch subprocess"); + } + return false; + } + + // Wait for subprocess to connect + auto &sub_config = get_config(); + result = ipc_server_->wait_for_connection(sub_config.init_timeout_ms); + if (result != ipc::result_e::success) { + BOOST_LOG(error) << "Subprocess failed to connect: " << ipc::result_to_string(result); + terminate_process(); + state_ = state_e::error; + if (status_callback_) { + status_callback_(state_e::error, -3, "Subprocess connection timeout"); + } + return false; + } + + // Send session configuration + std::vector payload; + { + ipc::init_session_payload_t init_payload {}; + init_payload.width = config.width; + init_payload.height = config.height; + init_payload.framerate = config.framerate; + init_payload.bitrate = config.bitrate_kbps; + init_payload.slices_per_frame = config.slices_per_frame; + init_payload.num_ref_frames = config.num_ref_frames; + init_payload.encoder_csc_mode = config.encoder_csc_mode; + init_payload.video_format = config.video_format; + init_payload.dynamic_range = config.dynamic_range; + init_payload.chroma_sampling = config.chroma_sampling; + init_payload.enable_intra_refresh = config.enable_intra_refresh; + + init_payload.audio_channels = config.audio_channels; + init_payload.audio_mask = config.audio_mask; + init_payload.audio_packet_duration = config.audio_packet_duration; + init_payload.audio_high_quality = config.audio_high_quality ? 1 : 0; + init_payload.audio_host_audio = config.audio_host_audio ? 1 : 0; + + init_payload.packet_size = config.packet_size; + init_payload.min_fec_packets = config.min_fec_packets; + init_payload.fec_percentage = config.fec_percentage; + + init_payload.encryption_flags = config.encryption_flags; + std::copy(config.gcm_key.begin(), config.gcm_key.end(), init_payload.gcm_key); + std::copy(config.iv.begin(), config.iv.end(), init_payload.iv); + + init_payload.display_name_length = static_cast(config.display_name.size()); + + // Serialize payload + payload.resize(sizeof(init_payload) + config.display_name.size()); + std::memcpy(payload.data(), &init_payload, sizeof(init_payload)); + if (!config.display_name.empty()) { + std::memcpy(payload.data() + sizeof(init_payload), + config.display_name.data(), config.display_name.size()); + } + } + + result = ipc_server_->send_message(ipc::message_type_e::INIT_SESSION, payload.data(), payload.size()); + if (result != ipc::result_e::success) { + BOOST_LOG(error) << "Failed to send init message: " << ipc::result_to_string(result); + terminate_process(); + state_ = state_e::error; + if (status_callback_) { + status_callback_(state_e::error, -4, "Failed to send configuration"); + } + return false; + } + + // Wait for ready status + ipc::message_header_t header; + std::vector response; + result = ipc_server_->receive_message(header, response, sub_config.init_timeout_ms); + if (result != ipc::result_e::success) { + BOOST_LOG(error) << "Failed to receive ready status: " << ipc::result_to_string(result); + terminate_process(); + state_ = state_e::error; + if (status_callback_) { + status_callback_(state_e::error, -5, "Subprocess initialization failed"); + } + return false; + } + + auto msg_type = static_cast(header.type); + if (msg_type == ipc::message_type_e::STATUS_ERROR) { + int error_code = 0; + std::string error_msg = "Unknown error"; + if (response.size() >= sizeof(ipc::status_error_payload_t)) { + auto *err_payload = reinterpret_cast(response.data()); + error_code = err_payload->error_code; + if (err_payload->message_length > 0 && + response.size() >= sizeof(ipc::status_error_payload_t) + err_payload->message_length) { + error_msg = std::string(reinterpret_cast(response.data() + sizeof(ipc::status_error_payload_t)), + err_payload->message_length); + } + } + BOOST_LOG(error) << "Subprocess reported error: " << error_msg << " (code: " << error_code << ")"; + terminate_process(); + state_ = state_e::error; + if (status_callback_) { + status_callback_(state_e::error, error_code, error_msg); + } + return false; + } + + if (msg_type != ipc::message_type_e::STATUS_READY) { + BOOST_LOG(error) << "Unexpected message type: " << static_cast(header.type); + terminate_process(); + state_ = state_e::error; + if (status_callback_) { + status_callback_(state_e::error, -6, "Unexpected response from subprocess"); + } + return false; + } + + state_ = state_e::ready; + BOOST_LOG(info) << "Subprocess worker ready for session " << config.session_id; + + // Start message receive loop + ipc_server_->start_receive_loop([this](const ipc::message_header_t &h, const std::vector &p) { + return handle_message(h, p); + }); + + // Start heartbeat monitoring + heartbeat_running_ = true; + last_heartbeat_ = std::chrono::steady_clock::now(); + heartbeat_thread_ = std::thread(&subprocess_worker_t::heartbeat_thread, this); + + if (status_callback_) { + status_callback_(state_e::ready, 0, ""); + } + + return true; + } + + void + subprocess_worker_t::stop(int wait_timeout_ms) { + std::lock_guard lock(mutex_); + + if (state_ == state_e::stopped) { + return; + } + + BOOST_LOG(info) << "Stopping subprocess worker"; + state_ = state_e::stopping; + + // Stop heartbeat + heartbeat_running_ = false; + if (heartbeat_thread_.joinable()) { + heartbeat_thread_.join(); + } + + // Send shutdown message + if (ipc_server_ && ipc_server_->is_connected()) { + ipc_server_->send_message(ipc::message_type_e::SHUTDOWN); + ipc_server_->stop_receive_loop(); + } + + // Wait for process to exit gracefully + terminate_process(); + + ipc_server_.reset(); + state_ = state_e::stopped; + + BOOST_LOG(info) << "Subprocess worker stopped"; + } + + void + subprocess_worker_t::request_idr_frame() { + if (!is_running() || !ipc_server_ || !ipc_server_->is_connected()) { + return; + } + + auto result = ipc_server_->send_message(ipc::message_type_e::REQUEST_IDR); + if (result != ipc::result_e::success) { + BOOST_LOG(warning) << "Failed to send IDR request: " << ipc::result_to_string(result); + } + } + + void + subprocess_worker_t::change_bitrate(int new_bitrate_kbps) { + if (!is_running() || !ipc_server_ || !ipc_server_->is_connected()) { + return; + } + + ipc::change_bitrate_payload_t payload; + payload.new_bitrate_kbps = new_bitrate_kbps; + + auto result = ipc_server_->send_message(ipc::message_type_e::CHANGE_BITRATE, &payload, sizeof(payload)); + if (result != ipc::result_e::success) { + BOOST_LOG(warning) << "Failed to send bitrate change: " << ipc::result_to_string(result); + } + } + + void + subprocess_worker_t::invalidate_ref_frames(int64_t first_frame, int64_t last_frame) { + if (!is_running() || !ipc_server_ || !ipc_server_->is_connected()) { + return; + } + + ipc::invalidate_refs_payload_t payload; + payload.first_frame = first_frame; + payload.last_frame = last_frame; + + auto result = ipc_server_->send_message(ipc::message_type_e::INVALIDATE_REFS, &payload, sizeof(payload)); + if (result != ipc::result_e::success) { + BOOST_LOG(warning) << "Failed to send ref frame invalidation: " << ipc::result_to_string(result); + } + } + +#ifdef _WIN32 + bool + subprocess_worker_t::transfer_socket(uint8_t socket_type, uintptr_t socket_handle, + const uint8_t *remote_addr, uint8_t addr_family, uint16_t remote_port) { + if (!is_running() || !ipc_server_ || !ipc_server_->is_connected()) { + return false; + } + + // Get subprocess process ID + DWORD process_id = 0; + if (process_handle_) { + process_id = GetProcessId(static_cast(process_handle_)); + } + + if (process_id == 0) { + BOOST_LOG(error) << "Cannot get subprocess process ID for socket transfer"; + return false; + } + + // Duplicate socket for subprocess + WSAPROTOCOL_INFOW protocol_info; + if (WSADuplicateSocketW(static_cast(socket_handle), process_id, &protocol_info) != 0) { + BOOST_LOG(error) << "WSADuplicateSocket failed: " << WSAGetLastError(); + return false; + } + + // Build socket info payload + std::vector payload; + payload.resize(sizeof(ipc::socket_info_payload_t) + sizeof(protocol_info)); + + auto *info = reinterpret_cast(payload.data()); + info->socket_type = socket_type; + info->local_port = 0; // Will be determined by subprocess + info->remote_port = remote_port; + info->address_family = addr_family; + std::memcpy(info->remote_addr, remote_addr, (addr_family == AF_INET6) ? 16 : 4); + info->protocol_info_length = sizeof(protocol_info); + + std::memcpy(payload.data() + sizeof(ipc::socket_info_payload_t), &protocol_info, sizeof(protocol_info)); + + auto result = ipc_server_->send_message(ipc::message_type_e::SOCKET_INFO, payload.data(), payload.size()); + if (result != ipc::result_e::success) { + BOOST_LOG(error) << "Failed to send socket info: " << ipc::result_to_string(result); + return false; + } + + BOOST_LOG(debug) << "Socket transferred to subprocess (type=" << (int) socket_type << ")"; + return true; + } +#endif + + bool + subprocess_worker_t::handle_message(const ipc::message_header_t &header, const std::vector &payload) { + auto msg_type = static_cast(header.type); + + switch (msg_type) { + case ipc::message_type_e::HEARTBEAT_ACK: + last_heartbeat_ = std::chrono::steady_clock::now(); + break; + + case ipc::message_type_e::STATUS_STREAMING: + state_ = state_e::streaming; + if (status_callback_) { + status_callback_(state_e::streaming, 0, ""); + } + break; + + case ipc::message_type_e::STATUS_STOPPED: + state_ = state_e::stopped; + if (status_callback_) { + status_callback_(state_e::stopped, 0, ""); + } + return false; // Stop receive loop + + case ipc::message_type_e::STATUS_ERROR: { + int error_code = 0; + std::string error_msg = "Unknown error"; + if (payload.size() >= sizeof(ipc::status_error_payload_t)) { + auto *err = reinterpret_cast(payload.data()); + error_code = err->error_code; + if (err->message_length > 0 && + payload.size() >= sizeof(ipc::status_error_payload_t) + err->message_length) { + error_msg = std::string(reinterpret_cast(payload.data() + sizeof(ipc::status_error_payload_t)), + err->message_length); + } + } + BOOST_LOG(error) << "Subprocess error: " << error_msg << " (code: " << error_code << ")"; + state_ = state_e::error; + if (status_callback_) { + status_callback_(state_e::error, error_code, error_msg); + } + return false; // Stop receive loop + } + + default: + BOOST_LOG(debug) << "Received message type: " << static_cast(header.type); + break; + } + + return true; + } + + void + subprocess_worker_t::heartbeat_thread() { + auto &sub_config = get_config(); + + while (heartbeat_running_) { + std::this_thread::sleep_for(std::chrono::milliseconds(sub_config.heartbeat_interval_ms)); + + if (!heartbeat_running_ || !ipc_server_ || !ipc_server_->is_connected()) { + break; + } + + // Send heartbeat + auto result = ipc_server_->send_message(ipc::message_type_e::HEARTBEAT); + if (result != ipc::result_e::success) { + BOOST_LOG(warning) << "Failed to send heartbeat: " << ipc::result_to_string(result); + continue; + } + + // Check for heartbeat timeout + auto now = std::chrono::steady_clock::now(); + auto last = last_heartbeat_.load(); + if (std::chrono::duration_cast(now - last).count() > sub_config.heartbeat_timeout_ms) { + BOOST_LOG(error) << "Subprocess heartbeat timeout"; + state_ = state_e::error; + if (status_callback_) { + status_callback_(state_e::error, -100, "Heartbeat timeout"); + } + break; + } + } + } + + bool + subprocess_worker_t::launch_process() { +#ifdef _WIN32 + // Find sender executable + auto &sub_config = get_config(); + std::filesystem::path sender_path; + + if (!sub_config.sender_executable.empty()) { + sender_path = sub_config.sender_executable; + } + else { + // Look in same directory as current executable + wchar_t module_path[MAX_PATH]; + GetModuleFileNameW(nullptr, module_path, MAX_PATH); + sender_path = std::filesystem::path(module_path).parent_path() / "sunshine-sender.exe"; + } + + if (!std::filesystem::exists(sender_path)) { + BOOST_LOG(error) << "Sender executable not found: " << sender_path.string(); + return false; + } + + // Build command line + std::wstring cmdline = L"\"" + sender_path.wstring() + L"\" --session-id " + std::to_wstring(config_.session_id); + + STARTUPINFOW si = {}; + si.cb = sizeof(si); + PROCESS_INFORMATION pi = {}; + + // Create process + if (!CreateProcessW( + nullptr, + cmdline.data(), + nullptr, + nullptr, + FALSE, + CREATE_NO_WINDOW, + nullptr, + nullptr, + &si, + &pi)) { + BOOST_LOG(error) << "CreateProcess failed: " << GetLastError(); + return false; + } + + CloseHandle(pi.hThread); + process_handle_ = pi.hProcess; + + BOOST_LOG(info) << "Launched subprocess (PID: " << pi.dwProcessId << ")"; + return true; +#else + // Find sender executable + auto &sub_config = get_config(); + std::filesystem::path sender_path; + + if (!sub_config.sender_executable.empty()) { + sender_path = sub_config.sender_executable; + } + else { + // Look in same directory as current executable + char exe_path[PATH_MAX]; + ssize_t len = readlink("/proc/self/exe", exe_path, sizeof(exe_path) - 1); + if (len == -1) { + BOOST_LOG(error) << "Failed to get executable path"; + return false; + } + exe_path[len] = '\0'; + sender_path = std::filesystem::path(exe_path).parent_path() / "sunshine-sender"; + } + + if (!std::filesystem::exists(sender_path)) { + BOOST_LOG(error) << "Sender executable not found: " << sender_path.string(); + return false; + } + + pid_t pid = fork(); + if (pid < 0) { + BOOST_LOG(error) << "fork() failed: " << strerror(errno); + return false; + } + + if (pid == 0) { + // Child process + std::string session_id_str = std::to_string(config_.session_id); + execl(sender_path.c_str(), sender_path.c_str(), + "--session-id", session_id_str.c_str(), + nullptr); + // If execl returns, it failed + _exit(1); + } + + // Parent process + process_handle_ = reinterpret_cast(static_cast(pid)); + BOOST_LOG(info) << "Launched subprocess (PID: " << pid << ")"; + return true; +#endif + } + + void + subprocess_worker_t::terminate_process() { + if (!process_handle_) { + return; + } + +#ifdef _WIN32 + HANDLE handle = static_cast(process_handle_); + + // Wait for process to exit gracefully + if (WaitForSingleObject(handle, 3000) == WAIT_TIMEOUT) { + BOOST_LOG(warning) << "Subprocess did not exit gracefully, terminating"; + TerminateProcess(handle, 1); + WaitForSingleObject(handle, 1000); + } + + CloseHandle(handle); +#else + pid_t pid = static_cast(reinterpret_cast(process_handle_)); + + // Send SIGTERM and wait + kill(pid, SIGTERM); + + int status; + for (int i = 0; i < 30; ++i) { // Wait up to 3 seconds + if (waitpid(pid, &status, WNOHANG) != 0) { + break; + } + std::this_thread::sleep_for(100ms); + } + + // Force kill if still running + if (waitpid(pid, &status, WNOHANG) == 0) { + BOOST_LOG(warning) << "Subprocess did not exit gracefully, killing"; + kill(pid, SIGKILL); + waitpid(pid, &status, 0); + } +#endif + + process_handle_ = nullptr; + } + + // ===================================================== + // subprocess_manager_t implementation + // ===================================================== + + subprocess_manager_t & + subprocess_manager_t::instance() { + static subprocess_manager_t instance; + return instance; + } + + std::shared_ptr + subprocess_manager_t::create_worker(uint32_t session_id) { + std::lock_guard lock(mutex_); + + // Check if worker already exists + auto it = workers_.find(session_id); + if (it != workers_.end()) { + BOOST_LOG(warning) << "Worker already exists for session " << session_id; + return it->second; + } + + auto worker = std::make_shared(); + workers_[session_id] = worker; + + BOOST_LOG(debug) << "Created worker for session " << session_id; + return worker; + } + + std::shared_ptr + subprocess_manager_t::get_worker(uint32_t session_id) { + std::lock_guard lock(mutex_); + + auto it = workers_.find(session_id); + if (it != workers_.end()) { + return it->second; + } + + return nullptr; + } + + void + subprocess_manager_t::remove_worker(uint32_t session_id) { + std::lock_guard lock(mutex_); + + auto it = workers_.find(session_id); + if (it != workers_.end()) { + it->second->stop(); + workers_.erase(it); + BOOST_LOG(debug) << "Removed worker for session " << session_id; + } + } + + void + subprocess_manager_t::stop_all() { + std::lock_guard lock(mutex_); + + BOOST_LOG(info) << "Stopping all subprocess workers"; + for (auto &[id, worker] : workers_) { + worker->stop(); + } + workers_.clear(); + } + +} // namespace subprocess diff --git a/src/subprocess/subprocess_manager.h b/src/subprocess/subprocess_manager.h new file mode 100644 index 00000000000..efe72cc6e38 --- /dev/null +++ b/src/subprocess/subprocess_manager.h @@ -0,0 +1,259 @@ +/** + * @file src/subprocess/subprocess_manager.h + * @brief Subprocess lifecycle management for streaming data plane. + * + * This module manages the lifecycle of subprocess workers that handle + * video/audio capture, encoding, and network transmission. + */ +#pragma once + +#include +#include +#include +#include +#include + +#include "ipc_pipe.h" +#include "ipc_protocol.h" +#include "subprocess_config.h" + +namespace subprocess { + + /** + * @brief Subprocess state. + */ + enum class state_e { + stopped, ///< Not running + starting, ///< Starting up + ready, ///< Ready to stream + streaming, ///< Actively streaming + stopping, ///< Shutting down + error, ///< Error state + }; + + /** + * @brief Convert state to string. + */ + const char * + state_to_string(state_e state); + + /** + * @brief Callback for subprocess status changes. + */ + using status_callback_t = std::function; + + /** + * @brief Session configuration for subprocess initialization. + */ + struct session_config_t { + // Session identification + uint32_t session_id; + std::string client_name; + + // Video configuration + int width; + int height; + int framerate; + int bitrate_kbps; + int slices_per_frame; + int num_ref_frames; + int encoder_csc_mode; + int video_format; // 0=H264, 1=HEVC, 2=AV1 + int dynamic_range; + int chroma_sampling; + int enable_intra_refresh; + + // Audio configuration + int audio_channels; + int audio_mask; + int audio_packet_duration; + bool audio_high_quality; + bool audio_host_audio; + + // Network configuration + int packet_size; + int min_fec_packets; + int fec_percentage; + + // Encryption + uint8_t encryption_flags; + std::array gcm_key; + std::array iv; + + // Display + std::string display_name; + }; + + /** + * @brief Manages a single subprocess worker. + */ + class subprocess_worker_t { + public: + subprocess_worker_t(); + ~subprocess_worker_t(); + + // Non-copyable + subprocess_worker_t(const subprocess_worker_t &) = delete; + subprocess_worker_t &operator=(const subprocess_worker_t &) = delete; + + /** + * @brief Start the subprocess with given configuration. + * @param config Session configuration. + * @param status_callback Callback for status changes. + * @return true on success, false on failure. + */ + bool + start(const session_config_t &config, status_callback_t status_callback); + + /** + * @brief Stop the subprocess. + * @param wait_timeout_ms Timeout to wait for graceful shutdown. + */ + void + stop(int wait_timeout_ms = 5000); + + /** + * @brief Request IDR frame from encoder. + */ + void + request_idr_frame(); + + /** + * @brief Change encoding bitrate. + * @param new_bitrate_kbps New bitrate in Kbps. + */ + void + change_bitrate(int new_bitrate_kbps); + + /** + * @brief Invalidate reference frames. + * @param first_frame First frame to invalidate. + * @param last_frame Last frame to invalidate. + */ + void + invalidate_ref_frames(int64_t first_frame, int64_t last_frame); + + /** + * @brief Get current state. + */ + state_e + get_state() const { + return state_.load(); + } + + /** + * @brief Check if subprocess is running. + */ + bool + is_running() const { + auto s = state_.load(); + return s == state_e::starting || s == state_e::ready || s == state_e::streaming; + } + +#ifdef _WIN32 + /** + * @brief Transfer socket to subprocess (Windows). + * @param socket_type 0=video, 1=audio, 2=control + * @param socket_handle Native socket handle (SOCKET). + * @param remote_addr Remote address. + * @param remote_port Remote port. + * @return true on success. + */ + bool + transfer_socket(uint8_t socket_type, uintptr_t socket_handle, + const uint8_t *remote_addr, uint8_t addr_family, uint16_t remote_port); +#endif + + private: + /** + * @brief Handle messages from subprocess. + */ + bool + handle_message(const ipc::message_header_t &header, const std::vector &payload); + + /** + * @brief Heartbeat monitoring thread. + */ + void + heartbeat_thread(); + + /** + * @brief Launch the subprocess executable. + */ + bool + launch_process(); + + /** + * @brief Terminate the subprocess. + */ + void + terminate_process(); + + // Configuration + session_config_t config_; + status_callback_t status_callback_; + + // State + std::atomic state_ { state_e::stopped }; + + // IPC + std::unique_ptr ipc_server_; + + // Process handle + void *process_handle_ = nullptr; + + // Heartbeat + std::thread heartbeat_thread_; + std::atomic heartbeat_running_ { false }; + std::atomic last_heartbeat_; + + // Mutex for thread safety + mutable std::mutex mutex_; + }; + + /** + * @brief Global subprocess manager for multiple workers. + */ + class subprocess_manager_t { + public: + static subprocess_manager_t & + instance(); + + /** + * @brief Create a new subprocess worker for a session. + * @param session_id Session ID. + * @return Shared pointer to worker, or nullptr on failure. + */ + std::shared_ptr + create_worker(uint32_t session_id); + + /** + * @brief Get an existing worker by session ID. + * @param session_id Session ID. + * @return Shared pointer to worker, or nullptr if not found. + */ + std::shared_ptr + get_worker(uint32_t session_id); + + /** + * @brief Remove a worker by session ID. + * @param session_id Session ID. + */ + void + remove_worker(uint32_t session_id); + + /** + * @brief Stop all workers. + */ + void + stop_all(); + + private: + subprocess_manager_t() = default; + ~subprocess_manager_t() = default; + + std::mutex mutex_; + std::unordered_map> workers_; + }; + +} // namespace subprocess From cb8d5f153b12f9864d2b7766fe4da3b03b3b26d0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 22 Dec 2025 04:13:56 +0000 Subject: [PATCH 3/5] feat(subprocess): Add subprocess sender executable framework - Create sender_main.cpp with IPC message handling - Add CMakeLists.txt for building sunshine-sender - Create logging wrapper for subprocess module compatibility - Integrate sender build into Windows and Linux targets - Implement basic message routing for control commands Co-authored-by: ShadowLemoon <119576779+ShadowLemoon@users.noreply.github.com> --- cmake/targets/linux.cmake | 3 + cmake/targets/windows.cmake | 3 + src/sender/CMakeLists.txt | 41 +++ src/sender/sender_main.cpp | 426 ++++++++++++++++++++++++++++ src/subprocess/ipc_pipe.cpp | 60 ++-- src/subprocess/subprocess_logging.h | 47 +++ 6 files changed, 550 insertions(+), 30 deletions(-) create mode 100644 src/sender/CMakeLists.txt create mode 100644 src/sender/sender_main.cpp create mode 100644 src/subprocess/subprocess_logging.h diff --git a/cmake/targets/linux.cmake b/cmake/targets/linux.cmake index fa1f33c0752..586a87dc425 100644 --- a/cmake/targets/linux.cmake +++ b/cmake/targets/linux.cmake @@ -1 +1,4 @@ # linux specific target definitions + +# Build subprocess sender executable +add_subdirectory(src/sender) diff --git a/cmake/targets/windows.cmake b/cmake/targets/windows.cmake index 26bea92874e..dbddb9f48a2 100644 --- a/cmake/targets/windows.cmake +++ b/cmake/targets/windows.cmake @@ -1,5 +1,8 @@ # windows specific target definitions set_target_properties(sunshine PROPERTIES LINK_SEARCH_START_STATIC 1) + +# Build subprocess sender executable +add_subdirectory(src/sender) set(CMAKE_FIND_LIBRARY_SUFFIXES ".dll") find_library(ZLIB ZLIB1) list(APPEND SUNSHINE_EXTERNAL_LIBRARIES diff --git a/src/sender/CMakeLists.txt b/src/sender/CMakeLists.txt new file mode 100644 index 00000000000..f117cfb607a --- /dev/null +++ b/src/sender/CMakeLists.txt @@ -0,0 +1,41 @@ +# CMakeLists.txt for sunshine-sender subprocess +# This builds the sender executable that handles the data plane + +set(SENDER_TARGET_FILES + "${CMAKE_SOURCE_DIR}/src/sender/sender_main.cpp" + "${CMAKE_SOURCE_DIR}/src/subprocess/ipc_pipe.cpp" + "${CMAKE_SOURCE_DIR}/src/subprocess/ipc_protocol.h" +) + +add_executable(sunshine-sender ${SENDER_TARGET_FILES}) + +target_include_directories(sunshine-sender PRIVATE + "${CMAKE_SOURCE_DIR}" +) + +target_compile_definitions(sunshine-sender PRIVATE + SUBPROCESS_STANDALONE=1 + $<$:WIN32_LEAN_AND_MEAN> + $<$:NOMINMAX> +) + +set_target_properties(sunshine-sender PROPERTIES + CXX_STANDARD 23 +) + +# Platform-specific libraries +if(WIN32) + target_link_libraries(sunshine-sender PRIVATE + ws2_32 + ) +else() + target_link_libraries(sunshine-sender PRIVATE + pthread + ) +endif() + +# Install sender alongside main executable +install(TARGETS sunshine-sender + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + COMPONENT Runtime +) diff --git a/src/sender/sender_main.cpp b/src/sender/sender_main.cpp new file mode 100644 index 00000000000..7cc69447406 --- /dev/null +++ b/src/sender/sender_main.cpp @@ -0,0 +1,426 @@ +/** + * @file src/sender/sender_main.cpp + * @brief Main entry point for the subprocess sender (data plane). + * + * This subprocess handles: + * - Screen capture (WGC on Windows) + * - Audio capture (WASAPI loopback) + * - Hardware encoding (NVENC/AMF/QSV) + * - RTP packet construction and sending + * + * Communication with main process via named pipe IPC. + */ +#include +#include +#include +#include +#include +#include +#include + +#include "src/subprocess/ipc_pipe.h" +#include "src/subprocess/ipc_protocol.h" + +#ifdef _WIN32 + #include + #include +#endif + +using namespace std::literals; +namespace ipc = subprocess::ipc; + +// Global state +static std::atomic g_running { true }; +static std::atomic g_streaming { false }; +static uint32_t g_session_id = 0; + +// Session configuration received from main process +struct session_config_t { + int width = 0; + int height = 0; + int framerate = 0; + int bitrate_kbps = 0; + int slices_per_frame = 1; + int num_ref_frames = 1; + int encoder_csc_mode = 0; + int video_format = 0; // 0=H264, 1=HEVC, 2=AV1 + int dynamic_range = 0; + int chroma_sampling = 0; + int enable_intra_refresh = 0; + + int audio_channels = 2; + int audio_mask = 0; + int audio_packet_duration = 5; + bool audio_high_quality = false; + bool audio_host_audio = false; + + int packet_size = 1024; + int min_fec_packets = 0; + int fec_percentage = 20; + + uint8_t encryption_flags = 0; + uint8_t gcm_key[16] = {}; + uint8_t iv[16] = {}; + + std::string display_name; +}; + +static session_config_t g_config; +static std::unique_ptr g_ipc_client; + +// Forward declarations +void +signal_handler(int sig); +bool +parse_args(int argc, char *argv[]); +bool +connect_to_main_process(); +bool +process_ipc_message(const ipc::message_header_t &header, const std::vector &payload); +void +send_status(ipc::message_type_e status, int error_code = 0, const std::string &error_msg = ""); +void +start_streaming(); +void +stop_streaming(); + +/** + * @brief Signal handler for graceful shutdown. + */ +void +signal_handler(int sig) { + std::cerr << "[Sender] Received signal " << sig << ", shutting down..." << std::endl; + g_running = false; + g_streaming = false; +} + +/** + * @brief Parse command line arguments. + */ +bool +parse_args(int argc, char *argv[]) { + for (int i = 1; i < argc; ++i) { + std::string arg = argv[i]; + + if (arg == "--session-id" && i + 1 < argc) { + g_session_id = static_cast(std::stoul(argv[++i])); + } + else if (arg == "--help" || arg == "-h") { + std::cout << "Usage: " << argv[0] << " --session-id " << std::endl; + std::cout << " --session-id Session ID for IPC connection" << std::endl; + return false; + } + } + + if (g_session_id == 0) { + std::cerr << "[Sender] Error: --session-id is required" << std::endl; + return false; + } + + return true; +} + +/** + * @brief Connect to the main process via IPC. + */ +bool +connect_to_main_process() { + std::cerr << "[Sender] Connecting to main process (session " << g_session_id << ")..." << std::endl; + + g_ipc_client = std::make_unique(); + + auto result = g_ipc_client->connect(g_session_id, 10000); // 10 second timeout + if (result != ipc::result_e::success) { + std::cerr << "[Sender] Failed to connect: " << ipc::result_to_string(result) << std::endl; + return false; + } + + std::cerr << "[Sender] Connected to main process" << std::endl; + return true; +} + +/** + * @brief Process an IPC message from main process. + */ +bool +process_ipc_message(const ipc::message_header_t &header, const std::vector &payload) { + auto msg_type = static_cast(header.type); + + switch (msg_type) { + case ipc::message_type_e::INIT_SESSION: { + std::cerr << "[Sender] Received INIT_SESSION" << std::endl; + + if (payload.size() < sizeof(ipc::init_session_payload_t)) { + send_status(ipc::message_type_e::STATUS_ERROR, -1, "Invalid INIT_SESSION payload size"); + return false; + } + + auto *init = reinterpret_cast(payload.data()); + + // Parse configuration + g_config.width = init->width; + g_config.height = init->height; + g_config.framerate = init->framerate; + g_config.bitrate_kbps = init->bitrate; + g_config.slices_per_frame = init->slices_per_frame; + g_config.num_ref_frames = init->num_ref_frames; + g_config.encoder_csc_mode = init->encoder_csc_mode; + g_config.video_format = init->video_format; + g_config.dynamic_range = init->dynamic_range; + g_config.chroma_sampling = init->chroma_sampling; + g_config.enable_intra_refresh = init->enable_intra_refresh; + + g_config.audio_channels = init->audio_channels; + g_config.audio_mask = init->audio_mask; + g_config.audio_packet_duration = init->audio_packet_duration; + g_config.audio_high_quality = init->audio_high_quality != 0; + g_config.audio_host_audio = init->audio_host_audio != 0; + + g_config.packet_size = init->packet_size; + g_config.min_fec_packets = init->min_fec_packets; + g_config.fec_percentage = init->fec_percentage; + + g_config.encryption_flags = init->encryption_flags; + std::copy(std::begin(init->gcm_key), std::end(init->gcm_key), g_config.gcm_key); + std::copy(std::begin(init->iv), std::end(init->iv), g_config.iv); + + // Parse display name + if (init->display_name_length > 0 && + payload.size() >= sizeof(ipc::init_session_payload_t) + init->display_name_length) { + g_config.display_name = std::string( + reinterpret_cast(payload.data() + sizeof(ipc::init_session_payload_t)), + init->display_name_length); + } + + std::cerr << "[Sender] Config: " << g_config.width << "x" << g_config.height + << "@" << g_config.framerate << "fps, " << g_config.bitrate_kbps << "Kbps" + << ", format=" << g_config.video_format + << ", display=" << g_config.display_name << std::endl; + + // TODO: Initialize capture and encoder here + + // Send ready status + send_status(ipc::message_type_e::STATUS_READY); + break; + } + + case ipc::message_type_e::SOCKET_INFO: { + std::cerr << "[Sender] Received SOCKET_INFO" << std::endl; + + if (payload.size() < sizeof(ipc::socket_info_payload_t)) { + std::cerr << "[Sender] Invalid SOCKET_INFO payload size" << std::endl; + break; + } + + auto *info = reinterpret_cast(payload.data()); + std::cerr << "[Sender] Socket type=" << (int) info->socket_type + << ", remote_port=" << info->remote_port << std::endl; + +#ifdef _WIN32 + // Recreate socket from WSAPROTOCOL_INFO + if (info->protocol_info_length > 0 && + payload.size() >= sizeof(ipc::socket_info_payload_t) + info->protocol_info_length) { + auto *protocol_info = reinterpret_cast( + payload.data() + sizeof(ipc::socket_info_payload_t)); + + SOCKET sock = WSASocketW(AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP, + const_cast(protocol_info), + 0, WSA_FLAG_OVERLAPPED); + if (sock != INVALID_SOCKET) { + std::cerr << "[Sender] Successfully received socket (type=" << (int) info->socket_type << ")" << std::endl; + // TODO: Store socket and use for streaming + } + else { + std::cerr << "[Sender] Failed to create socket from protocol info: " << WSAGetLastError() << std::endl; + } + } +#else + // On Unix, we'll use different socket passing mechanism (SCM_RIGHTS) + std::cerr << "[Sender] Socket passing not yet implemented on this platform" << std::endl; +#endif + break; + } + + case ipc::message_type_e::START_STREAM: { + std::cerr << "[Sender] Received START_STREAM" << std::endl; + start_streaming(); + break; + } + + case ipc::message_type_e::STOP_STREAM: { + std::cerr << "[Sender] Received STOP_STREAM" << std::endl; + stop_streaming(); + break; + } + + case ipc::message_type_e::REQUEST_IDR: { + std::cerr << "[Sender] Received REQUEST_IDR" << std::endl; + // TODO: Request IDR frame from encoder + break; + } + + case ipc::message_type_e::CHANGE_BITRATE: { + if (payload.size() >= sizeof(ipc::change_bitrate_payload_t)) { + auto *br = reinterpret_cast(payload.data()); + std::cerr << "[Sender] Received CHANGE_BITRATE: " << br->new_bitrate_kbps << " Kbps" << std::endl; + g_config.bitrate_kbps = br->new_bitrate_kbps; + // TODO: Update encoder bitrate + } + break; + } + + case ipc::message_type_e::INVALIDATE_REFS: { + if (payload.size() >= sizeof(ipc::invalidate_refs_payload_t)) { + auto *refs = reinterpret_cast(payload.data()); + std::cerr << "[Sender] Received INVALIDATE_REFS: frames " << refs->first_frame + << " to " << refs->last_frame << std::endl; + // TODO: Invalidate reference frames in encoder + } + break; + } + + case ipc::message_type_e::HEARTBEAT: { + // Respond with heartbeat ACK + g_ipc_client->send_message(ipc::message_type_e::HEARTBEAT_ACK); + break; + } + + case ipc::message_type_e::SHUTDOWN: { + std::cerr << "[Sender] Received SHUTDOWN" << std::endl; + g_running = false; + return false; + } + + default: + std::cerr << "[Sender] Unknown message type: " << header.type << std::endl; + break; + } + + return true; +} + +/** + * @brief Send status message to main process. + */ +void +send_status(ipc::message_type_e status, int error_code, const std::string &error_msg) { + if (!g_ipc_client || !g_ipc_client->is_connected()) { + return; + } + + if (status == ipc::message_type_e::STATUS_ERROR) { + std::vector payload(sizeof(ipc::status_error_payload_t) + error_msg.size()); + auto *err = reinterpret_cast(payload.data()); + err->error_code = error_code; + err->message_length = static_cast(error_msg.size()); + if (!error_msg.empty()) { + std::memcpy(payload.data() + sizeof(ipc::status_error_payload_t), error_msg.data(), error_msg.size()); + } + g_ipc_client->send_message(status, payload.data(), payload.size()); + } + else { + g_ipc_client->send_message(status); + } +} + +/** + * @brief Start streaming (capture + encode + send). + */ +void +start_streaming() { + if (g_streaming) { + return; + } + + std::cerr << "[Sender] Starting streaming..." << std::endl; + g_streaming = true; + send_status(ipc::message_type_e::STATUS_STREAMING); + + // TODO: Implement actual streaming loop + // 1. Initialize display capture (WGC) + // 2. Initialize hardware encoder (NVENC/AMF/QSV) + // 3. Initialize audio capture (WASAPI) + // 4. Main loop: capture -> encode -> RTP packetize -> send +} + +/** + * @brief Stop streaming. + */ +void +stop_streaming() { + if (!g_streaming) { + return; + } + + std::cerr << "[Sender] Stopping streaming..." << std::endl; + g_streaming = false; + + // TODO: Cleanup capture and encoder + + send_status(ipc::message_type_e::STATUS_STOPPED); +} + +/** + * @brief Main entry point. + */ +int +main(int argc, char *argv[]) { + std::cerr << "[Sender] Subprocess sender starting..." << std::endl; + +#ifdef _WIN32 + // Initialize Winsock + WSADATA wsa_data; + if (WSAStartup(MAKEWORD(2, 2), &wsa_data) != 0) { + std::cerr << "[Sender] WSAStartup failed" << std::endl; + return 1; + } +#endif + + // Setup signal handlers + std::signal(SIGINT, signal_handler); + std::signal(SIGTERM, signal_handler); + + // Parse command line arguments + if (!parse_args(argc, argv)) { + return 1; + } + + // Connect to main process + if (!connect_to_main_process()) { + return 1; + } + + // Main IPC message loop + while (g_running) { + ipc::message_header_t header; + std::vector payload; + + auto result = g_ipc_client->receive_message(header, payload, 1000); + if (result == ipc::result_e::error_timeout) { + continue; + } + else if (result == ipc::result_e::error_disconnected) { + std::cerr << "[Sender] Disconnected from main process" << std::endl; + break; + } + else if (result != ipc::result_e::success) { + std::cerr << "[Sender] IPC error: " << ipc::result_to_string(result) << std::endl; + continue; + } + + if (!process_ipc_message(header, payload)) { + break; + } + } + + // Cleanup + stop_streaming(); + g_ipc_client.reset(); + +#ifdef _WIN32 + WSACleanup(); +#endif + + std::cerr << "[Sender] Subprocess sender exiting" << std::endl; + return 0; +} diff --git a/src/subprocess/ipc_pipe.cpp b/src/subprocess/ipc_pipe.cpp index b8b57805319..b4e1d28e0be 100644 --- a/src/subprocess/ipc_pipe.cpp +++ b/src/subprocess/ipc_pipe.cpp @@ -4,7 +4,7 @@ */ #include "ipc_pipe.h" -#include "src/logging.h" +#include "subprocess_logging.h" #ifdef _WIN32 #include @@ -77,12 +77,12 @@ namespace subprocess { ); if (handle == INVALID_HANDLE_VALUE) { - BOOST_LOG(error) << "Failed to create named pipe: " << GetLastError(); + SUBPROCESS_LOG(error) << "Failed to create named pipe: " << GetLastError(); return result_e::error_create_pipe; } pipe_handle_ = handle; - BOOST_LOG(debug) << "Created named pipe: " << pipe_name_; + SUBPROCESS_LOG(debug) << "Created named pipe: " << pipe_name_; return result_e::success; #else // On Unix, we use FIFO (named pipes) @@ -90,11 +90,11 @@ namespace subprocess { unlink(pipe_name_.c_str()); if (mkfifo(pipe_name_.c_str(), 0600) != 0) { - BOOST_LOG(error) << "Failed to create FIFO: " << strerror(errno); + SUBPROCESS_LOG(error) << "Failed to create FIFO: " << strerror(errno); return result_e::error_create_pipe; } - BOOST_LOG(debug) << "Created FIFO: " << pipe_name_; + SUBPROCESS_LOG(debug) << "Created FIFO: " << pipe_name_; return result_e::success; #endif } @@ -112,7 +112,7 @@ namespace subprocess { OVERLAPPED overlapped = {}; overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); if (!overlapped.hEvent) { - BOOST_LOG(error) << "Failed to create event: " << GetLastError(); + SUBPROCESS_LOG(error) << "Failed to create event: " << GetLastError(); return result_e::error_create_pipe; } @@ -128,11 +128,11 @@ namespace subprocess { if (wait_result == WAIT_TIMEOUT) { CancelIo(handle); - BOOST_LOG(debug) << "Connection timeout"; + SUBPROCESS_LOG(debug) << "Connection timeout"; return result_e::error_timeout; } else if (wait_result != WAIT_OBJECT_0) { - BOOST_LOG(error) << "Wait failed: " << GetLastError(); + SUBPROCESS_LOG(error) << "Wait failed: " << GetLastError(); return result_e::error_connect; } @@ -141,14 +141,14 @@ namespace subprocess { if (!GetOverlappedResult(handle, &overlapped, &bytes, FALSE)) { error = GetLastError(); if (error != ERROR_PIPE_CONNECTED) { - BOOST_LOG(error) << "GetOverlappedResult failed: " << error; + SUBPROCESS_LOG(error) << "GetOverlappedResult failed: " << error; return result_e::error_connect; } } } else if (error != ERROR_PIPE_CONNECTED) { CloseHandle(overlapped.hEvent); - BOOST_LOG(error) << "ConnectNamedPipe failed: " << error; + SUBPROCESS_LOG(error) << "ConnectNamedPipe failed: " << error; return result_e::error_connect; } } @@ -157,19 +157,19 @@ namespace subprocess { } connected_ = true; - BOOST_LOG(info) << "Client connected to IPC pipe"; + SUBPROCESS_LOG(info) << "Client connected to IPC pipe"; return result_e::success; #else // On Unix, open the FIFO for read+write (non-blocking initially) int fd = open(pipe_name_.c_str(), O_RDWR | O_NONBLOCK); if (fd < 0) { - BOOST_LOG(error) << "Failed to open FIFO: " << strerror(errno); + SUBPROCESS_LOG(error) << "Failed to open FIFO: " << strerror(errno); return result_e::error_create_pipe; } pipe_handle_ = reinterpret_cast(static_cast(fd)); connected_ = true; - BOOST_LOG(info) << "IPC pipe ready"; + SUBPROCESS_LOG(info) << "IPC pipe ready"; return result_e::success; #endif } @@ -190,7 +190,7 @@ namespace subprocess { // Write header DWORD written; if (!WriteFile(handle, &header, sizeof(header), &written, nullptr) || written != sizeof(header)) { - BOOST_LOG(error) << "Failed to write header: " << GetLastError(); + SUBPROCESS_LOG(error) << "Failed to write header: " << GetLastError(); return result_e::error_write; } @@ -198,7 +198,7 @@ namespace subprocess { if (payload && payload_length > 0) { if (!WriteFile(handle, payload, static_cast(payload_length), &written, nullptr) || written != payload_length) { - BOOST_LOG(error) << "Failed to write payload: " << GetLastError(); + SUBPROCESS_LOG(error) << "Failed to write payload: " << GetLastError(); return result_e::error_write; } } @@ -210,7 +210,7 @@ namespace subprocess { // Write header ssize_t written = write(fd, &header, sizeof(header)); if (written != sizeof(header)) { - BOOST_LOG(error) << "Failed to write header: " << strerror(errno); + SUBPROCESS_LOG(error) << "Failed to write header: " << strerror(errno); return result_e::error_write; } @@ -218,7 +218,7 @@ namespace subprocess { if (payload && payload_length > 0) { written = write(fd, payload, payload_length); if (written != static_cast(payload_length)) { - BOOST_LOG(error) << "Failed to write payload: " << strerror(errno); + SUBPROCESS_LOG(error) << "Failed to write payload: " << strerror(errno); return result_e::error_write; } } @@ -346,7 +346,7 @@ namespace subprocess { return result_e::error_timeout; } else if (ret < 0) { - BOOST_LOG(error) << "Poll failed: " << strerror(errno); + SUBPROCESS_LOG(error) << "Poll failed: " << strerror(errno); return result_e::error_read; } @@ -398,7 +398,7 @@ namespace subprocess { continue; } else if (result != result_e::success) { - BOOST_LOG(warning) << "IPC receive error: " << result_to_string(result); + SUBPROCESS_LOG(warning) << "IPC receive error: " << result_to_string(result); break; } @@ -475,13 +475,13 @@ namespace subprocess { DWORD mode = PIPE_READMODE_MESSAGE; if (!SetNamedPipeHandleState(handle, &mode, nullptr, nullptr)) { CloseHandle(handle); - BOOST_LOG(error) << "Failed to set pipe mode: " << GetLastError(); + SUBPROCESS_LOG(error) << "Failed to set pipe mode: " << GetLastError(); return result_e::error_connect; } pipe_handle_ = handle; connected_ = true; - BOOST_LOG(info) << "Connected to IPC pipe: " << pipe_name_; + SUBPROCESS_LOG(info) << "Connected to IPC pipe: " << pipe_name_; return result_e::success; } @@ -499,7 +499,7 @@ namespace subprocess { continue; } else { - BOOST_LOG(error) << "Failed to connect to pipe: " << error; + SUBPROCESS_LOG(error) << "Failed to connect to pipe: " << error; return result_e::error_connect; } } @@ -512,7 +512,7 @@ namespace subprocess { if (fd >= 0) { pipe_handle_ = reinterpret_cast(static_cast(fd)); connected_ = true; - BOOST_LOG(info) << "Connected to IPC pipe: " << pipe_name_; + SUBPROCESS_LOG(info) << "Connected to IPC pipe: " << pipe_name_; return result_e::success; } @@ -522,7 +522,7 @@ namespace subprocess { continue; } - BOOST_LOG(error) << "Failed to open FIFO: " << strerror(errno); + SUBPROCESS_LOG(error) << "Failed to open FIFO: " << strerror(errno); return result_e::error_connect; } @@ -546,7 +546,7 @@ namespace subprocess { // Write header DWORD written; if (!WriteFile(handle, &header, sizeof(header), &written, nullptr) || written != sizeof(header)) { - BOOST_LOG(error) << "Failed to write header: " << GetLastError(); + SUBPROCESS_LOG(error) << "Failed to write header: " << GetLastError(); return result_e::error_write; } @@ -554,7 +554,7 @@ namespace subprocess { if (payload && payload_length > 0) { if (!WriteFile(handle, payload, static_cast(payload_length), &written, nullptr) || written != payload_length) { - BOOST_LOG(error) << "Failed to write payload: " << GetLastError(); + SUBPROCESS_LOG(error) << "Failed to write payload: " << GetLastError(); return result_e::error_write; } } @@ -566,7 +566,7 @@ namespace subprocess { // Write header ssize_t written = write(fd, &header, sizeof(header)); if (written != sizeof(header)) { - BOOST_LOG(error) << "Failed to write header: " << strerror(errno); + SUBPROCESS_LOG(error) << "Failed to write header: " << strerror(errno); return result_e::error_write; } @@ -574,7 +574,7 @@ namespace subprocess { if (payload && payload_length > 0) { written = write(fd, payload, payload_length); if (written != static_cast(payload_length)) { - BOOST_LOG(error) << "Failed to write payload: " << strerror(errno); + SUBPROCESS_LOG(error) << "Failed to write payload: " << strerror(errno); return result_e::error_write; } } @@ -702,7 +702,7 @@ namespace subprocess { return result_e::error_timeout; } else if (ret < 0) { - BOOST_LOG(error) << "Poll failed: " << strerror(errno); + SUBPROCESS_LOG(error) << "Poll failed: " << strerror(errno); return result_e::error_read; } @@ -754,7 +754,7 @@ namespace subprocess { continue; } else if (result != result_e::success) { - BOOST_LOG(warning) << "IPC receive error: " << result_to_string(result); + SUBPROCESS_LOG(warning) << "IPC receive error: " << result_to_string(result); break; } diff --git a/src/subprocess/subprocess_logging.h b/src/subprocess/subprocess_logging.h new file mode 100644 index 00000000000..decbe57fd31 --- /dev/null +++ b/src/subprocess/subprocess_logging.h @@ -0,0 +1,47 @@ +/** + * @file src/subprocess/subprocess_logging.h + * @brief Simple logging wrapper for subprocess module. + * + * This header provides a unified logging interface that works in both: + * - Main Sunshine process (uses full Boost.Log infrastructure) + * - Subprocess sender (uses simple stderr logging) + */ +#pragma once + +#ifdef SUBPROCESS_STANDALONE + // Standalone subprocess - use simple stderr logging + #include + #include + + #define SUBPROCESS_LOG(level) subprocess::log_stream(#level) + +namespace subprocess { + class log_stream { + public: + explicit log_stream(const char *level): + level_(level) {} + + ~log_stream() { + std::cerr << "[" << level_ << "] " << stream_.str() << std::endl; + } + + template + log_stream & + operator<<(const T &value) { + stream_ << value; + return *this; + } + + private: + const char *level_; + std::ostringstream stream_; + }; +} // namespace subprocess + +#else + // Main process - use Boost.Log + #include "src/logging.h" + + #define SUBPROCESS_LOG(level) BOOST_LOG(level) + +#endif From 865c41bc1ebc3e20a2092ea0e771c908741858a9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 22 Dec 2025 04:16:00 +0000 Subject: [PATCH 4/5] fix: Address code review feedback - Add missing include to subprocess_manager.h - Use raw string literal for Windows pipe name - Fix Unix pipe path to use /tmp directory - Change 'using namespace std::literals' to 'using namespace std::chrono_literals' Co-authored-by: ShadowLemoon <119576779+ShadowLemoon@users.noreply.github.com> --- _codeql_detected_source_root | 1 + src/sender/sender_main.cpp | 3 ++- src/subprocess/ipc_pipe.cpp | 4 ++-- src/subprocess/ipc_protocol.h | 6 +++++- src/subprocess/subprocess_manager.h | 1 + 5 files changed, 11 insertions(+), 4 deletions(-) create mode 120000 _codeql_detected_source_root diff --git a/_codeql_detected_source_root b/_codeql_detected_source_root new file mode 120000 index 00000000000..945c9b46d68 --- /dev/null +++ b/_codeql_detected_source_root @@ -0,0 +1 @@ +. \ No newline at end of file diff --git a/src/sender/sender_main.cpp b/src/sender/sender_main.cpp index 7cc69447406..832b5c6055c 100644 --- a/src/sender/sender_main.cpp +++ b/src/sender/sender_main.cpp @@ -26,7 +26,8 @@ #include #endif -using namespace std::literals; +// Use chrono_literals for time durations +using namespace std::chrono_literals; namespace ipc = subprocess::ipc; // Global state diff --git a/src/subprocess/ipc_pipe.cpp b/src/subprocess/ipc_pipe.cpp index b4e1d28e0be..d33f7bfc568 100644 --- a/src/subprocess/ipc_pipe.cpp +++ b/src/subprocess/ipc_pipe.cpp @@ -19,10 +19,10 @@ #include #endif -using namespace std::literals; - namespace subprocess { namespace ipc { + // Use chrono_literals only in the implementation + using namespace std::chrono_literals; const char * result_to_string(result_e result) { diff --git a/src/subprocess/ipc_protocol.h b/src/subprocess/ipc_protocol.h index 4aa809ca953..4324c748a17 100644 --- a/src/subprocess/ipc_protocol.h +++ b/src/subprocess/ipc_protocol.h @@ -149,7 +149,11 @@ namespace subprocess { */ inline std::string get_pipe_name(uint32_t session_id) { - return "\\\\.\\pipe\\sunshine_subprocess_" + std::to_string(session_id); +#ifdef _WIN32 + return R"(\\.\pipe\sunshine_subprocess_)" + std::to_string(session_id); +#else + return "/tmp/sunshine_subprocess_" + std::to_string(session_id); +#endif } /** diff --git a/src/subprocess/subprocess_manager.h b/src/subprocess/subprocess_manager.h index efe72cc6e38..6dde9aee418 100644 --- a/src/subprocess/subprocess_manager.h +++ b/src/subprocess/subprocess_manager.h @@ -12,6 +12,7 @@ #include #include #include +#include #include "ipc_pipe.h" #include "ipc_protocol.h" From 7b8afce8bafa5e51264e2c4d44eda716d4ed3ed9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 22 Dec 2025 04:16:34 +0000 Subject: [PATCH 5/5] chore: Remove codeql artifact and update gitignore Co-authored-by: ShadowLemoon <119576779+ShadowLemoon@users.noreply.github.com> --- .gitignore | 5 ++++- _codeql_detected_source_root | 1 - 2 files changed, 4 insertions(+), 2 deletions(-) delete mode 120000 _codeql_detected_source_root diff --git a/.gitignore b/.gitignore index 6f4f5e3c776..df56ef38391 100644 --- a/.gitignore +++ b/.gitignore @@ -57,4 +57,7 @@ package-lock.json venv/ # Generated files -src_assets/common/assets/web/welcome.html \ No newline at end of file +src_assets/common/assets/web/welcome.html + +# CodeQL +_codeql_detected_source_root \ No newline at end of file diff --git a/_codeql_detected_source_root b/_codeql_detected_source_root deleted file mode 120000 index 945c9b46d68..00000000000 --- a/_codeql_detected_source_root +++ /dev/null @@ -1 +0,0 @@ -. \ No newline at end of file