Apollo  6.0
Open source self driving car software
reader_base.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_NODE_READER_BASE_H_
18 #define CYBER_NODE_READER_BASE_H_
19 
20 #include <atomic>
21 #include <memory>
22 #include <string>
23 #include <unordered_map>
24 #include <vector>
25 
26 #include "cyber/common/macros.h"
27 #include "cyber/common/util.h"
30 
31 namespace apollo {
32 namespace cyber {
33 
37 
46 class ReaderBase {
47  public:
48  explicit ReaderBase(const proto::RoleAttributes& role_attr)
49  : role_attr_(role_attr), init_(false) {}
50  virtual ~ReaderBase() {}
51 
58  virtual bool Init() = 0;
59 
63  virtual void Shutdown() = 0;
64 
68  virtual void ClearData() = 0;
69 
73  virtual void Observe() = 0;
74 
81  virtual bool Empty() const = 0;
82 
89  virtual bool HasReceived() const = 0;
90 
96  virtual double GetDelaySec() const = 0;
97 
103  virtual uint32_t PendingQueueSize() const = 0;
104 
111  virtual bool HasWriter() { return false; }
112 
118  virtual void GetWriters(std::vector<proto::RoleAttributes>* writers) {}
119 
125  const std::string& GetChannelName() const {
126  return role_attr_.channel_name();
127  }
128 
134  uint64_t ChannelId() const { return role_attr_.channel_id(); }
135 
141  const proto::QosProfile& QosProfile() const {
142  return role_attr_.qos_profile();
143  }
144 
151  bool IsInit() const { return init_.load(); }
152 
153  protected:
154  proto::RoleAttributes role_attr_;
155  std::atomic<bool> init_;
156 };
157 
167 template <typename MessageT>
169  public:
170  ~ReceiverManager() { receiver_map_.clear(); }
171 
178  auto GetReceiver(const proto::RoleAttributes& role_attr) ->
179  typename std::shared_ptr<transport::Receiver<MessageT>>;
180 
181  private:
182  std::unordered_map<std::string,
183  typename std::shared_ptr<transport::Receiver<MessageT>>>
184  receiver_map_;
185  std::mutex receiver_map_mutex_;
186 
188 };
189 
195 template <typename MessageT>
197 
198 template <typename MessageT>
200  const proto::RoleAttributes& role_attr) ->
201  typename std::shared_ptr<transport::Receiver<MessageT>> {
202  std::lock_guard<std::mutex> lock(receiver_map_mutex_);
203  // because multi reader for one channel will write datacache multi times,
204  // so reader for datacache we use map to keep one instance for per channel
205  const std::string& channel_name = role_attr.channel_name();
206  if (receiver_map_.count(channel_name) == 0) {
207  receiver_map_[channel_name] =
208  transport::Transport::Instance()->CreateReceiver<MessageT>(
209  role_attr, [](const std::shared_ptr<MessageT>& msg,
210  const transport::MessageInfo& msg_info,
211  const proto::RoleAttributes& reader_attr) {
212  (void)msg_info;
213  (void)reader_attr;
214  PerfEventCache::Instance()->AddTransportEvent(
215  TransPerf::DISPATCH, reader_attr.channel_id(),
216  msg_info.seq_num());
218  reader_attr.channel_id(), msg);
219  PerfEventCache::Instance()->AddTransportEvent(
220  TransPerf::NOTIFY, reader_attr.channel_id(),
221  msg_info.seq_num());
222  });
223  }
224  return receiver_map_[channel_name];
225 }
226 
227 } // namespace cyber
228 } // namespace apollo
229 
230 #endif // CYBER_NODE_READER_BASE_H_
One Channel is related to one Receiver. ReceiverManager is in charge of attaching one Receiver to its...
Definition: reader_base.h:168
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
auto GetReceiver(const proto::RoleAttributes &role_attr) -> typename std::shared_ptr< transport::Receiver< MessageT >>
Get the Receiver object.
Definition: reader_base.h:199
std::atomic< bool > init_
Definition: reader_base.h:155
uint64_t ChannelId() const
Get Reader&#39;s Channel id.
Definition: reader_base.h:134
virtual uint32_t PendingQueueSize() const =0
Get the value of pending queue size.
const proto::QosProfile & QosProfile() const
Get qos profile. You can see qos description.
Definition: reader_base.h:141
#define DECLARE_SINGLETON(classname)
Definition: macros.h:52
const std::string & GetChannelName() const
Get Reader&#39;s Channel name.
Definition: reader_base.h:125
Definition: message_info.h:30
virtual bool HasWriter()
Query is there any writer that publish the subscribed channel.
Definition: reader_base.h:111
TransPerf
Definition: perf_event.h:34
virtual ~ReaderBase()
Definition: reader_base.h:50
Definition: global_data.h:40
Definition: perf_event_cache.h:36
~ReceiverManager()
Definition: reader_base.h:170
bool IsInit() const
Query whether the Reader is initialized.
Definition: reader_base.h:151
virtual void GetWriters(std::vector< proto::RoleAttributes > *writers)
Get all writers pushlish the channel we subscribes.
Definition: reader_base.h:118
Base Class for Reader Reader is identified by one apollo::cyber::proto::RoleAttribute, it contains the channel_name, channel_id that we subscribe, and host_name, process_id and node that we are located, and qos that describes our transportation quality.
Definition: reader_base.h:46
virtual void ClearData()=0
Clear local data.
virtual void Observe()=0
Get stored data.
virtual double GetDelaySec() const =0
Get time interval of since last receive message.
ReaderBase(const proto::RoleAttributes &role_attr)
Definition: reader_base.h:48
virtual bool Empty() const =0
Query whether the Reader has data to be handled.
virtual bool Init()=0
Init the Reader object.
proto::RoleAttributes role_attr_
Definition: reader_base.h:154
virtual bool HasReceived() const =0
Query whether we have received data since last clear.
virtual void Shutdown()=0
Shutdown the Reader object.