17 #ifndef CYBER_TRANSPORT_DISPATCHER_INTRA_DISPATCHER_H_ 18 #define CYBER_TRANSPORT_DISPATCHER_INTRA_DISPATCHER_H_ 38 class IntraDispatcher;
42 template <
typename MessageT>
44 std::function<void(const std::shared_ptr<MessageT>&,
const MessageInfo&)>;
50 using BaseHandlersType =
51 std::map<uint64_t, std::map<std::string, ListenerHandlerBasePtr>>;
54 template <
typename MessageT>
56 const std::string& message_type,
59 auto ret = GetHandler<MessageT>(channel_id, message_type, &handlers_);
60 auto handler = ret.first;
61 if (handler ==
nullptr) {
62 AERROR <<
"get handler failed. channel: " 64 <<
", message type: " << message::GetMessageName<MessageT>();
67 handler->Connect(self_id, listener);
71 template <
typename MessageT>
72 bool AddListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id,
73 const std::string& message_type,
76 if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
77 oppo_handlers_[oppo_id] = BaseHandlersType();
79 BaseHandlersType& handlers = oppo_handlers_[oppo_id];
80 auto ret = GetHandler<MessageT>(channel_id, message_type, &handlers);
81 auto handler = ret.first;
82 if (handler ==
nullptr) {
83 AERROR <<
"get handler failed. channel: " 85 <<
", message type: " << message_type;
88 handler->Connect(self_id, oppo_id, listener);
92 template <
typename MessageT>
94 const std::string& message_type) {
96 auto handler = RemoveHandler(channel_id, message_type, &handlers_);
98 handler->Disconnect(self_id);
102 template <
typename MessageT>
104 const std::string& message_type) {
106 if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
109 BaseHandlersType& handlers = oppo_handlers_[oppo_id];
110 auto handler = RemoveHandler(channel_id, message_type, &handlers);
111 if (oppo_handlers_[oppo_id].empty()) {
112 oppo_handlers_.erase(oppo_id);
115 handler->Disconnect(self_id, oppo_id);
119 template <
typename MessageT>
120 void Run(uint64_t self_id, uint64_t channel_id,
121 const std::string& message_type,
122 const std::shared_ptr<MessageT>& message,
125 Run(channel_id, message_type, handlers_, message, message_info);
128 template <
typename MessageT>
129 void Run(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id,
130 const std::string& message_type,
131 const std::shared_ptr<MessageT>& message,
134 if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
137 BaseHandlersType& handlers = oppo_handlers_[oppo_id];
138 Run(channel_id, message_type, handlers, message, message_info);
143 template <
typename MessageT>
144 std::pair<std::shared_ptr<ListenerHandler<MessageT>>,
bool> GetHandler(
145 uint64_t channel_id,
const std::string& message_type,
146 BaseHandlersType* handlers) {
147 std::shared_ptr<ListenerHandler<MessageT>> handler;
148 bool created =
false;
150 if (handlers->find(channel_id) == handlers->end()) {
151 (*handlers)[channel_id] = std::map<std::string, ListenerHandlerBasePtr>();
154 if ((*handlers)[channel_id].find(message_type) ==
155 (*handlers)[channel_id].end()) {
156 ADEBUG <<
"Create new ListenerHandler for channel " 158 <<
", message type: " << message_type;
160 (*handlers)[channel_id][message_type] = handler;
164 <<
"'s ListenerHandler, message type: " << message_type;
166 (*handlers)[channel_id][message_type]);
169 return std::make_pair(handler, created);
174 const std::string message_type,
175 BaseHandlersType* handlers) {
177 if (handlers->find(channel_id) != handlers->end()) {
178 if ((*handlers)[channel_id].find(message_type) !=
179 (*handlers)[channel_id].end()) {
180 handler_base = (*handlers)[channel_id][message_type];
182 << message_type <<
" ListenerHandler";
183 (*handlers)[channel_id].erase(message_type);
185 if ((*handlers)[channel_id].empty()) {
187 <<
"'s all ListenerHandler";
188 (*handlers).erase(channel_id);
194 template <
typename MessageT>
195 void Run(
const uint64_t channel_id,
const std::string& message_type,
196 const BaseHandlersType& handlers,
197 const std::shared_ptr<MessageT>& message,
199 const auto channel_handlers_itr = handlers.find(channel_id);
200 if (channel_handlers_itr == handlers.end()) {
205 const auto& channel_handlers = channel_handlers_itr->second;
208 <<
"'s chain run, size: " << channel_handlers.size()
209 <<
", message type: " << message_type;
211 for (
const auto& ele : channel_handlers) {
212 auto handler_base = ele.second;
213 if (message_type == ele.first) {
214 ADEBUG <<
"Run handler for message type: " << ele.first <<
" directly";
217 if (handler ==
nullptr) {
220 handler->
Run(message, message_info);
222 ADEBUG <<
"Run handler for message type: " << ele.first
227 AERROR <<
"Failed to get message size. channel[" 231 msg.resize(msg_size);
234 AERROR <<
"Chain Serialize error for channel id: " << channel_id;
239 (handler_base)->RunFromString(msg, message_info);
245 BaseHandlersType handlers_;
247 std::map<uint64_t, BaseHandlersType> oppo_handlers_;
255 template <
typename MessageT>
256 void OnMessage(uint64_t channel_id,
const std::shared_ptr<MessageT>& message,
259 template <
typename MessageT>
263 template <
typename MessageT>
265 const RoleAttributes& opposite_attr,
268 template <
typename MessageT>
271 template <
typename MessageT>
273 const RoleAttributes& opposite_attr);
278 template <
typename MessageT>
279 std::shared_ptr<ListenerHandler<MessageT>> GetHandler(uint64_t channel_id);
284 template <
typename MessageT>
286 const std::shared_ptr<MessageT>& message,
288 if (is_shutdown_.load()) {
292 ADEBUG <<
"intra on message, channel:" 294 if (msg_listeners_.Get(channel_id, &handler_base)) {
298 handler->
Run(message, message_info);
302 AERROR <<
"Failed to get message size. channel[" 307 msg.resize(msg_size);
310 (*handler_base)->RunFromString(msg, message_info);
312 AERROR <<
"Failed to serialize message. channel[" 319 template <
typename MessageT>
320 std::shared_ptr<ListenerHandler<MessageT>> IntraDispatcher::GetHandler(
321 uint64_t channel_id) {
322 std::shared_ptr<ListenerHandler<MessageT>> handler;
325 if (msg_listeners_.Get(channel_id, &handler_base)) {
328 if (handler ==
nullptr) {
329 ADEBUG <<
"Find a new type for channel " 331 << message::GetMessageName<MessageT>();
334 ADEBUG <<
"Create new ListenerHandler for channel " 336 << message::GetMessageName<MessageT>();
338 msg_listeners_.Set(channel_id, handler);
344 template <
typename MessageT>
347 if (is_shutdown_.load()) {
351 auto channel_id = self_attr.channel_id();
352 std::string message_type = message::GetMessageName<MessageT>();
353 uint64_t self_id = self_attr.id();
356 chain_->AddListener(self_id, channel_id, message_type, listener);
358 auto handler = GetHandler<MessageT>(self_attr.channel_id());
359 if (handler && created) {
360 auto listener_wrapper = [
this, self_id, channel_id, message_type](
361 const std::shared_ptr<MessageT>& message,
363 this->chain_->Run<MessageT>(self_id, channel_id, message_type, message,
366 handler->Connect(self_id, listener_wrapper);
370 template <
typename MessageT>
372 const RoleAttributes& opposite_attr,
374 if (is_shutdown_.load()) {
378 auto channel_id = self_attr.channel_id();
379 std::string message_type = message::GetMessageName<MessageT>();
380 uint64_t self_id = self_attr.id();
381 uint64_t oppo_id = opposite_attr.id();
384 chain_->AddListener(self_id, oppo_id, channel_id, message_type, listener);
386 auto handler = GetHandler<MessageT>(self_attr.channel_id());
387 if (handler && created) {
388 auto listener_wrapper = [
this, self_id, oppo_id, channel_id, message_type](
389 const std::shared_ptr<MessageT>& message,
391 this->chain_->Run<MessageT>(self_id, oppo_id, channel_id, message_type,
392 message, message_info);
394 handler->Connect(self_id, oppo_id, listener_wrapper);
398 template <
typename MessageT>
400 if (is_shutdown_.load()) {
403 Dispatcher::RemoveListener<MessageT>(self_attr);
404 chain_->RemoveListener<MessageT>(self_attr.id(), self_attr.channel_id(),
405 message::GetMessageName<MessageT>());
408 template <
typename MessageT>
410 const RoleAttributes& opposite_attr) {
411 if (is_shutdown_.load()) {
414 Dispatcher::RemoveListener<MessageT>(self_attr, opposite_attr);
415 chain_->RemoveListener<MessageT>(self_attr.id(), opposite_attr.id(),
416 self_attr.channel_id(),
417 message::GetMessageName<MessageT>());
424 #endif // CYBER_TRANSPORT_DISPATCHER_INTRA_DISPATCHER_H_ void Run(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id, const std::string &message_type, const std::shared_ptr< MessageT > &message, const MessageInfo &message_info)
Definition: intra_dispatcher.h:129
void Run(uint64_t self_id, uint64_t channel_id, const std::string &message_type, const std::shared_ptr< MessageT > &message, const MessageInfo &message_info)
Definition: intra_dispatcher.h:120
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
Definition: listener_handler.h:41
void AddListener(const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
Definition: intra_dispatcher.h:345
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: rw_lock_guard.h:48
void RemoveListener(const RoleAttributes &self_attr)
Definition: intra_dispatcher.h:399
Definition: atomic_rw_lock.h:36
void RemoveListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id, const std::string &message_type)
Definition: intra_dispatcher.h:103
Definition: intra_dispatcher.h:251
static std::string GetChannelById(uint64_t id)
Definition: dispatcher.h:54
#define DECLARE_SINGLETON(classname)
Definition: macros.h:52
#define ADEBUG
Definition: log.h:41
void RemoveListener(uint64_t self_id, uint64_t channel_id, const std::string &message_type)
Definition: intra_dispatcher.h:93
Definition: rw_lock_guard.h:35
std::shared_ptr< ChannelChain > ChannelChainPtr
Definition: intra_dispatcher.h:41
Definition: intra_dispatcher.h:49
Definition: listener_handler.h:59
bool AddListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id, const std::string &message_type, const MessageListener< MessageT > &listener)
Definition: intra_dispatcher.h:72
Definition: message_info.h:30
std::function< void(const std::shared_ptr< MessageT > &, const MessageInfo &)> MessageListener
Definition: dispatcher.h:52
int FullByteSize(const T &message)
Definition: message_traits.h:136
bool AddListener(uint64_t self_id, uint64_t channel_id, const std::string &message_type, const MessageListener< MessageT > &listener)
Definition: intra_dispatcher.h:55
void OnMessage(uint64_t channel_id, const std::shared_ptr< MessageT > &message, const MessageInfo &message_info)
Definition: intra_dispatcher.h:285
#define AERROR
Definition: log.h:44
std::enable_if< HasSerializeToArray< T >::value, bool >::type SerializeToHC(const T &message, void *data, int size)
Definition: message_traits.h:213
void Run(const Message &msg, const MessageInfo &msg_info)
Definition: listener_handler.h:162