Apollo  6.0
Open source self driving car software
py_cyber.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_PYTHON_INTERNAL_PY_CYBER_H_
18 #define CYBER_PYTHON_INTERNAL_PY_CYBER_H_
19 
20 #include <unistd.h>
21 
22 #include <algorithm>
23 #include <deque>
24 #include <iostream>
25 #include <memory>
26 #include <mutex>
27 #include <string>
28 #include <thread>
29 #include <unordered_map>
30 #include <utility>
31 #include <vector>
32 
33 #include "cyber/cyber.h"
34 #include "cyber/init.h"
38 #include "cyber/node/node.h"
39 #include "cyber/node/reader.h"
40 #include "cyber/node/writer.h"
42 
43 namespace apollo {
44 namespace cyber {
45 
46 inline bool py_init(const std::string& module_name) {
47  static bool inited = false;
48  if (inited) {
49  AINFO << "cyber already inited.";
50  return true;
51  }
52 
53  if (!Init(module_name.c_str())) {
54  AERROR << "cyber::Init failed:" << module_name;
55  return false;
56  }
57  inited = true;
58  AINFO << "cyber init succ.";
59  return true;
60 }
61 
62 inline bool py_ok() { return OK(); }
63 
64 inline void py_shutdown() { return Clear(); }
65 
66 inline bool py_is_shutdown() { return IsShutdown(); }
67 
68 inline void py_waitforshutdown() { return WaitForShutdown(); }
69 
70 class PyWriter {
71  public:
72  PyWriter(const std::string& channel, const std::string& type,
73  const uint32_t qos_depth, Node* node)
74  : channel_name_(channel),
75  data_type_(type),
76  qos_depth_(qos_depth),
77  node_(node) {
78  std::string proto_desc;
79  message::ProtobufFactory::Instance()->GetDescriptorString(type,
80  &proto_desc);
81  if (proto_desc.empty()) {
82  AWARN << "cpp can't find proto_desc msgtype->" << data_type_;
83  return;
84  }
85  proto::RoleAttributes role_attr;
86  role_attr.set_channel_name(channel_name_);
87  role_attr.set_message_type(data_type_);
88  role_attr.set_proto_desc(proto_desc);
89  auto qos_profile = role_attr.mutable_qos_profile();
90  qos_profile->set_depth(qos_depth_);
91  writer_ = node_->CreateWriter<message::PyMessageWrap>(role_attr);
92  }
93 
94  int write(const std::string& data) {
95  auto message = std::make_shared<message::PyMessageWrap>(data, data_type_);
96  message->set_type_name(data_type_);
97  return writer_->Write(message);
98  }
99 
100  private:
101  std::string channel_name_;
102  std::string data_type_;
103  uint32_t qos_depth_;
104  Node* node_ = nullptr;
105  std::shared_ptr<Writer<message::PyMessageWrap>> writer_;
106 };
107 
108 const char RAWDATATYPE[] = "RawData";
109 class PyReader {
110  public:
111  PyReader(const std::string& channel, const std::string& type, Node* node)
112  : channel_name_(channel), data_type_(type), node_(node), func_(nullptr) {
113  if (data_type_.compare(RAWDATATYPE) == 0) {
114  auto f =
115  [this](const std::shared_ptr<const message::PyMessageWrap>& request) {
116  this->cb(request);
117  };
118  reader_ = node_->CreateReader<message::PyMessageWrap>(channel, f);
119  } else {
120  auto f =
121  [this](const std::shared_ptr<const message::RawMessage>& request) {
122  this->cb_rawmsg(request);
123  };
124  reader_rawmsg_ = node_->CreateReader<message::RawMessage>(channel, f);
125  }
126  }
127 
128  void register_func(int (*func)(const char*)) { func_ = func; }
129 
130  std::string read(bool wait = false) {
131  std::string msg("");
132  std::unique_lock<std::mutex> ul(msg_lock_);
133  if (!cache_.empty()) {
134  msg = std::move(cache_.front());
135  cache_.pop_front();
136  }
137 
138  if (!wait) {
139  return msg;
140  }
141 
142  msg_cond_.wait(ul, [this] { return !this->cache_.empty(); });
143  if (!cache_.empty()) {
144  msg = std::move(cache_.front());
145  cache_.pop_front();
146  }
147 
148  return msg;
149  }
150 
151  private:
152  void cb(const std::shared_ptr<const message::PyMessageWrap>& message) {
153  {
154  std::lock_guard<std::mutex> lg(msg_lock_);
155  cache_.push_back(message->data());
156  }
157  if (func_) {
158  func_(channel_name_.c_str());
159  }
160  msg_cond_.notify_one();
161  }
162 
163  void cb_rawmsg(const std::shared_ptr<const message::RawMessage>& message) {
164  {
165  std::lock_guard<std::mutex> lg(msg_lock_);
166  cache_.push_back(message->message);
167  }
168  if (func_) {
169  func_(channel_name_.c_str());
170  }
171  msg_cond_.notify_one();
172  }
173 
174  std::string channel_name_;
175  std::string data_type_;
176  Node* node_ = nullptr;
177  int (*func_)(const char*) = nullptr;
178  std::shared_ptr<Reader<message::PyMessageWrap>> reader_ = nullptr;
179  std::deque<std::string> cache_;
180  std::mutex msg_lock_;
181  std::condition_variable msg_cond_;
182 
183  std::shared_ptr<Reader<message::RawMessage>> reader_rawmsg_ = nullptr;
184 };
185 
186 using PyMsgWrapPtr = std::shared_ptr<message::PyMessageWrap>;
187 class PyService {
188  public:
189  PyService(const std::string& service_name, const std::string& data_type,
190  Node* node)
191  : node_(node),
192  service_name_(service_name),
193  data_type_(data_type),
194  func_(nullptr) {
195  auto f = [this](
196  const std::shared_ptr<const message::PyMessageWrap>& request,
197  std::shared_ptr<message::PyMessageWrap>& response) {
198  response = this->cb(request);
199  };
200  service_ =
201  node_->CreateService<message::PyMessageWrap, message::PyMessageWrap>(
202  service_name, f);
203  }
204 
205  void register_func(int (*func)(const char*)) { func_ = func; }
206 
207  std::string read() {
208  std::string msg("");
209  if (!request_cache_.empty()) {
210  msg = std::move(request_cache_.front());
211  request_cache_.pop_front();
212  }
213  return msg;
214  }
215 
216  int write(const std::string& data) {
217  response_cache_.push_back(data);
218  return SUCC;
219  }
220 
221  private:
222  PyMsgWrapPtr cb(
223  const std::shared_ptr<const message::PyMessageWrap>& request) {
224  std::lock_guard<std::mutex> lg(msg_lock_);
225 
226  request_cache_.push_back(request->data());
227 
228  if (func_) {
229  func_(service_name_.c_str());
230  }
231 
232  std::string msg("");
233  if (!response_cache_.empty()) {
234  msg = std::move(response_cache_.front());
235  response_cache_.pop_front();
236  }
237 
238  PyMsgWrapPtr response;
239  response.reset(new message::PyMessageWrap(msg, data_type_));
240  return response;
241  }
242 
243  Node* node_;
244  std::string service_name_;
245  std::string data_type_;
246  int (*func_)(const char*) = nullptr;
247  std::shared_ptr<Service<message::PyMessageWrap, message::PyMessageWrap>>
248  service_;
249  std::mutex msg_lock_;
250  std::deque<std::string> request_cache_;
251  std::deque<std::string> response_cache_;
252 };
253 
254 class PyClient {
255  public:
256  PyClient(const std::string& name, const std::string& data_type, Node* node)
257  : node_(node), service_name_(name), data_type_(data_type) {
258  client_ =
259  node_->CreateClient<message::PyMessageWrap, message::PyMessageWrap>(
260  name);
261  }
262 
263  std::string send_request(std::string request) {
264  std::shared_ptr<message::PyMessageWrap> m;
265  m.reset(new message::PyMessageWrap(request, data_type_));
266 
267  auto response = client_->SendRequest(m);
268  if (response == nullptr) {
269  AINFO << "SendRequest:response is null";
270  return std::string("");
271  }
272  response->ParseFromString(response->data());
273 
274  return response->data();
275  }
276 
277  private:
278  Node* node_;
279  std::string service_name_;
280  std::string data_type_;
281  std::shared_ptr<Client<message::PyMessageWrap, message::PyMessageWrap>>
282  client_;
283 };
284 
285 class PyNode {
286  public:
287  explicit PyNode(const std::string& node_name) : node_name_(node_name) {
288  node_ = CreateNode(node_name);
289  }
290 
291  void shutdown() {
292  node_.reset();
293  AINFO << "PyNode " << node_name_ << " exit.";
294  }
295 
296  PyWriter* create_writer(const std::string& channel, const std::string& type,
297  uint32_t qos_depth = 1) {
298  if (node_) {
299  return new PyWriter(channel, type, qos_depth, node_.get());
300  }
301  AINFO << "Py_Node: node_ is null, new PyWriter failed!";
302  return nullptr;
303  }
304 
305  void register_message(const std::string& desc) {
306  message::ProtobufFactory::Instance()->RegisterPythonMessage(desc);
307  }
308 
309  PyReader* create_reader(const std::string& channel, const std::string& type) {
310  if (node_) {
311  return new PyReader(channel, type, node_.get());
312  }
313  return nullptr;
314  }
315 
316  PyService* create_service(const std::string& service,
317  const std::string& type) {
318  if (node_) {
319  return new PyService(service, type, node_.get());
320  }
321  return nullptr;
322  }
323 
324  PyClient* create_client(const std::string& service, const std::string& type) {
325  if (node_) {
326  return new PyClient(service, type, node_.get());
327  }
328  return nullptr;
329  }
330 
331  std::shared_ptr<Node> get_node() { return node_; }
332 
333  private:
334  std::string node_name_;
335  std::shared_ptr<Node> node_ = nullptr;
336 };
337 
339  public:
340  // Get debugstring of rawmsgdata
341  // Pls make sure the msg_type of rawmsg is matching
342  // Used in cyber_channel echo command
344  const std::string& msg_type, const std::string& rawmsgdata) {
345  if (msg_type.empty()) {
346  AERROR << "parse rawmessage the msg_type is null";
347  return "";
348  }
349  if (rawmsgdata.empty()) {
350  AERROR << "parse rawmessage the rawmsgdata is null";
351  return "";
352  }
353 
354  if (raw_msg_class_ == nullptr) {
355  auto rawFactory = message::ProtobufFactory::Instance();
356  raw_msg_class_ = rawFactory->GenerateMessageByType(msg_type);
357  }
358 
359  if (raw_msg_class_ == nullptr) {
360  AERROR << "raw_msg_class_ is null";
361  return "";
362  }
363 
364  if (!raw_msg_class_->ParseFromString(rawmsgdata)) {
365  AERROR << "Cannot parse the msg [ " << msg_type << " ]";
366  return "";
367  }
368 
369  return raw_msg_class_->DebugString();
370  }
371 
372  static std::string get_msgtype_by_channelname(const std::string& channel_name,
373  uint8_t sleep_s = 0) {
374  if (channel_name.empty()) {
375  AERROR << "channel_name is null";
376  return "";
377  }
379  sleep(sleep_s);
380  auto channel_manager = topology->channel_manager();
381  std::string msg_type("");
382  channel_manager->GetMsgType(channel_name, &msg_type);
383  return msg_type;
384  }
385 
386  static std::vector<std::string> get_active_channels(uint8_t sleep_s = 2) {
388  sleep(sleep_s);
389  auto channel_manager = topology->channel_manager();
390  std::vector<std::string> channels;
391  channel_manager->GetChannelNames(&channels);
392  return channels;
393  }
394 
395  static std::unordered_map<std::string, std::vector<std::string>>
396  get_channels_info(uint8_t sleep_s = 2) {
398  sleep(sleep_s);
399  std::vector<proto::RoleAttributes> tmpVec;
400  topology->channel_manager()->GetWriters(&tmpVec);
401  std::unordered_map<std::string, std::vector<std::string>> roles_info;
402 
403  for (auto& attr : tmpVec) {
404  std::string channel_name = attr.channel_name();
405  std::string msgdata;
406  attr.SerializeToString(&msgdata);
407  roles_info[channel_name].emplace_back(msgdata);
408  }
409 
410  tmpVec.clear();
411  topology->channel_manager()->GetReaders(&tmpVec);
412  for (auto& attr : tmpVec) {
413  std::string channel_name = attr.channel_name();
414  std::string msgdata;
415  attr.SerializeToString(&msgdata);
416  roles_info[channel_name].emplace_back(msgdata);
417  }
418  return roles_info;
419  }
420 
421  private:
422  static google::protobuf::Message* raw_msg_class_;
423 };
424 
425 class PyNodeUtils {
426  public:
427  static std::vector<std::string> get_active_nodes(uint8_t sleep_s = 2) {
429  sleep(sleep_s);
430  std::vector<std::string> node_names;
431  std::vector<RoleAttributes> nodes;
432  topology->node_manager()->GetNodes(&nodes);
433  if (nodes.empty()) {
434  AERROR << "no node found.";
435  return node_names;
436  }
437 
438  std::sort(nodes.begin(), nodes.end(),
439  [](const RoleAttributes& na, const RoleAttributes& nb) -> bool {
440  return na.node_name().compare(nb.node_name()) <= 0;
441  });
442  for (auto& node : nodes) {
443  node_names.emplace_back(node.node_name());
444  }
445  return node_names;
446  }
447 
448  static std::string get_node_attr(const std::string& node_name,
449  uint8_t sleep_s = 2) {
451  sleep(sleep_s);
452 
453  if (!topology->node_manager()->HasNode(node_name)) {
454  AERROR << "no node named: " << node_name;
455  return "";
456  }
457 
458  std::vector<RoleAttributes> nodes;
459  topology->node_manager()->GetNodes(&nodes);
460  std::string msgdata;
461  for (auto& node_attr : nodes) {
462  if (node_attr.node_name() == node_name) {
463  node_attr.SerializeToString(&msgdata);
464  return msgdata;
465  }
466  }
467  return "";
468  }
469 
470  static std::vector<std::string> get_readersofnode(
471  const std::string& node_name, uint8_t sleep_s = 2) {
472  std::vector<std::string> reader_channels;
474  sleep(sleep_s);
475  if (!topology->node_manager()->HasNode(node_name)) {
476  AERROR << "no node named: " << node_name;
477  return reader_channels;
478  }
479 
480  std::vector<RoleAttributes> readers;
481  auto channel_mgr = topology->channel_manager();
482  channel_mgr->GetReadersOfNode(node_name, &readers);
483  for (auto& reader : readers) {
484  if (reader.channel_name() == "param_event") {
485  continue;
486  }
487  reader_channels.emplace_back(reader.channel_name());
488  }
489  return reader_channels;
490  }
491 
492  static std::vector<std::string> get_writersofnode(
493  const std::string& node_name, uint8_t sleep_s = 2) {
494  std::vector<std::string> writer_channels;
496  sleep(sleep_s);
497  if (!topology->node_manager()->HasNode(node_name)) {
498  AERROR << "no node named: " << node_name;
499  return writer_channels;
500  }
501 
502  std::vector<RoleAttributes> writers;
503  auto channel_mgr = topology->channel_manager();
504  channel_mgr->GetWritersOfNode(node_name, &writers);
505  for (auto& writer : writers) {
506  if (writer.channel_name() == "param_event") {
507  continue;
508  }
509  writer_channels.emplace_back(writer.channel_name());
510  }
511  return writer_channels;
512  }
513 };
514 
516  public:
517  static std::vector<std::string> get_active_services(uint8_t sleep_s = 2) {
519  sleep(sleep_s);
520  std::vector<std::string> srv_names;
521  std::vector<RoleAttributes> services;
522  topology->service_manager()->GetServers(&services);
523  if (services.empty()) {
524  AERROR << "no service found.";
525  return srv_names;
526  }
527 
528  std::sort(services.begin(), services.end(),
529  [](const RoleAttributes& sa, const RoleAttributes& sb) -> bool {
530  return sa.service_name().compare(sb.service_name()) <= 0;
531  });
532  for (auto& service : services) {
533  srv_names.emplace_back(service.service_name());
534  }
535  return srv_names;
536  }
537 
538  static std::string get_service_attr(const std::string& service_name,
539  uint8_t sleep_s = 2) {
541  sleep(sleep_s);
542 
543  if (!topology->service_manager()->HasService(service_name)) {
544  AERROR << "no service: " << service_name;
545  return "";
546  }
547 
548  std::vector<RoleAttributes> services;
549  topology->service_manager()->GetServers(&services);
550  std::string msgdata;
551  for (auto& service_attr : services) {
552  if (service_attr.service_name() == service_name) {
553  service_attr.SerializeToString(&msgdata);
554  return msgdata;
555  }
556  }
557  return "";
558  }
559 };
560 
561 } // namespace cyber
562 } // namespace apollo
563 
564 #endif // CYBER_PYTHON_INTERNAL_PY_CYBER_H_
bool py_ok()
Definition: py_cyber.h:62
static std::unordered_map< std::string, std::vector< std::string > > get_channels_info(uint8_t sleep_s=2)
Definition: py_cyber.h:396
int write(const std::string &data)
Definition: py_cyber.h:94
void(* func)(void *)
Definition: routine_context.h:41
PyReader * create_reader(const std::string &channel, const std::string &type)
Definition: py_cyber.h:309
const char RAWDATATYPE[]
Definition: py_cyber.h:108
static std::string get_node_attr(const std::string &node_name, uint8_t sleep_s=2)
Definition: py_cyber.h:448
bool py_init(const std::string &module_name)
Definition: py_cyber.h:46
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
static std::string get_service_attr(const std::string &service_name, uint8_t sleep_s=2)
Definition: py_cyber.h:538
Definition: py_cyber.h:109
PyClient(const std::string &name, const std::string &data_type, Node *node)
Definition: py_cyber.h:256
PyService(const std::string &service_name, const std::string &data_type, Node *node)
Definition: py_cyber.h:189
static std::string get_msgtype_by_channelname(const std::string &channel_name, uint8_t sleep_s=0)
Definition: py_cyber.h:372
static std::vector< std::string > get_readersofnode(const std::string &node_name, uint8_t sleep_s=2)
Definition: py_cyber.h:470
Definition: raw_message.h:31
Definition: types.h:29
void shutdown()
Definition: py_cyber.h:291
Definition: py_cyber.h:70
Definition: py_cyber.h:187
PyReader(const std::string &channel, const std::string &type, Node *node)
Definition: py_cyber.h:111
static std::string get_debugstring_by_msgtype_rawmsgdata(const std::string &msg_type, const std::string &rawmsgdata)
Definition: py_cyber.h:343
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space="")
Node is the fundamental building block of Cyber RT. every module contains and communicates through th...
Definition: node.h:44
int write(const std::string &data)
Definition: py_cyber.h:216
PyService * create_service(const std::string &service, const std::string &type)
Definition: py_cyber.h:316
bool IsShutdown()
Definition: state.h:46
Definition: py_cyber.h:425
PyWriter(const std::string &channel, const std::string &type, const uint32_t qos_depth, Node *node)
Definition: py_cyber.h:72
std::string send_request(std::string request)
Definition: py_cyber.h:263
void register_func(int(*func)(const char *))
Definition: py_cyber.h:205
static std::vector< std::string > get_active_nodes(uint8_t sleep_s=2)
Definition: py_cyber.h:427
Definition: py_cyber.h:254
void register_func(int(*func)(const char *))
Definition: py_cyber.h:128
void py_waitforshutdown()
Definition: py_cyber.h:68
static std::vector< std::string > get_active_services(uint8_t sleep_s=2)
Definition: py_cyber.h:517
Definition: py_message.h:33
auto CreateWriter(const proto::RoleAttributes &role_attr) -> std::shared_ptr< Writer< MessageT >>
Create a Writer with specific message type.
Definition: node.h:189
std::shared_ptr< message::PyMessageWrap > PyMsgWrapPtr
Definition: py_cyber.h:186
Definition: py_cyber.h:285
bool Init(const char *binary_name)
static std::vector< std::string > get_writersofnode(const std::string &node_name, uint8_t sleep_s=2)
Definition: py_cyber.h:492
bool py_is_shutdown()
Definition: py_cyber.h:66
void register_message(const std::string &desc)
Definition: py_cyber.h:305
PyNode(const std::string &node_name)
Definition: py_cyber.h:287
std::shared_ptr< Node > get_node()
Definition: py_cyber.h:331
std::string read()
Definition: py_cyber.h:207
std::string read(bool wait=false)
Definition: py_cyber.h:130
#define AERROR
Definition: log.h:44
PyWriter * create_writer(const std::string &channel, const std::string &type, uint32_t qos_depth=1)
Definition: py_cyber.h:296
PyClient * create_client(const std::string &service, const std::string &type)
Definition: py_cyber.h:324
#define AWARN
Definition: log.h:43
bool OK()
Definition: state.h:44
static std::vector< std::string > get_active_channels(uint8_t sleep_s=2)
Definition: py_cyber.h:386
Definition: py_cyber.h:338
#define AINFO
Definition: log.h:42
Definition: py_cyber.h:515
void WaitForShutdown()
Definition: state.h:50
void py_shutdown()
Definition: py_cyber.h:64