Apollo  6.0
Open source self driving car software
component.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_COMPONENT_COMPONENT_H_
18 #define CYBER_COMPONENT_COMPONENT_H_
19 
20 #include <memory>
21 #include <utility>
22 #include <vector>
23 
24 #include "cyber/base/macros.h"
27 #include "cyber/common/types.h"
28 #include "cyber/common/util.h"
33 
34 namespace apollo {
35 namespace cyber {
36 
38 using apollo::cyber::proto::RoleAttributes;
39 
56 template <typename M0 = NullType, typename M1 = NullType,
57  typename M2 = NullType, typename M3 = NullType>
58 class Component : public ComponentBase {
59  public:
60  Component() {}
61  ~Component() override {}
62 
70  bool Initialize(const ComponentConfig& config) override;
71  bool Process(const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1,
72  const std::shared_ptr<M2>& msg2,
73  const std::shared_ptr<M3>& msg3);
74 
75  private:
86  virtual bool Proc(const std::shared_ptr<M0>& msg0,
87  const std::shared_ptr<M1>& msg1,
88  const std::shared_ptr<M2>& msg2,
89  const std::shared_ptr<M3>& msg3) = 0;
90 };
91 
92 template <>
94  public:
95  Component() {}
96  ~Component() override {}
97  bool Initialize(const ComponentConfig& config) override;
98 };
99 
100 template <typename M0>
102  public:
104  ~Component() override {}
105  bool Initialize(const ComponentConfig& config) override;
106  bool Process(const std::shared_ptr<M0>& msg);
107 
108  private:
109  virtual bool Proc(const std::shared_ptr<M0>& msg) = 0;
110 };
111 
112 template <typename M0, typename M1>
113 class Component<M0, M1, NullType, NullType> : public ComponentBase {
114  public:
116  ~Component() override {}
117  bool Initialize(const ComponentConfig& config) override;
118  bool Process(const std::shared_ptr<M0>& msg0,
119  const std::shared_ptr<M1>& msg1);
120 
121  private:
122  virtual bool Proc(const std::shared_ptr<M0>& msg,
123  const std::shared_ptr<M1>& msg1) = 0;
124 };
125 
126 template <typename M0, typename M1, typename M2>
127 class Component<M0, M1, M2, NullType> : public ComponentBase {
128  public:
130  ~Component() override {}
131  bool Initialize(const ComponentConfig& config) override;
132  bool Process(const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1,
133  const std::shared_ptr<M2>& msg2);
134 
135  private:
136  virtual bool Proc(const std::shared_ptr<M0>& msg,
137  const std::shared_ptr<M1>& msg1,
138  const std::shared_ptr<M2>& msg2) = 0;
139 };
140 
141 template <typename M0>
143  const std::shared_ptr<M0>& msg) {
144  if (is_shutdown_.load()) {
145  return true;
146  }
147  return Proc(msg);
148 }
149 
151  const ComponentConfig& config) {
152  node_.reset(new Node(config.name()));
153  LoadConfigFiles(config);
154  if (!Init()) {
155  AERROR << "Component Init() failed." << std::endl;
156  return false;
157  }
158  return true;
159 }
160 
161 template <typename M0>
163  const ComponentConfig& config) {
164  node_.reset(new Node(config.name()));
165  LoadConfigFiles(config);
166 
167  if (config.readers_size() < 1) {
168  AERROR << "Invalid config file: too few readers.";
169  return false;
170  }
171 
172  if (!Init()) {
173  AERROR << "Component Init() failed.";
174  return false;
175  }
176 
177  bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
178 
179  ReaderConfig reader_cfg;
180  reader_cfg.channel_name = config.readers(0).channel();
181  reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
182  reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
183 
184  std::weak_ptr<Component<M0>> self =
185  std::dynamic_pointer_cast<Component<M0>>(shared_from_this());
186  auto func = [self](const std::shared_ptr<M0>& msg) {
187  auto ptr = self.lock();
188  if (ptr) {
189  ptr->Process(msg);
190  } else {
191  AERROR << "Component object has been destroyed.";
192  }
193  };
194 
195  std::shared_ptr<Reader<M0>> reader = nullptr;
196 
197  if (cyber_likely(is_reality_mode)) {
198  reader = node_->CreateReader<M0>(reader_cfg);
199  } else {
200  reader = node_->CreateReader<M0>(reader_cfg, func);
201  }
202 
203  if (reader == nullptr) {
204  AERROR << "Component create reader failed.";
205  return false;
206  }
207  readers_.emplace_back(std::move(reader));
208 
209  if (cyber_unlikely(!is_reality_mode)) {
210  return true;
211  }
212 
213  data::VisitorConfig conf = {readers_[0]->ChannelId(),
214  readers_[0]->PendingQueueSize()};
215  auto dv = std::make_shared<data::DataVisitor<M0>>(conf);
216  croutine::RoutineFactory factory =
217  croutine::CreateRoutineFactory<M0>(func, dv);
218  auto sched = scheduler::Instance();
219  return sched->CreateTask(factory, node_->Name());
220 }
221 
222 template <typename M0, typename M1>
224  const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1) {
225  if (is_shutdown_.load()) {
226  return true;
227  }
228  return Proc(msg0, msg1);
229 }
230 
231 template <typename M0, typename M1>
233  const ComponentConfig& config) {
234  node_.reset(new Node(config.name()));
235  LoadConfigFiles(config);
236 
237  if (config.readers_size() < 2) {
238  AERROR << "Invalid config file: too few readers.";
239  return false;
240  }
241 
242  if (!Init()) {
243  AERROR << "Component Init() failed.";
244  return false;
245  }
246 
247  bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
248 
249  ReaderConfig reader_cfg;
250  reader_cfg.channel_name = config.readers(1).channel();
251  reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
252  reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
253 
254  auto reader1 = node_->template CreateReader<M1>(reader_cfg);
255 
256  reader_cfg.channel_name = config.readers(0).channel();
257  reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
258  reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
259 
260  std::shared_ptr<Reader<M0>> reader0 = nullptr;
261  if (cyber_likely(is_reality_mode)) {
262  reader0 = node_->template CreateReader<M0>(reader_cfg);
263  } else {
264  std::weak_ptr<Component<M0, M1>> self =
265  std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
266 
267  auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
268  config.readers(1).channel());
269 
270  auto func = [self, blocker1](const std::shared_ptr<M0>& msg0) {
271  auto ptr = self.lock();
272  if (ptr) {
273  if (!blocker1->IsPublishedEmpty()) {
274  auto msg1 = blocker1->GetLatestPublishedPtr();
275  ptr->Process(msg0, msg1);
276  }
277  } else {
278  AERROR << "Component object has been destroyed.";
279  }
280  };
281 
282  reader0 = node_->template CreateReader<M0>(reader_cfg, func);
283  }
284  if (reader0 == nullptr || reader1 == nullptr) {
285  AERROR << "Component create reader failed.";
286  return false;
287  }
288  readers_.push_back(std::move(reader0));
289  readers_.push_back(std::move(reader1));
290 
291  if (cyber_unlikely(!is_reality_mode)) {
292  return true;
293  }
294 
295  auto sched = scheduler::Instance();
296  std::weak_ptr<Component<M0, M1>> self =
297  std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
298  auto func = [self](const std::shared_ptr<M0>& msg0,
299  const std::shared_ptr<M1>& msg1) {
300  auto ptr = self.lock();
301  if (ptr) {
302  ptr->Process(msg0, msg1);
303  } else {
304  AERROR << "Component object has been destroyed.";
305  }
306  };
307 
308  std::vector<data::VisitorConfig> config_list;
309  for (auto& reader : readers_) {
310  config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
311  }
312  auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
313  croutine::RoutineFactory factory =
314  croutine::CreateRoutineFactory<M0, M1>(func, dv);
315  return sched->CreateTask(factory, node_->Name());
316 }
317 
318 template <typename M0, typename M1, typename M2>
319 bool Component<M0, M1, M2, NullType>::Process(const std::shared_ptr<M0>& msg0,
320  const std::shared_ptr<M1>& msg1,
321  const std::shared_ptr<M2>& msg2) {
322  if (is_shutdown_.load()) {
323  return true;
324  }
325  return Proc(msg0, msg1, msg2);
326 }
327 
328 template <typename M0, typename M1, typename M2>
330  const ComponentConfig& config) {
331  node_.reset(new Node(config.name()));
332  LoadConfigFiles(config);
333 
334  if (config.readers_size() < 3) {
335  AERROR << "Invalid config file: too few readers.";
336  return false;
337  }
338 
339  if (!Init()) {
340  AERROR << "Component Init() failed.";
341  return false;
342  }
343 
344  bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
345 
346  ReaderConfig reader_cfg;
347  reader_cfg.channel_name = config.readers(1).channel();
348  reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
349  reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
350 
351  auto reader1 = node_->template CreateReader<M1>(reader_cfg);
352 
353  reader_cfg.channel_name = config.readers(2).channel();
354  reader_cfg.qos_profile.CopyFrom(config.readers(2).qos_profile());
355  reader_cfg.pending_queue_size = config.readers(2).pending_queue_size();
356 
357  auto reader2 = node_->template CreateReader<M2>(reader_cfg);
358 
359  reader_cfg.channel_name = config.readers(0).channel();
360  reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
361  reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
362  std::shared_ptr<Reader<M0>> reader0 = nullptr;
363  if (cyber_likely(is_reality_mode)) {
364  reader0 = node_->template CreateReader<M0>(reader_cfg);
365  } else {
366  std::weak_ptr<Component<M0, M1, M2, NullType>> self =
367  std::dynamic_pointer_cast<Component<M0, M1, M2, NullType>>(
368  shared_from_this());
369 
370  auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
371  config.readers(1).channel());
372  auto blocker2 = blocker::BlockerManager::Instance()->GetBlocker<M2>(
373  config.readers(2).channel());
374 
375  auto func = [self, blocker1, blocker2](const std::shared_ptr<M0>& msg0) {
376  auto ptr = self.lock();
377  if (ptr) {
378  if (!blocker1->IsPublishedEmpty() && !blocker2->IsPublishedEmpty()) {
379  auto msg1 = blocker1->GetLatestPublishedPtr();
380  auto msg2 = blocker2->GetLatestPublishedPtr();
381  ptr->Process(msg0, msg1, msg2);
382  }
383  } else {
384  AERROR << "Component object has been destroyed.";
385  }
386  };
387 
388  reader0 = node_->template CreateReader<M0>(reader_cfg, func);
389  }
390 
391  if (reader0 == nullptr || reader1 == nullptr || reader2 == nullptr) {
392  AERROR << "Component create reader failed.";
393  return false;
394  }
395  readers_.push_back(std::move(reader0));
396  readers_.push_back(std::move(reader1));
397  readers_.push_back(std::move(reader2));
398 
399  if (cyber_unlikely(!is_reality_mode)) {
400  return true;
401  }
402 
403  auto sched = scheduler::Instance();
404  std::weak_ptr<Component<M0, M1, M2, NullType>> self =
405  std::dynamic_pointer_cast<Component<M0, M1, M2, NullType>>(
406  shared_from_this());
407  auto func = [self](const std::shared_ptr<M0>& msg0,
408  const std::shared_ptr<M1>& msg1,
409  const std::shared_ptr<M2>& msg2) {
410  auto ptr = self.lock();
411  if (ptr) {
412  ptr->Process(msg0, msg1, msg2);
413  } else {
414  AERROR << "Component object has been destroyed.";
415  }
416  };
417 
418  std::vector<data::VisitorConfig> config_list;
419  for (auto& reader : readers_) {
420  config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
421  }
422  auto dv = std::make_shared<data::DataVisitor<M0, M1, M2>>(config_list);
423  croutine::RoutineFactory factory =
424  croutine::CreateRoutineFactory<M0, M1, M2>(func, dv);
425  return sched->CreateTask(factory, node_->Name());
426 }
427 
428 template <typename M0, typename M1, typename M2, typename M3>
429 bool Component<M0, M1, M2, M3>::Process(const std::shared_ptr<M0>& msg0,
430  const std::shared_ptr<M1>& msg1,
431  const std::shared_ptr<M2>& msg2,
432  const std::shared_ptr<M3>& msg3) {
433  if (is_shutdown_.load()) {
434  return true;
435  }
436  return Proc(msg0, msg1, msg2, msg3);
437 }
438 
439 template <typename M0, typename M1, typename M2, typename M3>
440 bool Component<M0, M1, M2, M3>::Initialize(const ComponentConfig& config) {
441  node_.reset(new Node(config.name()));
442  LoadConfigFiles(config);
443 
444  if (config.readers_size() < 4) {
445  AERROR << "Invalid config file: too few readers_." << std::endl;
446  return false;
447  }
448 
449  if (!Init()) {
450  AERROR << "Component Init() failed." << std::endl;
451  return false;
452  }
453 
454  bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
455 
456  ReaderConfig reader_cfg;
457  reader_cfg.channel_name = config.readers(1).channel();
458  reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
459  reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
460 
461  auto reader1 = node_->template CreateReader<M1>(reader_cfg);
462 
463  reader_cfg.channel_name = config.readers(2).channel();
464  reader_cfg.qos_profile.CopyFrom(config.readers(2).qos_profile());
465  reader_cfg.pending_queue_size = config.readers(2).pending_queue_size();
466 
467  auto reader2 = node_->template CreateReader<M2>(reader_cfg);
468 
469  reader_cfg.channel_name = config.readers(3).channel();
470  reader_cfg.qos_profile.CopyFrom(config.readers(3).qos_profile());
471  reader_cfg.pending_queue_size = config.readers(3).pending_queue_size();
472 
473  auto reader3 = node_->template CreateReader<M3>(reader_cfg);
474 
475  reader_cfg.channel_name = config.readers(0).channel();
476  reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
477  reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
478 
479  std::shared_ptr<Reader<M0>> reader0 = nullptr;
480  if (cyber_likely(is_reality_mode)) {
481  reader0 = node_->template CreateReader<M0>(reader_cfg);
482  } else {
483  std::weak_ptr<Component<M0, M1, M2, M3>> self =
484  std::dynamic_pointer_cast<Component<M0, M1, M2, M3>>(
485  shared_from_this());
486 
487  auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
488  config.readers(1).channel());
489  auto blocker2 = blocker::BlockerManager::Instance()->GetBlocker<M2>(
490  config.readers(2).channel());
491  auto blocker3 = blocker::BlockerManager::Instance()->GetBlocker<M3>(
492  config.readers(3).channel());
493 
494  auto func = [self, blocker1, blocker2,
495  blocker3](const std::shared_ptr<M0>& msg0) {
496  auto ptr = self.lock();
497  if (ptr) {
498  if (!blocker1->IsPublishedEmpty() && !blocker2->IsPublishedEmpty() &&
499  !blocker3->IsPublishedEmpty()) {
500  auto msg1 = blocker1->GetLatestPublishedPtr();
501  auto msg2 = blocker2->GetLatestPublishedPtr();
502  auto msg3 = blocker3->GetLatestPublishedPtr();
503  ptr->Process(msg0, msg1, msg2, msg3);
504  }
505  } else {
506  AERROR << "Component object has been destroyed.";
507  }
508  };
509 
510  reader0 = node_->template CreateReader<M0>(reader_cfg, func);
511  }
512 
513  if (reader0 == nullptr || reader1 == nullptr || reader2 == nullptr ||
514  reader3 == nullptr) {
515  AERROR << "Component create reader failed." << std::endl;
516  return false;
517  }
518  readers_.push_back(std::move(reader0));
519  readers_.push_back(std::move(reader1));
520  readers_.push_back(std::move(reader2));
521  readers_.push_back(std::move(reader3));
522 
523  if (cyber_unlikely(!is_reality_mode)) {
524  return true;
525  }
526 
527  auto sched = scheduler::Instance();
528  std::weak_ptr<Component<M0, M1, M2, M3>> self =
529  std::dynamic_pointer_cast<Component<M0, M1, M2, M3>>(shared_from_this());
530  auto func =
531  [self](const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1,
532  const std::shared_ptr<M2>& msg2, const std::shared_ptr<M3>& msg3) {
533  auto ptr = self.lock();
534  if (ptr) {
535  ptr->Process(msg0, msg1, msg2, msg3);
536  } else {
537  AERROR << "Component object has been destroyed." << std::endl;
538  }
539  };
540 
541  std::vector<data::VisitorConfig> config_list;
542  for (auto& reader : readers_) {
543  config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
544  }
545  auto dv = std::make_shared<data::DataVisitor<M0, M1, M2, M3>>(config_list);
546  croutine::RoutineFactory factory =
547  croutine::CreateRoutineFactory<M0, M1, M2, M3>(func, dv);
548  return sched->CreateTask(factory, node_->Name());
549 }
550 
551 #define CYBER_REGISTER_COMPONENT(name) \
552  CLASS_LOADER_REGISTER_CLASS(name, apollo::cyber::ComponentBase)
553 
554 } // namespace cyber
555 } // namespace apollo
556 
557 #endif // CYBER_COMPONENT_COMPONENT_H_
~Component() override
Definition: component.h:104
void(* func)(void *)
Definition: routine_context.h:41
std::vector< std::shared_ptr< ReaderBase > > readers_
Definition: component_base.h:115
#define cyber_likely(x)
Definition: macros.h:27
Definition: node_channel_impl.h:37
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
static const std::shared_ptr< BlockerManager > & Instance()
Definition: blocker_manager.h:38
bool Process(const std::shared_ptr< M0 > &msg0, const std::shared_ptr< M1 > &msg1, const std::shared_ptr< M2 > &msg2, const std::shared_ptr< M3 > &msg3)
Definition: component.h:429
uint32_t pending_queue_size
configuration for responding ChannelBuffer. Older messages will dropped if you have no time to handle...
Definition: node_channel_impl.h:59
~Component() override
Definition: component.h:61
Definition: routine_factory.h:33
Node is the fundamental building block of Cyber RT. every module contains and communicates through th...
Definition: node.h:44
~Component() override
Definition: component.h:130
proto::QosProfile qos_profile
Definition: node_channel_impl.h:54
void LoadConfigFiles(const ComponentConfig &config)
Definition: component_base.h:72
Definition: types.h:25
Definition: data_visitor.h:36
Definition: component_base.h:41
std::shared_ptr< Node > node_
Definition: component_base.h:113
Definition: global_data.h:40
~Component() override
Definition: component.h:116
Component()
Definition: component.h:60
#define AERROR
Definition: log.h:44
bool Initialize(const ComponentConfig &config) override
init the component by protobuf object.
Definition: component.h:150
std::atomic< bool > is_shutdown_
Definition: component_base.h:112
std::string channel_name
Definition: node_channel_impl.h:53
The Component can process up to four channels of messages. The message type is specified when the com...
Definition: component.h:58
#define cyber_unlikely(x)
Definition: macros.h:28