17 #ifndef CYBER_BASE_BOUNDED_QUEUE_H_ 18 #define CYBER_BASE_BOUNDED_QUEUE_H_ 47 bool Init(uint64_t size);
59 uint64_t
Head() {
return head_.load(); }
60 uint64_t
Tail() {
return tail_.load(); }
61 uint64_t
Commit() {
return commit_.load(); }
64 uint64_t GetIndex(uint64_t num);
70 uint64_t pool_size_ = 0;
72 std::unique_ptr<WaitStrategy> wait_strategy_ =
nullptr;
73 volatile bool break_all_wait_ =
false;
82 for (uint64_t i = 0; i < pool_size_; ++i) {
97 pool_size_ = size + 2;
98 pool_ =
reinterpret_cast<T*
>(std::calloc(pool_size_,
sizeof(T)));
99 if (pool_ ==
nullptr) {
102 for (uint64_t i = 0; i < pool_size_; ++i) {
103 new (&(pool_[i])) T();
105 wait_strategy_.reset(strategy);
109 template <
typename T>
111 uint64_t new_tail = 0;
112 uint64_t old_commit = 0;
113 uint64_t old_tail = tail_.load(std::memory_order_acquire);
115 new_tail = old_tail + 1;
116 if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) {
119 }
while (!tail_.compare_exchange_weak(old_tail, new_tail,
120 std::memory_order_acq_rel,
121 std::memory_order_relaxed));
122 pool_[GetIndex(old_tail)] = element;
124 old_commit = old_tail;
126 old_commit, new_tail, std::memory_order_acq_rel,
127 std::memory_order_relaxed)));
128 wait_strategy_->NotifyOne();
132 template <
typename T>
134 uint64_t new_tail = 0;
135 uint64_t old_commit = 0;
136 uint64_t old_tail = tail_.load(std::memory_order_acquire);
138 new_tail = old_tail + 1;
139 if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) {
142 }
while (!tail_.compare_exchange_weak(old_tail, new_tail,
143 std::memory_order_acq_rel,
144 std::memory_order_relaxed));
145 pool_[GetIndex(old_tail)] = std::move(element);
147 old_commit = old_tail;
149 old_commit, new_tail, std::memory_order_acq_rel,
150 std::memory_order_relaxed)));
151 wait_strategy_->NotifyOne();
155 template <
typename T>
157 uint64_t new_head = 0;
158 uint64_t old_head = head_.load(std::memory_order_acquire);
160 new_head = old_head + 1;
161 if (new_head == commit_.load(std::memory_order_acquire)) {
164 *element = pool_[GetIndex(new_head)];
165 }
while (!head_.compare_exchange_weak(old_head, new_head,
166 std::memory_order_acq_rel,
167 std::memory_order_relaxed));
171 template <
typename T>
173 while (!break_all_wait_) {
177 if (wait_strategy_->EmptyWait()) {
187 template <
typename T>
189 while (!break_all_wait_) {
190 if (
Enqueue(std::move(element))) {
193 if (wait_strategy_->EmptyWait()) {
203 template <
typename T>
205 while (!break_all_wait_) {
209 if (wait_strategy_->EmptyWait()) {
219 template <
typename T>
221 return tail_ - head_ - 1;
224 template <
typename T>
229 template <
typename T>
231 return num - (num / pool_size_) * pool_size_;
234 template <
typename T>
236 wait_strategy_.reset(strategy);
239 template <
typename T>
241 break_all_wait_ =
true;
242 wait_strategy_->BreakAllWait();
249 #endif // CYBER_BASE_BOUNDED_QUEUE_H_ bool Init(uint64_t size)
Definition: bounded_queue.h:90
uint64_t size_type
Definition: bounded_queue.h:40
uint64_t Tail()
Definition: bounded_queue.h:60
uint64_t Head()
Definition: bounded_queue.h:59
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: wait_strategy.h:30
~BoundedQueue()
Definition: bounded_queue.h:77
uint64_t Commit()
Definition: bounded_queue.h:61
Definition: wait_strategy.h:56
#define CACHELINE_SIZE
Definition: macros.h:31
bool Enqueue(const T &element)
Definition: bounded_queue.h:110
BoundedQueue & operator=(const BoundedQueue &other)=delete
Definition: bounded_queue.h:37
void BreakAllWait()
Definition: bounded_queue.h:240
bool WaitEnqueue(const T &element)
Definition: bounded_queue.h:172
uint64_t Size()
Definition: bounded_queue.h:220
void SetWaitStrategy(WaitStrategy *WaitStrategy)
Definition: bounded_queue.h:235
bool WaitDequeue(T *element)
Definition: bounded_queue.h:204
bool Empty()
Definition: bounded_queue.h:225
std::function< void()> value_type
Definition: bounded_queue.h:39
bool Dequeue(T *element)
Definition: bounded_queue.h:156
BoundedQueue()
Definition: bounded_queue.h:43
#define cyber_unlikely(x)
Definition: macros.h:28