17 #ifndef CYBER_NODE_NODE_CHANNEL_IMPL_H_ 18 #define CYBER_NODE_NODE_CHANNEL_IMPL_H_ 23 #include "cyber/proto/run_mode_conf.pb.h" 39 qos_profile.set_history(proto::QosHistoryPolicy::HISTORY_KEEP_LAST);
43 proto::QosReliabilityPolicy::RELIABILITY_RELIABLE);
44 qos_profile.set_durability(proto::QosDurabilityPolicy::DURABILITY_VOLATILE);
79 : is_reality_mode_(true), node_name_(node_name) {
83 node_attr_.set_node_name(node_name);
85 node_attr_.set_node_id(node_id);
89 if (is_reality_mode_) {
92 node_manager_->Join(node_attr_, RoleType::ROLE_NODE);
100 if (is_reality_mode_) {
101 node_manager_->Leave(node_attr_, RoleType::ROLE_NODE);
102 node_manager_ =
nullptr;
111 const std::string&
NodeName()
const {
return node_name_; }
114 template <
typename MessageT>
115 auto CreateWriter(
const proto::RoleAttributes& role_attr)
116 -> std::shared_ptr<Writer<MessageT>>;
118 template <
typename MessageT>
120 -> std::shared_ptr<Writer<MessageT>>;
122 template <
typename MessageT>
123 auto CreateReader(
const std::string& channel_name,
125 -> std::shared_ptr<Reader<MessageT>>;
127 template <
typename MessageT>
130 -> std::shared_ptr<Reader<MessageT>>;
132 template <
typename MessageT>
133 auto CreateReader(
const proto::RoleAttributes& role_attr,
136 -> std::shared_ptr<Reader<MessageT>>;
138 template <
typename MessageT>
139 auto CreateReader(
const proto::RoleAttributes& role_attr)
140 -> std::shared_ptr<Reader<MessageT>>;
142 template <
typename MessageT>
143 void FillInAttr(proto::RoleAttributes* attr);
145 bool is_reality_mode_;
146 std::string node_name_;
147 proto::RoleAttributes node_attr_;
151 template <
typename MessageT>
152 auto NodeChannelImpl::CreateWriter(
const proto::RoleAttributes& role_attr)
153 -> std::shared_ptr<Writer<MessageT>> {
154 if (!role_attr.has_channel_name() || role_attr.channel_name().empty()) {
155 AERROR <<
"Can't create a writer with empty channel name!";
158 proto::RoleAttributes new_attr(role_attr);
159 FillInAttr<MessageT>(&new_attr);
161 std::shared_ptr<Writer<MessageT>> writer_ptr =
nullptr;
162 if (!is_reality_mode_) {
163 writer_ptr = std::make_shared<blocker::IntraWriter<MessageT>>(new_attr);
165 writer_ptr = std::make_shared<Writer<MessageT>>(new_attr);
173 template <
typename MessageT>
174 auto NodeChannelImpl::CreateWriter(
const std::string&
channel_name)
175 -> std::shared_ptr<Writer<MessageT>> {
176 proto::RoleAttributes role_attr;
177 role_attr.set_channel_name(channel_name);
178 return this->CreateWriter<MessageT>(role_attr);
181 template <
typename MessageT>
182 auto NodeChannelImpl::CreateReader(
const std::string& channel_name,
184 -> std::shared_ptr<Reader<MessageT>> {
185 proto::RoleAttributes role_attr;
186 role_attr.set_channel_name(channel_name);
187 return this->
template CreateReader<MessageT>(role_attr, reader_func);
190 template <
typename MessageT>
191 auto NodeChannelImpl::CreateReader(
const ReaderConfig& config,
193 -> std::shared_ptr<Reader<MessageT>> {
194 proto::RoleAttributes role_attr;
196 role_attr.mutable_qos_profile()->CopyFrom(config.
qos_profile);
197 return this->
template CreateReader<MessageT>(role_attr, reader_func,
201 template <
typename MessageT>
202 auto NodeChannelImpl::CreateReader(
const proto::RoleAttributes& role_attr,
205 -> std::shared_ptr<Reader<MessageT>> {
206 if (!role_attr.has_channel_name() || role_attr.channel_name().empty()) {
207 AERROR <<
"Can't create a reader with empty channel name!";
211 proto::RoleAttributes new_attr(role_attr);
212 FillInAttr<MessageT>(&new_attr);
214 std::shared_ptr<Reader<MessageT>> reader_ptr =
nullptr;
215 if (!is_reality_mode_) {
217 std::make_shared<blocker::IntraReader<MessageT>>(new_attr, reader_func);
219 reader_ptr = std::make_shared<Reader<MessageT>>(new_attr, reader_func,
228 template <
typename MessageT>
229 auto NodeChannelImpl::CreateReader(
const proto::RoleAttributes& role_attr)
230 -> std::shared_ptr<Reader<MessageT>> {
231 return this->
template CreateReader<MessageT>(role_attr,
nullptr);
234 template <
typename MessageT>
235 void NodeChannelImpl::FillInAttr(proto::RoleAttributes* attr) {
236 attr->set_host_name(node_attr_.host_name());
237 attr->set_host_ip(node_attr_.host_ip());
238 attr->set_process_id(node_attr_.process_id());
239 attr->set_node_name(node_attr_.node_name());
240 attr->set_node_id(node_attr_.node_id());
242 attr->set_channel_id(channel_id);
243 if (!attr->has_message_type()) {
244 attr->set_message_type(message::MessageType<MessageT>());
246 if (!attr->has_proto_desc()) {
247 std::string proto_desc(
"");
248 message::GetDescriptorString<MessageT>(attr->message_type(), &proto_desc);
249 attr->set_proto_desc(proto_desc);
251 if (!attr->has_qos_profile()) {
252 attr->mutable_qos_profile()->CopyFrom(
260 #endif // CYBER_NODE_NODE_CHANNEL_IMPL_H_
Definition: node_channel_impl.h:37
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
virtual ~NodeChannelImpl()
Destroy the Node Channel Impl object.
Definition: node_channel_impl.h:99
#define RETURN_VAL_IF(condition, val)
Definition: log.h:114
static uint64_t RegisterNode(const std::string &node_name)
const std::string & NodeName() const
get name of this node
Definition: node_channel_impl.h:111
uint32_t pending_queue_size
configuration for responding ChannelBuffer. Older messages will dropped if you have no time to handle...
Definition: node_channel_impl.h:59
Node is the fundamental building block of Cyber RT. every module contains and communicates through th...
Definition: node.h:44
proto::QosProfile qos_profile
Definition: node_channel_impl.h:54
static uint64_t RegisterChannel(const std::string &channel)
const uint32_t DEFAULT_PENDING_QUEUE_SIZE
Definition: reader.h:49
std::function< void(const std::shared_ptr< M0 > &)> CallbackFunc
Definition: reader.h:45
NodeChannelImpl(const std::string &node_name)
Construct a new Node Channel Impl object.
Definition: node_channel_impl.h:78
static const QosProfile QOS_PROFILE_DEFAULT
Definition: qos_profile_conf.h:46
#define AERROR
Definition: log.h:44
ReaderConfig(const ReaderConfig &other)
Definition: node_channel_impl.h:48
The implementation for Node to create Objects connected by Channels. e.g. Channel Reader and Writer...
Definition: node_channel_impl.h:67
std::shared_ptr< service_discovery::NodeManager > NodeManagerPtr
Definition: node_channel_impl.h:71
std::string channel_name
Definition: node_channel_impl.h:53
#define RETURN_VAL_IF_NULL(ptr, val)
Definition: log.h:98
ReaderConfig()
< configurations for a Reader
Definition: node_channel_impl.h:38