Apollo  6.0
Open source self driving car software
ctpl.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2019 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 /*********************************************************
18  *
19  * Copyright (C) 2014 by Vitaliy Vitsentiy
20  *
21  * Licensed under the Apache License, Version 2.0 (the "License");
22  * you may not use this file except in compliance with the License.
23  * You may obtain a copy of the License at
24  *
25  * http://www.apache.org/licenses/LICENSE-2.0
26  *
27  * Unless required by applicable law or agreed to in writing, software
28  * distributed under the License is distributed on an "AS IS" BASIS,
29  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30  * See the License for the specific language governing permissions and
31  * limitations under the License.
32  *
33  *********************************************************/
34 
35 #ifndef __ctpl_thread_pool_H__
36 #define __ctpl_thread_pool_H__
37 
38 #include <atomic>
39 #include <exception>
40 #include <functional>
41 #include <future>
42 #include <memory>
43 #include <mutex>
44 #include <thread>
45 #include <utility>
46 #include <vector>
47 
48 #include "boost/lockfree/queue.hpp"
49 
50 #ifndef _ctplThreadPoolLength_
51 #define _ctplThreadPoolLength_ 100
52 #endif
53 
54 // thread pool to run user's functors with signature
55 // ret func(int id, other_params)
56 // where id is the index of the thread that runs the functor
57 // ret is some return type
58 
59 namespace ctpl {
60 
61 class thread_pool {
62  public:
63  thread_pool() : q(_ctplThreadPoolLength_) { this->init(); }
64  explicit thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_)
65  : q(queueSize) {
66  this->init();
67  this->resize(nThreads);
68  }
69 
70  // the destructor waits for all the functions in the queue to be finished
71  ~thread_pool() { this->stop(true); }
72 
73  // mock the construct function, from zyvitski's comments
74  void start(int nThreads) {
75  this->init();
76  this->resize(nThreads);
77  }
78 
79  // get the number of running threads in the pool
80  int size() { return static_cast<int>(this->threads.size()); }
81 
82  // number of idle threads
83  int n_idle() { return this->nWaiting; }
84  std::thread &get_thread(int i) { return *this->threads[i]; }
85 
86  // change the number of threads in the pool
87  // should be called from one thread, otherwise be careful to not interleave,
88  // also with this->stop()
89  // nThreads must be >= 0
90  void resize(int nThreads) {
91  if (!this->isStop && !this->isDone) {
92  int oldNThreads = static_cast<int>(this->threads.size());
93  if (oldNThreads <= nThreads) { // if the number of threads is increased
94  this->threads.resize(nThreads);
95  this->flags.resize(nThreads);
96 
97  for (int i = oldNThreads; i < nThreads; ++i) {
98  this->flags[i] = std::make_shared<std::atomic<bool>>(false);
99  this->set_thread(i);
100  }
101  } else { // the number of threads is decreased
102  for (int i = oldNThreads - 1; i >= nThreads; --i) {
103  *this->flags[i] = true; // this thread will finish
104  this->threads[i]->detach();
105  }
106  {
107  // stop the detached threads that were waiting
108  std::unique_lock<std::mutex> lock(this->mutex);
109  this->cv.notify_all();
110  }
111  this->threads.resize(
112  nThreads); // safe to delete because the threads are detached
113  this->flags.resize(nThreads); // safe to delete because the threads
114  // have copies of shared_ptr of the
115  // flags, not originals
116  }
117  }
118  }
119 
120  // empty the queue
121  void clear_queue() {
122  std::function<void(int id)> *_f;
123  while (this->q.pop(_f)) delete _f; // empty the queue
124  }
125 
126  // pops a functional wraper to the original function
127  std::function<void(int)> pop() {
128  std::function<void(int id)> *_f = nullptr;
129  this->q.pop(_f);
130  std::unique_ptr<std::function<void(int id)>> func(
131  _f); // at return, delete the function even if an exception occurred
132 
133  std::function<void(int)> f;
134  if (_f) f = *_f;
135  return f;
136  }
137 
138  // wait for all computing threads to finish and stop all threads
139  // may be called asyncronously to not pause the calling thread while waiting
140  // if isWait == true, all the functions in the queue are run, otherwise the
141  // queue is cleared without running the functions
142  void stop(bool isWait = false) {
143  if (!isWait) {
144  if (this->isStop) return;
145  this->isStop = true;
146  for (int i = 0, n = this->size(); i < n; ++i) {
147  *this->flags[i] = true; // command the threads to stop
148  }
149  this->clear_queue(); // empty the queue
150  } else {
151  if (this->isDone || this->isStop) return;
152  this->isDone = true; // give the waiting threads a command to finish
153  }
154  {
155  std::unique_lock<std::mutex> lock(this->mutex);
156  this->cv.notify_all(); // stop all waiting threads
157  }
158  for (int i = 0; i < static_cast<int>(this->threads.size());
159  ++i) { // wait for the computing threads to finish
160  if (this->threads[i]->joinable()) this->threads[i]->join();
161  }
162  // if there were no threads in the pool but some functors in the queue, the
163  // functors are not deleted by the threads
164  // therefore delete them here
165  this->clear_queue();
166  this->threads.clear();
167  this->flags.clear();
168  }
169 
170  template <typename F, typename... Rest>
171  auto push(F &&f, Rest &&... rest) -> std::future<decltype(f(0, rest...))> {
172  auto pck =
173  std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
174  std::bind(std::forward<F>(f), std::placeholders::_1,
175  std::forward<Rest>(rest)...));
176 
177  auto _f = new std::function<void(int id)>([pck](int id) { (*pck)(id); });
178  this->q.push(_f);
179 
180  std::unique_lock<std::mutex> lock(this->mutex);
181  this->cv.notify_one();
182 
183  return pck->get_future();
184  }
185 
186  // run the user's function that excepts argument int - id of the running
187  // thread. returned value is templatized
188  // operator returns std::future, where the user can get the result and rethrow
189  // the catched exceptins
190  template <typename F>
191  auto push(F &&f) -> std::future<decltype(f(0))> {
192  auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(
193  std::forward<F>(f));
194 
195  auto _f = new std::function<void(int id)>([pck](int id) { (*pck)(id); });
196  this->q.push(_f);
197 
198  std::unique_lock<std::mutex> lock(this->mutex);
199  this->cv.notify_one();
200 
201  return pck->get_future();
202  }
203 
204  private:
205  // deleted
206  thread_pool(const thread_pool &); // = delete;
207  thread_pool(thread_pool &&); // = delete;
208  thread_pool &operator=(const thread_pool &); // = delete;
209  thread_pool &operator=(thread_pool &&); // = delete;
210 
211  void set_thread(int i) {
212  std::shared_ptr<std::atomic<bool>> flag(
213  this->flags[i]); // a copy of the shared ptr to the flag
214  auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() {
215  std::atomic<bool> &_flag = *flag;
216  std::function<void(int id)> *_f;
217  bool isPop = this->q.pop(_f);
218  while (true) {
219  while (isPop) { // if there is anything in the queue
220  std::unique_ptr<std::function<void(int id)>> func(
221  _f); // at return, delete the function even if an exception
222  // occurred
223  (*_f)(i);
224 
225  if (_flag)
226  return; // the thread is wanted to stop, return even if the queue
227  // is not empty yet
228  else
229  isPop = this->q.pop(_f);
230  }
231 
232  // the queue is empty here, wait for the next command
233  std::unique_lock<std::mutex> lock(this->mutex);
234  ++this->nWaiting;
235  this->cv.wait(lock, [this, &_f, &isPop, &_flag]() {
236  isPop = this->q.pop(_f);
237  return isPop || this->isDone || _flag;
238  });
239  --this->nWaiting;
240 
241  if (!isPop)
242  return; // if the queue is empty and this->isDone == true or *flag
243  // then return
244  }
245  };
246  this->threads[i].reset(
247  new std::thread(f)); // compiler may not support std::make_unique()
248  }
249 
250  void init() {
251  this->nWaiting = 0;
252  this->isStop = false;
253  this->isDone = false;
254  }
255 
256  std::vector<std::unique_ptr<std::thread>> threads;
257  std::vector<std::shared_ptr<std::atomic<bool>>> flags;
258  mutable boost::lockfree::queue<std::function<void(int id)> *> q;
259  std::atomic<bool> isDone;
260  std::atomic<bool> isStop;
261  std::atomic<int> nWaiting; // how many threads are waiting
262 
263  std::mutex mutex;
264  std::condition_variable cv;
265 };
266 } // namespace ctpl
267 
268 #endif
#define _ctplThreadPoolLength_
Definition: ctpl.h:51
int n_idle()
Definition: ctpl.h:83
void(* func)(void *)
Definition: routine_context.h:41
void start(int nThreads)
Definition: ctpl.h:74
Definition: ctpl.h:59
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
Definition: ctpl.h:171
void resize(int nThreads)
Definition: ctpl.h:90
void clear_queue()
Definition: ctpl.h:121
thread_pool(int nThreads, int queueSize=_ctplThreadPoolLength_)
Definition: ctpl.h:64
std::function< void(int)> pop()
Definition: ctpl.h:127
thread_pool()
Definition: ctpl.h:63
std::thread & get_thread(int i)
Definition: ctpl.h:84
void stop(bool isWait=false)
Definition: ctpl.h:142
int size()
Definition: ctpl.h:80
Definition: ctpl.h:61
auto push(F &&f) -> std::future< decltype(f(0))>
Definition: ctpl.h:191
~thread_pool()
Definition: ctpl.h:71