Apollo  6.0
Open source self driving car software
hybrid_transmitter.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_TRANSPORT_TRANSMITTER_HYBRID_TRANSMITTER_H_
18 #define CYBER_TRANSPORT_TRANSMITTER_HYBRID_TRANSMITTER_H_
19 
20 #include <chrono>
21 #include <map>
22 #include <memory>
23 #include <set>
24 #include <string>
25 #include <unordered_map>
26 #include <vector>
27 
29 #include "cyber/common/log.h"
30 #include "cyber/common/types.h"
31 #include "cyber/proto/role_attributes.pb.h"
32 #include "cyber/proto/transport_conf.pb.h"
33 #include "cyber/task/task.h"
40 
41 namespace apollo {
42 namespace cyber {
43 namespace transport {
44 
45 using apollo::cyber::proto::OptionalMode;
46 using apollo::cyber::proto::QosDurabilityPolicy;
47 using apollo::cyber::proto::RoleAttributes;
48 
49 template <typename M>
50 class HybridTransmitter : public Transmitter<M> {
51  public:
52  using MessagePtr = std::shared_ptr<M>;
53  using HistoryPtr = std::shared_ptr<History<M>>;
54  using TransmitterPtr = std::shared_ptr<Transmitter<M>>;
55  using TransmitterMap =
56  std::unordered_map<OptionalMode, TransmitterPtr, std::hash<int>>;
57  using ReceiverMap =
58  std::unordered_map<OptionalMode, std::set<uint64_t>, std::hash<int>>;
59  using CommunicationModePtr = std::shared_ptr<proto::CommunicationMode>;
60  using MappingTable =
61  std::unordered_map<Relation, OptionalMode, std::hash<int>>;
62 
63  HybridTransmitter(const RoleAttributes& attr,
64  const ParticipantPtr& participant);
65  virtual ~HybridTransmitter();
66 
67  void Enable() override;
68  void Disable() override;
69  void Enable(const RoleAttributes& opposite_attr) override;
70  void Disable(const RoleAttributes& opposite_attr) override;
71 
72  bool Transmit(const MessagePtr& msg, const MessageInfo& msg_info) override;
73 
74  private:
75  void InitMode();
76  void ObtainConfig();
77  void InitHistory();
78  void InitTransmitters();
79  void ClearTransmitters();
80  void InitReceivers();
81  void ClearReceivers();
82  void TransmitHistoryMsg(const RoleAttributes& opposite_attr);
83  void ThreadFunc(const RoleAttributes& opposite_attr,
84  const std::vector<typename History<M>::CachedMessage>& msgs);
85  Relation GetRelation(const RoleAttributes& opposite_attr);
86 
87  HistoryPtr history_;
88  TransmitterMap transmitters_;
89  ReceiverMap receivers_;
90  std::mutex mutex_;
91 
93  MappingTable mapping_table_;
94 
95  ParticipantPtr participant_;
96 };
97 
98 template <typename M>
99 HybridTransmitter<M>::HybridTransmitter(const RoleAttributes& attr,
100  const ParticipantPtr& participant)
101  : Transmitter<M>(attr),
102  history_(nullptr),
103  mode_(nullptr),
104  participant_(participant) {
105  InitMode();
106  ObtainConfig();
107  InitHistory();
108  InitTransmitters();
109  InitReceivers();
110 }
111 
112 template <typename M>
114  ClearReceivers();
115  ClearTransmitters();
116 }
117 
118 template <typename M>
120  std::lock_guard<std::mutex> lock(mutex_);
121  for (auto& item : transmitters_) {
122  item.second->Enable();
123  }
124 }
125 
126 template <typename M>
128  std::lock_guard<std::mutex> lock(mutex_);
129  for (auto& item : transmitters_) {
130  item.second->Disable();
131  }
132 }
133 
134 template <typename M>
135 void HybridTransmitter<M>::Enable(const RoleAttributes& opposite_attr) {
136  auto relation = GetRelation(opposite_attr);
137  if (relation == NO_RELATION) {
138  return;
139  }
140 
141  uint64_t id = opposite_attr.id();
142  std::lock_guard<std::mutex> lock(mutex_);
143  receivers_[mapping_table_[relation]].insert(id);
144  transmitters_[mapping_table_[relation]]->Enable();
145  TransmitHistoryMsg(opposite_attr);
146 }
147 
148 template <typename M>
149 void HybridTransmitter<M>::Disable(const RoleAttributes& opposite_attr) {
150  auto relation = GetRelation(opposite_attr);
151  if (relation == NO_RELATION) {
152  return;
153  }
154 
155  uint64_t id = opposite_attr.id();
156  std::lock_guard<std::mutex> lock(mutex_);
157  receivers_[mapping_table_[relation]].erase(id);
158  if (receivers_[mapping_table_[relation]].empty()) {
159  transmitters_[mapping_table_[relation]]->Disable();
160  }
161 }
162 
163 template <typename M>
165  const MessageInfo& msg_info) {
166  std::lock_guard<std::mutex> lock(mutex_);
167  history_->Add(msg, msg_info);
168  for (auto& item : transmitters_) {
169  item.second->Transmit(msg, msg_info);
170  }
171  return true;
172 }
173 
174 template <typename M>
176  mode_ = std::make_shared<proto::CommunicationMode>();
177  mapping_table_[SAME_PROC] = mode_->same_proc();
178  mapping_table_[DIFF_PROC] = mode_->diff_proc();
179  mapping_table_[DIFF_HOST] = mode_->diff_host();
180 }
181 
182 template <typename M>
184  auto& global_conf = common::GlobalData::Instance()->Config();
185  if (!global_conf.has_transport_conf()) {
186  return;
187  }
188  if (!global_conf.transport_conf().has_communication_mode()) {
189  return;
190  }
191  mode_->CopyFrom(global_conf.transport_conf().communication_mode());
192 
193  mapping_table_[SAME_PROC] = mode_->same_proc();
194  mapping_table_[DIFF_PROC] = mode_->diff_proc();
195  mapping_table_[DIFF_HOST] = mode_->diff_host();
196 }
197 
198 template <typename M>
200  HistoryAttributes history_attr(this->attr_.qos_profile().history(),
201  this->attr_.qos_profile().depth());
202  history_ = std::make_shared<History<M>>(history_attr);
203  if (this->attr_.qos_profile().durability() ==
204  QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
205  history_->Enable();
206  }
207 }
208 
209 template <typename M>
211  std::set<OptionalMode> modes;
212  modes.insert(mode_->same_proc());
213  modes.insert(mode_->diff_proc());
214  modes.insert(mode_->diff_host());
215  for (auto& mode : modes) {
216  switch (mode) {
217  case OptionalMode::INTRA:
218  transmitters_[mode] =
219  std::make_shared<IntraTransmitter<M>>(this->attr_);
220  break;
221  case OptionalMode::SHM:
222  transmitters_[mode] = std::make_shared<ShmTransmitter<M>>(this->attr_);
223  break;
224  default:
225  transmitters_[mode] =
226  std::make_shared<RtpsTransmitter<M>>(this->attr_, participant_);
227  break;
228  }
229  }
230 }
231 
232 template <typename M>
234  for (auto& item : transmitters_) {
235  item.second->Disable();
236  }
237  transmitters_.clear();
238 }
239 
240 template <typename M>
242  std::set<uint64_t> empty;
243  for (auto& item : transmitters_) {
244  receivers_[item.first] = empty;
245  }
246 }
247 
248 template <typename M>
250  receivers_.clear();
251 }
252 
253 template <typename M>
255  const RoleAttributes& opposite_attr) {
256  // check qos
257  if (this->attr_.qos_profile().durability() !=
258  QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
259  return;
260  }
261 
262  // get unsent messages
263  std::vector<typename History<M>::CachedMessage> unsent_msgs;
264  history_->GetCachedMessage(&unsent_msgs);
265  if (unsent_msgs.empty()) {
266  return;
267  }
268 
269  auto attr = opposite_attr;
270  cyber::Async(&HybridTransmitter<M>::ThreadFunc, this, attr, unsent_msgs);
271 }
272 
273 template <typename M>
275  const RoleAttributes& opposite_attr,
276  const std::vector<typename History<M>::CachedMessage>& msgs) {
277  // create transmitter to transmit msgs
278  RoleAttributes new_attr;
279  new_attr.CopyFrom(this->attr_);
280  std::string new_channel_name =
281  std::to_string(this->attr_.id()) + std::to_string(opposite_attr.id());
282  uint64_t channel_id = common::GlobalData::RegisterChannel(new_channel_name);
283  new_attr.set_channel_name(new_channel_name);
284  new_attr.set_channel_id(channel_id);
285  auto new_transmitter =
286  std::make_shared<RtpsTransmitter<M>>(new_attr, participant_);
287  new_transmitter->Enable();
288 
289  for (auto& item : msgs) {
290  new_transmitter->Transmit(item.msg, item.msg_info);
291  cyber::USleep(1000);
292  }
293  new_transmitter->Disable();
294  ADEBUG << "trans threadfunc exit.";
295 }
296 
297 template <typename M>
299  const RoleAttributes& opposite_attr) {
300  if (opposite_attr.channel_name() != this->attr_.channel_name()) {
301  return NO_RELATION;
302  }
303  if (opposite_attr.host_ip() != this->attr_.host_ip()) {
304  return DIFF_HOST;
305  }
306  if (opposite_attr.process_id() != this->attr_.process_id()) {
307  return DIFF_PROC;
308  }
309 
310  return SAME_PROC;
311 }
312 
313 } // namespace transport
314 } // namespace cyber
315 } // namespace apollo
316 
317 #endif // CYBER_TRANSPORT_TRANSMITTER_HYBRID_TRANSMITTER_H_
Definition: types.h:38
std::shared_ptr< proto::CommunicationMode > CommunicationModePtr
Definition: hybrid_transmitter.h:59
virtual ~HybridTransmitter()
Definition: hybrid_transmitter.h:113
std::unordered_map< OptionalMode, TransmitterPtr, std::hash< int > > TransmitterMap
Definition: hybrid_transmitter.h:56
Definition: history_attributes.h:28
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::unordered_map< OptionalMode, std::set< uint64_t >, std::hash< int > > ReceiverMap
Definition: hybrid_transmitter.h:58
HybridTransmitter(const RoleAttributes &attr, const ParticipantPtr &participant)
Definition: hybrid_transmitter.h:99
std::shared_ptr< History< M > > HistoryPtr
Definition: hybrid_transmitter.h:53
std::shared_ptr< M > MessagePtr
Definition: hybrid_transmitter.h:52
Definition: hybrid_transmitter.h:50
void Disable() override
Definition: hybrid_transmitter.h:127
bool Transmit(const MessagePtr &msg, const MessageInfo &msg_info) override
Definition: hybrid_transmitter.h:164
RoleAttributes attr_
Definition: endpoint.h:47
#define ADEBUG
Definition: log.h:41
std::shared_ptr< Transmitter< M > > TransmitterPtr
Definition: hybrid_transmitter.h:54
static uint64_t RegisterChannel(const std::string &channel)
Relation
Describe relation between nodes, writers/readers...
Definition: types.h:36
Definition: types.h:37
void Enable() override
Definition: hybrid_transmitter.h:119
Definition: message_info.h:30
Definition: types.h:39
std::unordered_map< Relation, OptionalMode, std::hash< int > > MappingTable
Definition: hybrid_transmitter.h:61
std::shared_ptr< Participant > ParticipantPtr
Definition: participant.h:37
Definition: types.h:40
Definition: transmitter.h:36