17 #ifndef CYBER_PYTHON_INTERNAL_PY_CYBER_H_ 18 #define CYBER_PYTHON_INTERNAL_PY_CYBER_H_ 29 #include <unordered_map> 46 inline bool py_init(
const std::string& module_name) {
47 static bool inited =
false;
49 AINFO <<
"cyber already inited.";
53 if (!
Init(module_name.c_str())) {
54 AERROR <<
"cyber::Init failed:" << module_name;
58 AINFO <<
"cyber init succ.";
72 PyWriter(
const std::string& channel,
const std::string& type,
73 const uint32_t qos_depth,
Node* node)
74 : channel_name_(channel),
76 qos_depth_(qos_depth),
78 std::string proto_desc;
81 if (proto_desc.empty()) {
82 AWARN <<
"cpp can't find proto_desc msgtype->" << data_type_;
85 proto::RoleAttributes role_attr;
86 role_attr.set_channel_name(channel_name_);
87 role_attr.set_message_type(data_type_);
88 role_attr.set_proto_desc(proto_desc);
89 auto qos_profile = role_attr.mutable_qos_profile();
90 qos_profile->set_depth(qos_depth_);
94 int write(
const std::string& data) {
95 auto message = std::make_shared<message::PyMessageWrap>(data, data_type_);
96 message->set_type_name(data_type_);
97 return writer_->Write(message);
101 std::string channel_name_;
102 std::string data_type_;
104 Node* node_ =
nullptr;
105 std::shared_ptr<Writer<message::PyMessageWrap>> writer_;
111 PyReader(
const std::string& channel,
const std::string& type,
Node* node)
112 : channel_name_(channel), data_type_(type), node_(node), func_(nullptr) {
113 if (data_type_.compare(RAWDATATYPE) == 0) {
115 [
this](
const std::shared_ptr<const message::PyMessageWrap>& request) {
121 [
this](
const std::shared_ptr<const message::RawMessage>& request) {
122 this->cb_rawmsg(request);
130 std::string
read(
bool wait =
false) {
132 std::unique_lock<std::mutex> ul(msg_lock_);
133 if (!cache_.empty()) {
134 msg = std::move(cache_.front());
142 msg_cond_.wait(ul, [
this] {
return !this->cache_.empty(); });
143 if (!cache_.empty()) {
144 msg = std::move(cache_.front());
152 void cb(
const std::shared_ptr<const message::PyMessageWrap>& message) {
154 std::lock_guard<std::mutex> lg(msg_lock_);
155 cache_.push_back(message->data());
158 func_(channel_name_.c_str());
160 msg_cond_.notify_one();
163 void cb_rawmsg(
const std::shared_ptr<const message::RawMessage>& message) {
165 std::lock_guard<std::mutex> lg(msg_lock_);
166 cache_.push_back(message->message);
169 func_(channel_name_.c_str());
171 msg_cond_.notify_one();
174 std::string channel_name_;
175 std::string data_type_;
176 Node* node_ =
nullptr;
177 int (*func_)(
const char*) =
nullptr;
178 std::shared_ptr<Reader<message::PyMessageWrap>> reader_ =
nullptr;
179 std::deque<std::string> cache_;
180 std::mutex msg_lock_;
181 std::condition_variable msg_cond_;
183 std::shared_ptr<Reader<message::RawMessage>> reader_rawmsg_ =
nullptr;
189 PyService(
const std::string& service_name,
const std::string& data_type,
192 service_name_(service_name),
193 data_type_(data_type),
196 const std::shared_ptr<const message::PyMessageWrap>& request,
197 std::shared_ptr<message::PyMessageWrap>& response) {
198 response = this->cb(request);
209 if (!request_cache_.empty()) {
210 msg = std::move(request_cache_.front());
211 request_cache_.pop_front();
216 int write(
const std::string& data) {
217 response_cache_.push_back(data);
223 const std::shared_ptr<const message::PyMessageWrap>& request) {
224 std::lock_guard<std::mutex> lg(msg_lock_);
226 request_cache_.push_back(request->data());
229 func_(service_name_.c_str());
233 if (!response_cache_.empty()) {
234 msg = std::move(response_cache_.front());
235 response_cache_.pop_front();
244 std::string service_name_;
245 std::string data_type_;
246 int (*func_)(
const char*) =
nullptr;
247 std::shared_ptr<Service<message::PyMessageWrap, message::PyMessageWrap>>
249 std::mutex msg_lock_;
250 std::deque<std::string> request_cache_;
251 std::deque<std::string> response_cache_;
256 PyClient(
const std::string& name,
const std::string& data_type,
Node* node)
257 : node_(node), service_name_(name), data_type_(data_type) {
264 std::shared_ptr<message::PyMessageWrap> m;
267 auto response = client_->SendRequest(m);
268 if (response ==
nullptr) {
269 AINFO <<
"SendRequest:response is null";
270 return std::string(
"");
272 response->ParseFromString(response->data());
274 return response->data();
279 std::string service_name_;
280 std::string data_type_;
281 std::shared_ptr<Client<message::PyMessageWrap, message::PyMessageWrap>>
287 explicit PyNode(
const std::string& node_name) : node_name_(node_name) {
293 AINFO <<
"PyNode " << node_name_ <<
" exit.";
297 uint32_t qos_depth = 1) {
299 return new PyWriter(channel, type, qos_depth, node_.get());
301 AINFO <<
"Py_Node: node_ is null, new PyWriter failed!";
311 return new PyReader(channel, type, node_.get());
317 const std::string& type) {
319 return new PyService(service, type, node_.get());
326 return new PyClient(service, type, node_.get());
334 std::string node_name_;
335 std::shared_ptr<Node> node_ =
nullptr;
344 const std::string& msg_type,
const std::string& rawmsgdata) {
345 if (msg_type.empty()) {
346 AERROR <<
"parse rawmessage the msg_type is null";
349 if (rawmsgdata.empty()) {
350 AERROR <<
"parse rawmessage the rawmsgdata is null";
354 if (raw_msg_class_ ==
nullptr) {
356 raw_msg_class_ = rawFactory->GenerateMessageByType(msg_type);
359 if (raw_msg_class_ ==
nullptr) {
360 AERROR <<
"raw_msg_class_ is null";
364 if (!raw_msg_class_->ParseFromString(rawmsgdata)) {
365 AERROR <<
"Cannot parse the msg [ " << msg_type <<
" ]";
369 return raw_msg_class_->DebugString();
373 uint8_t sleep_s = 0) {
374 if (channel_name.empty()) {
375 AERROR <<
"channel_name is null";
380 auto channel_manager = topology->channel_manager();
381 std::string msg_type(
"");
382 channel_manager->GetMsgType(channel_name, &msg_type);
389 auto channel_manager = topology->channel_manager();
390 std::vector<std::string> channels;
391 channel_manager->GetChannelNames(&channels);
395 static std::unordered_map<std::string, std::vector<std::string>>
399 std::vector<proto::RoleAttributes> tmpVec;
400 topology->channel_manager()->GetWriters(&tmpVec);
401 std::unordered_map<std::string, std::vector<std::string>> roles_info;
403 for (
auto& attr : tmpVec) {
404 std::string channel_name = attr.channel_name();
406 attr.SerializeToString(&msgdata);
407 roles_info[channel_name].emplace_back(msgdata);
411 topology->channel_manager()->GetReaders(&tmpVec);
412 for (
auto& attr : tmpVec) {
413 std::string channel_name = attr.channel_name();
415 attr.SerializeToString(&msgdata);
416 roles_info[channel_name].emplace_back(msgdata);
422 static google::protobuf::Message* raw_msg_class_;
430 std::vector<std::string> node_names;
431 std::vector<RoleAttributes> nodes;
432 topology->node_manager()->GetNodes(&nodes);
434 AERROR <<
"no node found.";
438 std::sort(nodes.begin(), nodes.end(),
439 [](
const RoleAttributes& na,
const RoleAttributes& nb) ->
bool {
440 return na.node_name().compare(nb.node_name()) <= 0;
442 for (
auto& node : nodes) {
443 node_names.emplace_back(node.node_name());
449 uint8_t sleep_s = 2) {
453 if (!topology->node_manager()->HasNode(node_name)) {
454 AERROR <<
"no node named: " << node_name;
458 std::vector<RoleAttributes> nodes;
459 topology->node_manager()->GetNodes(&nodes);
461 for (
auto& node_attr : nodes) {
462 if (node_attr.node_name() == node_name) {
463 node_attr.SerializeToString(&msgdata);
471 const std::string& node_name, uint8_t sleep_s = 2) {
472 std::vector<std::string> reader_channels;
475 if (!topology->node_manager()->HasNode(node_name)) {
476 AERROR <<
"no node named: " << node_name;
477 return reader_channels;
480 std::vector<RoleAttributes> readers;
481 auto channel_mgr = topology->channel_manager();
482 channel_mgr->GetReadersOfNode(node_name, &readers);
483 for (
auto& reader : readers) {
484 if (reader.channel_name() ==
"param_event") {
487 reader_channels.emplace_back(reader.channel_name());
489 return reader_channels;
493 const std::string& node_name, uint8_t sleep_s = 2) {
494 std::vector<std::string> writer_channels;
497 if (!topology->node_manager()->HasNode(node_name)) {
498 AERROR <<
"no node named: " << node_name;
499 return writer_channels;
502 std::vector<RoleAttributes> writers;
503 auto channel_mgr = topology->channel_manager();
504 channel_mgr->GetWritersOfNode(node_name, &writers);
505 for (
auto& writer : writers) {
506 if (writer.channel_name() ==
"param_event") {
509 writer_channels.emplace_back(writer.channel_name());
511 return writer_channels;
520 std::vector<std::string> srv_names;
521 std::vector<RoleAttributes> services;
522 topology->service_manager()->GetServers(&services);
523 if (services.empty()) {
524 AERROR <<
"no service found.";
528 std::sort(services.begin(), services.end(),
529 [](
const RoleAttributes& sa,
const RoleAttributes& sb) ->
bool {
530 return sa.service_name().compare(sb.service_name()) <= 0;
532 for (
auto& service : services) {
533 srv_names.emplace_back(service.service_name());
539 uint8_t sleep_s = 2) {
543 if (!topology->service_manager()->HasService(service_name)) {
544 AERROR <<
"no service: " << service_name;
548 std::vector<RoleAttributes> services;
549 topology->service_manager()->GetServers(&services);
551 for (
auto& service_attr : services) {
552 if (service_attr.service_name() == service_name) {
553 service_attr.SerializeToString(&msgdata);
564 #endif // CYBER_PYTHON_INTERNAL_PY_CYBER_H_ bool py_ok()
Definition: py_cyber.h:62
static std::unordered_map< std::string, std::vector< std::string > > get_channels_info(uint8_t sleep_s=2)
Definition: py_cyber.h:396
int write(const std::string &data)
Definition: py_cyber.h:94
void(* func)(void *)
Definition: routine_context.h:41
PyReader * create_reader(const std::string &channel, const std::string &type)
Definition: py_cyber.h:309
const char RAWDATATYPE[]
Definition: py_cyber.h:108
static std::string get_node_attr(const std::string &node_name, uint8_t sleep_s=2)
Definition: py_cyber.h:448
bool py_init(const std::string &module_name)
Definition: py_cyber.h:46
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
static std::string get_service_attr(const std::string &service_name, uint8_t sleep_s=2)
Definition: py_cyber.h:538
Definition: py_cyber.h:109
PyClient(const std::string &name, const std::string &data_type, Node *node)
Definition: py_cyber.h:256
PyService(const std::string &service_name, const std::string &data_type, Node *node)
Definition: py_cyber.h:189
static std::string get_msgtype_by_channelname(const std::string &channel_name, uint8_t sleep_s=0)
Definition: py_cyber.h:372
static std::vector< std::string > get_readersofnode(const std::string &node_name, uint8_t sleep_s=2)
Definition: py_cyber.h:470
Definition: raw_message.h:31
void shutdown()
Definition: py_cyber.h:291
Definition: py_cyber.h:70
Definition: py_cyber.h:187
PyReader(const std::string &channel, const std::string &type, Node *node)
Definition: py_cyber.h:111
static std::string get_debugstring_by_msgtype_rawmsgdata(const std::string &msg_type, const std::string &rawmsgdata)
Definition: py_cyber.h:343
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space="")
Node is the fundamental building block of Cyber RT. every module contains and communicates through th...
Definition: node.h:44
int write(const std::string &data)
Definition: py_cyber.h:216
PyService * create_service(const std::string &service, const std::string &type)
Definition: py_cyber.h:316
bool IsShutdown()
Definition: state.h:46
Definition: py_cyber.h:425
PyWriter(const std::string &channel, const std::string &type, const uint32_t qos_depth, Node *node)
Definition: py_cyber.h:72
std::string send_request(std::string request)
Definition: py_cyber.h:263
void register_func(int(*func)(const char *))
Definition: py_cyber.h:205
static std::vector< std::string > get_active_nodes(uint8_t sleep_s=2)
Definition: py_cyber.h:427
Definition: py_cyber.h:254
void register_func(int(*func)(const char *))
Definition: py_cyber.h:128
void py_waitforshutdown()
Definition: py_cyber.h:68
static std::vector< std::string > get_active_services(uint8_t sleep_s=2)
Definition: py_cyber.h:517
Definition: py_message.h:33
auto CreateWriter(const proto::RoleAttributes &role_attr) -> std::shared_ptr< Writer< MessageT >>
Create a Writer with specific message type.
Definition: node.h:189
std::shared_ptr< message::PyMessageWrap > PyMsgWrapPtr
Definition: py_cyber.h:186
Definition: py_cyber.h:285
bool Init(const char *binary_name)
static std::vector< std::string > get_writersofnode(const std::string &node_name, uint8_t sleep_s=2)
Definition: py_cyber.h:492
bool py_is_shutdown()
Definition: py_cyber.h:66
void register_message(const std::string &desc)
Definition: py_cyber.h:305
PyNode(const std::string &node_name)
Definition: py_cyber.h:287
std::shared_ptr< Node > get_node()
Definition: py_cyber.h:331
std::string read()
Definition: py_cyber.h:207
std::string read(bool wait=false)
Definition: py_cyber.h:130
#define AERROR
Definition: log.h:44
PyWriter * create_writer(const std::string &channel, const std::string &type, uint32_t qos_depth=1)
Definition: py_cyber.h:296
PyClient * create_client(const std::string &service, const std::string &type)
Definition: py_cyber.h:324
#define AWARN
Definition: log.h:43
bool OK()
Definition: state.h:44
static std::vector< std::string > get_active_channels(uint8_t sleep_s=2)
Definition: py_cyber.h:386
Definition: py_cyber.h:338
#define AINFO
Definition: log.h:42
Definition: py_cyber.h:515
void WaitForShutdown()
Definition: state.h:50
void py_shutdown()
Definition: py_cyber.h:64