17 #ifndef CYBER_TRANSPORT_MESSAGE_LISTENER_HANDLER_H_ 18 #define CYBER_TRANSPORT_MESSAGE_LISTENER_HANDLER_H_ 23 #include <unordered_map> 40 class ListenerHandlerBase;
49 virtual void Disconnect(uint64_t self_id, uint64_t oppo_id) = 0;
58 template <
typename MessageT>
61 using Message = std::shared_ptr<MessageT>;
64 using Listener = std::function<void(const Message&, const MessageInfo&)>;
72 void Connect(uint64_t self_id,
const Listener& listener);
73 void Connect(uint64_t self_id, uint64_t oppo_id,
const Listener& listener);
76 void Disconnect(uint64_t self_id, uint64_t oppo_id)
override;
83 using SignalPtr = std::shared_ptr<MessageSignal>;
84 using MessageSignalMap = std::unordered_map<uint64_t, SignalPtr>;
90 MessageSignalMap signals_;
92 std::unordered_map<uint64_t, ConnectionMap> signals_conns_;
102 template <
typename MessageT>
105 auto connection = signal_.Connect(listener);
106 if (!connection.IsConnected()) {
111 signal_conns_[self_id] = connection;
114 template <
typename MessageT>
118 if (signals_.find(oppo_id) == signals_.end()) {
119 signals_[oppo_id] = std::make_shared<MessageSignal>();
122 auto connection = signals_[oppo_id]->Connect(listener);
123 if (!connection.IsConnected()) {
124 AWARN << oppo_id <<
" " << self_id <<
" connect failed!";
128 if (signals_conns_.find(oppo_id) == signals_conns_.end()) {
132 signals_conns_[oppo_id][self_id] = connection;
135 template <
typename MessageT>
138 if (signal_conns_.find(self_id) == signal_conns_.end()) {
142 signal_conns_[self_id].Disconnect();
143 signal_conns_.erase(self_id);
146 template <
typename MessageT>
149 if (signals_conns_.find(oppo_id) == signals_conns_.end()) {
153 if (signals_conns_[oppo_id].find(self_id) == signals_conns_[oppo_id].end()) {
157 signals_conns_[oppo_id][self_id].Disconnect();
158 signals_conns_[oppo_id].erase(self_id);
161 template <
typename MessageT>
164 signal_(msg, msg_info);
167 if (signals_.find(oppo_id) == signals_.end()) {
171 (*signals_[oppo_id])(msg, msg_info);
174 template <
typename MessageT>
177 auto msg = std::make_shared<MessageT>();
182 AWARN <<
"Failed to parse message. Content: " << str;
190 #endif // CYBER_TRANSPORT_MESSAGE_LISTENER_HANDLER_H_
std::enable_if< HasParseFromArray< T >::value, bool >::type ParseFromHC(const void *data, int size, T *message)
Definition: message_traits.h:169
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
Definition: listener_handler.h:41
virtual ~ListenerHandlerBase()
Definition: listener_handler.h:46
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
void RunFromString(const std::string &str, const MessageInfo &msg_info) override
Definition: listener_handler.h:175
Definition: rw_lock_guard.h:48
std::shared_ptr< MessageT > Message
Definition: listener_handler.h:61
Definition: atomic_rw_lock.h:36
bool IsRawMessage() const
Definition: listener_handler.h:50
ListenerHandlerBase()
Definition: listener_handler.h:45
bool is_raw_message_
Definition: listener_handler.h:55
void Connect(uint64_t self_id, const Listener &listener)
Definition: listener_handler.h:103
std::function< void(const Message &, const MessageInfo &)> Listener
Definition: listener_handler.h:64
Definition: rw_lock_guard.h:35
virtual ~ListenerHandler()
Definition: listener_handler.h:70
const Identity & sender_id() const
Definition: message_info.h:49
void Disconnect(uint64_t self_id) override
Definition: listener_handler.h:136
Definition: listener_handler.h:59
Definition: message_info.h:30
Definition: listener_handler.h:43
std::unordered_map< uint64_t, MessageConnection > ConnectionMap
Definition: listener_handler.h:67
virtual void Disconnect(uint64_t self_id)=0
#define AWARN
Definition: log.h:43
ListenerHandler()
Definition: listener_handler.h:69
virtual void RunFromString(const std::string &str, const MessageInfo &msg_info)=0
void Run(const Message &msg, const MessageInfo &msg_info)
Definition: listener_handler.h:162
uint64_t HashValue() const