Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions coroio/http/httpd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ class TResponse {
};

struct IRouter {
virtual TFuture<void> HandleRequest(const TRequest& request, TResponse& response) = 0;
virtual TFuture<void> HandleRequest(TRequest& request, TResponse& response) = 0;
};

class THelloWorldRouter : public IRouter {
public:
TFuture<void> HandleRequest(const TRequest& request, TResponse& response) override {
TFuture<void> HandleRequest(TRequest& request, TResponse& response) override {
if (request.Uri().Path() == "/") {
response.SetStatus(200);
response.SetHeader("Content-Type", "text/plain");
Expand Down
27 changes: 26 additions & 1 deletion coroio/pipe/pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace NNet {

namespace {

[[maybe_unused]] void SetCloseOnExec(int fd) {
void SetCloseOnExec(int fd) {
int flags = fcntl(fd, F_GETFD);
if (flags == -1) {
throw std::runtime_error("fcntl(F_GETFD) failed");
Expand Down Expand Up @@ -70,8 +70,18 @@ void TPipe::TPipeLow::Fork() {
close(pipeStdin[1]);
throw std::system_error(code, std::generic_category(), "pipe() failed for stdout");
}
int pipeStderr[2];
if (pipe(pipeStderr) == -1) {
auto code = errno;
close(pipeStdin[0]);
close(pipeStdin[1]);
close(pipeStdout[0]);
close(pipeStdout[1]);
throw std::system_error(code, std::generic_category(), "pipe() failed for stderr");
}
SetCloseOnExec(pipeStdin[0]); SetCloseOnExec(pipeStdin[1]);
SetCloseOnExec(pipeStdout[0]); SetCloseOnExec(pipeStdout[1]);
SetCloseOnExec(pipeStderr[0]); SetCloseOnExec(pipeStderr[1]);

auto pid = fork();
if (pid == -1) {
Expand All @@ -80,13 +90,16 @@ void TPipe::TPipeLow::Fork() {
close(pipeStdin[1]);
close(pipeStdout[0]);
close(pipeStdout[1]);
close(pipeStderr[0]);
close(pipeStderr[1]);
throw std::system_error(code, std::generic_category(), "fork() failed");
}

if (pid == 0) {
// Child process
close(pipeStdin[1]);
close(pipeStdout[0]);
close(pipeStderr[0]);

if (dup2(pipeStdin[0], STDIN_FILENO) == -1) {
std::cerr << "dup2() failed for stdin: " << strerror(errno) << std::endl;
Expand All @@ -96,9 +109,14 @@ void TPipe::TPipeLow::Fork() {
std::cerr << "dup2() failed for stdout: " << strerror(errno) << std::endl;
_exit(1);
}
if (dup2(pipeStderr[1], STDERR_FILENO) == -1) {
std::cerr << "dup2() failed for stderr: " << strerror(errno) << std::endl;
_exit(1);
}

close(pipeStdin[0]);
close(pipeStdout[1]);
close(pipeStderr[1]);

std::vector<char*> cargs;
cargs.reserve(Args.size() + 2);
Expand All @@ -115,19 +133,26 @@ void TPipe::TPipeLow::Fork() {
// Parent process
close(pipeStdin[0]);
close(pipeStdout[1]);
close(pipeStderr[1]);

ChildPid = pid;
WriteFd = pipeStdin[1];
ReadFd = pipeStdout[0];
ErrFd = pipeStderr[0];
SetNonBlocking(ReadFd);
SetNonBlocking(WriteFd);
SetNonBlocking(ErrFd);
}
}

TFuture<ssize_t> TPipe::ReadSome(void* buffer, size_t size) {
co_return co_await ReadHandle->ReadSome(buffer, size);
}

TFuture<ssize_t> TPipe::ReadSomeErr(void* buffer, size_t size) {
co_return co_await ErrHandle->ReadSome(buffer, size);
}

TFuture<ssize_t> TPipe::WriteSome(const void* buffer, size_t size) {
co_return co_await WriteHandle->WriteSome(buffer, size);
}
Expand Down
18 changes: 18 additions & 0 deletions coroio/pipe/pipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,25 @@ class TPipe {

ReadHandle = std::make_unique<TPipeFileHandle<TPoller>>(PipeLow.ReadFd, poller);
WriteHandle = std::make_unique<TPipeFileHandle<TPoller>>(PipeLow.WriteFd, poller);
ErrHandle = std::make_unique<TPipeFileHandle<TPoller>>(PipeLow.ErrFd, poller);
}

void CloseRead() {
ReadHandle.reset();
}

void CloseWrite() {
WriteHandle.reset();
}

void CloseErr() {
ErrHandle.reset();
}

int Wait();

TFuture<ssize_t> ReadSome(void* buffer, size_t size);
TFuture<ssize_t> ReadSomeErr(void* buffer, size_t size);
TFuture<ssize_t> WriteSome(const void* buffer, size_t size);

private:
Expand All @@ -33,6 +49,7 @@ class TPipe {

int ReadFd = -1;
int WriteFd = -1;
int ErrFd = -1;
int ChildPid = -1;
};

Expand Down Expand Up @@ -61,6 +78,7 @@ class TPipe {
TPipeLow PipeLow;
std::unique_ptr<TTypelessFileHandle> ReadHandle;
std::unique_ptr<TTypelessFileHandle> WriteHandle;
std::unique_ptr<TTypelessFileHandle> ErrHandle;
};

#endif // _WIN32
Expand Down
27 changes: 27 additions & 0 deletions tests/test_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,32 @@ void test_pipe_basic_read_write(void**) {
assert_true(bytesRead == sizeof(testData));
assert_true(memcmp(buffer, testData, sizeof(testData)) == 0);
}

void test_pipe_read_stderr(void**) {
TLoop<TDefaultPoller> loop;
auto& poller = loop.Poller();
std::string bashPath = "/bin/bash";
TPipe pipe(poller, bashPath, {"-c", "echo 'error message' 1>&2"});

char buffer[64] = {};
size_t bytesRead = sizeof(buffer);
auto reader = [](TPipe& pipe, char* buffer, size_t& size) -> TFuture<void> {
try {
size = co_await pipe.ReadSomeErr(buffer, size);
} catch (const std::exception& ex) {
std::cerr << "Read error: " << ex.what() << "\n";
}
}(pipe, buffer, bytesRead);

while (!reader.done()) {
loop.Step();
}

const char expected[] = "error message\n";
assert_true(bytesRead == sizeof(expected)-1);
assert_true(memcmp(buffer, expected, sizeof(expected)-1) == 0);
}

#endif // _WIN32

int main(int argc, char** argv) {
Expand All @@ -68,6 +94,7 @@ int main(int argc, char** argv) {

#ifndef _WIN32
ADD_TEST(cmocka_unit_test, test_pipe_basic_read_write);
ADD_TEST(cmocka_unit_test, test_pipe_read_stderr);
#endif

return _cmocka_run_group_tests("test_pipe", tests.data(), tests.size(), NULL, NULL);
Expand Down
Loading