Apollo  6.0
Open source self driving car software
concurrent_object_pool.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 #pragma once
17 
18 #include <deque>
19 #include <list>
20 #include <memory>
21 #include <mutex>
22 #include <queue>
23 #include <vector>
24 
26 
27 #define PERCEPTION_BASE_DISABLE_POOL
28 namespace apollo {
29 namespace perception {
30 namespace base {
31 
32 static const size_t kPoolDefaultExtendNum = 10;
33 static const size_t kPoolDefaultSize = 100;
34 
35 // @brief default initializer used in concurrent object pool
36 template <class T>
38  void operator()(T* t) const {}
39 };
40 // @brief concurrent object pool with dynamic size
41 template <class ObjectType, size_t N = kPoolDefaultSize,
42  class Initializer = ObjectPoolDefaultInitializer<ObjectType>>
43 class ConcurrentObjectPool : public BaseObjectPool<ObjectType> {
44  public:
45  // using ObjectTypePtr = typename BaseObjectPool<ObjectType>::ObjectTypePtr;
47  // @brief Only allow accessing from global instance
49  static ConcurrentObjectPool pool(N);
50  return pool;
51  }
52  // @brief overrided function to get object smart pointer
53  std::shared_ptr<ObjectType> Get() override {
54 // TODO(All): remove conditional build
55 #ifndef PERCEPTION_BASE_DISABLE_POOL
56  ObjectType* ptr = nullptr;
57  {
58  std::lock_guard<std::mutex> lock(mutex_);
59  if (queue_.empty()) {
60  Add(1 + kPoolDefaultExtendNum);
61  }
62  ptr = queue_.front();
63  queue_.pop();
64  }
65  // For efficiency consideration, initialization should be invoked
66  // after releasing the mutex
67  kInitializer(ptr);
68  return std::shared_ptr<ObjectType>(ptr, [&](ObjectType* obj_ptr) {
69  std::lock_guard<std::mutex> lock(mutex_);
70  queue_.push(obj_ptr);
71  });
72 #else
73  return std::shared_ptr<ObjectType>(new ObjectType);
74 #endif
75  }
76  // @brief overrided function to get batch of smart pointers
77  // @params[IN] num: batch number
78  // @params[OUT] data: vector container to store the pointers
79  void BatchGet(size_t num,
80  std::vector<std::shared_ptr<ObjectType>>* data) override {
81 #ifndef PERCEPTION_BASE_DISABLE_POOL
82  std::vector<ObjectType*> buffer(num, nullptr);
83  {
84  std::lock_guard<std::mutex> lock(mutex_);
85  if (queue_.size() < num) {
86  Add(num - queue_.size() + kPoolDefaultExtendNum);
87  }
88  for (size_t i = 0; i < num; ++i) {
89  buffer[i] = queue_.front();
90  queue_.pop();
91  }
92  }
93  // For efficiency consideration, initialization should be invoked
94  // after releasing the mutex
95  for (size_t i = 0; i < num; ++i) {
96  kInitializer(buffer[i]);
97  data->emplace_back(
98  std::shared_ptr<ObjectType>(buffer[i], [&](ObjectType* obj_ptr) {
99  std::lock_guard<std::mutex> lock(mutex_);
100  queue_.push(obj_ptr);
101  }));
102  }
103 #else
104  for (size_t i = 0; i < num; ++i) {
105  data->emplace_back(new ObjectType);
106  }
107 #endif
108  }
109  // @brief overrided function to get batch of smart pointers
110  // @params[IN] num: batch number
111  // @params[IN] is_front: indicating insert to front or back of the list
112  // @params[OUT] data: list container to store the pointers
113  void BatchGet(size_t num, bool is_front,
114  std::list<std::shared_ptr<ObjectType>>* data) override {
115 #ifndef PERCEPTION_BASE_DISABLE_POOL
116  std::vector<ObjectType*> buffer(num, nullptr);
117  {
118  std::lock_guard<std::mutex> lock(mutex_);
119  if (queue_.size() < num) {
120  Add(num - queue_.size() + kPoolDefaultExtendNum);
121  }
122  for (size_t i = 0; i < num; ++i) {
123  buffer[i] = queue_.front();
124  queue_.pop();
125  }
126  }
127  // For efficiency consideration, initialization should be invoked
128  // after releasing the mutex
129  for (size_t i = 0; i < num; ++i) {
130  kInitializer(buffer[i]);
131  is_front ? data->emplace_front(std::shared_ptr<ObjectType>(
132  buffer[i],
133  [&](ObjectType* obj_ptr) {
134  std::lock_guard<std::mutex> lock(mutex_);
135  queue_.push(obj_ptr);
136  }))
137  : data->emplace_back(std::shared_ptr<ObjectType>(
138  buffer[i], [&](ObjectType* obj_ptr) {
139  std::lock_guard<std::mutex> lock(mutex_);
140  queue_.push(obj_ptr);
141  }));
142  }
143 #else
144  for (size_t i = 0; i < num; ++i) {
145  is_front ? data->emplace_front(new ObjectType)
146  : data->emplace_back(new ObjectType);
147  }
148 #endif
149  }
150  // @brief overrided function to get batch of smart pointers
151  // @params[IN] num: batch number
152  // @params[IN] is_front: indicating insert to front or back of the deque
153  // @params[OUT] data: deque container to store the pointers
154  void BatchGet(size_t num, bool is_front,
155  std::deque<std::shared_ptr<ObjectType>>* data) override {
156 #ifndef PERCEPTION_BASE_DISABLE_POOL
157  std::vector<ObjectType*> buffer(num, nullptr);
158  {
159  std::lock_guard<std::mutex> lock(mutex_);
160  if (queue_.size() < num) {
161  Add(num - queue_.size() + kPoolDefaultExtendNum);
162  }
163  for (size_t i = 0; i < num; ++i) {
164  buffer[i] = queue_.front();
165  queue_.pop();
166  }
167  }
168  for (size_t i = 0; i < num; ++i) {
169  kInitializer(buffer[i]);
170  is_front ? data->emplace_front(std::shared_ptr<ObjectType>(
171  buffer[i],
172  [&](ObjectType* obj_ptr) {
173  std::lock_guard<std::mutex> lock(mutex_);
174  queue_.push(obj_ptr);
175  }))
176  : data->emplace_back(std::shared_ptr<ObjectType>(
177  buffer[i], [&](ObjectType* obj_ptr) {
178  std::lock_guard<std::mutex> lock(mutex_);
179  queue_.push(obj_ptr);
180  }));
181  }
182 #else
183  for (size_t i = 0; i < num; ++i) {
184  is_front ? data->emplace_front(new ObjectType)
185  : data->emplace_back(new ObjectType);
186  }
187 #endif
188  }
189 #ifndef PERCEPTION_BASE_DISABLE_POOL
190  // @brief overrided function to set capacity
191  void set_capacity(size_t capacity) override {
192  std::lock_guard<std::mutex> lock(mutex_);
193  if (capacity_ < capacity) {
194  Add(capacity - capacity_);
195  }
196  }
197  // @brief get remained object number
198  size_t RemainedNum() override { return queue_.size(); }
199 #endif
200  // @brief destructor to release the cached memory
202  if (cache_) {
203  delete[] cache_;
204  cache_ = nullptr;
205  }
206  for (auto& ptr : extended_cache_) {
207  delete ptr;
208  }
209  extended_cache_.clear();
210  }
211 
212  protected:
213 // @brief add num objects, should add lock before invoke this function
214 #ifndef PERCEPTION_BASE_DISABLE_POOL
215  void Add(size_t num) {
216  for (size_t i = 0; i < num; ++i) {
217  ObjectType* ptr = new ObjectType;
218  extended_cache_.push_back(ptr);
219  queue_.push(ptr);
220  }
221  capacity_ = kDefaultCacheSize + extended_cache_.size();
222  }
223 #endif
224  // @brief default constructor
225  explicit ConcurrentObjectPool(const size_t default_size)
226  : kDefaultCacheSize(default_size) {
227 #ifndef PERCEPTION_BASE_DISABLE_POOL
228  cache_ = new ObjectType[kDefaultCacheSize];
229  for (size_t i = 0; i < kDefaultCacheSize; ++i) {
230  queue_.push(&cache_[i]);
231  }
232  capacity_ = kDefaultCacheSize;
233 #endif
234  }
235  std::mutex mutex_;
236  std::queue<ObjectType*> queue_;
237  // @brief point to a continuous memory of default pool size
238  ObjectType* cache_ = nullptr;
239  const size_t kDefaultCacheSize;
240  // @brief list to store extended memory, not as efficient
241  std::list<ObjectType*> extended_cache_;
242  static const Initializer kInitializer;
243 };
244 
245 } // namespace base
246 } // namespace perception
247 } // namespace apollo
~ConcurrentObjectPool() override
Definition: concurrent_object_pool.h:201
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
ConcurrentObjectPool(const size_t default_size)
Definition: concurrent_object_pool.h:225
Definition: concurrent_object_pool.h:37
static ConcurrentObjectPool & Instance()
Definition: concurrent_object_pool.h:48
void BatchGet(size_t num, bool is_front, std::deque< std::shared_ptr< ObjectType >> *data) override
Definition: concurrent_object_pool.h:154
static const Initializer kInitializer
Definition: concurrent_object_pool.h:242
std::queue< ObjectType * > queue_
Definition: concurrent_object_pool.h:236
std::mutex mutex_
Definition: concurrent_object_pool.h:235
const size_t kDefaultCacheSize
Definition: concurrent_object_pool.h:239
Definition: concurrent_object_pool.h:43
void BatchGet(size_t num, bool is_front, std::list< std::shared_ptr< ObjectType >> *data) override
Definition: concurrent_object_pool.h:113
Definition: object_pool.h:28
std::list< ObjectType * > extended_cache_
Definition: concurrent_object_pool.h:241
void operator()(T *t) const
Definition: concurrent_object_pool.h:38
std::shared_ptr< ObjectType > Get() override
Definition: concurrent_object_pool.h:53
ObjectType
Definition: object_types.h:26
void BatchGet(size_t num, std::vector< std::shared_ptr< ObjectType >> *data) override
Definition: concurrent_object_pool.h:79