17 #ifndef CYBER_NODE_WRITER_H_ 18 #define CYBER_NODE_WRITER_H_ 24 #include "cyber/proto/topology_change.pb.h" 41 template <
typename MessageT>
53 explicit Writer(
const proto::RoleAttributes& role_attr);
76 virtual bool Write(
const MessageT& msg);
85 virtual bool Write(
const std::shared_ptr<MessageT>& msg_ptr);
101 void GetReaders(std::vector<proto::RoleAttributes>* readers)
override;
104 void JoinTheTopology();
105 void LeaveTheTopology();
106 void OnChannelChange(
const proto::ChangeMsg& change_msg);
114 template <
typename MessageT>
116 :
WriterBase(role_attr), transmitter_(nullptr), channel_manager_(nullptr) {}
118 template <
typename MessageT>
123 template <
typename MessageT>
126 std::lock_guard<std::mutex> g(
lock_);
133 if (transmitter_ ==
nullptr) {
138 this->
role_attr_.set_id(transmitter_->id().HashValue());
145 template <
typename MessageT>
148 std::lock_guard<std::mutex> g(
lock_);
155 transmitter_ =
nullptr;
156 channel_manager_ =
nullptr;
159 template <
typename MessageT>
162 auto msg_ptr = std::make_shared<MessageT>(msg);
163 return Write(msg_ptr);
166 template <
typename MessageT>
169 return transmitter_->Transmit(msg_ptr);
172 template <
typename MessageT>
175 change_conn_ = channel_manager_->AddChangeListener(std::bind(
179 const std::string& channel_name = this->
role_attr_.channel_name();
180 std::vector<proto::RoleAttributes> readers;
181 channel_manager_->GetReadersOfChannel(channel_name, &readers);
182 for (
auto& reader : readers) {
183 transmitter_->Enable(reader);
186 channel_manager_->Join(this->
role_attr_, proto::RoleType::ROLE_WRITER,
190 template <
typename MessageT>
192 channel_manager_->RemoveChangeListener(change_conn_);
193 channel_manager_->Leave(this->
role_attr_, proto::RoleType::ROLE_WRITER);
196 template <
typename MessageT>
198 if (change_msg.role_type() != proto::RoleType::ROLE_READER) {
202 auto& reader_attr = change_msg.role_attr();
203 if (reader_attr.channel_name() != this->
role_attr_.channel_name()) {
207 auto operate_type = change_msg.operate_type();
208 if (operate_type == proto::OperateType::OPT_JOIN) {
209 transmitter_->Enable(reader_attr);
211 transmitter_->Disable(reader_attr);
215 template <
typename MessageT>
218 return channel_manager_->HasReader(
role_attr_.channel_name());
221 template <
typename MessageT>
223 if (readers ==
nullptr) {
231 channel_manager_->GetReadersOfChannel(
role_attr_.channel_name(), readers);
237 #endif // CYBER_NODE_WRITER_H_ std::mutex lock_
Definition: writer_base.h:99
proto::RoleAttributes role_attr_
Definition: writer_base.h:98
std::shared_ptr< transport::Transmitter< MessageT > > TransmitterPtr
Definition: writer.h:44
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
bool init_
Definition: writer_base.h:100
#define RETURN_VAL_IF(condition, val)
Definition: log.h:114
std::shared_ptr< ChannelManager > ChannelManagerPtr
Definition: topology_manager.h:43
virtual ~Writer()
Definition: writer.h:119
base::Connection< const ChangeMsg & > ChangeConnection
Definition: manager.h:55
bool HasReader() override
Is there any Reader that subscribes our Channel? You can publish message when this return true...
Definition: writer.h:216
typename service_discovery::Manager::ChangeConnection ChangeConnection
Definition: writer.h:46
virtual bool Write(const MessageT &msg)
Write a MessageT instance.
Definition: writer.h:160
bool IsInit() const
Is Writer initialized?
Definition: writer_base.h:92
void Shutdown() override
Shutdown the Writer.
Definition: writer.h:146
Definition: message_traits.h:45
Base class for a Writer. A Writer is an object to send messages through a 'Channel'.
Definition: writer_base.h:37
Writer(const proto::RoleAttributes &role_attr)
Construct a new Writer object.
Definition: writer.h:115
bool Init() override
Init the Writer.
Definition: writer.h:124
void GetReaders(std::vector< proto::RoleAttributes > *readers) override
Get all Readers that subscriber our writing channel.
Definition: writer.h:222