Apollo  6.0
Open source self driving car software
writer.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_NODE_WRITER_H_
18 #define CYBER_NODE_WRITER_H_
19 
20 #include <memory>
21 #include <string>
22 #include <vector>
23 
24 #include "cyber/proto/topology_change.pb.h"
25 
26 #include "cyber/common/log.h"
27 #include "cyber/node/writer_base.h"
30 
31 namespace apollo {
32 namespace cyber {
33 
41 template <typename MessageT>
42 class Writer : public WriterBase {
43  public:
44  using TransmitterPtr = std::shared_ptr<transport::Transmitter<MessageT>>;
45  using ChangeConnection =
47 
53  explicit Writer(const proto::RoleAttributes& role_attr);
54  virtual ~Writer();
55 
62  bool Init() override;
63 
67  void Shutdown() override;
68 
76  virtual bool Write(const MessageT& msg);
77 
85  virtual bool Write(const std::shared_ptr<MessageT>& msg_ptr);
86 
94  bool HasReader() override;
95 
101  void GetReaders(std::vector<proto::RoleAttributes>* readers) override;
102 
103  private:
104  void JoinTheTopology();
105  void LeaveTheTopology();
106  void OnChannelChange(const proto::ChangeMsg& change_msg);
107 
108  TransmitterPtr transmitter_;
109 
110  ChangeConnection change_conn_;
111  service_discovery::ChannelManagerPtr channel_manager_;
112 };
113 
114 template <typename MessageT>
115 Writer<MessageT>::Writer(const proto::RoleAttributes& role_attr)
116  : WriterBase(role_attr), transmitter_(nullptr), channel_manager_(nullptr) {}
117 
118 template <typename MessageT>
120  Shutdown();
121 }
122 
123 template <typename MessageT>
125  {
126  std::lock_guard<std::mutex> g(lock_);
127  if (init_) {
128  return true;
129  }
130  transmitter_ =
131  transport::Transport::Instance()->CreateTransmitter<MessageT>(
132  role_attr_);
133  if (transmitter_ == nullptr) {
134  return false;
135  }
136  init_ = true;
137  }
138  this->role_attr_.set_id(transmitter_->id().HashValue());
139  channel_manager_ =
141  JoinTheTopology();
142  return true;
143 }
144 
145 template <typename MessageT>
147  {
148  std::lock_guard<std::mutex> g(lock_);
149  if (!init_) {
150  return;
151  }
152  init_ = false;
153  }
154  LeaveTheTopology();
155  transmitter_ = nullptr;
156  channel_manager_ = nullptr;
157 }
158 
159 template <typename MessageT>
160 bool Writer<MessageT>::Write(const MessageT& msg) {
162  auto msg_ptr = std::make_shared<MessageT>(msg);
163  return Write(msg_ptr);
164 }
165 
166 template <typename MessageT>
167 bool Writer<MessageT>::Write(const std::shared_ptr<MessageT>& msg_ptr) {
169  return transmitter_->Transmit(msg_ptr);
170 }
171 
172 template <typename MessageT>
174  // add listener
175  change_conn_ = channel_manager_->AddChangeListener(std::bind(
176  &Writer<MessageT>::OnChannelChange, this, std::placeholders::_1));
177 
178  // get peer readers
179  const std::string& channel_name = this->role_attr_.channel_name();
180  std::vector<proto::RoleAttributes> readers;
181  channel_manager_->GetReadersOfChannel(channel_name, &readers);
182  for (auto& reader : readers) {
183  transmitter_->Enable(reader);
184  }
185 
186  channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_WRITER,
188 }
189 
190 template <typename MessageT>
192  channel_manager_->RemoveChangeListener(change_conn_);
193  channel_manager_->Leave(this->role_attr_, proto::RoleType::ROLE_WRITER);
194 }
195 
196 template <typename MessageT>
197 void Writer<MessageT>::OnChannelChange(const proto::ChangeMsg& change_msg) {
198  if (change_msg.role_type() != proto::RoleType::ROLE_READER) {
199  return;
200  }
201 
202  auto& reader_attr = change_msg.role_attr();
203  if (reader_attr.channel_name() != this->role_attr_.channel_name()) {
204  return;
205  }
206 
207  auto operate_type = change_msg.operate_type();
208  if (operate_type == proto::OperateType::OPT_JOIN) {
209  transmitter_->Enable(reader_attr);
210  } else {
211  transmitter_->Disable(reader_attr);
212  }
213 }
214 
215 template <typename MessageT>
218  return channel_manager_->HasReader(role_attr_.channel_name());
219 }
220 
221 template <typename MessageT>
222 void Writer<MessageT>::GetReaders(std::vector<proto::RoleAttributes>* readers) {
223  if (readers == nullptr) {
224  return;
225  }
226 
227  if (!WriterBase::IsInit()) {
228  return;
229  }
230 
231  channel_manager_->GetReadersOfChannel(role_attr_.channel_name(), readers);
232 }
233 
234 } // namespace cyber
235 } // namespace apollo
236 
237 #endif // CYBER_NODE_WRITER_H_
std::mutex lock_
Definition: writer_base.h:99
proto::RoleAttributes role_attr_
Definition: writer_base.h:98
std::shared_ptr< transport::Transmitter< MessageT > > TransmitterPtr
Definition: writer.h:44
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
bool init_
Definition: writer_base.h:100
#define RETURN_VAL_IF(condition, val)
Definition: log.h:114
std::shared_ptr< ChannelManager > ChannelManagerPtr
Definition: topology_manager.h:43
virtual ~Writer()
Definition: writer.h:119
Definition: writer.h:42
base::Connection< const ChangeMsg & > ChangeConnection
Definition: manager.h:55
bool HasReader() override
Is there any Reader that subscribes our Channel? You can publish message when this return true...
Definition: writer.h:216
typename service_discovery::Manager::ChangeConnection ChangeConnection
Definition: writer.h:46
virtual bool Write(const MessageT &msg)
Write a MessageT instance.
Definition: writer.h:160
bool IsInit() const
Is Writer initialized?
Definition: writer_base.h:92
void Shutdown() override
Shutdown the Writer.
Definition: writer.h:146
Definition: message_traits.h:45
Base class for a Writer. A Writer is an object to send messages through a &#39;Channel&#39;.
Definition: writer_base.h:37
Writer(const proto::RoleAttributes &role_attr)
Construct a new Writer object.
Definition: writer.h:115
bool Init() override
Init the Writer.
Definition: writer.h:124
void GetReaders(std::vector< proto::RoleAttributes > *readers) override
Get all Readers that subscriber our writing channel.
Definition: writer.h:222