Apollo  6.0
Open source self driving car software
hybrid_receiver.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_TRANSPORT_RECEIVER_HYBRID_RECEIVER_H_
18 #define CYBER_TRANSPORT_RECEIVER_HYBRID_RECEIVER_H_
19 
20 #include <map>
21 #include <memory>
22 #include <set>
23 #include <string>
24 #include <unordered_map>
25 #include <utility>
26 #include <vector>
27 
29 #include "cyber/common/log.h"
30 #include "cyber/common/types.h"
31 #include "cyber/proto/role_attributes.pb.h"
33 #include "cyber/task/task.h"
34 #include "cyber/time/time.h"
39 
40 namespace apollo {
41 namespace cyber {
42 namespace transport {
43 
44 using apollo::cyber::proto::OptionalMode;
45 using apollo::cyber::proto::QosDurabilityPolicy;
46 using apollo::cyber::proto::RoleAttributes;
47 
48 template <typename M>
49 class HybridReceiver : public Receiver<M> {
50  public:
51  using HistoryPtr = std::shared_ptr<History<M>>;
52  using ReceiverPtr = std::shared_ptr<Receiver<M>>;
53  using ReceiverContainer =
54  std::unordered_map<OptionalMode, ReceiverPtr, std::hash<int>>;
55  using TransmitterContainer =
56  std::unordered_map<OptionalMode,
57  std::unordered_map<uint64_t, RoleAttributes>,
58  std::hash<int>>;
59  using CommunicationModePtr = std::shared_ptr<proto::CommunicationMode>;
60  using MappingTable =
61  std::unordered_map<Relation, OptionalMode, std::hash<int>>;
62 
63  HybridReceiver(const RoleAttributes& attr,
64  const typename Receiver<M>::MessageListener& msg_listener,
65  const ParticipantPtr& participant);
66  virtual ~HybridReceiver();
67 
68  void Enable() override;
69  void Disable() override;
70 
71  void Enable(const RoleAttributes& opposite_attr) override;
72  void Disable(const RoleAttributes& opposite_attr) override;
73 
74  private:
75  void InitMode();
76  void ObtainConfig();
77  void InitHistory();
78  void InitReceivers();
79  void ClearReceivers();
80  void InitTransmitters();
81  void ClearTransmitters();
82  void ReceiveHistoryMsg(const RoleAttributes& opposite_attr);
83  void ThreadFunc(const RoleAttributes& opposite_attr);
84  Relation GetRelation(const RoleAttributes& opposite_attr);
85 
86  HistoryPtr history_;
87  ReceiverContainer receivers_;
88  TransmitterContainer transmitters_;
89  std::mutex mutex_;
90 
92  MappingTable mapping_table_;
93 
94  ParticipantPtr participant_;
95 };
96 
97 template <typename M>
99  const RoleAttributes& attr,
100  const typename Receiver<M>::MessageListener& msg_listener,
101  const ParticipantPtr& participant)
102  : Receiver<M>(attr, msg_listener),
103  history_(nullptr),
104  participant_(participant) {
105  InitMode();
106  ObtainConfig();
107  InitHistory();
108  InitReceivers();
109  InitTransmitters();
110 }
111 
112 template <typename M>
114  ClearTransmitters();
115  ClearReceivers();
116 }
117 
118 template <typename M>
120  std::lock_guard<std::mutex> lock(mutex_);
121  for (auto& item : receivers_) {
122  item.second->Enable();
123  }
124 }
125 
126 template <typename M>
128  std::lock_guard<std::mutex> lock(mutex_);
129  for (auto& item : receivers_) {
130  item.second->Disable();
131  }
132 }
133 
134 template <typename M>
135 void HybridReceiver<M>::Enable(const RoleAttributes& opposite_attr) {
136  auto relation = GetRelation(opposite_attr);
137  RETURN_IF(relation == NO_RELATION);
138 
139  uint64_t id = opposite_attr.id();
140  std::lock_guard<std::mutex> lock(mutex_);
141  if (transmitters_[mapping_table_[relation]].count(id) == 0) {
142  transmitters_[mapping_table_[relation]].insert(
143  std::make_pair(id, opposite_attr));
144  receivers_[mapping_table_[relation]]->Enable(opposite_attr);
145  ReceiveHistoryMsg(opposite_attr);
146  }
147 }
148 
149 template <typename M>
150 void HybridReceiver<M>::Disable(const RoleAttributes& opposite_attr) {
151  auto relation = GetRelation(opposite_attr);
152  RETURN_IF(relation == NO_RELATION);
153 
154  uint64_t id = opposite_attr.id();
155  std::lock_guard<std::mutex> lock(mutex_);
156  if (transmitters_[mapping_table_[relation]].count(id) > 0) {
157  transmitters_[mapping_table_[relation]].erase(id);
158  receivers_[mapping_table_[relation]]->Disable(opposite_attr);
159  }
160 }
161 
162 template <typename M>
164  mode_ = std::make_shared<proto::CommunicationMode>();
165  mapping_table_[SAME_PROC] = mode_->same_proc();
166  mapping_table_[DIFF_PROC] = mode_->diff_proc();
167  mapping_table_[DIFF_HOST] = mode_->diff_host();
168 }
169 
170 template <typename M>
172  auto& global_conf = common::GlobalData::Instance()->Config();
173  if (!global_conf.has_transport_conf()) {
174  return;
175  }
176  if (!global_conf.transport_conf().has_communication_mode()) {
177  return;
178  }
179  mode_->CopyFrom(global_conf.transport_conf().communication_mode());
180 
181  mapping_table_[SAME_PROC] = mode_->same_proc();
182  mapping_table_[DIFF_PROC] = mode_->diff_proc();
183  mapping_table_[DIFF_HOST] = mode_->diff_host();
184 }
185 
186 template <typename M>
188  HistoryAttributes history_attr(this->attr_.qos_profile().history(),
189  this->attr_.qos_profile().depth());
190  history_ = std::make_shared<History<M>>(history_attr);
191  if (this->attr_.qos_profile().durability() ==
192  QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
193  history_->Enable();
194  }
195 }
196 
197 template <typename M>
199  std::set<OptionalMode> modes;
200  modes.insert(mode_->same_proc());
201  modes.insert(mode_->diff_proc());
202  modes.insert(mode_->diff_host());
203  auto listener = std::bind(&HybridReceiver<M>::OnNewMessage, this,
204  std::placeholders::_1, std::placeholders::_2);
205  for (auto& mode : modes) {
206  switch (mode) {
207  case OptionalMode::INTRA:
208  receivers_[mode] =
209  std::make_shared<IntraReceiver<M>>(this->attr_, listener);
210  break;
211  case OptionalMode::SHM:
212  receivers_[mode] =
213  std::make_shared<ShmReceiver<M>>(this->attr_, listener);
214  break;
215  default:
216  receivers_[mode] =
217  std::make_shared<RtpsReceiver<M>>(this->attr_, listener);
218  break;
219  }
220  }
221 }
222 
223 template <typename M>
225  receivers_.clear();
226 }
227 
228 template <typename M>
230  std::unordered_map<uint64_t, RoleAttributes> empty;
231  for (auto& item : receivers_) {
232  transmitters_[item.first] = empty;
233  }
234 }
235 
236 template <typename M>
238  for (auto& item : transmitters_) {
239  for (auto& upper_reach : item.second) {
240  receivers_[item.first]->Disable(upper_reach.second);
241  }
242  }
243  transmitters_.clear();
244 }
245 
246 template <typename M>
247 void HybridReceiver<M>::ReceiveHistoryMsg(const RoleAttributes& opposite_attr) {
248  // check qos
249  if (opposite_attr.qos_profile().durability() !=
250  QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
251  return;
252  }
253 
254  auto attr = opposite_attr;
255  cyber::Async(&HybridReceiver<M>::ThreadFunc, this, attr);
256 }
257 
258 template <typename M>
259 void HybridReceiver<M>::ThreadFunc(const RoleAttributes& opposite_attr) {
260  std::string channel_name =
261  std::to_string(opposite_attr.id()) + std::to_string(this->attr_.id());
262  uint64_t channel_id = common::GlobalData::RegisterChannel(channel_name);
263 
264  RoleAttributes attr(this->attr_);
265  attr.set_channel_name(channel_name);
266  attr.set_channel_id(channel_id);
267  attr.mutable_qos_profile()->CopyFrom(opposite_attr.qos_profile());
268 
269  volatile bool is_msg_arrived = false;
270  auto listener = [&](const std::shared_ptr<M>& msg,
271  const MessageInfo& msg_info, const RoleAttributes& attr) {
272  is_msg_arrived = true;
273  this->OnNewMessage(msg, msg_info);
274  };
275 
276  auto receiver = std::make_shared<RtpsReceiver<M>>(attr, listener);
277  receiver->Enable();
278 
279  do {
280  if (is_msg_arrived) {
281  is_msg_arrived = false;
282  }
283  cyber::USleep(1000000);
284  } while (is_msg_arrived);
285 
286  receiver->Disable();
287  ADEBUG << "recv threadfunc exit.";
288 }
289 
290 template <typename M>
291 Relation HybridReceiver<M>::GetRelation(const RoleAttributes& opposite_attr) {
292  if (opposite_attr.channel_name() != this->attr_.channel_name()) {
293  return NO_RELATION;
294  }
295 
296  if (opposite_attr.host_ip() != this->attr_.host_ip()) {
297  return DIFF_HOST;
298  }
299 
300  if (opposite_attr.process_id() != this->attr_.process_id()) {
301  return DIFF_PROC;
302  }
303 
304  return SAME_PROC;
305 }
306 
307 } // namespace transport
308 } // namespace cyber
309 } // namespace apollo
310 
311 #endif // CYBER_TRANSPORT_RECEIVER_HYBRID_RECEIVER_H_
Definition: types.h:38
virtual ~HybridReceiver()
Definition: hybrid_receiver.h:113
Definition: receiver.h:32
Definition: history_attributes.h:28
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: hybrid_receiver.h:49
#define RETURN_IF(condition)
Definition: log.h:106
std::unordered_map< OptionalMode, std::unordered_map< uint64_t, RoleAttributes >, std::hash< int > > TransmitterContainer
Definition: hybrid_receiver.h:58
RoleAttributes attr_
Definition: endpoint.h:47
#define ADEBUG
Definition: log.h:41
static uint64_t RegisterChannel(const std::string &channel)
Relation
Describe relation between nodes, writers/readers...
Definition: types.h:36
std::shared_ptr< Receiver< M > > ReceiverPtr
Definition: hybrid_receiver.h:52
std::shared_ptr< History< M > > HistoryPtr
Definition: hybrid_receiver.h:51
Definition: types.h:37
Definition: message_info.h:30
void OnNewMessage(const MessagePtr &msg, const MessageInfo &msg_info)
Definition: receiver.h:61
HybridReceiver(const RoleAttributes &attr, const typename Receiver< M >::MessageListener &msg_listener, const ParticipantPtr &participant)
Definition: hybrid_receiver.h:98
Definition: types.h:39
std::unordered_map< Relation, OptionalMode, std::hash< int > > MappingTable
Definition: hybrid_receiver.h:61
std::shared_ptr< proto::CommunicationMode > CommunicationModePtr
Definition: hybrid_receiver.h:59
std::shared_ptr< Participant > ParticipantPtr
Definition: participant.h:37
Definition: types.h:40
std::function< void(const MessagePtr &, const MessageInfo &, const RoleAttributes &)> MessageListener
Definition: receiver.h:36
std::unordered_map< OptionalMode, ReceiverPtr, std::hash< int > > ReceiverContainer
Definition: hybrid_receiver.h:54
void Enable() override
Definition: hybrid_receiver.h:119
void Disable() override
Definition: hybrid_receiver.h:127