Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cmake/netlib_features.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,21 @@ if(NETLIB_HAS_STD_EXECUTION)
else()
message(STATUS "netlib: NETLIB_HAS_STD_EXECUTION=OFF (fallback backend)")
endif()

check_cxx_source_compiles(
"
#include <functional>
int main() {
std::move_only_function<void()> f;
(void)f;
return 0;
}
"
NETLIB_HAS_MOVE_ONLY_FUNCTION
)

if(NETLIB_HAS_MOVE_ONLY_FUNCTION)
message(STATUS "netlib: std::move_only_function доступен")
else()
message(STATUS "netlib: std::move_only_function недоступен — polyfill через -include")
endif()
4 changes: 4 additions & 0 deletions cmake/netlib_target.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ target_include_directories(netlib INTERFACE

target_compile_features(netlib INTERFACE cxx_std_${NETLIB_CXX_STANDARD})

if(NOT NETLIB_HAS_MOVE_ONLY_FUNCTION)
target_compile_definitions(netlib INTERFACE NETLIB_POLYFILL_MOVE_ONLY_FUNCTION=1)
endif()

if(NETLIB_ENABLE_COROUTINES)
target_compile_definitions(netlib INTERFACE NETLIB_ENABLE_COROUTINES=1)
message(STATUS "netlib: NETLIB_ENABLE_COROUTINES=ON")
Expand Down
77 changes: 77 additions & 0 deletions include/netlib/detail/move_only_function.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#pragma once

// Polyfill std::move_only_function when the standard library does not provide it
// (e.g. libc++ on Apple Clang in CI). Included from headers that use std::move_only_function.

#include <functional>
#include <memory>
#include <type_traits>
#include <utility>
#include <version>

#if !defined(NETLIB_POLYFILL_MOVE_ONLY_FUNCTION) && defined(__cpp_lib_move_only_function) \
&& (__cpp_lib_move_only_function >= 202110L)
// std::move_only_function is available.
#else
Comment thread
cursor[bot] marked this conversation as resolved.

namespace rrmode::netlib::detail {

template<typename>
class move_only_function;

template<typename R, typename... Args>
class move_only_function<R(Args...)> {
struct impl_base {
virtual ~impl_base() = default;
virtual R call(Args... args) = 0;
};

template<typename F>
struct impl final : impl_base {
F fn;

explicit impl(F&& f) : fn(std::move(f)) {}

R call(Args... args) override {
if constexpr (std::is_void_v<R>) {
fn(std::forward<Args>(args)...);
} else {
return fn(std::forward<Args>(args)...);
}
}
};

std::unique_ptr<impl_base> impl_{};

public:
move_only_function() noexcept = default;

move_only_function(std::nullptr_t) noexcept {}

move_only_function(move_only_function&& other) noexcept = default;
move_only_function& operator=(move_only_function&& other) noexcept = default;

move_only_function(move_only_function const&) = delete;
move_only_function& operator=(move_only_function const&) = delete;

template<typename F>
requires std::is_invocable_r_v<R, F&, Args...> && (!std::is_same_v<std::decay_t<F>, move_only_function>)
move_only_function(F&& f) : impl_{std::make_unique<impl<std::decay_t<F>>>(std::forward<F>(f))} {}

explicit operator bool() const noexcept { return static_cast<bool>(impl_); }

R operator()(Args... args) const {
if (!impl_) {
throw std::bad_function_call{};
}
return impl_->call(std::forward<Args>(args)...);
}
};

} // namespace rrmode::netlib::detail

namespace std {
using rrmode::netlib::detail::move_only_function;
} // namespace std

#endif
9 changes: 8 additions & 1 deletion include/netlib/execution/detail/task_promise.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <netlib/execution/error.hpp>
#include <netlib/execution/scheduler.hpp>

#include <condition_variable>
Expand Down Expand Up @@ -51,7 +52,13 @@ inline void resume_on_scheduler(scheduler_holder& holder, std::coroutine_handle<
}
if (cont) {
if (holder.sched != nullptr) {
holder.sched->schedule([cont]() mutable { cont.resume(); });
try {
holder.sched->schedule([cont]() mutable { cont.resume(); });
} catch (execution_error const&) {
// thread_pool остановлен: продолжение нельзя отложить, но цепочку co_await
// нужно завершить синхронно, иначе родитель зависнет на final_suspend.
cont.resume();
}
Comment thread
cursor[bot] marked this conversation as resolved.
} else {
cont.resume();
}
Expand Down
2 changes: 1 addition & 1 deletion include/netlib/execution/scheduler.hpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include <netlib/execution/executor.hpp>
#include <netlib/detail/move_only_function.hpp>

#include <functional>
#include <memory>

namespace rrmode::netlib::execution {
Expand Down
2 changes: 2 additions & 0 deletions include/netlib/execution/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class [[nodiscard]] task {
template<>
class [[nodiscard]] task<void> {
public:
using value_type = void;

struct promise_type : detail::task_promise_storage<void> {
using storage = detail::task_promise_storage<void>;

Expand Down
5 changes: 5 additions & 0 deletions include/netlib/execution/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ class thread_pool : public executor {
queue_.push(std::move(fn));
}

/// После shutdown: true, если post() отклонит задачу.
[[nodiscard]] bool is_stopped() const noexcept {
return stopped_.load(std::memory_order_acquire);
}

void request_stop() override { shutdown(); }

/// Остановить пул: закрыть очередь, дождаться потоков.
Expand Down
2 changes: 1 addition & 1 deletion include/netlib/net/detail/reactor_backend.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#pragma once

#include <netlib/net/detail/poll_event.hpp>
#include <netlib/detail/move_only_function.hpp>

#include <chrono>
#include <functional>

namespace rrmode::netlib::net::detail {

Expand Down
4 changes: 2 additions & 2 deletions include/netlib/net/detail/win_socket_backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ class win_socket_backend final : public socket_backend {
}

private:
static SOCKET to_socket(int fd) { return reinterpret_cast<SOCKET>(static_cast<intptr_t>(fd)); }
static SOCKET to_socket(int fd) { return static_cast<SOCKET>(static_cast<UINT_PTR>(fd)); }

static int to_fd(SOCKET s) { return static_cast<int>(reinterpret_cast<intptr_t>(s)); }
static int to_fd(SOCKET s) { return static_cast<int>(static_cast<UINT_PTR>(s)); }

[[noreturn]] static void throw_wsa(char const* context) {
throw net_error(std::string{context} + ": WSA error " + std::to_string(::WSAGetLastError()));
Expand Down
2 changes: 1 addition & 1 deletion include/netlib/net/medium/tcp_acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <netlib/net/medium/tcp_socket.hpp>
#include <netlib/net/tcp_acceptor.hpp>

#include <functional>
#include <netlib/detail/move_only_function.hpp>
#include <utility>

namespace rrmode::netlib::net::medium {
Expand Down
2 changes: 1 addition & 1 deletion include/netlib/net/medium/tcp_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <netlib/net/medium/socket_options.hpp>
#include <netlib/net/tcp_socket.hpp>

#include <functional>
#include <netlib/detail/move_only_function.hpp>
#include <span>
#include <vector>

Expand Down
2 changes: 1 addition & 1 deletion include/netlib/net/simple/tcp_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <netlib/net/tcp_socket.hpp>

#include <cstddef>
#include <functional>
#include <netlib/detail/move_only_function.hpp>
#include <memory>
#include <span>
#include <thread>
Expand Down
2 changes: 1 addition & 1 deletion include/netlib/net/simple/write_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <netlib/net/simple/writable_chunk.hpp>

#include <cstddef>
#include <functional>
#include <netlib/detail/move_only_function.hpp>
#include <optional>
#include <vector>

Expand Down
2 changes: 1 addition & 1 deletion include/netlib/net/tcp_acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

#include <atomic>
#include <cstdint>
#include <functional>
#include <netlib/detail/move_only_function.hpp>
#include <memory>
#include <mutex>
#include <utility>
Expand Down
2 changes: 1 addition & 1 deletion include/netlib/net/tcp_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <netlib/net/event_loop.hpp>

#include <cstring>
#include <functional> // std::move_only_function
#include <netlib/detail/move_only_function.hpp>
#include <memory>
#include <span>
#include <utility>
Expand Down
2 changes: 1 addition & 1 deletion include/netlib/net/udp_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <netlib/net/event_loop.hpp>

#include <cstdint>
#include <functional>
#include <netlib/detail/move_only_function.hpp>
#include <memory>
#include <span>
#include <utility>
Expand Down
2 changes: 1 addition & 1 deletion include/netlib/net/unix_stream_acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <netlib/net/unix_stream_socket.hpp>

#include <atomic>
#include <functional>
#include <netlib/detail/move_only_function.hpp>
#include <memory>
#include <mutex>
#include <string>
Expand Down
2 changes: 1 addition & 1 deletion include/netlib/net/unix_stream_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <netlib/net/unix_endpoint.hpp>

#include <cstring>
#include <functional>
#include <netlib/detail/move_only_function.hpp>
#include <memory>
#include <span>
#include <utility>
Expand Down
6 changes: 4 additions & 2 deletions tests/fakes/fake_socket_backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <netlib/net/error.hpp>
#include <netlib/net/unix_endpoint.hpp>

#include <algorithm>

#include <deque>
#include <optional>
#include <span>
Expand Down Expand Up @@ -152,7 +154,7 @@ class fake_socket_backend final : public rrmode::netlib::net::detail::socket_bac
auto dg = std::move(s.datagram_rx.front());
s.datagram_rx.pop_front();
out = dg.remote;
std::size_t const n = std::min(buf.size(), dg.payload.size());
std::size_t const n = (std::min)(buf.size(), dg.payload.size());
for (std::size_t i = 0; i < n; ++i) {
buf[i] = dg.payload[i];
}
Expand Down Expand Up @@ -194,7 +196,7 @@ class fake_socket_backend final : public rrmode::netlib::net::detail::socket_bac
}
return std::size_t{0};
}
std::size_t n = std::min(buf.size(), s.rx.size());
std::size_t n = (std::min)(buf.size(), s.rx.size());
for (std::size_t i = 0; i < n; ++i) {
buf[i] = s.rx.front();
s.rx.pop_front();
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/udp_loopback_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ TEST_CASE("UDP loopback ping-pong") {
REQUIRE(client_msg == "pong");
}

} // namespace

#else

TEST_CASE("UDP loopback ping-pong") { REQUIRE(true); }

#endif

} // namespace
2 changes: 1 addition & 1 deletion tests/integration/unix_echo_coro_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
#include <catch2/catch_test_macros.hpp>
#include <string>
#include <thread>
#include <unistd.h>
#include <vector>

#if defined(__linux__) || defined(__APPLE__)
#include <unistd.h>

using namespace std::chrono_literals;

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/unix_server_coro_shutdown_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
#include <catch2/catch_test_macros.hpp>
#include <string>
#include <thread>
#include <unistd.h>
#include <vector>

#if defined(__linux__) || defined(__APPLE__)
#include <unistd.h>

using namespace std::chrono_literals;

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/execution/delay_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ TEST_CASE("delay_async scheduler: параллельные паузы") {
sync_wait(sched, parallel_delays(sched));
auto const elapsed = std::chrono::steady_clock::now() - t0;

pool.shutdown();
REQUIRE(elapsed >= std::chrono::milliseconds{35});
REQUIRE(elapsed < std::chrono::milliseconds{120});
REQUIRE(elapsed < std::chrono::milliseconds{300});
pool.shutdown();
}

TEST_CASE("delay_async scheduler: нулевая длительность") {
Expand Down
35 changes: 35 additions & 0 deletions tests/unit/execution/task_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
#include <netlib/execution/coroutine.hpp>
#include <netlib/execution/error.hpp>
#include <netlib/execution/executor.hpp>
#include <netlib/execution/thread_pool.hpp>

#include <catch2/catch_test_macros.hpp>
#include <functional>
#include <stdexcept>

using rrmode::netlib::execution::execution_error;
using rrmode::netlib::execution::executor;
using rrmode::netlib::execution::scheduler;
using rrmode::netlib::execution::sync_wait;
using rrmode::netlib::execution::task;
Expand Down Expand Up @@ -51,6 +56,36 @@ TEST_CASE("task void завершается без значения") {
pool.shutdown();
}

TEST_CASE("task: execution_error от schedule возобновляет continuation") {
struct stopped_executor final : executor {
void post(std::function<void()> /*fn*/) override {
throw execution_error("post в остановленный thread_pool");
}
};

stopped_executor ex;
scheduler sched{ex};
bool parent_done = false;
bool outer_done = false;

auto child = []() -> task<void> { co_return; };
auto parent = [&]() -> task<void> {
co_await child();
parent_done = true;
};
auto outer = [&]() -> task<void> {
co_await parent();
outer_done = true;
};

auto h = outer().release();
h.promise().set_scheduler(sched);
h.resume();

REQUIRE(parent_done);
REQUIRE(outer_done);
}

TEST_CASE("task пробрасывает исключение") {
thread_pool pool{1};
scheduler sched{pool};
Expand Down
Loading
Loading