Apollo  6.0
Open source self driving car software
reader.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_NODE_READER_H_
18 #define CYBER_NODE_READER_H_
19 
20 #include <algorithm>
21 #include <list>
22 #include <memory>
23 #include <mutex>
24 #include <string>
25 #include <unordered_map>
26 #include <utility>
27 #include <vector>
28 
29 #include "cyber/proto/topology_change.pb.h"
30 
31 #include "cyber/blocker/blocker.h"
35 #include "cyber/node/reader_base.h"
38 #include "cyber/time/time.h"
40 
41 namespace apollo {
42 namespace cyber {
43 
44 template <typename M0>
45 using CallbackFunc = std::function<void(const std::shared_ptr<M0>&)>;
46 
47 using proto::RoleType;
48 
49 const uint32_t DEFAULT_PENDING_QUEUE_SIZE = 1;
50 
67 template <typename MessageT>
68 class Reader : public ReaderBase {
69  public:
70  using BlockerPtr = std::unique_ptr<blocker::Blocker<MessageT>>;
71  using ReceiverPtr = std::shared_ptr<transport::Receiver<MessageT>>;
72  using ChangeConnection =
74  using Iterator =
75  typename std::list<std::shared_ptr<MessageT>>::const_iterator;
76 
86  explicit Reader(const proto::RoleAttributes& role_attr,
87  const CallbackFunc<MessageT>& reader_func = nullptr,
88  uint32_t pending_queue_size = DEFAULT_PENDING_QUEUE_SIZE);
89  virtual ~Reader();
90 
97  bool Init() override;
98 
102  void Shutdown() override;
103 
107  void Observe() override;
108 
112  void ClearData() override;
113 
120  bool HasReceived() const override;
121 
128  bool Empty() const override;
129 
135  double GetDelaySec() const override;
136 
142  uint32_t PendingQueueSize() const override;
143 
149  virtual void Enqueue(const std::shared_ptr<MessageT>& msg);
150 
156  virtual void SetHistoryDepth(const uint32_t& depth);
157 
163  virtual uint32_t GetHistoryDepth() const;
164 
170  virtual std::shared_ptr<MessageT> GetLatestObserved() const;
171 
177  virtual std::shared_ptr<MessageT> GetOldestObserved() const;
178 
184  virtual Iterator Begin() const { return blocker_->ObservedBegin(); }
185 
191  virtual Iterator End() const { return blocker_->ObservedEnd(); }
192 
200  bool HasWriter() override;
201 
207  void GetWriters(std::vector<proto::RoleAttributes>* writers) override;
208 
209  protected:
210  double latest_recv_time_sec_ = -1.0;
213 
214  private:
215  void JoinTheTopology();
216  void LeaveTheTopology();
217  void OnChannelChange(const proto::ChangeMsg& change_msg);
218 
219  CallbackFunc<MessageT> reader_func_;
220  ReceiverPtr receiver_ = nullptr;
221  std::string croutine_name_;
222 
223  BlockerPtr blocker_ = nullptr;
224 
225  ChangeConnection change_conn_;
226  service_discovery::ChannelManagerPtr channel_manager_ = nullptr;
227 };
228 
229 template <typename MessageT>
230 Reader<MessageT>::Reader(const proto::RoleAttributes& role_attr,
231  const CallbackFunc<MessageT>& reader_func,
232  uint32_t pending_queue_size)
233  : ReaderBase(role_attr),
234  pending_queue_size_(pending_queue_size),
235  reader_func_(reader_func) {
237  role_attr.qos_profile().depth(), role_attr.channel_name())));
238 }
239 
240 template <typename MessageT>
242  Shutdown();
243 }
244 
245 template <typename MessageT>
246 void Reader<MessageT>::Enqueue(const std::shared_ptr<MessageT>& msg) {
249  blocker_->Publish(msg);
250 }
251 
252 template <typename MessageT>
254  blocker_->Observe();
255 }
256 
257 template <typename MessageT>
259  if (init_.exchange(true)) {
260  return true;
261  }
262  std::function<void(const std::shared_ptr<MessageT>&)> func;
263  if (reader_func_ != nullptr) {
264  func = [this](const std::shared_ptr<MessageT>& msg) {
265  this->Enqueue(msg);
266  this->reader_func_(msg);
267  };
268  } else {
269  func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); };
270  }
271  auto sched = scheduler::Instance();
272  croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();
273  auto dv = std::make_shared<data::DataVisitor<MessageT>>(
274  role_attr_.channel_id(), pending_queue_size_);
275  // Using factory to wrap templates.
276  croutine::RoutineFactory factory =
277  croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);
278  if (!sched->CreateTask(factory, croutine_name_)) {
279  AERROR << "Create Task Failed!";
280  init_.store(false);
281  return false;
282  }
283 
284  receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);
285  this->role_attr_.set_id(receiver_->id().HashValue());
286  channel_manager_ =
288  JoinTheTopology();
289 
290  return true;
291 }
292 
293 template <typename MessageT>
295  if (!init_.exchange(false)) {
296  return;
297  }
298  LeaveTheTopology();
299  receiver_ = nullptr;
300  channel_manager_ = nullptr;
301 
302  if (!croutine_name_.empty()) {
303  scheduler::Instance()->RemoveTask(croutine_name_);
304  }
305 }
306 
307 template <typename MessageT>
309  // add listener
310  change_conn_ = channel_manager_->AddChangeListener(std::bind(
311  &Reader<MessageT>::OnChannelChange, this, std::placeholders::_1));
312 
313  // get peer writers
314  const std::string& channel_name = this->role_attr_.channel_name();
315  std::vector<proto::RoleAttributes> writers;
316  channel_manager_->GetWritersOfChannel(channel_name, &writers);
317  for (auto& writer : writers) {
318  receiver_->Enable(writer);
319  }
320  channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_READER,
322 }
323 
324 template <typename MessageT>
326  channel_manager_->RemoveChangeListener(change_conn_);
327  channel_manager_->Leave(this->role_attr_, proto::RoleType::ROLE_READER);
328 }
329 
330 template <typename MessageT>
331 void Reader<MessageT>::OnChannelChange(const proto::ChangeMsg& change_msg) {
332  if (change_msg.role_type() != proto::RoleType::ROLE_WRITER) {
333  return;
334  }
335 
336  auto& writer_attr = change_msg.role_attr();
337  if (writer_attr.channel_name() != this->role_attr_.channel_name()) {
338  return;
339  }
340 
341  auto operate_type = change_msg.operate_type();
342  if (operate_type == proto::OperateType::OPT_JOIN) {
343  receiver_->Enable(writer_attr);
344  } else {
345  receiver_->Disable(writer_attr);
346  }
347 }
348 
349 template <typename MessageT>
351  return !blocker_->IsPublishedEmpty();
352 }
353 
354 template <typename MessageT>
356  return blocker_->IsObservedEmpty();
357 }
358 
359 template <typename MessageT>
361  if (latest_recv_time_sec_ < 0) {
362  return -1.0;
363  }
366  }
367  return std::max((Time::Now().ToSecond() - latest_recv_time_sec_),
369 }
370 
371 template <typename MessageT>
373  return pending_queue_size_;
374 }
375 
376 template <typename MessageT>
377 std::shared_ptr<MessageT> Reader<MessageT>::GetLatestObserved() const {
378  return blocker_->GetLatestObservedPtr();
379 }
380 
381 template <typename MessageT>
382 std::shared_ptr<MessageT> Reader<MessageT>::GetOldestObserved() const {
383  return blocker_->GetOldestObservedPtr();
384 }
385 
386 template <typename MessageT>
388  blocker_->ClearPublished();
389  blocker_->ClearObserved();
390 }
391 
392 template <typename MessageT>
393 void Reader<MessageT>::SetHistoryDepth(const uint32_t& depth) {
394  blocker_->set_capacity(depth);
395 }
396 
397 template <typename MessageT>
399  return static_cast<uint32_t>(blocker_->capacity());
400 }
401 
402 template <typename MessageT>
404  if (!init_.load()) {
405  return false;
406  }
407 
408  return channel_manager_->HasWriter(role_attr_.channel_name());
409 }
410 
411 template <typename MessageT>
412 void Reader<MessageT>::GetWriters(std::vector<proto::RoleAttributes>* writers) {
413  if (writers == nullptr) {
414  return;
415  }
416 
417  if (!init_.load()) {
418  return;
419  }
420 
421  channel_manager_->GetWritersOfChannel(role_attr_.channel_name(), writers);
422 }
423 
424 } // namespace cyber
425 } // namespace apollo
426 
427 #endif // CYBER_NODE_READER_H_
virtual void Enqueue(const std::shared_ptr< MessageT > &msg)
Push msg to Blocker&#39;s PublishQueue
Definition: reader.h:246
void(* func)(void *)
Definition: routine_context.h:41
double GetDelaySec() const override
Get time interval of since last receive message.
Definition: reader.h:360
double latest_recv_time_sec_
Definition: reader.h:210
virtual Iterator End() const
Get the end iterator of ObserveQueue, used to traverse.
Definition: reader.h:191
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::atomic< bool > init_
Definition: reader_base.h:155
std::shared_ptr< ChannelManager > ChannelManagerPtr
Definition: topology_manager.h:43
Reader subscribes a channel, it has two main functions:
Definition: reader.h:68
bool HasWriter() override
Is there is at least one writer publish the channel that we subscribes?
Definition: reader.h:403
Reader(const proto::RoleAttributes &role_attr, const CallbackFunc< MessageT > &reader_func=nullptr, uint32_t pending_queue_size=DEFAULT_PENDING_QUEUE_SIZE)
Definition: reader.h:230
Definition: routine_factory.h:33
Definition: blocker.h:50
base::Connection< const ChangeMsg & > ChangeConnection
Definition: manager.h:55
double second_to_lastest_recv_time_sec_
Definition: reader.h:211
virtual bool RemoveTask(const std::string &name)=0
virtual void SetHistoryDepth(const uint32_t &depth)
Set Blocker&#39;s PublishQueue&#39;s capacity to depth
Definition: reader.h:393
bool HasReceived() const override
Query whether we have received data since last clear.
Definition: reader.h:350
bool Init() override
Init Reader.
Definition: reader.h:258
typename std::list< std::shared_ptr< MessageT > >::const_iterator Iterator
Definition: reader.h:75
virtual std::shared_ptr< MessageT > GetOldestObserved() const
Get the oldest message we Observe
Definition: reader.h:382
std::shared_ptr< transport::Receiver< MessageT > > ReceiverPtr
Definition: reader.h:71
bool Empty() const override
Query whether the Reader has data to be handled.
Definition: reader.h:355
const uint32_t DEFAULT_PENDING_QUEUE_SIZE
Definition: reader.h:49
virtual ~Reader()
Definition: reader.h:241
std::function< void(const std::shared_ptr< M0 > &)> CallbackFunc
Definition: reader.h:45
virtual uint32_t GetHistoryDepth() const
Get Blocker&#39;s PublishQueue&#39;s capacity.
Definition: reader.h:398
Definition: message_traits.h:45
typename service_discovery::Manager::ChangeConnection ChangeConnection
Definition: reader.h:73
virtual std::shared_ptr< MessageT > GetLatestObserved() const
Get the latest message we Observe
Definition: reader.h:377
uint32_t pending_queue_size_
Definition: reader.h:212
std::unique_ptr< blocker::Blocker< MessageT > > BlockerPtr
Definition: reader.h:70
uint32_t PendingQueueSize() const override
Get pending_queue_size configuration.
Definition: reader.h:372
double ToSecond() const
convert time to second.
virtual Iterator Begin() const
Get the begin iterator of ObserveQueue, used to traverse.
Definition: reader.h:184
Base Class for Reader Reader is identified by one apollo::cyber::proto::RoleAttribute, it contains the channel_name, channel_id that we subscribe, and host_name, process_id and node that we are located, and qos that describes our transportation quality.
Definition: reader_base.h:46
void Shutdown() override
Shutdown Reader.
Definition: reader.h:294
#define AERROR
Definition: log.h:44
static Time Now()
get the current time.
proto::RoleAttributes role_attr_
Definition: reader_base.h:154
Definition: blocker.h:64
void ClearData() override
Clear Blocker&#39;s data.
Definition: reader.h:387
void Observe() override
Get All data that Blocker stores.
Definition: reader.h:253
void GetWriters(std::vector< proto::RoleAttributes > *writers) override
Get all writers pushlish the channel we subscribes.
Definition: reader.h:412