-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathqueue.h
58 lines (49 loc) · 1.22 KB
/
queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <functional>
#include <vector>
#include <mutex>
static const int N_THREADS = std::thread::hardware_concurrency();
template<typename ThreadStorage>
struct work_queue {
using work_item = std::function<void(ThreadStorage&)>;
std::vector<ThreadStorage> stores;
std::vector<work_item> vec;
std::mutex mutex;
unsigned int i = 0;
bool running = false;
void add(work_item f) {
mutex.lock();
if (running) {
throw "Cannot add after the work queue has started.";
}
vec.push_back(f);
mutex.unlock();
}
bool get_task(work_item *out) {
mutex.lock();
bool result = false;
if (i < vec.size()) {
*out = vec[i++];
result = true;
}
mutex.unlock();
return result;
}
void thread_worker(ThreadStorage &store) {
work_item f;
while (get_task(&f)) {
f(store);
}
}
void run() {
running = true;
stores = std::vector<ThreadStorage>(N_THREADS);
std::thread threads[N_THREADS - 1];
for (int i = 0; i < N_THREADS - 1; i++) {
threads[i] = std::thread(&work_queue::thread_worker, this, std::ref(stores[i]));
}
thread_worker(stores[N_THREADS - 1]);
for (int i = 0; i < N_THREADS-1; i++) {
threads[i].join();
}
}
};