Apollo  6.0
Open source self driving car software
intra_dispatcher.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_TRANSPORT_DISPATCHER_INTRA_DISPATCHER_H_
18 #define CYBER_TRANSPORT_DISPATCHER_INTRA_DISPATCHER_H_
19 
20 #include <iostream>
21 #include <map>
22 #include <memory>
23 #include <string>
24 #include <utility>
25 
28 #include "cyber/common/log.h"
29 #include "cyber/common/macros.h"
33 
34 namespace apollo {
35 namespace cyber {
36 namespace transport {
37 
38 class IntraDispatcher;
40 class ChannelChain;
41 using ChannelChainPtr = std::shared_ptr<ChannelChain>;
42 template <typename MessageT>
43 using MessageListener =
44  std::function<void(const std::shared_ptr<MessageT>&, const MessageInfo&)>;
45 
46 // use a channel chain to wrap specific ListenerHandler.
47 // If the message is MessageT, then we use pointer directly, or we first parse
48 // to a string, and use it to serialise to another message type.
49 class ChannelChain {
50  using BaseHandlersType =
51  std::map<uint64_t, std::map<std::string, ListenerHandlerBasePtr>>;
52 
53  public:
54  template <typename MessageT>
55  bool AddListener(uint64_t self_id, uint64_t channel_id,
56  const std::string& message_type,
57  const MessageListener<MessageT>& listener) {
59  auto ret = GetHandler<MessageT>(channel_id, message_type, &handlers_);
60  auto handler = ret.first;
61  if (handler == nullptr) {
62  AERROR << "get handler failed. channel: "
63  << GlobalData::GetChannelById(channel_id)
64  << ", message type: " << message::GetMessageName<MessageT>();
65  return ret.second;
66  }
67  handler->Connect(self_id, listener);
68  return ret.second;
69  }
70 
71  template <typename MessageT>
72  bool AddListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id,
73  const std::string& message_type,
74  const MessageListener<MessageT>& listener) {
75  WriteLockGuard<base::AtomicRWLock> lg(oppo_rw_lock_);
76  if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
77  oppo_handlers_[oppo_id] = BaseHandlersType();
78  }
79  BaseHandlersType& handlers = oppo_handlers_[oppo_id];
80  auto ret = GetHandler<MessageT>(channel_id, message_type, &handlers);
81  auto handler = ret.first;
82  if (handler == nullptr) {
83  AERROR << "get handler failed. channel: "
84  << GlobalData::GetChannelById(channel_id)
85  << ", message type: " << message_type;
86  return ret.second;
87  }
88  handler->Connect(self_id, oppo_id, listener);
89  return ret.second;
90  }
91 
92  template <typename MessageT>
93  void RemoveListener(uint64_t self_id, uint64_t channel_id,
94  const std::string& message_type) {
96  auto handler = RemoveHandler(channel_id, message_type, &handlers_);
97  if (handler) {
98  handler->Disconnect(self_id);
99  }
100  }
101 
102  template <typename MessageT>
103  void RemoveListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id,
104  const std::string& message_type) {
105  WriteLockGuard<base::AtomicRWLock> lg(oppo_rw_lock_);
106  if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
107  return;
108  }
109  BaseHandlersType& handlers = oppo_handlers_[oppo_id];
110  auto handler = RemoveHandler(channel_id, message_type, &handlers);
111  if (oppo_handlers_[oppo_id].empty()) {
112  oppo_handlers_.erase(oppo_id);
113  }
114  if (handler) {
115  handler->Disconnect(self_id, oppo_id);
116  }
117  }
118 
119  template <typename MessageT>
120  void Run(uint64_t self_id, uint64_t channel_id,
121  const std::string& message_type,
122  const std::shared_ptr<MessageT>& message,
123  const MessageInfo& message_info) {
125  Run(channel_id, message_type, handlers_, message, message_info);
126  }
127 
128  template <typename MessageT>
129  void Run(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id,
130  const std::string& message_type,
131  const std::shared_ptr<MessageT>& message,
132  const MessageInfo& message_info) {
133  ReadLockGuard<base::AtomicRWLock> lg(oppo_rw_lock_);
134  if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
135  return;
136  }
137  BaseHandlersType& handlers = oppo_handlers_[oppo_id];
138  Run(channel_id, message_type, handlers, message, message_info);
139  }
140 
141  private:
142  // NOTE: lock hold
143  template <typename MessageT>
144  std::pair<std::shared_ptr<ListenerHandler<MessageT>>, bool> GetHandler(
145  uint64_t channel_id, const std::string& message_type,
146  BaseHandlersType* handlers) {
147  std::shared_ptr<ListenerHandler<MessageT>> handler;
148  bool created = false; // if the handler is created
149 
150  if (handlers->find(channel_id) == handlers->end()) {
151  (*handlers)[channel_id] = std::map<std::string, ListenerHandlerBasePtr>();
152  }
153 
154  if ((*handlers)[channel_id].find(message_type) ==
155  (*handlers)[channel_id].end()) {
156  ADEBUG << "Create new ListenerHandler for channel "
157  << GlobalData::GetChannelById(channel_id)
158  << ", message type: " << message_type;
159  handler.reset(new ListenerHandler<MessageT>());
160  (*handlers)[channel_id][message_type] = handler;
161  created = true;
162  } else {
163  ADEBUG << "Find channel " << GlobalData::GetChannelById(channel_id)
164  << "'s ListenerHandler, message type: " << message_type;
165  handler = std::dynamic_pointer_cast<ListenerHandler<MessageT>>(
166  (*handlers)[channel_id][message_type]);
167  }
168 
169  return std::make_pair(handler, created);
170  }
171 
172  // NOTE: Lock hold
173  ListenerHandlerBasePtr RemoveHandler(int64_t channel_id,
174  const std::string message_type,
175  BaseHandlersType* handlers) {
176  ListenerHandlerBasePtr handler_base;
177  if (handlers->find(channel_id) != handlers->end()) {
178  if ((*handlers)[channel_id].find(message_type) !=
179  (*handlers)[channel_id].end()) {
180  handler_base = (*handlers)[channel_id][message_type];
181  ADEBUG << "remove " << GlobalData::GetChannelById(channel_id) << "'s "
182  << message_type << " ListenerHandler";
183  (*handlers)[channel_id].erase(message_type);
184  }
185  if ((*handlers)[channel_id].empty()) {
186  ADEBUG << "remove " << GlobalData::GetChannelById(channel_id)
187  << "'s all ListenerHandler";
188  (*handlers).erase(channel_id);
189  }
190  }
191  return handler_base;
192  }
193 
194  template <typename MessageT>
195  void Run(const uint64_t channel_id, const std::string& message_type,
196  const BaseHandlersType& handlers,
197  const std::shared_ptr<MessageT>& message,
198  const MessageInfo& message_info) {
199  const auto channel_handlers_itr = handlers.find(channel_id);
200  if (channel_handlers_itr == handlers.end()) {
201  AERROR << "Cant find channel " << GlobalData::GetChannelById(channel_id)
202  << " in Chain";
203  return;
204  }
205  const auto& channel_handlers = channel_handlers_itr->second;
206 
207  ADEBUG << GlobalData::GetChannelById(channel_id)
208  << "'s chain run, size: " << channel_handlers.size()
209  << ", message type: " << message_type;
210  std::string msg;
211  for (const auto& ele : channel_handlers) {
212  auto handler_base = ele.second;
213  if (message_type == ele.first) {
214  ADEBUG << "Run handler for message type: " << ele.first << " directly";
215  auto handler =
216  std::static_pointer_cast<ListenerHandler<MessageT>>(handler_base);
217  if (handler == nullptr) {
218  continue;
219  }
220  handler->Run(message, message_info);
221  } else {
222  ADEBUG << "Run handler for message type: " << ele.first
223  << " from string";
224  if (msg.empty()) {
225  auto msg_size = message::FullByteSize(*message);
226  if (msg_size < 0) {
227  AERROR << "Failed to get message size. channel["
228  << common::GlobalData::GetChannelById(channel_id) << "]";
229  continue;
230  }
231  msg.resize(msg_size);
232  if (!message::SerializeToHC(*message, const_cast<char*>(msg.data()),
233  msg_size)) {
234  AERROR << "Chain Serialize error for channel id: " << channel_id;
235  msg.clear();
236  }
237  }
238  if (!msg.empty()) {
239  (handler_base)->RunFromString(msg, message_info);
240  }
241  }
242  }
243  }
244 
245  BaseHandlersType handlers_;
246  base::AtomicRWLock rw_lock_;
247  std::map<uint64_t, BaseHandlersType> oppo_handlers_;
248  base::AtomicRWLock oppo_rw_lock_;
249 };
250 
251 class IntraDispatcher : public Dispatcher {
252  public:
253  virtual ~IntraDispatcher();
254 
255  template <typename MessageT>
256  void OnMessage(uint64_t channel_id, const std::shared_ptr<MessageT>& message,
257  const MessageInfo& message_info);
258 
259  template <typename MessageT>
260  void AddListener(const RoleAttributes& self_attr,
261  const MessageListener<MessageT>& listener);
262 
263  template <typename MessageT>
264  void AddListener(const RoleAttributes& self_attr,
265  const RoleAttributes& opposite_attr,
266  const MessageListener<MessageT>& listener);
267 
268  template <typename MessageT>
269  void RemoveListener(const RoleAttributes& self_attr);
270 
271  template <typename MessageT>
272  void RemoveListener(const RoleAttributes& self_attr,
273  const RoleAttributes& opposite_attr);
274 
276 
277  private:
278  template <typename MessageT>
279  std::shared_ptr<ListenerHandler<MessageT>> GetHandler(uint64_t channel_id);
280 
281  ChannelChainPtr chain_;
282 };
283 
284 template <typename MessageT>
285 void IntraDispatcher::OnMessage(uint64_t channel_id,
286  const std::shared_ptr<MessageT>& message,
287  const MessageInfo& message_info) {
288  if (is_shutdown_.load()) {
289  return;
290  }
291  ListenerHandlerBasePtr* handler_base = nullptr;
292  ADEBUG << "intra on message, channel:"
293  << common::GlobalData::GetChannelById(channel_id);
294  if (msg_listeners_.Get(channel_id, &handler_base)) {
295  auto handler =
296  std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
297  if (handler) {
298  handler->Run(message, message_info);
299  } else {
300  auto msg_size = message::FullByteSize(*message);
301  if (msg_size < 0) {
302  AERROR << "Failed to get message size. channel["
303  << common::GlobalData::GetChannelById(channel_id) << "]";
304  return;
305  }
306  std::string msg;
307  msg.resize(msg_size);
308  if (message::SerializeToHC(*message, const_cast<char*>(msg.data()),
309  msg_size)) {
310  (*handler_base)->RunFromString(msg, message_info);
311  } else {
312  AERROR << "Failed to serialize message. channel["
313  << common::GlobalData::GetChannelById(channel_id) << "]";
314  }
315  }
316  }
317 }
318 
319 template <typename MessageT>
320 std::shared_ptr<ListenerHandler<MessageT>> IntraDispatcher::GetHandler(
321  uint64_t channel_id) {
322  std::shared_ptr<ListenerHandler<MessageT>> handler;
323  ListenerHandlerBasePtr* handler_base = nullptr;
324 
325  if (msg_listeners_.Get(channel_id, &handler_base)) {
326  handler =
327  std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
328  if (handler == nullptr) {
329  ADEBUG << "Find a new type for channel "
330  << GlobalData::GetChannelById(channel_id) << " with type "
331  << message::GetMessageName<MessageT>();
332  }
333  } else {
334  ADEBUG << "Create new ListenerHandler for channel "
335  << GlobalData::GetChannelById(channel_id) << " with type "
336  << message::GetMessageName<MessageT>();
337  handler.reset(new ListenerHandler<MessageT>());
338  msg_listeners_.Set(channel_id, handler);
339  }
340 
341  return handler;
342 }
343 
344 template <typename MessageT>
345 void IntraDispatcher::AddListener(const RoleAttributes& self_attr,
346  const MessageListener<MessageT>& listener) {
347  if (is_shutdown_.load()) {
348  return;
349  }
350 
351  auto channel_id = self_attr.channel_id();
352  std::string message_type = message::GetMessageName<MessageT>();
353  uint64_t self_id = self_attr.id();
354 
355  bool created =
356  chain_->AddListener(self_id, channel_id, message_type, listener);
357 
358  auto handler = GetHandler<MessageT>(self_attr.channel_id());
359  if (handler && created) {
360  auto listener_wrapper = [this, self_id, channel_id, message_type](
361  const std::shared_ptr<MessageT>& message,
362  const MessageInfo& message_info) {
363  this->chain_->Run<MessageT>(self_id, channel_id, message_type, message,
364  message_info);
365  };
366  handler->Connect(self_id, listener_wrapper);
367  }
368 }
369 
370 template <typename MessageT>
371 void IntraDispatcher::AddListener(const RoleAttributes& self_attr,
372  const RoleAttributes& opposite_attr,
373  const MessageListener<MessageT>& listener) {
374  if (is_shutdown_.load()) {
375  return;
376  }
377 
378  auto channel_id = self_attr.channel_id();
379  std::string message_type = message::GetMessageName<MessageT>();
380  uint64_t self_id = self_attr.id();
381  uint64_t oppo_id = opposite_attr.id();
382 
383  bool created =
384  chain_->AddListener(self_id, oppo_id, channel_id, message_type, listener);
385 
386  auto handler = GetHandler<MessageT>(self_attr.channel_id());
387  if (handler && created) {
388  auto listener_wrapper = [this, self_id, oppo_id, channel_id, message_type](
389  const std::shared_ptr<MessageT>& message,
390  const MessageInfo& message_info) {
391  this->chain_->Run<MessageT>(self_id, oppo_id, channel_id, message_type,
392  message, message_info);
393  };
394  handler->Connect(self_id, oppo_id, listener_wrapper);
395  }
396 }
397 
398 template <typename MessageT>
399 void IntraDispatcher::RemoveListener(const RoleAttributes& self_attr) {
400  if (is_shutdown_.load()) {
401  return;
402  }
403  Dispatcher::RemoveListener<MessageT>(self_attr);
404  chain_->RemoveListener<MessageT>(self_attr.id(), self_attr.channel_id(),
405  message::GetMessageName<MessageT>());
406 }
407 
408 template <typename MessageT>
409 void IntraDispatcher::RemoveListener(const RoleAttributes& self_attr,
410  const RoleAttributes& opposite_attr) {
411  if (is_shutdown_.load()) {
412  return;
413  }
414  Dispatcher::RemoveListener<MessageT>(self_attr, opposite_attr);
415  chain_->RemoveListener<MessageT>(self_attr.id(), opposite_attr.id(),
416  self_attr.channel_id(),
417  message::GetMessageName<MessageT>());
418 }
419 
420 } // namespace transport
421 } // namespace cyber
422 } // namespace apollo
423 
424 #endif // CYBER_TRANSPORT_DISPATCHER_INTRA_DISPATCHER_H_
void Run(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id, const std::string &message_type, const std::shared_ptr< MessageT > &message, const MessageInfo &message_info)
Definition: intra_dispatcher.h:129
void Run(uint64_t self_id, uint64_t channel_id, const std::string &message_type, const std::shared_ptr< MessageT > &message, const MessageInfo &message_info)
Definition: intra_dispatcher.h:120
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
Definition: listener_handler.h:41
void AddListener(const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
Definition: intra_dispatcher.h:345
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: rw_lock_guard.h:48
void RemoveListener(const RoleAttributes &self_attr)
Definition: intra_dispatcher.h:399
Definition: atomic_rw_lock.h:36
void RemoveListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id, const std::string &message_type)
Definition: intra_dispatcher.h:103
Definition: intra_dispatcher.h:251
static std::string GetChannelById(uint64_t id)
Definition: dispatcher.h:54
#define DECLARE_SINGLETON(classname)
Definition: macros.h:52
#define ADEBUG
Definition: log.h:41
void RemoveListener(uint64_t self_id, uint64_t channel_id, const std::string &message_type)
Definition: intra_dispatcher.h:93
Definition: rw_lock_guard.h:35
std::shared_ptr< ChannelChain > ChannelChainPtr
Definition: intra_dispatcher.h:41
Definition: intra_dispatcher.h:49
Definition: listener_handler.h:59
bool AddListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id, const std::string &message_type, const MessageListener< MessageT > &listener)
Definition: intra_dispatcher.h:72
Definition: message_info.h:30
std::function< void(const std::shared_ptr< MessageT > &, const MessageInfo &)> MessageListener
Definition: dispatcher.h:52
int FullByteSize(const T &message)
Definition: message_traits.h:136
bool AddListener(uint64_t self_id, uint64_t channel_id, const std::string &message_type, const MessageListener< MessageT > &listener)
Definition: intra_dispatcher.h:55
void OnMessage(uint64_t channel_id, const std::shared_ptr< MessageT > &message, const MessageInfo &message_info)
Definition: intra_dispatcher.h:285
#define AERROR
Definition: log.h:44
std::enable_if< HasSerializeToArray< T >::value, bool >::type SerializeToHC(const T &message, void *data, int size)
Definition: message_traits.h:213
void Run(const Message &msg, const MessageInfo &msg_info)
Definition: listener_handler.h:162