17 #ifndef CYBER_COMPONENT_COMPONENT_H_ 18 #define CYBER_COMPONENT_COMPONENT_H_ 38 using apollo::cyber::proto::RoleAttributes;
56 template <
typename M0 = NullType,
typename M1 = NullType,
57 typename M2 = NullType,
typename M3 = NullType>
70 bool Initialize(
const ComponentConfig& config)
override;
71 bool Process(
const std::shared_ptr<M0>& msg0,
const std::shared_ptr<M1>& msg1,
72 const std::shared_ptr<M2>& msg2,
73 const std::shared_ptr<M3>& msg3);
86 virtual bool Proc(
const std::shared_ptr<M0>& msg0,
87 const std::shared_ptr<M1>& msg1,
88 const std::shared_ptr<M2>& msg2,
89 const std::shared_ptr<M3>& msg3) = 0;
97 bool Initialize(
const ComponentConfig& config)
override;
100 template <
typename M0>
105 bool Initialize(
const ComponentConfig& config)
override;
106 bool Process(
const std::shared_ptr<M0>& msg);
109 virtual bool Proc(
const std::shared_ptr<M0>& msg) = 0;
112 template <
typename M0,
typename M1>
117 bool Initialize(
const ComponentConfig& config)
override;
118 bool Process(
const std::shared_ptr<M0>& msg0,
119 const std::shared_ptr<M1>& msg1);
122 virtual bool Proc(
const std::shared_ptr<M0>& msg,
123 const std::shared_ptr<M1>& msg1) = 0;
126 template <
typename M0,
typename M1,
typename M2>
131 bool Initialize(
const ComponentConfig& config)
override;
132 bool Process(
const std::shared_ptr<M0>& msg0,
const std::shared_ptr<M1>& msg1,
133 const std::shared_ptr<M2>& msg2);
136 virtual bool Proc(
const std::shared_ptr<M0>& msg,
137 const std::shared_ptr<M1>& msg1,
138 const std::shared_ptr<M2>& msg2) = 0;
141 template <
typename M0>
143 const std::shared_ptr<M0>& msg) {
151 const ComponentConfig& config) {
155 AERROR <<
"Component Init() failed." << std::endl;
161 template <
typename M0>
163 const ComponentConfig& config) {
167 if (config.readers_size() < 1) {
168 AERROR <<
"Invalid config file: too few readers.";
173 AERROR <<
"Component Init() failed.";
181 reader_cfg.
qos_profile.CopyFrom(config.readers(0).qos_profile());
184 std::weak_ptr<Component<M0>>
self =
185 std::dynamic_pointer_cast<
Component<M0>>(shared_from_this());
186 auto func = [
self](
const std::shared_ptr<M0>& msg) {
187 auto ptr =
self.lock();
191 AERROR <<
"Component object has been destroyed.";
195 std::shared_ptr<Reader<M0>> reader =
nullptr;
198 reader =
node_->CreateReader<M0>(reader_cfg);
200 reader =
node_->CreateReader<M0>(reader_cfg,
func);
203 if (reader ==
nullptr) {
204 AERROR <<
"Component create reader failed.";
207 readers_.emplace_back(std::move(reader));
215 auto dv = std::make_shared<data::DataVisitor<M0>>(conf);
217 croutine::CreateRoutineFactory<M0>(
func, dv);
219 return sched->CreateTask(factory,
node_->Name());
222 template <
typename M0,
typename M1>
224 const std::shared_ptr<M0>& msg0,
const std::shared_ptr<M1>& msg1) {
228 return Proc(msg0, msg1);
231 template <
typename M0,
typename M1>
233 const ComponentConfig& config) {
237 if (config.readers_size() < 2) {
238 AERROR <<
"Invalid config file: too few readers.";
243 AERROR <<
"Component Init() failed.";
251 reader_cfg.
qos_profile.CopyFrom(config.readers(1).qos_profile());
254 auto reader1 =
node_->template CreateReader<M1>(reader_cfg);
257 reader_cfg.
qos_profile.CopyFrom(config.readers(0).qos_profile());
260 std::shared_ptr<Reader<M0>> reader0 =
nullptr;
262 reader0 =
node_->template CreateReader<M0>(reader_cfg);
264 std::weak_ptr<Component<M0, M1>>
self =
268 config.readers(1).channel());
270 auto func = [
self, blocker1](
const std::shared_ptr<M0>& msg0) {
271 auto ptr =
self.lock();
273 if (!blocker1->IsPublishedEmpty()) {
274 auto msg1 = blocker1->GetLatestPublishedPtr();
275 ptr->Process(msg0, msg1);
278 AERROR <<
"Component object has been destroyed.";
282 reader0 =
node_->template CreateReader<M0>(reader_cfg,
func);
284 if (reader0 ==
nullptr || reader1 ==
nullptr) {
285 AERROR <<
"Component create reader failed.";
288 readers_.push_back(std::move(reader0));
289 readers_.push_back(std::move(reader1));
296 std::weak_ptr<Component<M0, M1>>
self =
298 auto func = [
self](
const std::shared_ptr<M0>& msg0,
299 const std::shared_ptr<M1>& msg1) {
300 auto ptr =
self.lock();
302 ptr->Process(msg0, msg1);
304 AERROR <<
"Component object has been destroyed.";
308 std::vector<data::VisitorConfig> config_list;
310 config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
312 auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
314 croutine::CreateRoutineFactory<M0, M1>(
func, dv);
315 return sched->CreateTask(factory,
node_->Name());
318 template <
typename M0,
typename M1,
typename M2>
320 const std::shared_ptr<M1>& msg1,
321 const std::shared_ptr<M2>& msg2) {
325 return Proc(msg0, msg1, msg2);
328 template <
typename M0,
typename M1,
typename M2>
330 const ComponentConfig& config) {
334 if (config.readers_size() < 3) {
335 AERROR <<
"Invalid config file: too few readers.";
340 AERROR <<
"Component Init() failed.";
348 reader_cfg.
qos_profile.CopyFrom(config.readers(1).qos_profile());
351 auto reader1 =
node_->template CreateReader<M1>(reader_cfg);
354 reader_cfg.
qos_profile.CopyFrom(config.readers(2).qos_profile());
357 auto reader2 =
node_->template CreateReader<M2>(reader_cfg);
360 reader_cfg.
qos_profile.CopyFrom(config.readers(0).qos_profile());
362 std::shared_ptr<Reader<M0>> reader0 =
nullptr;
364 reader0 =
node_->template CreateReader<M0>(reader_cfg);
366 std::weak_ptr<Component<M0, M1, M2, NullType>>
self =
371 config.readers(1).channel());
373 config.readers(2).channel());
375 auto func = [
self, blocker1, blocker2](
const std::shared_ptr<M0>& msg0) {
376 auto ptr =
self.lock();
378 if (!blocker1->IsPublishedEmpty() && !blocker2->IsPublishedEmpty()) {
379 auto msg1 = blocker1->GetLatestPublishedPtr();
380 auto msg2 = blocker2->GetLatestPublishedPtr();
381 ptr->Process(msg0, msg1, msg2);
384 AERROR <<
"Component object has been destroyed.";
388 reader0 =
node_->template CreateReader<M0>(reader_cfg,
func);
391 if (reader0 ==
nullptr || reader1 ==
nullptr || reader2 ==
nullptr) {
392 AERROR <<
"Component create reader failed.";
395 readers_.push_back(std::move(reader0));
396 readers_.push_back(std::move(reader1));
397 readers_.push_back(std::move(reader2));
404 std::weak_ptr<Component<M0, M1, M2, NullType>>
self =
407 auto func = [
self](
const std::shared_ptr<M0>& msg0,
408 const std::shared_ptr<M1>& msg1,
409 const std::shared_ptr<M2>& msg2) {
410 auto ptr =
self.lock();
412 ptr->Process(msg0, msg1, msg2);
414 AERROR <<
"Component object has been destroyed.";
418 std::vector<data::VisitorConfig> config_list;
420 config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
422 auto dv = std::make_shared<data::DataVisitor<M0, M1, M2>>(config_list);
424 croutine::CreateRoutineFactory<M0, M1, M2>(
func, dv);
425 return sched->CreateTask(factory,
node_->Name());
428 template <
typename M0,
typename M1,
typename M2,
typename M3>
430 const std::shared_ptr<M1>& msg1,
431 const std::shared_ptr<M2>& msg2,
432 const std::shared_ptr<M3>& msg3) {
436 return Proc(msg0, msg1, msg2, msg3);
439 template <
typename M0,
typename M1,
typename M2,
typename M3>
444 if (config.readers_size() < 4) {
445 AERROR <<
"Invalid config file: too few readers_." << std::endl;
450 AERROR <<
"Component Init() failed." << std::endl;
458 reader_cfg.
qos_profile.CopyFrom(config.readers(1).qos_profile());
461 auto reader1 =
node_->template CreateReader<M1>(reader_cfg);
464 reader_cfg.
qos_profile.CopyFrom(config.readers(2).qos_profile());
467 auto reader2 =
node_->template CreateReader<M2>(reader_cfg);
470 reader_cfg.
qos_profile.CopyFrom(config.readers(3).qos_profile());
473 auto reader3 =
node_->template CreateReader<M3>(reader_cfg);
476 reader_cfg.
qos_profile.CopyFrom(config.readers(0).qos_profile());
479 std::shared_ptr<Reader<M0>> reader0 =
nullptr;
481 reader0 =
node_->template CreateReader<M0>(reader_cfg);
483 std::weak_ptr<Component<M0, M1, M2, M3>>
self =
488 config.readers(1).channel());
490 config.readers(2).channel());
492 config.readers(3).channel());
494 auto func = [
self, blocker1, blocker2,
495 blocker3](
const std::shared_ptr<M0>& msg0) {
496 auto ptr =
self.lock();
498 if (!blocker1->IsPublishedEmpty() && !blocker2->IsPublishedEmpty() &&
499 !blocker3->IsPublishedEmpty()) {
500 auto msg1 = blocker1->GetLatestPublishedPtr();
501 auto msg2 = blocker2->GetLatestPublishedPtr();
502 auto msg3 = blocker3->GetLatestPublishedPtr();
503 ptr->Process(msg0, msg1, msg2, msg3);
506 AERROR <<
"Component object has been destroyed.";
510 reader0 =
node_->template CreateReader<M0>(reader_cfg,
func);
513 if (reader0 ==
nullptr || reader1 ==
nullptr || reader2 ==
nullptr ||
514 reader3 ==
nullptr) {
515 AERROR <<
"Component create reader failed." << std::endl;
518 readers_.push_back(std::move(reader0));
519 readers_.push_back(std::move(reader1));
520 readers_.push_back(std::move(reader2));
521 readers_.push_back(std::move(reader3));
528 std::weak_ptr<Component<M0, M1, M2, M3>>
self =
531 [
self](
const std::shared_ptr<M0>& msg0,
const std::shared_ptr<M1>& msg1,
532 const std::shared_ptr<M2>& msg2,
const std::shared_ptr<M3>& msg3) {
533 auto ptr =
self.lock();
535 ptr->Process(msg0, msg1, msg2, msg3);
537 AERROR <<
"Component object has been destroyed." << std::endl;
541 std::vector<data::VisitorConfig> config_list;
543 config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
545 auto dv = std::make_shared<data::DataVisitor<M0, M1, M2, M3>>(config_list);
547 croutine::CreateRoutineFactory<M0, M1, M2, M3>(
func, dv);
548 return sched->CreateTask(factory,
node_->Name());
551 #define CYBER_REGISTER_COMPONENT(name) \ 552 CLASS_LOADER_REGISTER_CLASS(name, apollo::cyber::ComponentBase) 557 #endif // CYBER_COMPONENT_COMPONENT_H_ ~Component() override
Definition: component.h:104
void(* func)(void *)
Definition: routine_context.h:41
Component()
Definition: component.h:95
std::vector< std::shared_ptr< ReaderBase > > readers_
Definition: component_base.h:115
#define cyber_likely(x)
Definition: macros.h:27
Definition: node_channel_impl.h:37
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
static const std::shared_ptr< BlockerManager > & Instance()
Definition: blocker_manager.h:38
Component()
Definition: component.h:129
Component()
Definition: component.h:103
bool Process(const std::shared_ptr< M0 > &msg0, const std::shared_ptr< M1 > &msg1, const std::shared_ptr< M2 > &msg2, const std::shared_ptr< M3 > &msg3)
Definition: component.h:429
uint32_t pending_queue_size
configuration for responding ChannelBuffer. Older messages will dropped if you have no time to handle...
Definition: node_channel_impl.h:59
~Component() override
Definition: component.h:61
Definition: routine_factory.h:33
Definition: component.h:127
Node is the fundamental building block of Cyber RT. every module contains and communicates through th...
Definition: node.h:44
~Component() override
Definition: component.h:130
proto::QosProfile qos_profile
Definition: node_channel_impl.h:54
void LoadConfigFiles(const ComponentConfig &config)
Definition: component_base.h:72
Definition: data_visitor.h:36
Definition: component_base.h:41
std::shared_ptr< Node > node_
Definition: component_base.h:113
~Component() override
Definition: component.h:96
Definition: global_data.h:40
~Component() override
Definition: component.h:116
Component()
Definition: component.h:60
#define AERROR
Definition: log.h:44
bool Initialize(const ComponentConfig &config) override
init the component by protobuf object.
Definition: component.h:150
Component()
Definition: component.h:115
std::atomic< bool > is_shutdown_
Definition: component_base.h:112
std::string channel_name
Definition: node_channel_impl.h:53
The Component can process up to four channels of messages. The message type is specified when the com...
Definition: component.h:58
#define cyber_unlikely(x)
Definition: macros.h:28