17 #ifndef CYBER_BLOCKER_INTRA_READER_H_ 18 #define CYBER_BLOCKER_INTRA_READER_H_ 33 template <
typename MessageT>
37 using Callback = std::function<void(const std::shared_ptr<MessageT>&)>;
39 typename std::list<std::shared_ptr<MessageT>>::const_iterator;
49 bool Empty()
const override;
52 void Enqueue(
const std::shared_ptr<MessageT>& msg)
override;
66 template <
typename MessageT>
69 :
Reader<MessageT>(attr), msg_callback_(callback) {}
71 template <
typename MessageT>
76 template <
typename MessageT>
78 if (this->
init_.exchange(
true)) {
85 std::placeholders::_1));
88 template <
typename MessageT>
90 if (!this->
init_.exchange(
false)) {
97 template <
typename MessageT>
101 if (blocker !=
nullptr) {
102 blocker->ClearObserved();
103 blocker->ClearPublished();
107 template <
typename MessageT>
111 if (blocker !=
nullptr) {
116 template <
typename MessageT>
120 if (blocker !=
nullptr) {
121 return blocker->IsObservedEmpty();
126 template <
typename MessageT>
130 if (blocker !=
nullptr) {
131 return !blocker->IsPublishedEmpty();
136 template <
typename MessageT>
142 template <
typename MessageT>
146 if (blocker !=
nullptr) {
147 blocker->set_capacity(depth);
151 template <
typename MessageT>
155 if (blocker !=
nullptr) {
156 return static_cast<uint32_t
>(blocker->capacity());
161 template <
typename MessageT>
165 if (blocker !=
nullptr) {
166 return blocker->GetLatestObservedPtr();
171 template <
typename MessageT>
175 if (blocker !=
nullptr) {
176 return blocker->GetOldestObservedPtr();
181 template <
typename MessageT>
185 ACHECK(blocker !=
nullptr);
186 return blocker->ObservedBegin();
189 template <
typename MessageT>
193 ACHECK(blocker !=
nullptr);
194 return blocker->ObservedBegin();
197 template <
typename MessageT>
201 if (msg_callback_ !=
nullptr) {
202 msg_callback_(msg_ptr);
210 #endif // CYBER_BLOCKER_INTRA_READER_H_ double latest_recv_time_sec_
Definition: reader.h:210
bool HasReceived() const override
Query whether we have received data since last clear.
Definition: intra_reader.h:127
Definition: intra_reader.h:34
bool Init() override
Init the Reader object.
Definition: intra_reader.h:77
#define ACHECK(cond)
Definition: log.h:80
void SetHistoryDepth(const uint32_t &depth) override
Set Blocker's PublishQueue's capacity to depth
Definition: intra_reader.h:143
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::atomic< bool > init_
Definition: reader_base.h:155
std::function< void(const std::shared_ptr< MessageT > &)> Callback
Definition: intra_reader.h:37
void Shutdown() override
Shutdown the Reader object.
Definition: intra_reader.h:89
static const std::shared_ptr< BlockerManager > & Instance()
Definition: blocker_manager.h:38
void Enqueue(const std::shared_ptr< MessageT > &msg) override
Push msg to Blocker's PublishQueue
Definition: intra_reader.h:137
uint32_t GetHistoryDepth() const override
Get Blocker's PublishQueue's capacity.
Definition: intra_reader.h:152
Reader subscribes a channel, it has two main functions:
Definition: reader.h:68
std::shared_ptr< MessageT > MessagePtr
Definition: intra_reader.h:36
std::shared_ptr< MessageT > GetOldestObserved() const override
Get the oldest message we Observe
Definition: intra_reader.h:172
virtual ~IntraReader()
Definition: intra_reader.h:72
Iterator End() const override
Get the end iterator of ObserveQueue, used to traverse.
Definition: intra_reader.h:190
typename std::list< std::shared_ptr< MessageT > >::const_iterator Iterator
Definition: intra_reader.h:39
double second_to_lastest_recv_time_sec_
Definition: reader.h:211
bool Empty() const override
Query whether the Reader has data to be handled.
Definition: intra_reader.h:117
void Observe() override
Get stored data.
Definition: intra_reader.h:108
Iterator Begin() const override
Get the begin iterator of ObserveQueue, used to traverse.
Definition: intra_reader.h:182
double ToSecond() const
convert time to second.
void ClearData() override
Clear local data.
Definition: intra_reader.h:98
IntraReader(const proto::RoleAttributes &attr, const Callback &callback)
Definition: intra_reader.h:67
static Time Now()
get the current time.
proto::RoleAttributes role_attr_
Definition: reader_base.h:154
std::shared_ptr< MessageT > GetLatestObserved() const override
Get the latest message we Observe
Definition: intra_reader.h:162