Apollo  6.0
Open source self driving car software
Public Types | Public Member Functions | Protected Attributes | List of all members
apollo::cyber::Reader< MessageT > Class Template Reference

Reader subscribes a channel, it has two main functions: More...

#include <reader.h>

Inheritance diagram for apollo::cyber::Reader< MessageT >:
Inheritance graph
Collaboration diagram for apollo::cyber::Reader< MessageT >:
Collaboration graph

Public Types

using BlockerPtr = std::unique_ptr< blocker::Blocker< MessageT > >
 
using ReceiverPtr = std::shared_ptr< transport::Receiver< MessageT > >
 
using ChangeConnection = typename service_discovery::Manager::ChangeConnection
 
using Iterator = typename std::list< std::shared_ptr< MessageT > >::const_iterator
 

Public Member Functions

 Reader (const proto::RoleAttributes &role_attr, const CallbackFunc< MessageT > &reader_func=nullptr, uint32_t pending_queue_size=DEFAULT_PENDING_QUEUE_SIZE)
 
virtual ~Reader ()
 
bool Init () override
 Init Reader. More...
 
void Shutdown () override
 Shutdown Reader. More...
 
void Observe () override
 Get All data that Blocker stores. More...
 
void ClearData () override
 Clear Blocker's data. More...
 
bool HasReceived () const override
 Query whether we have received data since last clear. More...
 
bool Empty () const override
 Query whether the Reader has data to be handled. More...
 
double GetDelaySec () const override
 Get time interval of since last receive message. More...
 
uint32_t PendingQueueSize () const override
 Get pending_queue_size configuration. More...
 
virtual void Enqueue (const std::shared_ptr< MessageT > &msg)
 Push msg to Blocker's PublishQueue More...
 
virtual void SetHistoryDepth (const uint32_t &depth)
 Set Blocker's PublishQueue's capacity to depth More...
 
virtual uint32_t GetHistoryDepth () const
 Get Blocker's PublishQueue's capacity. More...
 
virtual std::shared_ptr< MessageT > GetLatestObserved () const
 Get the latest message we Observe More...
 
virtual std::shared_ptr< MessageT > GetOldestObserved () const
 Get the oldest message we Observe More...
 
virtual Iterator Begin () const
 Get the begin iterator of ObserveQueue, used to traverse. More...
 
virtual Iterator End () const
 Get the end iterator of ObserveQueue, used to traverse. More...
 
bool HasWriter () override
 Is there is at least one writer publish the channel that we subscribes? More...
 
void GetWriters (std::vector< proto::RoleAttributes > *writers) override
 Get all writers pushlish the channel we subscribes. More...
 
- Public Member Functions inherited from apollo::cyber::ReaderBase
 ReaderBase (const proto::RoleAttributes &role_attr)
 
virtual ~ReaderBase ()
 
const std::string & GetChannelName () const
 Get Reader's Channel name. More...
 
uint64_t ChannelId () const
 Get Reader's Channel id. More...
 
const proto::QosProfile & QosProfile () const
 Get qos profile. You can see qos description. More...
 
bool IsInit () const
 Query whether the Reader is initialized. More...
 

Protected Attributes

double latest_recv_time_sec_ = -1.0
 
double second_to_lastest_recv_time_sec_ = -1.0
 
uint32_t pending_queue_size_
 
- Protected Attributes inherited from apollo::cyber::ReaderBase
proto::RoleAttributes role_attr_
 
std::atomic< bool > init_
 

Detailed Description

template<typename MessageT>
class apollo::cyber::Reader< MessageT >

Reader subscribes a channel, it has two main functions:

  1. You can pass a CallbackFunc to handle the message then it arrived
  2. You can Observe messages that Blocker cached. Reader automatically push the message to Blocker's PublishQueue, and we can use Observe to fetch messages from PublishQueue to ObserveQueue. But, if you have set CallbackFunc, you can ignore this. One Reader uses one ChannelBuffer, the message we are handling is stored in ChannelBuffer Reader will Join the topology when init and Leave the topology when shutdown
    Warning
    To save resource, ChannelBuffer has limited length, it's passed through the pending_queue_size param. pending_queue_size is default set to 1, So, If you handle slower than writer sending, older messages that are not handled will be lost. You can increase pending_queue_size to resolve this problem.

Member Typedef Documentation

◆ BlockerPtr

template<typename MessageT>
using apollo::cyber::Reader< MessageT >::BlockerPtr = std::unique_ptr<blocker::Blocker<MessageT> >

◆ ChangeConnection

template<typename MessageT>
using apollo::cyber::Reader< MessageT >::ChangeConnection = typename service_discovery::Manager::ChangeConnection

◆ Iterator

template<typename MessageT>
using apollo::cyber::Reader< MessageT >::Iterator = typename std::list<std::shared_ptr<MessageT> >::const_iterator

◆ ReceiverPtr

template<typename MessageT>
using apollo::cyber::Reader< MessageT >::ReceiverPtr = std::shared_ptr<transport::Receiver<MessageT> >

Constructor & Destructor Documentation

◆ Reader()

template<typename MessageT >
apollo::cyber::Reader< MessageT >::Reader ( const proto::RoleAttributes &  role_attr,
const CallbackFunc< MessageT > &  reader_func = nullptr,
uint32_t  pending_queue_size = DEFAULT_PENDING_QUEUE_SIZE 
)
explicit

Constructor a Reader object.

