35 #ifndef __ctpl_thread_pool_H__ 36 #define __ctpl_thread_pool_H__ 48 #include "boost/lockfree/queue.hpp" 50 #ifndef _ctplThreadPoolLength_ 51 #define _ctplThreadPoolLength_ 100 80 int size() {
return static_cast<int>(this->threads.size()); }
83 int n_idle() {
return this->nWaiting; }
84 std::thread &
get_thread(
int i) {
return *this->threads[i]; }
91 if (!this->isStop && !this->isDone) {
92 int oldNThreads =
static_cast<int>(this->threads.size());
93 if (oldNThreads <= nThreads) {
94 this->threads.resize(nThreads);
95 this->flags.resize(nThreads);
97 for (
int i = oldNThreads; i < nThreads; ++i) {
98 this->flags[i] = std::make_shared<std::atomic<bool>>(
false);
102 for (
int i = oldNThreads - 1; i >= nThreads; --i) {
103 *this->flags[i] =
true;
104 this->threads[i]->detach();
108 std::unique_lock<std::mutex> lock(this->mutex);
109 this->cv.notify_all();
111 this->threads.resize(
113 this->flags.resize(nThreads);
122 std::function<void(int id)> *_f;
123 while (this->q.pop(_f))
delete _f;
127 std::function<void(int)>
pop() {
128 std::function<void(int id)> *_f =
nullptr;
130 std::unique_ptr<std::function<void(int id)>>
func(
133 std::function<void(int)> f;
142 void stop(
bool isWait =
false) {
144 if (this->isStop)
return;
146 for (
int i = 0, n = this->
size(); i < n; ++i) {
147 *this->flags[i] =
true;
151 if (this->isDone || this->isStop)
return;
155 std::unique_lock<std::mutex> lock(this->mutex);
156 this->cv.notify_all();
158 for (
int i = 0; i < static_cast<int>(this->threads.size());
160 if (this->threads[i]->joinable()) this->threads[i]->join();
166 this->threads.clear();
170 template <
typename F,
typename... Rest>
171 auto push(F &&f, Rest &&... rest) -> std::future<decltype(f(0, rest...))> {
173 std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
174 std::bind(std::forward<F>(f), std::placeholders::_1,
175 std::forward<Rest>(rest)...));
177 auto _f =
new std::function<void(int id)>([pck](
int id) { (*pck)(id); });
180 std::unique_lock<std::mutex> lock(this->mutex);
181 this->cv.notify_one();
183 return pck->get_future();
190 template <
typename F>
191 auto push(F &&f) -> std::future<decltype(f(0))> {
192 auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(
195 auto _f =
new std::function<void(int id)>([pck](
int id) { (*pck)(id); });
198 std::unique_lock<std::mutex> lock(this->mutex);
199 this->cv.notify_one();
201 return pck->get_future();
211 void set_thread(
int i) {
212 std::shared_ptr<std::atomic<bool>> flag(
214 auto f = [
this, i, flag ]() {
215 std::atomic<bool> &_flag = *flag;
216 std::function<void(int id)> *_f;
217 bool isPop = this->q.pop(_f);
220 std::unique_ptr<std::function<void(int id)>>
func(
229 isPop = this->q.pop(_f);
233 std::unique_lock<std::mutex> lock(this->mutex);
235 this->cv.wait(lock, [
this, &_f, &isPop, &_flag]() {
236 isPop = this->q.pop(_f);
237 return isPop || this->isDone || _flag;
246 this->threads[i].reset(
252 this->isStop =
false;
253 this->isDone =
false;
256 std::vector<std::unique_ptr<std::thread>> threads;
257 std::vector<std::shared_ptr<std::atomic<bool>>> flags;
258 mutable boost::lockfree::queue<std::function<void(int id)> *> q;
259 std::atomic<bool> isDone;
260 std::atomic<bool> isStop;
261 std::atomic<int> nWaiting;
264 std::condition_variable cv;
#define _ctplThreadPoolLength_
Definition: ctpl.h:51
int n_idle()
Definition: ctpl.h:83
void(* func)(void *)
Definition: routine_context.h:41
void start(int nThreads)
Definition: ctpl.h:74
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
Definition: ctpl.h:171
void resize(int nThreads)
Definition: ctpl.h:90
void clear_queue()
Definition: ctpl.h:121
thread_pool(int nThreads, int queueSize=_ctplThreadPoolLength_)
Definition: ctpl.h:64
std::function< void(int)> pop()
Definition: ctpl.h:127
thread_pool()
Definition: ctpl.h:63
std::thread & get_thread(int i)
Definition: ctpl.h:84
void stop(bool isWait=false)
Definition: ctpl.h:142
int size()
Definition: ctpl.h:80
auto push(F &&f) -> std::future< decltype(f(0))>
Definition: ctpl.h:191
~thread_pool()
Definition: ctpl.h:71