Apollo  6.0
Open source self driving car software
intra_reader.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_BLOCKER_INTRA_READER_H_
18 #define CYBER_BLOCKER_INTRA_READER_H_
19 
20 #include <functional>
21 #include <list>
22 #include <memory>
23 
25 #include "cyber/common/log.h"
26 #include "cyber/node/reader.h"
27 #include "cyber/time/time.h"
28 
29 namespace apollo {
30 namespace cyber {
31 namespace blocker {
32 
33 template <typename MessageT>
34 class IntraReader : public apollo::cyber::Reader<MessageT> {
35  public:
36  using MessagePtr = std::shared_ptr<MessageT>;
37  using Callback = std::function<void(const std::shared_ptr<MessageT>&)>;
38  using Iterator =
39  typename std::list<std::shared_ptr<MessageT>>::const_iterator;
40 
41  IntraReader(const proto::RoleAttributes& attr, const Callback& callback);
42  virtual ~IntraReader();
43 
44  bool Init() override;
45  void Shutdown() override;
46 
47  void ClearData() override;
48  void Observe() override;
49  bool Empty() const override;
50  bool HasReceived() const override;
51 
52  void Enqueue(const std::shared_ptr<MessageT>& msg) override;
53  void SetHistoryDepth(const uint32_t& depth) override;
54  uint32_t GetHistoryDepth() const override;
55  std::shared_ptr<MessageT> GetLatestObserved() const override;
56  std::shared_ptr<MessageT> GetOldestObserved() const override;
57  Iterator Begin() const override;
58  Iterator End() const override;
59 
60  private:
61  void OnMessage(const MessagePtr& msg_ptr);
62 
63  Callback msg_callback_;
64 };
65 
66 template <typename MessageT>
67 IntraReader<MessageT>::IntraReader(const proto::RoleAttributes& attr,
68  const Callback& callback)
69  : Reader<MessageT>(attr), msg_callback_(callback) {}
70 
71 template <typename MessageT>
73  Shutdown();
74 }
75 
76 template <typename MessageT>
78  if (this->init_.exchange(true)) {
79  return true;
80  }
81  return BlockerManager::Instance()->Subscribe<MessageT>(
82  this->role_attr_.channel_name(), this->role_attr_.qos_profile().depth(),
83  this->role_attr_.node_name(),
84  std::bind(&IntraReader<MessageT>::OnMessage, this,
85  std::placeholders::_1));
86 }
87 
88 template <typename MessageT>
90  if (!this->init_.exchange(false)) {
91  return;
92  }
93  BlockerManager::Instance()->Unsubscribe<MessageT>(
94  this->role_attr_.channel_name(), this->role_attr_.node_name());
95 }
96 
97 template <typename MessageT>
99  auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
100  this->role_attr_.channel_name());
101  if (blocker != nullptr) {
102  blocker->ClearObserved();
103  blocker->ClearPublished();
104  }
105 }
106 
107 template <typename MessageT>
109  auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
110  this->role_attr_.channel_name());
111  if (blocker != nullptr) {
112  blocker->Observe();
113  }
114 }
115 
116 template <typename MessageT>
118  auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
119  this->role_attr_.channel_name());
120  if (blocker != nullptr) {
121  return blocker->IsObservedEmpty();
122  }
123  return true;
124 }
125 
126 template <typename MessageT>
128  auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
129  this->role_attr_.channel_name());
130  if (blocker != nullptr) {
131  return !blocker->IsPublishedEmpty();
132  }
133  return false;
134 }
135 
136 template <typename MessageT>
137 void IntraReader<MessageT>::Enqueue(const std::shared_ptr<MessageT>& msg) {
138  BlockerManager::Instance()->Publish<MessageT>(this->role_attr_.channel_name(),
139  msg);
140 }
141 
142 template <typename MessageT>
143 void IntraReader<MessageT>::SetHistoryDepth(const uint32_t& depth) {
144  auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
145  this->role_attr_.channel_name());
146  if (blocker != nullptr) {
147  blocker->set_capacity(depth);
148  }
149 }
150 
151 template <typename MessageT>
153  auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
154  this->role_attr_.channel_name());
155  if (blocker != nullptr) {
156  return static_cast<uint32_t>(blocker->capacity());
157  }
158  return 0;
159 }
160 
161 template <typename MessageT>
162 std::shared_ptr<MessageT> IntraReader<MessageT>::GetLatestObserved() const {
163  auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
164  this->role_attr_.channel_name());
165  if (blocker != nullptr) {
166  return blocker->GetLatestObservedPtr();
167  }
168  return nullptr;
169 }
170 
171 template <typename MessageT>
172 std::shared_ptr<MessageT> IntraReader<MessageT>::GetOldestObserved() const {
173  auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
174  this->role_attr_.channel_name());
175  if (blocker != nullptr) {
176  return blocker->GetOldestObservedPtr();
177  }
178  return nullptr;
179 }
180 
181 template <typename MessageT>
183  auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
184  this->role_attr_.channel_name());
185  ACHECK(blocker != nullptr);
186  return blocker->ObservedBegin();
187 }
188 
189 template <typename MessageT>
191  auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
192  this->role_attr_.channel_name());
193  ACHECK(blocker != nullptr);
194  return blocker->ObservedBegin();
195 }
196 
197 template <typename MessageT>
198 void IntraReader<MessageT>::OnMessage(const MessagePtr& msg_ptr) {
201  if (msg_callback_ != nullptr) {
202  msg_callback_(msg_ptr);
203  }
204 }
205 
206 } // namespace blocker
207 } // namespace cyber
208 } // namespace apollo
209 
210 #endif // CYBER_BLOCKER_INTRA_READER_H_
double latest_recv_time_sec_
Definition: reader.h:210
bool HasReceived() const override
Query whether we have received data since last clear.
Definition: intra_reader.h:127
Definition: intra_reader.h:34
bool Init() override
Init the Reader object.
Definition: intra_reader.h:77
#define ACHECK(cond)
Definition: log.h:80
void SetHistoryDepth(const uint32_t &depth) override
Set Blocker&#39;s PublishQueue&#39;s capacity to depth
Definition: intra_reader.h:143
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::atomic< bool > init_
Definition: reader_base.h:155
std::function< void(const std::shared_ptr< MessageT > &)> Callback
Definition: intra_reader.h:37
void Shutdown() override
Shutdown the Reader object.
Definition: intra_reader.h:89
static const std::shared_ptr< BlockerManager > & Instance()
Definition: blocker_manager.h:38
void Enqueue(const std::shared_ptr< MessageT > &msg) override
Push msg to Blocker&#39;s PublishQueue
Definition: intra_reader.h:137
uint32_t GetHistoryDepth() const override
Get Blocker&#39;s PublishQueue&#39;s capacity.
Definition: intra_reader.h:152
Reader subscribes a channel, it has two main functions:
Definition: reader.h:68
std::shared_ptr< MessageT > MessagePtr
Definition: intra_reader.h:36
std::shared_ptr< MessageT > GetOldestObserved() const override
Get the oldest message we Observe
Definition: intra_reader.h:172
virtual ~IntraReader()
Definition: intra_reader.h:72
Iterator End() const override
Get the end iterator of ObserveQueue, used to traverse.
Definition: intra_reader.h:190
typename std::list< std::shared_ptr< MessageT > >::const_iterator Iterator
Definition: intra_reader.h:39
double second_to_lastest_recv_time_sec_
Definition: reader.h:211
bool Empty() const override
Query whether the Reader has data to be handled.
Definition: intra_reader.h:117
void Observe() override
Get stored data.
Definition: intra_reader.h:108
Iterator Begin() const override
Get the begin iterator of ObserveQueue, used to traverse.
Definition: intra_reader.h:182
double ToSecond() const
convert time to second.
void ClearData() override
Clear local data.
Definition: intra_reader.h:98
IntraReader(const proto::RoleAttributes &attr, const Callback &callback)
Definition: intra_reader.h:67
static Time Now()
get the current time.
proto::RoleAttributes role_attr_
Definition: reader_base.h:154
std::shared_ptr< MessageT > GetLatestObserved() const override
Get the latest message we Observe
Definition: intra_reader.h:162