17 #ifndef CYBER_SERVICE_SERVICE_H_ 18 #define CYBER_SERVICE_SERVICE_H_ 41 template <
typename Request,
typename Response>
44 using ServiceCallback = std::function<void(const std::shared_ptr<Request>&,
45 std::shared_ptr<Response>&)>;
56 node_name_(node_name),
57 service_callback_(service_callback),
58 request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
59 response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX) {}
71 node_name_(node_name),
72 service_callback_(service_callback),
73 request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
74 response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX) {}
94 void HandleRequest(
const std::shared_ptr<Request>& request,
98 const std::shared_ptr<Response>& response);
100 bool IsInit(
void)
const {
return request_receiver_ !=
nullptr; }
102 std::string node_name_;
105 std::function<void(const std::shared_ptr<Request>&,
108 std::shared_ptr<transport::Transmitter<Response>> response_transmitter_;
109 std::shared_ptr<transport::Receiver<Request>> request_receiver_;
110 std::string request_channel_;
111 std::string response_channel_;
112 std::mutex service_handle_request_mutex_;
114 volatile bool inited_ =
false;
115 void Enqueue(std::function<
void()>&& task);
118 std::mutex queue_mutex_;
119 std::condition_variable condition_;
120 std::list<std::function<void()>> tasks_;
123 template <
typename Request,
typename Response>
127 std::lock_guard<std::mutex> lg(queue_mutex_);
128 this->tasks_.clear();
130 condition_.notify_all();
131 if (thread_.joinable()) {
136 template <
typename Request,
typename Response>
138 std::lock_guard<std::mutex> lg(queue_mutex_);
139 tasks_.emplace_back(std::move(task));
140 condition_.notify_one();
143 template <
typename Request,
typename Response>
146 std::unique_lock<std::mutex> ul(queue_mutex_);
147 condition_.wait(ul, [
this]() {
return !inited_ || !this->tasks_.empty(); });
151 if (!tasks_.empty()) {
152 auto task = tasks_.front();
160 template <
typename Request,
typename Response>
165 proto::RoleAttributes role;
166 role.set_node_name(node_name_);
167 role.set_channel_name(response_channel_);
169 role.set_channel_id(channel_id);
170 role.mutable_qos_profile()->CopyFrom(
173 response_transmitter_ =
174 transport->CreateTransmitter<Response>(role, proto::OptionalMode::RTPS);
175 if (response_transmitter_ ==
nullptr) {
176 AERROR <<
" Create response pub failed.";
182 std::placeholders::_1, std::placeholders::_2);
184 role.set_channel_name(request_channel_);
186 role.set_channel_id(channel_id);
187 request_receiver_ = transport->CreateReceiver<Request>(
189 [=](
const std::shared_ptr<Request>& request,
191 const proto::RoleAttributes& reader_attr) {
193 auto task = [
this, request, message_info]() {
194 this->HandleRequest(request, message_info);
196 Enqueue(std::move(task));
198 proto::OptionalMode::RTPS);
201 if (request_receiver_ ==
nullptr) {
202 AERROR <<
" Create request sub failed." << request_channel_;
203 response_transmitter_.reset();
209 template <
typename Request,
typename Response>
211 const std::shared_ptr<Request>& request,
217 ADEBUG <<
"handling request:" << request_channel_;
218 std::lock_guard<std::mutex> lk(service_handle_request_mutex_);
219 auto response = std::make_shared<Response>();
220 service_callback_(request, response);
223 SendResponse(msg_info, response);
226 template <
typename Request,
typename Response>
229 const std::shared_ptr<Response>& response) {
236 response_transmitter_->Transmit(response, message_info);
242 #endif // CYBER_SERVICE_SERVICE_H_ Base class for Service.
Definition: service_base.h:30
void set_sender_id(const Identity &sender_id)
Definition: message_info.h:50
~Service()
Definition: service.h:81
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
const std::string & service_name() const
Get the service name.
Definition: service_base.h:47
Service handles Request from the Client, and send a Response to it.
Definition: service.h:42
static const QosProfile QOS_PROFILE_SERVICES_DEFAULT
Definition: qos_profile_conf.h:49
std::function< void(const std::shared_ptr< Request > &, std::shared_ptr< Response > &)> ServiceCallback
Definition: service.h:45
void destroy()
Destroy the Service.
Definition: service.h:124
bool IsShutdown()
Definition: state.h:46
#define ADEBUG
Definition: log.h:41
static uint64_t RegisterChannel(const std::string &channel)
Definition: message_info.h:30
Service(const std::string &node_name, const std::string &service_name, ServiceCallback &&service_callback)
Construct a new Service object.
Definition: service.h:68
#define AERROR
Definition: log.h:44
bool Init()
Init the Service.
Definition: service.h:161
Service()=delete
Forbid default constructing.
Service(const std::string &node_name, const std::string &service_name, const ServiceCallback &service_callback)
Construct a new Service object.
Definition: service.h:53