Apollo  6.0
Open source self driving car software
service.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_SERVICE_SERVICE_H_
18 #define CYBER_SERVICE_SERVICE_H_
19 
20 #include <list>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 
25 #include "cyber/common/types.h"
29 
30 namespace apollo {
31 namespace cyber {
32 
41 template <typename Request, typename Response>
42 class Service : public ServiceBase {
43  public:
44  using ServiceCallback = std::function<void(const std::shared_ptr<Request>&,
45  std::shared_ptr<Response>&)>;
53  Service(const std::string& node_name, const std::string& service_name,
54  const ServiceCallback& service_callback)
55  : ServiceBase(service_name),
56  node_name_(node_name),
57  service_callback_(service_callback),
58  request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
59  response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX) {}
60 
68  Service(const std::string& node_name, const std::string& service_name,
69  ServiceCallback&& service_callback)
70  : ServiceBase(service_name),
71  node_name_(node_name),
72  service_callback_(service_callback),
73  request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
74  response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX) {}
75 
79  Service() = delete;
80 
81  ~Service() { destroy(); }
82 
86  bool Init();
87 
91  void destroy();
92 
93  private:
94  void HandleRequest(const std::shared_ptr<Request>& request,
95  const transport::MessageInfo& message_info);
96 
97  void SendResponse(const transport::MessageInfo& message_info,
98  const std::shared_ptr<Response>& response);
99 
100  bool IsInit(void) const { return request_receiver_ != nullptr; }
101 
102  std::string node_name_;
103  ServiceCallback service_callback_;
104 
105  std::function<void(const std::shared_ptr<Request>&,
106  const transport::MessageInfo&)>
107  request_callback_;
108  std::shared_ptr<transport::Transmitter<Response>> response_transmitter_;
109  std::shared_ptr<transport::Receiver<Request>> request_receiver_;
110  std::string request_channel_;
111  std::string response_channel_;
112  std::mutex service_handle_request_mutex_;
113 
114  volatile bool inited_ = false;
115  void Enqueue(std::function<void()>&& task);
116  void Process();
117  std::thread thread_;
118  std::mutex queue_mutex_;
119  std::condition_variable condition_;
120  std::list<std::function<void()>> tasks_;
121 };
122 
123 template <typename Request, typename Response>
125  inited_ = false;
126  {
127  std::lock_guard<std::mutex> lg(queue_mutex_);
128  this->tasks_.clear();
129  }
130  condition_.notify_all();
131  if (thread_.joinable()) {
132  thread_.join();
133  }
134 }
135 
136 template <typename Request, typename Response>
137 inline void Service<Request, Response>::Enqueue(std::function<void()>&& task) {
138  std::lock_guard<std::mutex> lg(queue_mutex_);
139  tasks_.emplace_back(std::move(task));
140  condition_.notify_one();
141 }
142 
143 template <typename Request, typename Response>
145  while (!cyber::IsShutdown()) {
146  std::unique_lock<std::mutex> ul(queue_mutex_);
147  condition_.wait(ul, [this]() { return !inited_ || !this->tasks_.empty(); });
148  if (!inited_) {
149  break;
150  }
151  if (!tasks_.empty()) {
152  auto task = tasks_.front();
153  tasks_.pop_front();
154  ul.unlock();
155  task();
156  }
157  }
158 }
159 
160 template <typename Request, typename Response>
162  if (IsInit()) {
163  return true;
164  }
165  proto::RoleAttributes role;
166  role.set_node_name(node_name_);
167  role.set_channel_name(response_channel_);
168  auto channel_id = common::GlobalData::RegisterChannel(response_channel_);
169  role.set_channel_id(channel_id);
170  role.mutable_qos_profile()->CopyFrom(
172  auto transport = transport::Transport::Instance();
173  response_transmitter_ =
174  transport->CreateTransmitter<Response>(role, proto::OptionalMode::RTPS);
175  if (response_transmitter_ == nullptr) {
176  AERROR << " Create response pub failed.";
177  return false;
178  }
179 
180  request_callback_ =
182  std::placeholders::_1, std::placeholders::_2);
183 
184  role.set_channel_name(request_channel_);
185  channel_id = common::GlobalData::RegisterChannel(request_channel_);
186  role.set_channel_id(channel_id);
187  request_receiver_ = transport->CreateReceiver<Request>(
188  role,
189  [=](const std::shared_ptr<Request>& request,
190  const transport::MessageInfo& message_info,
191  const proto::RoleAttributes& reader_attr) {
192  (void)reader_attr;
193  auto task = [this, request, message_info]() {
194  this->HandleRequest(request, message_info);
195  };
196  Enqueue(std::move(task));
197  },
198  proto::OptionalMode::RTPS);
199  inited_ = true;
200  thread_ = std::thread(&Service<Request, Response>::Process, this);
201  if (request_receiver_ == nullptr) {
202  AERROR << " Create request sub failed." << request_channel_;
203  response_transmitter_.reset();
204  return false;
205  }
206  return true;
207 }
208 
209 template <typename Request, typename Response>
211  const std::shared_ptr<Request>& request,
212  const transport::MessageInfo& message_info) {
213  if (!IsInit()) {
214  // LOG_DEBUG << "not inited error.";
215  return;
216  }
217  ADEBUG << "handling request:" << request_channel_;
218  std::lock_guard<std::mutex> lk(service_handle_request_mutex_);
219  auto response = std::make_shared<Response>();
220  service_callback_(request, response);
221  transport::MessageInfo msg_info(message_info);
222  msg_info.set_sender_id(response_transmitter_->id());
223  SendResponse(msg_info, response);
224 }
225 
226 template <typename Request, typename Response>
228  const transport::MessageInfo& message_info,
229  const std::shared_ptr<Response>& response) {
230  if (!IsInit()) {
231  // LOG_DEBUG << "not inited error.";
232  return;
233  }
234  // publish return value ?
235  // LOG_DEBUG << "send response id:" << message_id.sequence_number;
236  response_transmitter_->Transmit(response, message_info);
237 }
238 
239 } // namespace cyber
240 } // namespace apollo
241 
242 #endif // CYBER_SERVICE_SERVICE_H_
Base class for Service.
Definition: service_base.h:30
void set_sender_id(const Identity &sender_id)
Definition: message_info.h:50
~Service()
Definition: service.h:81
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
const std::string & service_name() const
Get the service name.
Definition: service_base.h:47
Service handles Request from the Client, and send a Response to it.
Definition: service.h:42
static const QosProfile QOS_PROFILE_SERVICES_DEFAULT
Definition: qos_profile_conf.h:49
std::function< void(const std::shared_ptr< Request > &, std::shared_ptr< Response > &)> ServiceCallback
Definition: service.h:45
void destroy()
Destroy the Service.
Definition: service.h:124
bool IsShutdown()
Definition: state.h:46
#define ADEBUG
Definition: log.h:41
static uint64_t RegisterChannel(const std::string &channel)
Definition: message_info.h:30
Service(const std::string &node_name, const std::string &service_name, ServiceCallback &&service_callback)
Construct a new Service object.
Definition: service.h:68
#define AERROR
Definition: log.h:44
bool Init()
Init the Service.
Definition: service.h:161
Service()=delete
Forbid default constructing.
Service(const std::string &node_name, const std::string &service_name, const ServiceCallback &service_callback)
Construct a new Service object.
Definition: service.h:53