17 #ifndef CYBER_TRANSPORT_RECEIVER_HYBRID_RECEIVER_H_ 18 #define CYBER_TRANSPORT_RECEIVER_HYBRID_RECEIVER_H_ 24 #include <unordered_map> 31 #include "cyber/proto/role_attributes.pb.h" 44 using apollo::cyber::proto::OptionalMode;
45 using apollo::cyber::proto::QosDurabilityPolicy;
46 using apollo::cyber::proto::RoleAttributes;
54 std::unordered_map<OptionalMode, ReceiverPtr, std::hash<int>>;
56 std::unordered_map<OptionalMode,
57 std::unordered_map<uint64_t, RoleAttributes>,
61 std::unordered_map<Relation, OptionalMode, std::hash<int>>;
71 void Enable(
const RoleAttributes& opposite_attr)
override;
72 void Disable(
const RoleAttributes& opposite_attr)
override;
79 void ClearReceivers();
80 void InitTransmitters();
81 void ClearTransmitters();
82 void ReceiveHistoryMsg(
const RoleAttributes& opposite_attr);
83 void ThreadFunc(
const RoleAttributes& opposite_attr);
84 Relation GetRelation(
const RoleAttributes& opposite_attr);
99 const RoleAttributes& attr,
104 participant_(participant) {
112 template <
typename M>
118 template <
typename M>
120 std::lock_guard<std::mutex> lock(mutex_);
121 for (
auto& item : receivers_) {
122 item.second->Enable();
126 template <
typename M>
128 std::lock_guard<std::mutex> lock(mutex_);
129 for (
auto& item : receivers_) {
130 item.second->Disable();
134 template <
typename M>
136 auto relation = GetRelation(opposite_attr);
139 uint64_t
id = opposite_attr.id();
140 std::lock_guard<std::mutex> lock(mutex_);
141 if (transmitters_[mapping_table_[relation]].count(
id) == 0) {
142 transmitters_[mapping_table_[relation]].insert(
143 std::make_pair(
id, opposite_attr));
144 receivers_[mapping_table_[relation]]->Enable(opposite_attr);
145 ReceiveHistoryMsg(opposite_attr);
149 template <
typename M>
151 auto relation = GetRelation(opposite_attr);
154 uint64_t
id = opposite_attr.id();
155 std::lock_guard<std::mutex> lock(mutex_);
156 if (transmitters_[mapping_table_[relation]].count(
id) > 0) {
157 transmitters_[mapping_table_[relation]].erase(
id);
158 receivers_[mapping_table_[relation]]->Disable(opposite_attr);
162 template <
typename M>
164 mode_ = std::make_shared<proto::CommunicationMode>();
165 mapping_table_[
SAME_PROC] = mode_->same_proc();
166 mapping_table_[
DIFF_PROC] = mode_->diff_proc();
167 mapping_table_[
DIFF_HOST] = mode_->diff_host();
170 template <
typename M>
173 if (!global_conf.has_transport_conf()) {
176 if (!global_conf.transport_conf().has_communication_mode()) {
179 mode_->CopyFrom(global_conf.transport_conf().communication_mode());
181 mapping_table_[
SAME_PROC] = mode_->same_proc();
182 mapping_table_[
DIFF_PROC] = mode_->diff_proc();
183 mapping_table_[
DIFF_HOST] = mode_->diff_host();
186 template <
typename M>
189 this->
attr_.qos_profile().depth());
190 history_ = std::make_shared<History<M>>(history_attr);
191 if (this->
attr_.qos_profile().durability() ==
192 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
197 template <
typename M>
199 std::set<OptionalMode> modes;
200 modes.insert(mode_->same_proc());
201 modes.insert(mode_->diff_proc());
202 modes.insert(mode_->diff_host());
204 std::placeholders::_1, std::placeholders::_2);
205 for (
auto& mode : modes) {
207 case OptionalMode::INTRA:
209 std::make_shared<IntraReceiver<M>>(this->
attr_, listener);
211 case OptionalMode::SHM:
213 std::make_shared<ShmReceiver<M>>(this->
attr_, listener);
217 std::make_shared<RtpsReceiver<M>>(this->
attr_, listener);
223 template <
typename M>
228 template <
typename M>
230 std::unordered_map<uint64_t, RoleAttributes> empty;
231 for (
auto& item : receivers_) {
232 transmitters_[item.first] = empty;
236 template <
typename M>
238 for (
auto& item : transmitters_) {
239 for (
auto& upper_reach : item.second) {
240 receivers_[item.first]->Disable(upper_reach.second);
243 transmitters_.clear();
246 template <
typename M>
249 if (opposite_attr.qos_profile().durability() !=
250 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
254 auto attr = opposite_attr;
258 template <
typename M>
260 std::string channel_name =
261 std::to_string(opposite_attr.id()) + std::to_string(this->
attr_.id());
264 RoleAttributes attr(this->
attr_);
265 attr.set_channel_name(channel_name);
266 attr.set_channel_id(channel_id);
267 attr.mutable_qos_profile()->CopyFrom(opposite_attr.qos_profile());
269 volatile bool is_msg_arrived =
false;
270 auto listener = [&](
const std::shared_ptr<M>& msg,
271 const MessageInfo& msg_info,
const RoleAttributes& attr) {
272 is_msg_arrived =
true;
276 auto receiver = std::make_shared<RtpsReceiver<M>>(attr, listener);
280 if (is_msg_arrived) {
281 is_msg_arrived =
false;
283 cyber::USleep(1000000);
284 }
while (is_msg_arrived);
287 ADEBUG <<
"recv threadfunc exit.";
290 template <
typename M>
292 if (opposite_attr.channel_name() != this->
attr_.channel_name()) {
296 if (opposite_attr.host_ip() != this->
attr_.host_ip()) {
300 if (opposite_attr.process_id() != this->
attr_.process_id()) {
311 #endif // CYBER_TRANSPORT_RECEIVER_HYBRID_RECEIVER_H_
virtual ~HybridReceiver()
Definition: hybrid_receiver.h:113
Definition: receiver.h:32
Definition: history_attributes.h:28
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: hybrid_receiver.h:49
#define RETURN_IF(condition)
Definition: log.h:106
std::unordered_map< OptionalMode, std::unordered_map< uint64_t, RoleAttributes >, std::hash< int > > TransmitterContainer
Definition: hybrid_receiver.h:58
RoleAttributes attr_
Definition: endpoint.h:47
#define ADEBUG
Definition: log.h:41
static uint64_t RegisterChannel(const std::string &channel)
Relation
Describe relation between nodes, writers/readers...
Definition: types.h:36
std::shared_ptr< Receiver< M > > ReceiverPtr
Definition: hybrid_receiver.h:52
std::shared_ptr< History< M > > HistoryPtr
Definition: hybrid_receiver.h:51
Definition: message_info.h:30
void OnNewMessage(const MessagePtr &msg, const MessageInfo &msg_info)
Definition: receiver.h:61
HybridReceiver(const RoleAttributes &attr, const typename Receiver< M >::MessageListener &msg_listener, const ParticipantPtr &participant)
Definition: hybrid_receiver.h:98
std::unordered_map< Relation, OptionalMode, std::hash< int > > MappingTable
Definition: hybrid_receiver.h:61
std::shared_ptr< proto::CommunicationMode > CommunicationModePtr
Definition: hybrid_receiver.h:59
std::shared_ptr< Participant > ParticipantPtr
Definition: participant.h:37
std::function< void(const MessagePtr &, const MessageInfo &, const RoleAttributes &)> MessageListener
Definition: receiver.h:36
std::unordered_map< OptionalMode, ReceiverPtr, std::hash< int > > ReceiverContainer
Definition: hybrid_receiver.h:54
void Enable() override
Definition: hybrid_receiver.h:119
void Disable() override
Definition: hybrid_receiver.h:127