17 #ifndef CYBER_NODE_READER_H_ 18 #define CYBER_NODE_READER_H_ 25 #include <unordered_map> 29 #include "cyber/proto/topology_change.pb.h" 44 template <
typename M0>
45 using CallbackFunc = std::function<void(const std::shared_ptr<M0>&)>;
47 using proto::RoleType;
67 template <
typename MessageT>
70 using BlockerPtr = std::unique_ptr<blocker::Blocker<MessageT>>;
71 using ReceiverPtr = std::shared_ptr<transport::Receiver<MessageT>>;
75 typename std::list<std::shared_ptr<MessageT>>::const_iterator;
86 explicit Reader(
const proto::RoleAttributes& role_attr,
88 uint32_t pending_queue_size = DEFAULT_PENDING_QUEUE_SIZE);
128 bool Empty()
const override;
149 virtual void Enqueue(
const std::shared_ptr<MessageT>& msg);
207 void GetWriters(std::vector<proto::RoleAttributes>* writers)
override;
215 void JoinTheTopology();
216 void LeaveTheTopology();
217 void OnChannelChange(
const proto::ChangeMsg& change_msg);
221 std::string croutine_name_;
229 template <
typename MessageT>
232 uint32_t pending_queue_size)
235 reader_func_(reader_func) {
237 role_attr.qos_profile().depth(), role_attr.channel_name())));
240 template <
typename MessageT>
245 template <
typename MessageT>
249 blocker_->Publish(msg);
252 template <
typename MessageT>
257 template <
typename MessageT>
259 if (
init_.exchange(
true)) {
262 std::function<void(const std::shared_ptr<MessageT>&)>
func;
263 if (reader_func_ !=
nullptr) {
264 func = [
this](
const std::shared_ptr<MessageT>& msg) {
266 this->reader_func_(msg);
269 func = [
this](
const std::shared_ptr<MessageT>& msg) { this->
Enqueue(msg); };
273 auto dv = std::make_shared<data::DataVisitor<MessageT>>(
277 croutine::CreateRoutineFactory<MessageT>(std::move(
func), dv);
278 if (!sched->CreateTask(factory, croutine_name_)) {
279 AERROR <<
"Create Task Failed!";
285 this->
role_attr_.set_id(receiver_->id().HashValue());
293 template <
typename MessageT>
295 if (!
init_.exchange(
false)) {
300 channel_manager_ =
nullptr;
302 if (!croutine_name_.empty()) {
307 template <
typename MessageT>
310 change_conn_ = channel_manager_->AddChangeListener(std::bind(
314 const std::string& channel_name = this->
role_attr_.channel_name();
315 std::vector<proto::RoleAttributes> writers;
316 channel_manager_->GetWritersOfChannel(channel_name, &writers);
317 for (
auto& writer : writers) {
318 receiver_->Enable(writer);
320 channel_manager_->Join(this->
role_attr_, proto::RoleType::ROLE_READER,
324 template <
typename MessageT>
326 channel_manager_->RemoveChangeListener(change_conn_);
327 channel_manager_->Leave(this->
role_attr_, proto::RoleType::ROLE_READER);
330 template <
typename MessageT>
332 if (change_msg.role_type() != proto::RoleType::ROLE_WRITER) {
336 auto& writer_attr = change_msg.role_attr();
337 if (writer_attr.channel_name() != this->
role_attr_.channel_name()) {
341 auto operate_type = change_msg.operate_type();
342 if (operate_type == proto::OperateType::OPT_JOIN) {
343 receiver_->Enable(writer_attr);
345 receiver_->Disable(writer_attr);
349 template <
typename MessageT>
351 return !blocker_->IsPublishedEmpty();
354 template <
typename MessageT>
356 return blocker_->IsObservedEmpty();
359 template <
typename MessageT>
371 template <
typename MessageT>
376 template <
typename MessageT>
378 return blocker_->GetLatestObservedPtr();
381 template <
typename MessageT>
383 return blocker_->GetOldestObservedPtr();
386 template <
typename MessageT>
388 blocker_->ClearPublished();
389 blocker_->ClearObserved();
392 template <
typename MessageT>
394 blocker_->set_capacity(depth);
397 template <
typename MessageT>
399 return static_cast<uint32_t
>(blocker_->capacity());
402 template <
typename MessageT>
408 return channel_manager_->HasWriter(
role_attr_.channel_name());
411 template <
typename MessageT>
413 if (writers ==
nullptr) {
421 channel_manager_->GetWritersOfChannel(
role_attr_.channel_name(), writers);
427 #endif // CYBER_NODE_READER_H_ virtual void Enqueue(const std::shared_ptr< MessageT > &msg)
Push msg to Blocker's PublishQueue
Definition: reader.h:246
void(* func)(void *)
Definition: routine_context.h:41
double GetDelaySec() const override
Get time interval of since last receive message.
Definition: reader.h:360
double latest_recv_time_sec_
Definition: reader.h:210
virtual Iterator End() const
Get the end iterator of ObserveQueue, used to traverse.
Definition: reader.h:191
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::atomic< bool > init_
Definition: reader_base.h:155
std::shared_ptr< ChannelManager > ChannelManagerPtr
Definition: topology_manager.h:43
Reader subscribes a channel, it has two main functions:
Definition: reader.h:68
bool HasWriter() override
Is there is at least one writer publish the channel that we subscribes?
Definition: reader.h:403
Reader(const proto::RoleAttributes &role_attr, const CallbackFunc< MessageT > &reader_func=nullptr, uint32_t pending_queue_size=DEFAULT_PENDING_QUEUE_SIZE)
Definition: reader.h:230
Definition: routine_factory.h:33
base::Connection< const ChangeMsg & > ChangeConnection
Definition: manager.h:55
double second_to_lastest_recv_time_sec_
Definition: reader.h:211
virtual bool RemoveTask(const std::string &name)=0
virtual void SetHistoryDepth(const uint32_t &depth)
Set Blocker's PublishQueue's capacity to depth
Definition: reader.h:393
bool HasReceived() const override
Query whether we have received data since last clear.
Definition: reader.h:350
bool Init() override
Init Reader.
Definition: reader.h:258
typename std::list< std::shared_ptr< MessageT > >::const_iterator Iterator
Definition: reader.h:75
virtual std::shared_ptr< MessageT > GetOldestObserved() const
Get the oldest message we Observe
Definition: reader.h:382
std::shared_ptr< transport::Receiver< MessageT > > ReceiverPtr
Definition: reader.h:71
bool Empty() const override
Query whether the Reader has data to be handled.
Definition: reader.h:355
const uint32_t DEFAULT_PENDING_QUEUE_SIZE
Definition: reader.h:49
virtual ~Reader()
Definition: reader.h:241
std::function< void(const std::shared_ptr< M0 > &)> CallbackFunc
Definition: reader.h:45
virtual uint32_t GetHistoryDepth() const
Get Blocker's PublishQueue's capacity.
Definition: reader.h:398
Definition: message_traits.h:45
typename service_discovery::Manager::ChangeConnection ChangeConnection
Definition: reader.h:73
virtual std::shared_ptr< MessageT > GetLatestObserved() const
Get the latest message we Observe
Definition: reader.h:377
uint32_t pending_queue_size_
Definition: reader.h:212
std::unique_ptr< blocker::Blocker< MessageT > > BlockerPtr
Definition: reader.h:70
uint32_t PendingQueueSize() const override
Get pending_queue_size configuration.
Definition: reader.h:372
double ToSecond() const
convert time to second.
virtual Iterator Begin() const
Get the begin iterator of ObserveQueue, used to traverse.
Definition: reader.h:184
Base Class for Reader Reader is identified by one apollo::cyber::proto::RoleAttribute, it contains the channel_name, channel_id that we subscribe, and host_name, process_id and node that we are located, and qos that describes our transportation quality.
Definition: reader_base.h:46
void Shutdown() override
Shutdown Reader.
Definition: reader.h:294
#define AERROR
Definition: log.h:44
static Time Now()
get the current time.
proto::RoleAttributes role_attr_
Definition: reader_base.h:154
void ClearData() override
Clear Blocker's data.
Definition: reader.h:387
void Observe() override
Get All data that Blocker stores.
Definition: reader.h:253
void GetWriters(std::vector< proto::RoleAttributes > *writers) override
Get all writers pushlish the channel we subscribes.
Definition: reader.h:412