Apollo  6.0
Open source self driving car software
rtps_dispatcher.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_TRANSPORT_DISPATCHER_RTPS_DISPATCHER_H_
18 #define CYBER_TRANSPORT_DISPATCHER_RTPS_DISPATCHER_H_
19 
20 #include <iostream>
21 #include <memory>
22 #include <mutex>
23 #include <string>
24 #include <unordered_map>
25 
26 #include "cyber/common/log.h"
27 #include "cyber/common/macros.h"
33 
34 namespace apollo {
35 namespace cyber {
36 namespace transport {
37 
38 struct Subscriber {
39  Subscriber() : sub(nullptr), sub_listener(nullptr) {}
40 
41  eprosima::fastrtps::Subscriber* sub;
43 };
44 
45 class RtpsDispatcher;
47 
48 class RtpsDispatcher : public Dispatcher {
49  public:
50  virtual ~RtpsDispatcher();
51 
52  void Shutdown() override;
53 
54  template <typename MessageT>
55  void AddListener(const RoleAttributes& self_attr,
56  const MessageListener<MessageT>& listener);
57 
58  template <typename MessageT>
59  void AddListener(const RoleAttributes& self_attr,
60  const RoleAttributes& opposite_attr,
61  const MessageListener<MessageT>& listener);
62 
63  void set_participant(const ParticipantPtr& participant) {
64  participant_ = participant;
65  }
66 
67  private:
68  void OnMessage(uint64_t channel_id,
69  const std::shared_ptr<std::string>& msg_str,
70  const MessageInfo& msg_info);
71  void AddSubscriber(const RoleAttributes& self_attr);
72  // key: channel_id
73  std::unordered_map<uint64_t, Subscriber> subs_;
74  std::mutex subs_mutex_;
75 
76  ParticipantPtr participant_;
77 
79 };
80 
81 template <typename MessageT>
82 void RtpsDispatcher::AddListener(const RoleAttributes& self_attr,
83  const MessageListener<MessageT>& listener) {
84  auto listener_adapter = [listener](
85  const std::shared_ptr<std::string>& msg_str,
86  const MessageInfo& msg_info) {
87  auto msg = std::make_shared<MessageT>();
88  RETURN_IF(!message::ParseFromString(*msg_str, msg.get()));
89  listener(msg, msg_info);
90  };
91 
92  Dispatcher::AddListener<std::string>(self_attr, listener_adapter);
93  AddSubscriber(self_attr);
94 }
95 
96 template <typename MessageT>
97 void RtpsDispatcher::AddListener(const RoleAttributes& self_attr,
98  const RoleAttributes& opposite_attr,
99  const MessageListener<MessageT>& listener) {
100  auto listener_adapter = [listener](
101  const std::shared_ptr<std::string>& msg_str,
102  const MessageInfo& msg_info) {
103  auto msg = std::make_shared<MessageT>();
104  RETURN_IF(!message::ParseFromString(*msg_str, msg.get()));
105  listener(msg, msg_info);
106  };
107 
108  Dispatcher::AddListener<std::string>(self_attr, opposite_attr,
109  listener_adapter);
110  AddSubscriber(self_attr);
111 }
112 
113 } // namespace transport
114 } // namespace cyber
115 } // namespace apollo
116 
117 #endif // CYBER_TRANSPORT_DISPATCHER_RTPS_DISPATCHER_H_
SubListenerPtr sub_listener
Definition: rtps_dispatcher.h:42
Subscriber()
Definition: rtps_dispatcher.h:39
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: rtps_dispatcher.h:48
#define RETURN_IF(condition)
Definition: log.h:106
void AddListener(const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
Definition: rtps_dispatcher.h:82
void set_participant(const ParticipantPtr &participant)
Definition: rtps_dispatcher.h:63
Definition: dispatcher.h:54
#define DECLARE_SINGLETON(classname)
Definition: macros.h:52
std::shared_ptr< SubListener > SubListenerPtr
Definition: sub_listener.h:39
std::enable_if< HasParseFromString< T >::value, bool >::type ParseFromString(const std::string &str, T *message)
Definition: message_traits.h:158
Definition: message_info.h:30
std::function< void(const std::shared_ptr< MessageT > &, const MessageInfo &)> MessageListener
Definition: dispatcher.h:52
eprosima::fastrtps::Subscriber * sub
Definition: rtps_dispatcher.h:41
std::shared_ptr< Participant > ParticipantPtr
Definition: participant.h:37
Definition: rtps_dispatcher.h:38