Apollo  6.0
Open source self driving car software
thread_pool.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_BASE_THREAD_POOL_H_
18 #define CYBER_BASE_THREAD_POOL_H_
19 
20 #include <atomic>
21 #include <functional>
22 #include <future>
23 #include <memory>
24 #include <queue>
25 #include <stdexcept>
26 #include <thread>
27 #include <utility>
28 #include <vector>
29 
31 
32 namespace apollo {
33 namespace cyber {
34 namespace base {
35 
36 class ThreadPool {
37  public:
38  explicit ThreadPool(std::size_t thread_num, std::size_t max_task_num = 1000);
39 
40  template <typename F, typename... Args>
41  auto Enqueue(F&& f, Args&&... args)
42  -> std::future<typename std::result_of<F(Args...)>::type>;
43 
44  ~ThreadPool();
45 
46  private:
47  std::vector<std::thread> workers_;
49  std::atomic_bool stop_;
50 };
51 
52 inline ThreadPool::ThreadPool(std::size_t threads, std::size_t max_task_num)
53  : stop_(false) {
54  if (!task_queue_.Init(max_task_num, new BlockWaitStrategy())) {
55  throw std::runtime_error("Task queue init failed.");
56  }
57  workers_.reserve(threads);
58  for (size_t i = 0; i < threads; ++i) {
59  workers_.emplace_back([this] {
60  while (!stop_) {
61  std::function<void()> task;
62  if (task_queue_.WaitDequeue(&task)) {
63  task();
64  }
65  }
66  });
67  }
68 }
69 
70 // before using the return value, you should check value.valid()
71 template <typename F, typename... Args>
72 auto ThreadPool::Enqueue(F&& f, Args&&... args)
73  -> std::future<typename std::result_of<F(Args...)>::type> {
74  using return_type = typename std::result_of<F(Args...)>::type;
75 
76  auto task = std::make_shared<std::packaged_task<return_type()>>(
77  std::bind(std::forward<F>(f), std::forward<Args>(args)...));
78 
79  std::future<return_type> res = task->get_future();
80 
81  // don't allow enqueueing after stopping the pool
82  if (stop_) {
83  return std::future<return_type>();
84  }
85  task_queue_.Enqueue([task]() { (*task)(); });
86  return res;
87 };
88 
89 // the destructor joins all threads
91  if (stop_.exchange(true)) {
92  return;
93  }
94  task_queue_.BreakAllWait();
95  for (std::thread& worker : workers_) {
96  worker.join();
97  }
98 }
99 
100 } // namespace base
101 } // namespace cyber
102 } // namespace apollo
103 
104 #endif // CYBER_BASE_THREAD_POOL_H_
bool Init(uint64_t size)
Definition: bounded_queue.h:90
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: thread_pool.h:36
bool Enqueue(const T &element)
Definition: bounded_queue.h:110
Definition: bounded_queue.h:37
void BreakAllWait()
Definition: bounded_queue.h:240
auto Enqueue(F &&f, Args &&... args) -> std::future< typename std::result_of< F(Args...)>::type >
Definition: thread_pool.h:72
ThreadPool(std::size_t thread_num, std::size_t max_task_num=1000)
Definition: thread_pool.h:52
bool WaitDequeue(T *element)
Definition: bounded_queue.h:204
~ThreadPool()
Definition: thread_pool.h:90
Definition: wait_strategy.h:38