Apollo  6.0
Open source self driving car software
prediction_thread_pool.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2019 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
21 #pragma once
22 
23 #include <future>
24 #include <memory>
25 #include <utility>
26 #include <vector>
27 
29 #include "cyber/common/log.h"
30 
31 namespace apollo {
32 namespace prediction {
33 
35  public:
36  BaseThreadPool(int thread_num, int next_thread_pool_level);
37 
38  void Stop();
39 
41 
42  template <typename InputIter, typename F>
43  void ForEach(InputIter begin, InputIter end, F f) {
44  std::vector<std::future<void>> futures;
45  for (auto iter = begin; iter != end; ++iter) {
46  auto& elem = *iter;
47  futures.emplace_back(this->Post([&] { f(elem); }));
48  }
49  for (auto& future : futures) {
50  if (future.valid()) {
51  future.get();
52  } else {
53  AERROR << "Future is invalid.";
54  }
55  }
56  }
57 
58  template <typename FuncType>
59  std::future<typename std::result_of<FuncType()>::type> Post(FuncType&& func) {
60  typedef typename std::result_of<FuncType()>::type ReturnType;
61  typedef typename std::packaged_task<ReturnType()> TaskType;
62  // Post requires that the functions in it are copy-constructible.
63  // We used a shared pointer for the packaged_task,
64  // Since it's only movable and non-copyable
65  std::shared_ptr<TaskType> task =
66  std::make_shared<TaskType>(std::move(func));
67  std::future<ReturnType> returned_future = task->get_future();
68 
69  // Note: variables eg. `task` must be copied here because of the lifetime
70  if (stopped_) {
71  return std::future<ReturnType>();
72  }
73  task_queue_.Enqueue([task]() { (*task)(); });
74  return returned_future;
75  }
76 
77  static std::vector<int> THREAD_POOL_CAPACITY;
78 
79  private:
80  std::vector<std::thread> workers_;
82  std::atomic_bool stopped_;
83 };
84 
85 template <int LEVEL>
87  public:
89  static LevelThreadPool<LEVEL> pool;
90  return &pool;
91  }
92 
93  private:
94  LevelThreadPool() : BaseThreadPool(THREAD_POOL_CAPACITY[LEVEL], LEVEL + 1) {
95  ADEBUG << "Level = " << LEVEL
96  << "; thread pool capacity = " << THREAD_POOL_CAPACITY[LEVEL];
97  }
98 };
99 
101  public:
102  static BaseThreadPool* Instance();
103 
104  static thread_local int s_thread_pool_level;
105 
106  template <typename InputIter, typename F>
107  static void ForEach(InputIter begin, InputIter end, F f) {
108  Instance()->ForEach(begin, end, f);
109  }
110 };
111 
112 } // namespace prediction
113 } // namespace apollo
BaseThreadPool(int thread_num, int next_thread_pool_level)
Definition: prediction_thread_pool.h:86
void(* func)(void *)
Definition: routine_context.h:41
static thread_local int s_thread_pool_level
Definition: prediction_thread_pool.h:104
std::future< typename std::result_of< FuncType()>::type > Post(FuncType &&func)
Definition: prediction_thread_pool.h:59
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
static void ForEach(InputIter begin, InputIter end, F f)
Definition: prediction_thread_pool.h:107
Definition: prediction_thread_pool.h:34
bool Enqueue(const T &element)
Definition: bounded_queue.h:110
#define ADEBUG
Definition: log.h:41
Definition: prediction_thread_pool.h:100
Definition: bounded_queue.h:37
void ForEach(InputIter begin, InputIter end, F f)
Definition: prediction_thread_pool.h:43
static std::vector< int > THREAD_POOL_CAPACITY
Definition: prediction_thread_pool.h:77
static LevelThreadPool * Instance()
Definition: prediction_thread_pool.h:88
#define AERROR
Definition: log.h:44