-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_queue.h
More file actions
92 lines (84 loc) · 3.2 KB
/
async_queue.h
File metadata and controls
92 lines (84 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#pragma once
/*
async_queue.h - Implementation of a C++20 async awaitable queue.
Copyright (c) 2023 Dirk O. Kaar. All rights reserved.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "task_completion_source.h"
#include "task.h"
#include "lfllist.h"
namespace ghostl
{
template<typename T = void>
struct async_queue : private lfllist<T>
{
using lfllist_type = lfllist<T>;
async_queue()
{
cur_tcs.store(tcs_queue.emplace_front(task_completion_source<>()));
}
async_queue(const async_queue&) = delete;
async_queue(async_queue&&) = delete;
auto operator =(const async_queue&)->async_queue & = delete;
auto operator =(async_queue&&)->async_queue & = delete;
[[nodiscard]] auto push(T&& val) -> bool
{
if (auto node = lfllist_type::emplace_front(std::move(val)); node != nullptr)
{
if (auto _tcs_node = tcs_queue.emplace_front(task_completion_source<>()); _tcs_node != nullptr)
{
if (auto _cur_tcs = cur_tcs.exchange(_tcs_node); _cur_tcs != nullptr)
{
task_completion_source<> tcs = _cur_tcs->item;
tcs.set_value();
return true;
}
// keep new cur_tcs, this fixes previous failure to set it (OOM)
}
lfllist_type::erase(node);
}
return false;
}
inline auto push(const T& val) -> bool ALWAYS_INLINE_ATTR
{
T v(val);
return push(std::move(v));
}
auto flush() -> void
{
while (tcs_queue.back())
{
if (task_completion_source<> item; tcs_queue.try_pop(item)) {}
}
cur_tcs.store(tcs_queue.emplace_front(task_completion_source<>()));
while (lfllist_type::back())
{
if (T item; lfllist_type::try_pop(item)) {}
}
}
auto pop() -> ghostl::task<T>
{
auto tcs = tcs_queue.back();
auto token = tcs->item.token();
co_await token;
tcs_queue.erase(tcs);
decltype(lfllist_type::back()) node = lfllist_type::back();
T item = std::move(node->item);
lfllist_type::erase(node);
co_return item;
}
private:
ghostl::lfllist<task_completion_source<>> tcs_queue;
std::atomic<typename decltype(tcs_queue)::node_type*> cur_tcs;
};
}