Apollo  6.0
Open source self driving car software
node_channel_impl.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_NODE_NODE_CHANNEL_IMPL_H_
18 #define CYBER_NODE_NODE_CHANNEL_IMPL_H_
19 
20 #include <memory>
21 #include <string>
22 
23 #include "cyber/proto/run_mode_conf.pb.h"
24 
29 #include "cyber/node/reader.h"
30 #include "cyber/node/writer.h"
31 
32 namespace apollo {
33 namespace cyber {
34 
35 class Node;
36 
37 struct ReaderConfig {
39  qos_profile.set_history(proto::QosHistoryPolicy::HISTORY_KEEP_LAST);
40  qos_profile.set_depth(1);
41  qos_profile.set_mps(0);
42  qos_profile.set_reliability(
43  proto::QosReliabilityPolicy::RELIABILITY_RELIABLE);
44  qos_profile.set_durability(proto::QosDurabilityPolicy::DURABILITY_VOLATILE);
45 
47  }
48  ReaderConfig(const ReaderConfig& other)
49  : channel_name(other.channel_name),
50  qos_profile(other.qos_profile),
52 
53  std::string channel_name; //< channel reads
54  proto::QosProfile qos_profile; //< the qos configuration
60 };
61 
68  friend class Node;
69 
70  public:
71  using NodeManagerPtr = std::shared_ptr<service_discovery::NodeManager>;
72 
78  explicit NodeChannelImpl(const std::string& node_name)
79  : is_reality_mode_(true), node_name_(node_name) {
80  node_attr_.set_host_name(common::GlobalData::Instance()->HostName());
81  node_attr_.set_host_ip(common::GlobalData::Instance()->HostIp());
82  node_attr_.set_process_id(common::GlobalData::Instance()->ProcessId());
83  node_attr_.set_node_name(node_name);
84  uint64_t node_id = common::GlobalData::RegisterNode(node_name);
85  node_attr_.set_node_id(node_id);
86 
87  is_reality_mode_ = common::GlobalData::Instance()->IsRealityMode();
88 
89  if (is_reality_mode_) {
90  node_manager_ =
92  node_manager_->Join(node_attr_, RoleType::ROLE_NODE);
93  }
94  }
95 
99  virtual ~NodeChannelImpl() {
100  if (is_reality_mode_) {
101  node_manager_->Leave(node_attr_, RoleType::ROLE_NODE);
102  node_manager_ = nullptr;
103  }
104  }
105 
111  const std::string& NodeName() const { return node_name_; }
112 
113  private:
114  template <typename MessageT>
115  auto CreateWriter(const proto::RoleAttributes& role_attr)
116  -> std::shared_ptr<Writer<MessageT>>;
117 
118  template <typename MessageT>
119  auto CreateWriter(const std::string& channel_name)
120  -> std::shared_ptr<Writer<MessageT>>;
121 
122  template <typename MessageT>
123  auto CreateReader(const std::string& channel_name,
124  const CallbackFunc<MessageT>& reader_func)
125  -> std::shared_ptr<Reader<MessageT>>;
126 
127  template <typename MessageT>
128  auto CreateReader(const ReaderConfig& config,
129  const CallbackFunc<MessageT>& reader_func)
130  -> std::shared_ptr<Reader<MessageT>>;
131 
132  template <typename MessageT>
133  auto CreateReader(const proto::RoleAttributes& role_attr,
134  const CallbackFunc<MessageT>& reader_func,
136  -> std::shared_ptr<Reader<MessageT>>;
137 
138  template <typename MessageT>
139  auto CreateReader(const proto::RoleAttributes& role_attr)
140  -> std::shared_ptr<Reader<MessageT>>;
141 
142  template <typename MessageT>
143  void FillInAttr(proto::RoleAttributes* attr);
144 
145  bool is_reality_mode_;
146  std::string node_name_;
147  proto::RoleAttributes node_attr_;
148  NodeManagerPtr node_manager_ = nullptr;
149 };
150 
151 template <typename MessageT>
152 auto NodeChannelImpl::CreateWriter(const proto::RoleAttributes& role_attr)
153  -> std::shared_ptr<Writer<MessageT>> {
154  if (!role_attr.has_channel_name() || role_attr.channel_name().empty()) {
155  AERROR << "Can't create a writer with empty channel name!";
156  return nullptr;
157  }
158  proto::RoleAttributes new_attr(role_attr);
159  FillInAttr<MessageT>(&new_attr);
160 
161  std::shared_ptr<Writer<MessageT>> writer_ptr = nullptr;
162  if (!is_reality_mode_) {
163  writer_ptr = std::make_shared<blocker::IntraWriter<MessageT>>(new_attr);
164  } else {
165  writer_ptr = std::make_shared<Writer<MessageT>>(new_attr);
166  }
167 
168  RETURN_VAL_IF_NULL(writer_ptr, nullptr);
169  RETURN_VAL_IF(!writer_ptr->Init(), nullptr);
170  return writer_ptr;
171 }
172 
173 template <typename MessageT>
174 auto NodeChannelImpl::CreateWriter(const std::string& channel_name)
175  -> std::shared_ptr<Writer<MessageT>> {
176  proto::RoleAttributes role_attr;
177  role_attr.set_channel_name(channel_name);
178  return this->CreateWriter<MessageT>(role_attr);
179 }
180 
181 template <typename MessageT>
182 auto NodeChannelImpl::CreateReader(const std::string& channel_name,
183  const CallbackFunc<MessageT>& reader_func)
184  -> std::shared_ptr<Reader<MessageT>> {
185  proto::RoleAttributes role_attr;
186  role_attr.set_channel_name(channel_name);
187  return this->template CreateReader<MessageT>(role_attr, reader_func);
188 }
189 
190 template <typename MessageT>
191 auto NodeChannelImpl::CreateReader(const ReaderConfig& config,
192  const CallbackFunc<MessageT>& reader_func)
193  -> std::shared_ptr<Reader<MessageT>> {
194  proto::RoleAttributes role_attr;
195  role_attr.set_channel_name(config.channel_name);
196  role_attr.mutable_qos_profile()->CopyFrom(config.qos_profile);
197  return this->template CreateReader<MessageT>(role_attr, reader_func,
198  config.pending_queue_size);
199 }
200 
201 template <typename MessageT>
202 auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr,
203  const CallbackFunc<MessageT>& reader_func,
204  uint32_t pending_queue_size)
205  -> std::shared_ptr<Reader<MessageT>> {
206  if (!role_attr.has_channel_name() || role_attr.channel_name().empty()) {
207  AERROR << "Can't create a reader with empty channel name!";
208  return nullptr;
209  }
210 
211  proto::RoleAttributes new_attr(role_attr);
212  FillInAttr<MessageT>(&new_attr);
213 
214  std::shared_ptr<Reader<MessageT>> reader_ptr = nullptr;
215  if (!is_reality_mode_) {
216  reader_ptr =
217  std::make_shared<blocker::IntraReader<MessageT>>(new_attr, reader_func);
218  } else {
219  reader_ptr = std::make_shared<Reader<MessageT>>(new_attr, reader_func,
221  }
222 
223  RETURN_VAL_IF_NULL(reader_ptr, nullptr);
224  RETURN_VAL_IF(!reader_ptr->Init(), nullptr);
225  return reader_ptr;
226 }
227 
228 template <typename MessageT>
229 auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr)
230  -> std::shared_ptr<Reader<MessageT>> {
231  return this->template CreateReader<MessageT>(role_attr, nullptr);
232 }
233 
234 template <typename MessageT>
235 void NodeChannelImpl::FillInAttr(proto::RoleAttributes* attr) {
236  attr->set_host_name(node_attr_.host_name());
237  attr->set_host_ip(node_attr_.host_ip());
238  attr->set_process_id(node_attr_.process_id());
239  attr->set_node_name(node_attr_.node_name());
240  attr->set_node_id(node_attr_.node_id());
241  auto channel_id = GlobalData::RegisterChannel(attr->channel_name());
242  attr->set_channel_id(channel_id);
243  if (!attr->has_message_type()) {
244  attr->set_message_type(message::MessageType<MessageT>());
245  }
246  if (!attr->has_proto_desc()) {
247  std::string proto_desc("");
248  message::GetDescriptorString<MessageT>(attr->message_type(), &proto_desc);
249  attr->set_proto_desc(proto_desc);
250  }
251  if (!attr->has_qos_profile()) {
252  attr->mutable_qos_profile()->CopyFrom(
254  }
255 }
256 
257 } // namespace cyber
258 } // namespace apollo
259 
260 #endif // CYBER_NODE_NODE_CHANNEL_IMPL_H_
Definition: node.h:31
Definition: node_channel_impl.h:37
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
virtual ~NodeChannelImpl()
Destroy the Node Channel Impl object.
Definition: node_channel_impl.h:99
#define RETURN_VAL_IF(condition, val)
Definition: log.h:114
static uint64_t RegisterNode(const std::string &node_name)
const std::string & NodeName() const
get name of this node
Definition: node_channel_impl.h:111
uint32_t pending_queue_size
configuration for responding ChannelBuffer. Older messages will dropped if you have no time to handle...
Definition: node_channel_impl.h:59
Node is the fundamental building block of Cyber RT. every module contains and communicates through th...
Definition: node.h:44
proto::QosProfile qos_profile
Definition: node_channel_impl.h:54
static uint64_t RegisterChannel(const std::string &channel)
const uint32_t DEFAULT_PENDING_QUEUE_SIZE
Definition: reader.h:49
std::function< void(const std::shared_ptr< M0 > &)> CallbackFunc
Definition: reader.h:45
NodeChannelImpl(const std::string &node_name)
Construct a new Node Channel Impl object.
Definition: node_channel_impl.h:78
static const QosProfile QOS_PROFILE_DEFAULT
Definition: qos_profile_conf.h:46
#define AERROR
Definition: log.h:44
ReaderConfig(const ReaderConfig &other)
Definition: node_channel_impl.h:48
The implementation for Node to create Objects connected by Channels. e.g. Channel Reader and Writer...
Definition: node_channel_impl.h:67
std::shared_ptr< service_discovery::NodeManager > NodeManagerPtr
Definition: node_channel_impl.h:71
std::string channel_name
Definition: node_channel_impl.h:53
#define RETURN_VAL_IF_NULL(ptr, val)
Definition: log.h:98
ReaderConfig()
< configurations for a Reader
Definition: node_channel_impl.h:38