17 #ifndef CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_ 18 #define CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_ 26 #include <unordered_map> 32 #include "cyber/proto/role_attributes.pb.h" 45 using cyber::proto::RoleAttributes;
50 template <
typename MessageT>
52 std::function<void(const std::shared_ptr<MessageT>&,
const MessageInfo&)>;
61 template <
typename MessageT>
65 template <
typename MessageT>
67 const RoleAttributes& opposite_attr,
70 template <
typename MessageT>
73 template <
typename MessageT>
75 const RoleAttributes& opposite_attr);
86 template <
typename MessageT>
92 uint64_t channel_id = self_attr.channel_id();
94 std::shared_ptr<ListenerHandler<MessageT>> handler;
99 if (handler ==
nullptr) {
100 AERROR <<
"please ensure that readers with the same channel[" 101 << self_attr.channel_name()
102 <<
"] in the same process have the same message type";
106 ADEBUG <<
"new reader for channel:" 111 handler->Connect(self_attr.id(), listener);
114 template <
typename MessageT>
116 const RoleAttributes& opposite_attr,
121 uint64_t channel_id = self_attr.channel_id();
123 std::shared_ptr<ListenerHandler<MessageT>> handler;
128 if (handler ==
nullptr) {
129 AERROR <<
"please ensure that readers with the same channel[" 130 << self_attr.channel_name()
131 <<
"] in the same process have the same message type";
135 ADEBUG <<
"new reader for channel:" 140 handler->Connect(self_attr.id(), opposite_attr.id(), listener);
143 template <
typename MessageT>
148 uint64_t channel_id = self_attr.channel_id();
152 (*handler_base)->Disconnect(self_attr.id());
156 template <
typename MessageT>
158 const RoleAttributes& opposite_attr) {
162 uint64_t channel_id = self_attr.channel_id();
166 (*handler_base)->Disconnect(self_attr.id(), opposite_attr.id());
174 #endif // CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
Definition: listener_handler.h:41
void RemoveListener(const RoleAttributes &self_attr)
Definition: dispatcher.h:144
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: rw_lock_guard.h:48
AtomicHashMap< uint64_t, ListenerHandlerBasePtr > msg_listeners_
Definition: dispatcher.h:82
Definition: atomic_rw_lock.h:36
bool Get(K key, V **value)
Definition: atomic_hash_map.h:51
static std::string GetChannelById(uint64_t id)
Definition: dispatcher.h:54
#define ADEBUG
Definition: log.h:41
A implementation of lock-free fixed size hash map.
Definition: atomic_hash_map.h:40
Definition: rw_lock_guard.h:35
void AddListener(const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
Definition: dispatcher.h:87
void Set(K key)
Definition: atomic_hash_map.h:66
Definition: listener_handler.h:59
Definition: message_info.h:30
std::function< void(const std::shared_ptr< MessageT > &, const MessageInfo &)> MessageListener
Definition: dispatcher.h:52
Definition: global_data.h:40
#define AERROR
Definition: log.h:44
base::AtomicRWLock rw_lock_
Definition: dispatcher.h:83
std::atomic< bool > is_shutdown_
Definition: dispatcher.h:80
bool HasChannel(uint64_t channel_id)
std::shared_ptr< Dispatcher > DispatcherPtr
Definition: dispatcher.h:48