17 #ifndef CYBER_BLOCKER_BLOCKER_H_ 18 #define CYBER_BLOCKER_BLOCKER_H_ 26 #include <unordered_map> 37 virtual void Reset() = 0;
43 virtual bool Unsubscribe(
const std::string& callback_id) = 0;
71 using Callback = std::function<void(const MessagePtr&)>;
72 using CallbackMap = std::unordered_map<std::string, Callback>;
73 using Iterator =
typename std::list<std::shared_ptr<T>>::const_iterator;
87 bool Subscribe(
const std::string& callback_id,
const Callback& callback);
88 bool Unsubscribe(
const std::string& callback_id)
override;
93 const MessagePtr GetLatestPublishedPtr()
const;
103 void Reset()
override;
110 mutable std::mutex msg_mutex_;
113 mutable std::mutex cb_mutex_;
118 template <
typename T>
121 template <
typename T>
123 published_msg_queue_.clear();
124 observed_msg_queue_.clear();
125 published_callbacks_.clear();
128 template <
typename T>
130 Publish(std::make_shared<MessageType>(msg));
133 template <
typename T>
139 template <
typename T>
142 std::lock_guard<std::mutex> lock(msg_mutex_);
143 observed_msg_queue_.clear();
144 published_msg_queue_.clear();
147 std::lock_guard<std::mutex> lock(cb_mutex_);
148 published_callbacks_.clear();
152 template <
typename T>
154 std::lock_guard<std::mutex> lock(msg_mutex_);
155 observed_msg_queue_.clear();
158 template <
typename T>
160 std::lock_guard<std::mutex> lock(msg_mutex_);
161 published_msg_queue_.clear();
164 template <
typename T>
166 std::lock_guard<std::mutex> lock(msg_mutex_);
167 observed_msg_queue_ = published_msg_queue_;
170 template <
typename T>
172 std::lock_guard<std::mutex> lock(msg_mutex_);
173 return observed_msg_queue_.empty();
176 template <
typename T>
178 std::lock_guard<std::mutex> lock(msg_mutex_);
179 return published_msg_queue_.empty();
182 template <
typename T>
185 std::lock_guard<std::mutex> lock(cb_mutex_);
186 if (published_callbacks_.find(callback_id) != published_callbacks_.end()) {
189 published_callbacks_[callback_id] = callback;
193 template <
typename T>
195 std::lock_guard<std::mutex> lock(cb_mutex_);
196 return published_callbacks_.erase(callback_id) != 0;
199 template <
typename T>
201 std::lock_guard<std::mutex> lock(msg_mutex_);
202 if (observed_msg_queue_.empty()) {
205 return *observed_msg_queue_.front();
208 template <
typename T>
210 std::lock_guard<std::mutex> lock(msg_mutex_);
211 if (observed_msg_queue_.empty()) {
214 return observed_msg_queue_.front();
217 template <
typename T>
219 std::lock_guard<std::mutex> lock(msg_mutex_);
220 if (observed_msg_queue_.empty()) {
223 return observed_msg_queue_.back();
226 template <
typename T>
228 std::lock_guard<std::mutex> lock(msg_mutex_);
229 if (published_msg_queue_.empty()) {
232 return published_msg_queue_.front();
235 template <
typename T>
237 return observed_msg_queue_.begin();
240 template <
typename T>
242 return observed_msg_queue_.end();
245 template <
typename T>
250 template <
typename T>
252 std::lock_guard<std::mutex> lock(msg_mutex_);
254 while (published_msg_queue_.size() >
capacity) {
255 published_msg_queue_.pop_back();
259 template <
typename T>
264 template <
typename T>
269 std::lock_guard<std::mutex> lock(msg_mutex_);
270 published_msg_queue_.push_front(msg);
271 while (published_msg_queue_.size() > attr_.
capacity) {
272 published_msg_queue_.pop_back();
276 template <
typename T>
278 std::lock_guard<std::mutex> lock(cb_mutex_);
279 for (
const auto& item : published_callbacks_) {
288 #endif // CYBER_BLOCKER_BLOCKER_H_ std::shared_ptr< T > MessagePtr
Definition: blocker.h:69
const std::string & channel_name() const override
Definition: blocker.h:260
void ClearPublished() override
Definition: blocker.h:159
Blocker(const BlockerAttr &attr)
Definition: blocker.h:119
virtual const std::string & channel_name() const =0
virtual ~Blocker()
Definition: blocker.h:122
virtual void ClearObserved()=0
virtual void set_capacity(size_t capacity)=0
std::function< void(const MessagePtr &)> Callback
Definition: blocker.h:71
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::string channel_name
Definition: blocker.h:60
std::unordered_map< std::string, Callback > CallbackMap
Definition: blocker.h:72
bool IsPublishedEmpty() const override
Definition: blocker.h:177
const MessagePtr GetLatestPublishedPtr() const
Definition: blocker.h:227
virtual size_t capacity() const =0
typename std::list< std::shared_ptr< T > >::const_iterator Iterator
Definition: blocker.h:73
BlockerAttr()
Definition: blocker.h:51
bool Unsubscribe(const std::string &callback_id) override
Definition: blocker.h:194
std::list< MessagePtr > MessageQueue
Definition: blocker.h:70
void set_capacity(size_t capacity) override
Definition: blocker.h:251
BlockerAttr(size_t cap, const std::string &channel)
Definition: blocker.h:54
size_t capacity() const override
Definition: blocker.h:246
bool Subscribe(const std::string &callback_id, const Callback &callback)
Definition: blocker.h:183
Iterator ObservedBegin() const
Definition: blocker.h:236
bool IsObservedEmpty() const override
Definition: blocker.h:171
const MessagePtr GetLatestObservedPtr() const
Definition: blocker.h:209
BlockerAttr(const std::string &channel)
Definition: blocker.h:52
virtual void ClearPublished()=0
BlockerAttr(const BlockerAttr &attr)
Definition: blocker.h:56
virtual bool Unsubscribe(const std::string &callback_id)=0
T MessageType
Definition: blocker.h:68
void Observe() override
Definition: blocker.h:165
const MessageType & GetLatestObserved() const
Definition: blocker.h:200
void ClearObserved() override
Definition: blocker.h:153
void Publish(const MessageType &msg)
Definition: blocker.h:129
const MessagePtr GetOldestObservedPtr() const
Definition: blocker.h:218
virtual bool IsPublishedEmpty() const =0
Iterator ObservedEnd() const
Definition: blocker.h:241
size_t capacity
Definition: blocker.h:59
virtual ~BlockerBase()=default
Definition: blocker_manager.h:31
virtual bool IsObservedEmpty() const =0