Apollo  6.0
Open source self driving car software
bounded_queue.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_BASE_BOUNDED_QUEUE_H_
18 #define CYBER_BASE_BOUNDED_QUEUE_H_
19 
20 #include <unistd.h>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <cstdint>
25 #include <cstdlib>
26 #include <memory>
27 #include <utility>
28 
29 #include "cyber/base/macros.h"
31 
32 namespace apollo {
33 namespace cyber {
34 namespace base {
35 
36 template <typename T>
37 class BoundedQueue {
38  public:
39  using value_type = T;
40  using size_type = uint64_t;
41 
42  public:
44  BoundedQueue& operator=(const BoundedQueue& other) = delete;
45  BoundedQueue(const BoundedQueue& other) = delete;
46  ~BoundedQueue();
47  bool Init(uint64_t size);
48  bool Init(uint64_t size, WaitStrategy* strategy);
49  bool Enqueue(const T& element);
50  bool Enqueue(T&& element);
51  bool WaitEnqueue(const T& element);
52  bool WaitEnqueue(T&& element);
53  bool Dequeue(T* element);
54  bool WaitDequeue(T* element);
55  uint64_t Size();
56  bool Empty();
58  void BreakAllWait();
59  uint64_t Head() { return head_.load(); }
60  uint64_t Tail() { return tail_.load(); }
61  uint64_t Commit() { return commit_.load(); }
62 
63  private:
64  uint64_t GetIndex(uint64_t num);
65 
66  alignas(CACHELINE_SIZE) std::atomic<uint64_t> head_ = {0};
67  alignas(CACHELINE_SIZE) std::atomic<uint64_t> tail_ = {1};
68  alignas(CACHELINE_SIZE) std::atomic<uint64_t> commit_ = {1};
69  // alignas(CACHELINE_SIZE) std::atomic<uint64_t> size_ = {0};
70  uint64_t pool_size_ = 0;
71  T* pool_ = nullptr;
72  std::unique_ptr<WaitStrategy> wait_strategy_ = nullptr;
73  volatile bool break_all_wait_ = false;
74 };
75 
76 template <typename T>
78  if (wait_strategy_) {
79  BreakAllWait();
80  }
81  if (pool_) {
82  for (uint64_t i = 0; i < pool_size_; ++i) {
83  pool_[i].~T();
84  }
85  std::free(pool_);
86  }
87 }
88 
89 template <typename T>
90 inline bool BoundedQueue<T>::Init(uint64_t size) {
91  return Init(size, new SleepWaitStrategy());
92 }
93 
94 template <typename T>
95 bool BoundedQueue<T>::Init(uint64_t size, WaitStrategy* strategy) {
96  // Head and tail each occupy a space
97  pool_size_ = size + 2;
98  pool_ = reinterpret_cast<T*>(std::calloc(pool_size_, sizeof(T)));
99  if (pool_ == nullptr) {
100  return false;
101  }
102  for (uint64_t i = 0; i < pool_size_; ++i) {
103  new (&(pool_[i])) T();
104  }
105  wait_strategy_.reset(strategy);
106  return true;
107 }
108 
109 template <typename T>
110 bool BoundedQueue<T>::Enqueue(const T& element) {
111  uint64_t new_tail = 0;
112  uint64_t old_commit = 0;
113  uint64_t old_tail = tail_.load(std::memory_order_acquire);
114  do {
115  new_tail = old_tail + 1;
116  if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) {
117  return false;
118  }
119  } while (!tail_.compare_exchange_weak(old_tail, new_tail,
120  std::memory_order_acq_rel,
121  std::memory_order_relaxed));
122  pool_[GetIndex(old_tail)] = element;
123  do {
124  old_commit = old_tail;
125  } while (cyber_unlikely(!commit_.compare_exchange_weak(
126  old_commit, new_tail, std::memory_order_acq_rel,
127  std::memory_order_relaxed)));
128  wait_strategy_->NotifyOne();
129  return true;
130 }
131 
132 template <typename T>
133 bool BoundedQueue<T>::Enqueue(T&& element) {
134  uint64_t new_tail = 0;
135  uint64_t old_commit = 0;
136  uint64_t old_tail = tail_.load(std::memory_order_acquire);
137  do {
138  new_tail = old_tail + 1;
139  if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) {
140  return false;
141  }
142  } while (!tail_.compare_exchange_weak(old_tail, new_tail,
143  std::memory_order_acq_rel,
144  std::memory_order_relaxed));
145  pool_[GetIndex(old_tail)] = std::move(element);
146  do {
147  old_commit = old_tail;
148  } while (cyber_unlikely(!commit_.compare_exchange_weak(
149  old_commit, new_tail, std::memory_order_acq_rel,
150  std::memory_order_relaxed)));
151  wait_strategy_->NotifyOne();
152  return true;
153 }
154 
155 template <typename T>
156 bool BoundedQueue<T>::Dequeue(T* element) {
157  uint64_t new_head = 0;
158  uint64_t old_head = head_.load(std::memory_order_acquire);
159  do {
160  new_head = old_head + 1;
161  if (new_head == commit_.load(std::memory_order_acquire)) {
162  return false;
163  }
164  *element = pool_[GetIndex(new_head)];
165  } while (!head_.compare_exchange_weak(old_head, new_head,
166  std::memory_order_acq_rel,
167  std::memory_order_relaxed));
168  return true;
169 }
170 
171 template <typename T>
172 bool BoundedQueue<T>::WaitEnqueue(const T& element) {
173  while (!break_all_wait_) {
174  if (Enqueue(element)) {
175  return true;
176  }
177  if (wait_strategy_->EmptyWait()) {
178  continue;
179  }
180  // wait timeout
181  break;
182  }
183 
184  return false;
185 }
186 
187 template <typename T>
188 bool BoundedQueue<T>::WaitEnqueue(T&& element) {
189  while (!break_all_wait_) {
190  if (Enqueue(std::move(element))) {
191  return true;
192  }
193  if (wait_strategy_->EmptyWait()) {
194  continue;
195  }
196  // wait timeout
197  break;
198  }
199 
200  return false;
201 }
202 
203 template <typename T>
205  while (!break_all_wait_) {
206  if (Dequeue(element)) {
207  return true;
208  }
209  if (wait_strategy_->EmptyWait()) {
210  continue;
211  }
212  // wait timeout
213  break;
214  }
215 
216  return false;
217 }
218 
219 template <typename T>
220 inline uint64_t BoundedQueue<T>::Size() {
221  return tail_ - head_ - 1;
222 }
223 
224 template <typename T>
225 inline bool BoundedQueue<T>::Empty() {
226  return Size() == 0;
227 }
228 
229 template <typename T>
230 inline uint64_t BoundedQueue<T>::GetIndex(uint64_t num) {
231  return num - (num / pool_size_) * pool_size_; // faster than %
232 }
233 
234 template <typename T>
236  wait_strategy_.reset(strategy);
237 }
238 
239 template <typename T>
241  break_all_wait_ = true;
242  wait_strategy_->BreakAllWait();
243 }
244 
245 } // namespace base
246 } // namespace cyber
247 } // namespace apollo
248 
249 #endif // CYBER_BASE_BOUNDED_QUEUE_H_
bool Init(uint64_t size)
Definition: bounded_queue.h:90
uint64_t size_type
Definition: bounded_queue.h:40
uint64_t Tail()
Definition: bounded_queue.h:60
uint64_t Head()
Definition: bounded_queue.h:59
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: wait_strategy.h:30
~BoundedQueue()
Definition: bounded_queue.h:77
uint64_t Commit()
Definition: bounded_queue.h:61
Definition: wait_strategy.h:56
#define CACHELINE_SIZE
Definition: macros.h:31
bool Enqueue(const T &element)
Definition: bounded_queue.h:110
BoundedQueue & operator=(const BoundedQueue &other)=delete
Definition: bounded_queue.h:37
void BreakAllWait()
Definition: bounded_queue.h:240
bool WaitEnqueue(const T &element)
Definition: bounded_queue.h:172
uint64_t Size()
Definition: bounded_queue.h:220
void SetWaitStrategy(WaitStrategy *WaitStrategy)
Definition: bounded_queue.h:235
bool WaitDequeue(T *element)
Definition: bounded_queue.h:204
bool Empty()
Definition: bounded_queue.h:225
std::function< void()> value_type
Definition: bounded_queue.h:39
bool Dequeue(T *element)
Definition: bounded_queue.h:156
BoundedQueue()
Definition: bounded_queue.h:43
#define cyber_unlikely(x)
Definition: macros.h:28