Apollo  6.0
Open source self driving car software
unbounded_queue.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_BASE_UNBOUNDED_QUEUE_H_
18 #define CYBER_BASE_UNBOUNDED_QUEUE_H_
19 
20 #include <unistd.h>
21 
22 #include <atomic>
23 #include <cstdint>
24 #include <memory>
25 
26 namespace apollo {
27 namespace cyber {
28 namespace base {
29 
30 template <typename T>
32  public:
33  UnboundedQueue() { Reset(); }
34  UnboundedQueue& operator=(const UnboundedQueue& other) = delete;
35  UnboundedQueue(const UnboundedQueue& other) = delete;
36 
37  ~UnboundedQueue() { Destroy(); }
38 
39  void Clear() {
40  Destroy();
41  Reset();
42  }
43 
44  void Enqueue(const T& element) {
45  auto node = new Node();
46  node->data = element;
47  Node* old_tail = tail_.load();
48 
49  while (true) {
50  if (tail_.compare_exchange_strong(old_tail, node)) {
51  old_tail->next = node;
52  old_tail->release();
53  size_.fetch_add(1);
54  break;
55  }
56  }
57  }
58 
59  bool Dequeue(T* element) {
60  Node* old_head = head_.load();
61  Node* head_next = nullptr;
62  do {
63  head_next = old_head->next;
64 
65  if (head_next == nullptr) {
66  return false;
67  }
68  } while (!head_.compare_exchange_strong(old_head, head_next));
69  *element = head_next->data;
70  size_.fetch_sub(1);
71  old_head->release();
72  return true;
73  }
74 
75  size_t Size() { return size_.load(); }
76 
77  bool Empty() { return size_.load() == 0; }
78 
79  private:
80  struct Node {
81  T data;
82  std::atomic<uint32_t> ref_count;
83  Node* next = nullptr;
84  Node() { ref_count.store(2); }
85  void release() {
86  ref_count.fetch_sub(1);
87  if (ref_count.load() == 0) {
88  delete this;
89  }
90  }
91  };
92 
93  void Reset() {
94  auto node = new Node();
95  head_.store(node);
96  tail_.store(node);
97  size_.store(0);
98  }
99 
100  void Destroy() {
101  auto ite = head_.load();
102  Node* tmp = nullptr;
103  while (ite != nullptr) {
104  tmp = ite->next;
105  delete ite;
106  ite = tmp;
107  }
108  }
109 
110  std::atomic<Node*> head_;
111  std::atomic<Node*> tail_;
112  std::atomic<size_t> size_;
113 };
114 
115 } // namespace base
116 } // namespace cyber
117 } // namespace apollo
118 
119 #endif // CYBER_BASE_UNBOUNDED_QUEUE_H_
size_t Size()
Definition: unbounded_queue.h:75
~UnboundedQueue()
Definition: unbounded_queue.h:37
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
UnboundedQueue()
Definition: unbounded_queue.h:33
bool Empty()
Definition: unbounded_queue.h:77
void Clear()
Definition: unbounded_queue.h:39
UnboundedQueue & operator=(const UnboundedQueue &other)=delete
bool Dequeue(T *element)
Definition: unbounded_queue.h:59
void Enqueue(const T &element)
Definition: unbounded_queue.h:44
Definition: unbounded_queue.h:31