Apollo  6.0
Open source self driving car software
listener_handler.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_TRANSPORT_MESSAGE_LISTENER_HANDLER_H_
18 #define CYBER_TRANSPORT_MESSAGE_LISTENER_HANDLER_H_
19 
20 #include <functional>
21 #include <memory>
22 #include <string>
23 #include <unordered_map>
24 
26 #include "cyber/base/signal.h"
27 #include "cyber/common/log.h"
31 
32 namespace apollo {
33 namespace cyber {
34 namespace transport {
35 
39 
40 class ListenerHandlerBase;
41 using ListenerHandlerBasePtr = std::shared_ptr<ListenerHandlerBase>;
42 
44  public:
46  virtual ~ListenerHandlerBase() {}
47 
48  virtual void Disconnect(uint64_t self_id) = 0;
49  virtual void Disconnect(uint64_t self_id, uint64_t oppo_id) = 0;
50  inline bool IsRawMessage() const { return is_raw_message_; }
51  virtual void RunFromString(const std::string& str,
52  const MessageInfo& msg_info) = 0;
53 
54  protected:
55  bool is_raw_message_ = false;
56 };
57 
58 template <typename MessageT>
60  public:
61  using Message = std::shared_ptr<MessageT>;
63 
64  using Listener = std::function<void(const Message&, const MessageInfo&)>;
65  using MessageConnection =
67  using ConnectionMap = std::unordered_map<uint64_t, MessageConnection>;
68 
70  virtual ~ListenerHandler() {}
71 
72  void Connect(uint64_t self_id, const Listener& listener);
73  void Connect(uint64_t self_id, uint64_t oppo_id, const Listener& listener);
74 
75  void Disconnect(uint64_t self_id) override;
76  void Disconnect(uint64_t self_id, uint64_t oppo_id) override;
77 
78  void Run(const Message& msg, const MessageInfo& msg_info);
79  void RunFromString(const std::string& str,
80  const MessageInfo& msg_info) override;
81 
82  private:
83  using SignalPtr = std::shared_ptr<MessageSignal>;
84  using MessageSignalMap = std::unordered_map<uint64_t, SignalPtr>;
85  // used for self_id
86  MessageSignal signal_;
87  ConnectionMap signal_conns_; // key: self_id
88 
89  // used for self_id and oppo_id
90  MessageSignalMap signals_; // key: oppo_id
91  // key: oppo_id
92  std::unordered_map<uint64_t, ConnectionMap> signals_conns_;
93 
94  base::AtomicRWLock rw_lock_;
95 };
96 
97 template <>
99  is_raw_message_ = true;
100 }
101 
102 template <typename MessageT>
104  const Listener& listener) {
105  auto connection = signal_.Connect(listener);
106  if (!connection.IsConnected()) {
107  return;
108  }
109 
110  WriteLockGuard<AtomicRWLock> lock(rw_lock_);
111  signal_conns_[self_id] = connection;
112 }
113 
114 template <typename MessageT>
115 void ListenerHandler<MessageT>::Connect(uint64_t self_id, uint64_t oppo_id,
116  const Listener& listener) {
117  WriteLockGuard<AtomicRWLock> lock(rw_lock_);
118  if (signals_.find(oppo_id) == signals_.end()) {
119  signals_[oppo_id] = std::make_shared<MessageSignal>();
120  }
121 
122  auto connection = signals_[oppo_id]->Connect(listener);
123  if (!connection.IsConnected()) {
124  AWARN << oppo_id << " " << self_id << " connect failed!";
125  return;
126  }
127 
128  if (signals_conns_.find(oppo_id) == signals_conns_.end()) {
129  signals_conns_[oppo_id] = ConnectionMap();
130  }
131 
132  signals_conns_[oppo_id][self_id] = connection;
133 }
134 
135 template <typename MessageT>
137  WriteLockGuard<AtomicRWLock> lock(rw_lock_);
138  if (signal_conns_.find(self_id) == signal_conns_.end()) {
139  return;
140  }
141 
142  signal_conns_[self_id].Disconnect();
143  signal_conns_.erase(self_id);
144 }
145 
146 template <typename MessageT>
147 void ListenerHandler<MessageT>::Disconnect(uint64_t self_id, uint64_t oppo_id) {
148  WriteLockGuard<AtomicRWLock> lock(rw_lock_);
149  if (signals_conns_.find(oppo_id) == signals_conns_.end()) {
150  return;
151  }
152 
153  if (signals_conns_[oppo_id].find(self_id) == signals_conns_[oppo_id].end()) {
154  return;
155  }
156 
157  signals_conns_[oppo_id][self_id].Disconnect();
158  signals_conns_[oppo_id].erase(self_id);
159 }
160 
161 template <typename MessageT>
163  const MessageInfo& msg_info) {
164  signal_(msg, msg_info);
165  uint64_t oppo_id = msg_info.sender_id().HashValue();
166  ReadLockGuard<AtomicRWLock> lock(rw_lock_);
167  if (signals_.find(oppo_id) == signals_.end()) {
168  return;
169  }
170 
171  (*signals_[oppo_id])(msg, msg_info);
172 }
173 
174 template <typename MessageT>
175 void ListenerHandler<MessageT>::RunFromString(const std::string& str,
176  const MessageInfo& msg_info) {
177  auto msg = std::make_shared<MessageT>();
178  if (message::ParseFromHC(str.data(), static_cast<int>(str.size()),
179  msg.get())) {
180  Run(msg, msg_info);
181  } else {
182  AWARN << "Failed to parse message. Content: " << str;
183  }
184 }
185 
186 } // namespace transport
187 } // namespace cyber
188 } // namespace apollo
189 
190 #endif // CYBER_TRANSPORT_MESSAGE_LISTENER_HANDLER_H_
std::enable_if< HasParseFromArray< T >::value, bool >::type ParseFromHC(const void *data, int size, T *message)
Definition: message_traits.h:169
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
Definition: listener_handler.h:41
virtual ~ListenerHandlerBase()
Definition: listener_handler.h:46
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
void RunFromString(const std::string &str, const MessageInfo &msg_info) override
Definition: listener_handler.h:175
Definition: rw_lock_guard.h:48
std::shared_ptr< MessageT > Message
Definition: listener_handler.h:61
Definition: atomic_rw_lock.h:36
bool IsRawMessage() const
Definition: listener_handler.h:50
ListenerHandlerBase()
Definition: listener_handler.h:45
bool is_raw_message_
Definition: listener_handler.h:55
void Connect(uint64_t self_id, const Listener &listener)
Definition: listener_handler.h:103
std::function< void(const Message &, const MessageInfo &)> Listener
Definition: listener_handler.h:64
Definition: rw_lock_guard.h:35
virtual ~ListenerHandler()
Definition: listener_handler.h:70
const Identity & sender_id() const
Definition: message_info.h:49
void Disconnect(uint64_t self_id) override
Definition: listener_handler.h:136
Definition: listener_handler.h:59
Definition: message_info.h:30
Definition: listener_handler.h:43
std::unordered_map< uint64_t, MessageConnection > ConnectionMap
Definition: listener_handler.h:67
virtual void Disconnect(uint64_t self_id)=0
Definition: signal.h:37
#define AWARN
Definition: log.h:43
Definition: signal.h:34
ListenerHandler()
Definition: listener_handler.h:69
virtual void RunFromString(const std::string &str, const MessageInfo &msg_info)=0
void Run(const Message &msg, const MessageInfo &msg_info)
Definition: listener_handler.h:162