Parameters
role_attris a protobuf message RoleAttributes, which includes the channel name and other info.
reader_funcis the callback function, when the message is received.
pending_queue_sizeis the max depth of message cache queue.
Warning
the received messages is enqueue a queue,the queue's depth is pending_queue_size

◆ ~Reader()

template<typename MessageT >
apollo::cyber::Reader< MessageT >::~Reader ( )
virtual

Member Function Documentation

◆ Begin()

template<typename MessageT>
virtual Iterator apollo::cyber::Reader< MessageT >::Begin ( ) const
inlinevirtual

Get the begin iterator of ObserveQueue, used to traverse.

Returns
Iterator begin iterator

Reimplemented in apollo::cyber::blocker::IntraReader< MessageT >.

◆ ClearData()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::ClearData ( )
overridevirtual

Clear Blocker's data.

Implements apollo::cyber::ReaderBase.

◆ Empty()

template<typename MessageT >
bool apollo::cyber::Reader< MessageT >::Empty ( ) const
overridevirtual

Query whether the Reader has data to be handled.

Returns
true if blocker is empty
false if blocker has data

Implements apollo::cyber::ReaderBase.

◆ End()

template<typename MessageT>
virtual Iterator apollo::cyber::Reader< MessageT >::End ( ) const
inlinevirtual

Get the end iterator of ObserveQueue, used to traverse.

Returns
Iterator begin iterator

Reimplemented in apollo::cyber::blocker::IntraReader< MessageT >.

◆ Enqueue()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::Enqueue ( const std::shared_ptr< MessageT > &  msg)
virtual

Push msg to Blocker's PublishQueue

Parameters
msgmessage ptr to be pushed

Reimplemented in apollo::cyber::blocker::IntraReader< MessageT >.

◆ GetDelaySec()

template<typename MessageT >
double apollo::cyber::Reader< MessageT >::GetDelaySec ( ) const
overridevirtual

Get time interval of since last receive message.

Returns
double seconds delay

Implements apollo::cyber::ReaderBase.

◆ GetHistoryDepth()

template<typename MessageT >
uint32_t apollo::cyber::Reader< MessageT >::GetHistoryDepth ( ) const
virtual

Get Blocker's PublishQueue's capacity.

Returns
uint32_t depth of the history

Reimplemented in apollo::cyber::blocker::IntraReader< MessageT >.

◆ GetLatestObserved()

template<typename MessageT >
std::shared_ptr< MessageT > apollo::cyber::Reader< MessageT >::GetLatestObserved ( ) const
virtual

Get the latest message we Observe

Returns
std::shared_ptr<MessageT> the latest message

Reimplemented in apollo::cyber::blocker::IntraReader< MessageT >.

◆ GetOldestObserved()

template<typename MessageT >
std::shared_ptr< MessageT > apollo::cyber::Reader< MessageT >::GetOldestObserved ( ) const
virtual

Get the oldest message we Observe

Returns
std::shared_ptr<MessageT> the oldest message

Reimplemented in apollo::cyber::blocker::IntraReader< MessageT >.

◆ GetWriters()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::GetWriters ( std::vector< proto::RoleAttributes > *  writers)
overridevirtual

Get all writers pushlish the channel we subscribes.

Parameters
writersresult vector of RoleAttributes

Reimplemented from apollo::cyber::ReaderBase.

◆ HasReceived()

template<typename MessageT >
bool apollo::cyber::Reader< MessageT >::HasReceived ( ) const
overridevirtual

Query whether we have received data since last clear.

Returns
true if the reader has received data
false if the reader has not received data

Implements apollo::cyber::ReaderBase.

◆ HasWriter()

template<typename MessageT >
bool apollo::cyber::Reader< MessageT >::HasWriter ( )
overridevirtual

Is there is at least one writer publish the channel that we subscribes?

Returns
true if the channel has writer
false if the channel has no writer

Reimplemented from apollo::cyber::ReaderBase.

◆ Init()

template<typename MessageT >
bool apollo::cyber::Reader< MessageT >::Init ( )
overridevirtual

Init Reader.

Returns
true if init successfully
false if init failed

Implements apollo::cyber::ReaderBase.

◆ Observe()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::Observe ( )
overridevirtual

Get All data that Blocker stores.

Implements apollo::cyber::ReaderBase.

◆ PendingQueueSize()

template<typename MessageT >
uint32_t apollo::cyber::Reader< MessageT >::PendingQueueSize ( ) const
overridevirtual

Get pending_queue_size configuration.

Returns
uint32_t the value of pending queue size

Implements apollo::cyber::ReaderBase.

◆ SetHistoryDepth()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::SetHistoryDepth ( const uint32_t &  depth)
virtual

Set Blocker's PublishQueue's capacity to depth

Parameters
depththe value you want to set

Reimplemented in apollo::cyber::blocker::IntraReader< MessageT >.

◆ Shutdown()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::Shutdown ( )
overridevirtual

Shutdown Reader.

Implements apollo::cyber::ReaderBase.

Member Data Documentation

◆ latest_recv_time_sec_

template<typename MessageT>
double apollo::cyber::Reader< MessageT >::latest_recv_time_sec_ = -1.0
protected

◆ pending_queue_size_

template<typename MessageT>
uint32_t apollo::cyber::Reader< MessageT >::pending_queue_size_
protected

◆ second_to_lastest_recv_time_sec_

template<typename MessageT>
double apollo::cyber::Reader< MessageT >::second_to_lastest_recv_time_sec_ = -1.0
protected

The documentation for this class was generated from the following file: