Apollo  6.0
Open source self driving car software
blocker.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_BLOCKER_BLOCKER_H_
18 #define CYBER_BLOCKER_BLOCKER_H_
19 
20 #include <cstddef>
21 #include <functional>
22 #include <list>
23 #include <memory>
24 #include <mutex>
25 #include <string>
26 #include <unordered_map>
27 #include <vector>
28 
29 namespace apollo {
30 namespace cyber {
31 namespace blocker {
32 
33 class BlockerBase {
34  public:
35  virtual ~BlockerBase() = default;
36 
37  virtual void Reset() = 0;
38  virtual void ClearObserved() = 0;
39  virtual void ClearPublished() = 0;
40  virtual void Observe() = 0;
41  virtual bool IsObservedEmpty() const = 0;
42  virtual bool IsPublishedEmpty() const = 0;
43  virtual bool Unsubscribe(const std::string& callback_id) = 0;
44 
45  virtual size_t capacity() const = 0;
46  virtual void set_capacity(size_t capacity) = 0;
47  virtual const std::string& channel_name() const = 0;
48 };
49 
50 struct BlockerAttr {
52  explicit BlockerAttr(const std::string& channel)
53  : capacity(10), channel_name(channel) {}
54  BlockerAttr(size_t cap, const std::string& channel)
55  : capacity(cap), channel_name(channel) {}
56  BlockerAttr(const BlockerAttr& attr)
57  : capacity(attr.capacity), channel_name(attr.channel_name) {}
58 
59  size_t capacity;
60  std::string channel_name;
61 };
62 
63 template <typename T>
64 class Blocker : public BlockerBase {
65  friend class BlockerManager;
66 
67  public:
68  using MessageType = T;
69  using MessagePtr = std::shared_ptr<T>;
70  using MessageQueue = std::list<MessagePtr>;
71  using Callback = std::function<void(const MessagePtr&)>;
72  using CallbackMap = std::unordered_map<std::string, Callback>;
73  using Iterator = typename std::list<std::shared_ptr<T>>::const_iterator;
74 
75  explicit Blocker(const BlockerAttr& attr);
76  virtual ~Blocker();
77 
78  void Publish(const MessageType& msg);
79  void Publish(const MessagePtr& msg);
80 
81  void ClearObserved() override;
82  void ClearPublished() override;
83  void Observe() override;
84  bool IsObservedEmpty() const override;
85  bool IsPublishedEmpty() const override;
86 
87  bool Subscribe(const std::string& callback_id, const Callback& callback);
88  bool Unsubscribe(const std::string& callback_id) override;
89 
90  const MessageType& GetLatestObserved() const;
91  const MessagePtr GetLatestObservedPtr() const;
92  const MessagePtr GetOldestObservedPtr() const;
93  const MessagePtr GetLatestPublishedPtr() const;
94 
95  Iterator ObservedBegin() const;
96  Iterator ObservedEnd() const;
97 
98  size_t capacity() const override;
99  void set_capacity(size_t capacity) override;
100  const std::string& channel_name() const override;
101 
102  private:
103  void Reset() override;
104  void Enqueue(const MessagePtr& msg);
105  void Notify(const MessagePtr& msg);
106 
107  BlockerAttr attr_;
108  MessageQueue observed_msg_queue_;
109  MessageQueue published_msg_queue_;
110  mutable std::mutex msg_mutex_;
111 
112  CallbackMap published_callbacks_;
113  mutable std::mutex cb_mutex_;
114 
115  MessageType dummy_msg_;
116 };
117 
118 template <typename T>
119 Blocker<T>::Blocker(const BlockerAttr& attr) : attr_(attr), dummy_msg_() {}
120 
121 template <typename T>
123  published_msg_queue_.clear();
124  observed_msg_queue_.clear();
125  published_callbacks_.clear();
126 }
127 
128 template <typename T>
130  Publish(std::make_shared<MessageType>(msg));
131 }
132 
133 template <typename T>
134 void Blocker<T>::Publish(const MessagePtr& msg) {
135  Enqueue(msg);
136  Notify(msg);
137 }
138 
139 template <typename T>
140 void Blocker<T>::Reset() {
141  {
142  std::lock_guard<std::mutex> lock(msg_mutex_);
143  observed_msg_queue_.clear();
144  published_msg_queue_.clear();
145  }
146  {
147  std::lock_guard<std::mutex> lock(cb_mutex_);
148  published_callbacks_.clear();
149  }
150 }
151 
152 template <typename T>
154  std::lock_guard<std::mutex> lock(msg_mutex_);
155  observed_msg_queue_.clear();
156 }
157 
158 template <typename T>
160  std::lock_guard<std::mutex> lock(msg_mutex_);
161  published_msg_queue_.clear();
162 }
163 
164 template <typename T>
166  std::lock_guard<std::mutex> lock(msg_mutex_);
167  observed_msg_queue_ = published_msg_queue_;
168 }
169 
170 template <typename T>
172  std::lock_guard<std::mutex> lock(msg_mutex_);
173  return observed_msg_queue_.empty();
174 }
175 
176 template <typename T>
178  std::lock_guard<std::mutex> lock(msg_mutex_);
179  return published_msg_queue_.empty();
180 }
181 
182 template <typename T>
183 bool Blocker<T>::Subscribe(const std::string& callback_id,
184  const Callback& callback) {
185  std::lock_guard<std::mutex> lock(cb_mutex_);
186  if (published_callbacks_.find(callback_id) != published_callbacks_.end()) {
187  return false;
188  }
189  published_callbacks_[callback_id] = callback;
190  return true;
191 }
192 
193 template <typename T>
194 bool Blocker<T>::Unsubscribe(const std::string& callback_id) {
195  std::lock_guard<std::mutex> lock(cb_mutex_);
196  return published_callbacks_.erase(callback_id) != 0;
197 }
198 
199 template <typename T>
201  std::lock_guard<std::mutex> lock(msg_mutex_);
202  if (observed_msg_queue_.empty()) {
203  return dummy_msg_;
204  }
205  return *observed_msg_queue_.front();
206 }
207 
208 template <typename T>
210  std::lock_guard<std::mutex> lock(msg_mutex_);
211  if (observed_msg_queue_.empty()) {
212  return nullptr;
213  }
214  return observed_msg_queue_.front();
215 }
216 
217 template <typename T>
219  std::lock_guard<std::mutex> lock(msg_mutex_);
220  if (observed_msg_queue_.empty()) {
221  return nullptr;
222  }
223  return observed_msg_queue_.back();
224 }
225 
226 template <typename T>
228  std::lock_guard<std::mutex> lock(msg_mutex_);
229  if (published_msg_queue_.empty()) {
230  return nullptr;
231  }
232  return published_msg_queue_.front();
233 }
234 
235 template <typename T>
237  return observed_msg_queue_.begin();
238 }
239 
240 template <typename T>
242  return observed_msg_queue_.end();
243 }
244 
245 template <typename T>
246 size_t Blocker<T>::capacity() const {
247  return attr_.capacity;
248 }
249 
250 template <typename T>
252  std::lock_guard<std::mutex> lock(msg_mutex_);
253  attr_.capacity = capacity;
254  while (published_msg_queue_.size() > capacity) {
255  published_msg_queue_.pop_back();
256  }
257 }
258 
259 template <typename T>
260 const std::string& Blocker<T>::channel_name() const {
261  return attr_.channel_name;
262 }
263 
264 template <typename T>
265 void Blocker<T>::Enqueue(const MessagePtr& msg) {
266  if (attr_.capacity == 0) {
267  return;
268  }
269  std::lock_guard<std::mutex> lock(msg_mutex_);
270  published_msg_queue_.push_front(msg);
271  while (published_msg_queue_.size() > attr_.capacity) {
272  published_msg_queue_.pop_back();
273  }
274 }
275 
276 template <typename T>
277 void Blocker<T>::Notify(const MessagePtr& msg) {
278  std::lock_guard<std::mutex> lock(cb_mutex_);
279  for (const auto& item : published_callbacks_) {
280  item.second(msg);
281  }
282 }
283 
284 } // namespace blocker
285 } // namespace cyber
286 } // namespace apollo
287 
288 #endif // CYBER_BLOCKER_BLOCKER_H_
std::shared_ptr< T > MessagePtr
Definition: blocker.h:69
const std::string & channel_name() const override
Definition: blocker.h:260
void ClearPublished() override
Definition: blocker.h:159
Blocker(const BlockerAttr &attr)
Definition: blocker.h:119
virtual const std::string & channel_name() const =0
virtual ~Blocker()
Definition: blocker.h:122
virtual void set_capacity(size_t capacity)=0
std::function< void(const MessagePtr &)> Callback
Definition: blocker.h:71
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::string channel_name
Definition: blocker.h:60
std::unordered_map< std::string, Callback > CallbackMap
Definition: blocker.h:72
bool IsPublishedEmpty() const override
Definition: blocker.h:177
const MessagePtr GetLatestPublishedPtr() const
Definition: blocker.h:227
virtual size_t capacity() const =0
typename std::list< std::shared_ptr< T > >::const_iterator Iterator
Definition: blocker.h:73
BlockerAttr()
Definition: blocker.h:51
bool Unsubscribe(const std::string &callback_id) override
Definition: blocker.h:194
std::list< MessagePtr > MessageQueue
Definition: blocker.h:70
void set_capacity(size_t capacity) override
Definition: blocker.h:251
BlockerAttr(size_t cap, const std::string &channel)
Definition: blocker.h:54
Definition: blocker.h:50
size_t capacity() const override
Definition: blocker.h:246
bool Subscribe(const std::string &callback_id, const Callback &callback)
Definition: blocker.h:183
Iterator ObservedBegin() const
Definition: blocker.h:236
bool IsObservedEmpty() const override
Definition: blocker.h:171
const MessagePtr GetLatestObservedPtr() const
Definition: blocker.h:209
BlockerAttr(const std::string &channel)
Definition: blocker.h:52
BlockerAttr(const BlockerAttr &attr)
Definition: blocker.h:56
virtual bool Unsubscribe(const std::string &callback_id)=0
T MessageType
Definition: blocker.h:68
Definition: blocker.h:33
void Observe() override
Definition: blocker.h:165
const MessageType & GetLatestObserved() const
Definition: blocker.h:200
void ClearObserved() override
Definition: blocker.h:153
void Publish(const MessageType &msg)
Definition: blocker.h:129
const MessagePtr GetOldestObservedPtr() const
Definition: blocker.h:218
virtual bool IsPublishedEmpty() const =0
Iterator ObservedEnd() const
Definition: blocker.h:241
size_t capacity
Definition: blocker.h:59
Definition: blocker.h:64
Definition: blocker_manager.h:31
virtual bool IsObservedEmpty() const =0