-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththreadpool.cpp
More file actions
186 lines (158 loc) · 4.75 KB
/
threadpool.cpp
File metadata and controls
186 lines (158 loc) · 4.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#include <condition_variable>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
using namespace std;
// Custom ThreadPool implementation (C++ has no standard thread pool yet)
class ThreadPool {
public:
// Constructor: creates worker threads
ThreadPool(size_t numThreads) : stop(false)
{
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] {
while (true) {
function<void()> task;
{
unique_lock<mutex> lock(queueMutex);
// Wait until there's a task or stop signal
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) {
return; // Exit thread
}
task = move(tasks.front());
tasks.pop();
}
task(); // Execute task outside the lock
}
});
}
}
// Submit a task to the pool and get a future for the result
template <class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> future<typename result_of<F(Args...)>::type>
{
using return_type = typename result_of<F(Args...)>::type;
auto task = make_shared<packaged_task<return_type()>>(
bind(forward<F>(f), forward<Args>(args)...));
future<return_type> res = task->get_future();
{
unique_lock<mutex> lock(queueMutex);
if (stop) {
throw runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
// Destructor: waits for all tasks to complete
~ThreadPool()
{
{
unique_lock<mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (thread& worker : workers) {
worker.join();
}
}
private:
vector<thread> workers; // Worker threads
queue<function<void()>> tasks; // Task queue
mutex queueMutex; // Protects task queue
condition_variable condition; // For thread synchronization
bool stop; // Stop flag
};
// Example usage
int main()
{
// Create a thread pool with 4 worker threads
ThreadPool pool(4);
// Example 1: Simple tasks without return values
cout << "=== Example 1: Simple Tasks ===" << endl;
vector<future<void>> results1;
for (int i = 0; i < 8; ++i) {
results1.emplace_back(pool.enqueue([i] {
cout << "Task " << i << " executing on thread " << this_thread::get_id()
<< endl;
this_thread::sleep_for(chrono::milliseconds(100));
}));
}
// Wait for all tasks
for (auto& result : results1) {
result.get();
}
cout << "\n=== Example 2: Tasks with Return Values ===" << endl;
// Example 2: Tasks that return values
vector<future<int>> results2;
for (int i = 0; i < 5; ++i) {
results2.emplace_back(pool.enqueue([i] {
int result = i * i;
cout << "Computing " << i << "^2 = " << result << endl;
return result;
}));
}
// Get results
for (auto& result : results2) {
cout << "Result: " << result.get() << endl;
}
cout << "\n=== Example 3: Tasks with Parameters ===" << endl;
// Example 3: Functions with parameters
auto multiply = [](int a, int b) {
cout << a << " * " << b << " = " << (a * b) << endl;
return a * b;
};
vector<future<int>> results3;
for (int i = 1; i <= 5; ++i) {
results3.emplace_back(pool.enqueue(multiply, i, 10));
}
for (auto& result : results3) {
result.get();
}
return 0;
}
/*
STANDARD C++ THREAD POOL STATUS:
================================
❌ C++11/14/17: No standard thread pool
❌ C++20: Still no standard thread pool (added std::jthread but not a pool)
❌ C++23: No standard thread pool yet
⏳ C++26: Proposed but not yet standardized
ALTERNATIVES:
=============
1. Custom Implementation (like above) ✅
- Full control
- Lightweight
- No dependencies
2. Boost.Asio thread pool:
#include <boost/asio/thread_pool.hpp>
boost::asio::thread_pool pool(4);
boost::asio::post(pool, task);
3. Intel TBB (Threading Building Blocks):
#include <tbb/task_group.h>
tbb::task_group tg;
tg.run(task);
4. std::async (C++11) - Not a pool but simpler:
auto future = std::async(std::launch::async, function);
KEY CONCEPTS:
============
1. Worker Threads: Fixed number of threads waiting for work
2. Task Queue: Thread-safe queue of tasks to execute
3. Mutex: Protects the task queue from race conditions
4. Condition Variable: Efficiently wakes threads when work arrives
5. Future/Promise: Allows returning values from async tasks
BENEFITS:
=========
✓ Avoid thread creation overhead
✓ Control resource usage (limit thread count)
✓ Better than creating threads on-demand
✓ Efficient task distribution
*/