17 #ifndef CYBER_TRANSPORT_TRANSMITTER_HYBRID_TRANSMITTER_H_ 18 #define CYBER_TRANSPORT_TRANSMITTER_HYBRID_TRANSMITTER_H_ 25 #include <unordered_map> 31 #include "cyber/proto/role_attributes.pb.h" 32 #include "cyber/proto/transport_conf.pb.h" 45 using apollo::cyber::proto::OptionalMode;
46 using apollo::cyber::proto::QosDurabilityPolicy;
47 using apollo::cyber::proto::RoleAttributes;
56 std::unordered_map<OptionalMode, TransmitterPtr, std::hash<int>>;
58 std::unordered_map<OptionalMode, std::set<uint64_t>, std::hash<int>>;
61 std::unordered_map<Relation, OptionalMode, std::hash<int>>;
69 void Enable(
const RoleAttributes& opposite_attr)
override;
70 void Disable(
const RoleAttributes& opposite_attr)
override;
78 void InitTransmitters();
79 void ClearTransmitters();
81 void ClearReceivers();
82 void TransmitHistoryMsg(
const RoleAttributes& opposite_attr);
83 void ThreadFunc(
const RoleAttributes& opposite_attr,
85 Relation GetRelation(
const RoleAttributes& opposite_attr);
104 participant_(participant) {
112 template <
typename M>
118 template <
typename M>
120 std::lock_guard<std::mutex> lock(mutex_);
121 for (
auto& item : transmitters_) {
122 item.second->Enable();
126 template <
typename M>
128 std::lock_guard<std::mutex> lock(mutex_);
129 for (
auto& item : transmitters_) {
130 item.second->Disable();
134 template <
typename M>
136 auto relation = GetRelation(opposite_attr);
141 uint64_t
id = opposite_attr.id();
142 std::lock_guard<std::mutex> lock(mutex_);
143 receivers_[mapping_table_[relation]].insert(
id);
144 transmitters_[mapping_table_[relation]]->Enable();
145 TransmitHistoryMsg(opposite_attr);
148 template <
typename M>
150 auto relation = GetRelation(opposite_attr);
155 uint64_t
id = opposite_attr.id();
156 std::lock_guard<std::mutex> lock(mutex_);
157 receivers_[mapping_table_[relation]].erase(
id);
158 if (receivers_[mapping_table_[relation]].empty()) {
159 transmitters_[mapping_table_[relation]]->Disable();
163 template <
typename M>
166 std::lock_guard<std::mutex> lock(mutex_);
167 history_->Add(msg, msg_info);
168 for (
auto& item : transmitters_) {
169 item.second->Transmit(msg, msg_info);
174 template <
typename M>
176 mode_ = std::make_shared<proto::CommunicationMode>();
177 mapping_table_[
SAME_PROC] = mode_->same_proc();
178 mapping_table_[
DIFF_PROC] = mode_->diff_proc();
179 mapping_table_[
DIFF_HOST] = mode_->diff_host();
182 template <
typename M>
185 if (!global_conf.has_transport_conf()) {
188 if (!global_conf.transport_conf().has_communication_mode()) {
191 mode_->CopyFrom(global_conf.transport_conf().communication_mode());
193 mapping_table_[
SAME_PROC] = mode_->same_proc();
194 mapping_table_[
DIFF_PROC] = mode_->diff_proc();
195 mapping_table_[
DIFF_HOST] = mode_->diff_host();
198 template <
typename M>
201 this->
attr_.qos_profile().depth());
202 history_ = std::make_shared<History<M>>(history_attr);
203 if (this->
attr_.qos_profile().durability() ==
204 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
209 template <
typename M>
211 std::set<OptionalMode> modes;
212 modes.insert(mode_->same_proc());
213 modes.insert(mode_->diff_proc());
214 modes.insert(mode_->diff_host());
215 for (
auto& mode : modes) {
217 case OptionalMode::INTRA:
218 transmitters_[mode] =
219 std::make_shared<IntraTransmitter<M>>(this->
attr_);
221 case OptionalMode::SHM:
222 transmitters_[mode] = std::make_shared<ShmTransmitter<M>>(this->
attr_);
225 transmitters_[mode] =
226 std::make_shared<RtpsTransmitter<M>>(this->
attr_, participant_);
232 template <
typename M>
234 for (
auto& item : transmitters_) {
237 transmitters_.clear();
240 template <
typename M>
242 std::set<uint64_t> empty;
243 for (
auto& item : transmitters_) {
244 receivers_[item.first] = empty;
248 template <
typename M>
253 template <
typename M>
255 const RoleAttributes& opposite_attr) {
257 if (this->
attr_.qos_profile().durability() !=
258 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
263 std::vector<typename History<M>::CachedMessage> unsent_msgs;
264 history_->GetCachedMessage(&unsent_msgs);
265 if (unsent_msgs.empty()) {
269 auto attr = opposite_attr;
273 template <
typename M>
275 const RoleAttributes& opposite_attr,
278 RoleAttributes new_attr;
279 new_attr.CopyFrom(this->
attr_);
280 std::string new_channel_name =
281 std::to_string(this->
attr_.id()) + std::to_string(opposite_attr.id());
283 new_attr.set_channel_name(new_channel_name);
284 new_attr.set_channel_id(channel_id);
285 auto new_transmitter =
286 std::make_shared<RtpsTransmitter<M>>(new_attr, participant_);
287 new_transmitter->Enable();
289 for (
auto& item : msgs) {
290 new_transmitter->Transmit(item.msg, item.msg_info);
293 new_transmitter->Disable();
294 ADEBUG <<
"trans threadfunc exit.";
297 template <
typename M>
299 const RoleAttributes& opposite_attr) {
300 if (opposite_attr.channel_name() != this->
attr_.channel_name()) {
303 if (opposite_attr.host_ip() != this->
attr_.host_ip()) {
306 if (opposite_attr.process_id() != this->
attr_.process_id()) {
317 #endif // CYBER_TRANSPORT_TRANSMITTER_HYBRID_TRANSMITTER_H_
std::shared_ptr< proto::CommunicationMode > CommunicationModePtr
Definition: hybrid_transmitter.h:59
virtual ~HybridTransmitter()
Definition: hybrid_transmitter.h:113
std::unordered_map< OptionalMode, TransmitterPtr, std::hash< int > > TransmitterMap
Definition: hybrid_transmitter.h:56
Definition: history_attributes.h:28
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::unordered_map< OptionalMode, std::set< uint64_t >, std::hash< int > > ReceiverMap
Definition: hybrid_transmitter.h:58
HybridTransmitter(const RoleAttributes &attr, const ParticipantPtr &participant)
Definition: hybrid_transmitter.h:99
std::shared_ptr< History< M > > HistoryPtr
Definition: hybrid_transmitter.h:53
std::shared_ptr< M > MessagePtr
Definition: hybrid_transmitter.h:52
Definition: hybrid_transmitter.h:50
void Disable() override
Definition: hybrid_transmitter.h:127
bool Transmit(const MessagePtr &msg, const MessageInfo &msg_info) override
Definition: hybrid_transmitter.h:164
RoleAttributes attr_
Definition: endpoint.h:47
#define ADEBUG
Definition: log.h:41
std::shared_ptr< Transmitter< M > > TransmitterPtr
Definition: hybrid_transmitter.h:54
static uint64_t RegisterChannel(const std::string &channel)
Relation
Describe relation between nodes, writers/readers...
Definition: types.h:36
void Enable() override
Definition: hybrid_transmitter.h:119
Definition: message_info.h:30
std::unordered_map< Relation, OptionalMode, std::hash< int > > MappingTable
Definition: hybrid_transmitter.h:61
std::shared_ptr< Participant > ParticipantPtr
Definition: participant.h:37
Definition: transmitter.h:36