Apollo  6.0
Open source self driving car software
dispatcher.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_
18 #define CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_
19 
20 #include <atomic>
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <mutex>
25 #include <string>
26 #include <unordered_map>
27 
31 #include "cyber/common/log.h"
32 #include "cyber/proto/role_attributes.pb.h"
35 
36 namespace apollo {
37 namespace cyber {
38 namespace transport {
39 
45 using cyber::proto::RoleAttributes;
46 
47 class Dispatcher;
48 using DispatcherPtr = std::shared_ptr<Dispatcher>;
49 
50 template <typename MessageT>
51 using MessageListener =
52  std::function<void(const std::shared_ptr<MessageT>&, const MessageInfo&)>;
53 
54 class Dispatcher {
55  public:
56  Dispatcher();
57  virtual ~Dispatcher();
58 
59  virtual void Shutdown();
60 
61  template <typename MessageT>
62  void AddListener(const RoleAttributes& self_attr,
63  const MessageListener<MessageT>& listener);
64 
65  template <typename MessageT>
66  void AddListener(const RoleAttributes& self_attr,
67  const RoleAttributes& opposite_attr,
68  const MessageListener<MessageT>& listener);
69 
70  template <typename MessageT>
71  void RemoveListener(const RoleAttributes& self_attr);
72 
73  template <typename MessageT>
74  void RemoveListener(const RoleAttributes& self_attr,
75  const RoleAttributes& opposite_attr);
76 
77  bool HasChannel(uint64_t channel_id);
78 
79  protected:
80  std::atomic<bool> is_shutdown_;
81  // key: channel_id of message
84 };
85 
86 template <typename MessageT>
87 void Dispatcher::AddListener(const RoleAttributes& self_attr,
88  const MessageListener<MessageT>& listener) {
89  if (is_shutdown_.load()) {
90  return;
91  }
92  uint64_t channel_id = self_attr.channel_id();
93 
94  std::shared_ptr<ListenerHandler<MessageT>> handler;
95  ListenerHandlerBasePtr* handler_base = nullptr;
96  if (msg_listeners_.Get(channel_id, &handler_base)) {
97  handler =
98  std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
99  if (handler == nullptr) {
100  AERROR << "please ensure that readers with the same channel["
101  << self_attr.channel_name()
102  << "] in the same process have the same message type";
103  return;
104  }
105  } else {
106  ADEBUG << "new reader for channel:"
107  << GlobalData::GetChannelById(channel_id);
108  handler.reset(new ListenerHandler<MessageT>());
109  msg_listeners_.Set(channel_id, handler);
110  }
111  handler->Connect(self_attr.id(), listener);
112 }
113 
114 template <typename MessageT>
115 void Dispatcher::AddListener(const RoleAttributes& self_attr,
116  const RoleAttributes& opposite_attr,
117  const MessageListener<MessageT>& listener) {
118  if (is_shutdown_.load()) {
119  return;
120  }
121  uint64_t channel_id = self_attr.channel_id();
122 
123  std::shared_ptr<ListenerHandler<MessageT>> handler;
124  ListenerHandlerBasePtr* handler_base = nullptr;
125  if (msg_listeners_.Get(channel_id, &handler_base)) {
126  handler =
127  std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
128  if (handler == nullptr) {
129  AERROR << "please ensure that readers with the same channel["
130  << self_attr.channel_name()
131  << "] in the same process have the same message type";
132  return;
133  }
134  } else {
135  ADEBUG << "new reader for channel:"
136  << GlobalData::GetChannelById(channel_id);
137  handler.reset(new ListenerHandler<MessageT>());
138  msg_listeners_.Set(channel_id, handler);
139  }
140  handler->Connect(self_attr.id(), opposite_attr.id(), listener);
141 }
142 
143 template <typename MessageT>
144 void Dispatcher::RemoveListener(const RoleAttributes& self_attr) {
145  if (is_shutdown_.load()) {
146  return;
147  }
148  uint64_t channel_id = self_attr.channel_id();
149 
150  ListenerHandlerBasePtr* handler_base = nullptr;
151  if (msg_listeners_.Get(channel_id, &handler_base)) {
152  (*handler_base)->Disconnect(self_attr.id());
153  }
154 }
155 
156 template <typename MessageT>
157 void Dispatcher::RemoveListener(const RoleAttributes& self_attr,
158  const RoleAttributes& opposite_attr) {
159  if (is_shutdown_.load()) {
160  return;
161  }
162  uint64_t channel_id = self_attr.channel_id();
163 
164  ListenerHandlerBasePtr* handler_base = nullptr;
165  if (msg_listeners_.Get(channel_id, &handler_base)) {
166  (*handler_base)->Disconnect(self_attr.id(), opposite_attr.id());
167  }
168 }
169 
170 } // namespace transport
171 } // namespace cyber
172 } // namespace apollo
173 
174 #endif // CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
Definition: listener_handler.h:41
void RemoveListener(const RoleAttributes &self_attr)
Definition: dispatcher.h:144
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: rw_lock_guard.h:48
AtomicHashMap< uint64_t, ListenerHandlerBasePtr > msg_listeners_
Definition: dispatcher.h:82
Definition: atomic_rw_lock.h:36
bool Get(K key, V **value)
Definition: atomic_hash_map.h:51
static std::string GetChannelById(uint64_t id)
Definition: dispatcher.h:54
#define ADEBUG
Definition: log.h:41
A implementation of lock-free fixed size hash map.
Definition: atomic_hash_map.h:40
Definition: rw_lock_guard.h:35
void AddListener(const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
Definition: dispatcher.h:87
void Set(K key)
Definition: atomic_hash_map.h:66
Definition: listener_handler.h:59
Definition: message_info.h:30
std::function< void(const std::shared_ptr< MessageT > &, const MessageInfo &)> MessageListener
Definition: dispatcher.h:52
Definition: global_data.h:40
#define AERROR
Definition: log.h:44
base::AtomicRWLock rw_lock_
Definition: dispatcher.h:83
std::atomic< bool > is_shutdown_
Definition: dispatcher.h:80
bool HasChannel(uint64_t channel_id)
std::shared_ptr< Dispatcher > DispatcherPtr
Definition: dispatcher.h:48