Apollo  6.0
Open source self driving car software
client.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_SERVICE_CLIENT_H_
18 #define CYBER_SERVICE_CLIENT_H_
19 
20 #include <future>
21 #include <map>
22 #include <memory>
23 #include <sstream>
24 #include <string>
25 #include <tuple>
26 #include <unordered_map>
27 #include <utility>
28 
29 #include "cyber/common/log.h"
30 #include "cyber/common/types.h"
33 
34 namespace apollo {
35 namespace cyber {
36 
46 template <typename Request, typename Response>
47 class Client : public ClientBase {
48  public:
49  using SharedRequest = typename std::shared_ptr<Request>;
50  using SharedResponse = typename std::shared_ptr<Response>;
51  using Promise = std::promise<SharedResponse>;
52  using SharedPromise = std::shared_ptr<Promise>;
53  using SharedFuture = std::shared_future<SharedResponse>;
54  using CallbackType = std::function<void(SharedFuture)>;
55 
62  Client(const std::string& node_name, const std::string& service_name)
63  : ClientBase(service_name),
64  node_name_(node_name),
65  request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
66  response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX),
67  sequence_number_(0) {}
68 
72  Client() = delete;
73 
74  virtual ~Client() {}
75 
82  bool Init();
83 
92  SharedRequest request,
93  const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
94 
103  const Request& request,
104  const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
105 
110 
114  SharedFuture AsyncSendRequest(const Request& request);
115 
125 
129  bool ServiceIsReady() const;
130 
134  void Destroy();
135 
144  template <typename RatioT = std::milli>
145  bool WaitForService(std::chrono::duration<int64_t, RatioT> timeout =
146  std::chrono::duration<int64_t, RatioT>(-1)) {
148  std::chrono::duration_cast<std::chrono::nanoseconds>(timeout));
149  }
150 
151  private:
152  void HandleResponse(const std::shared_ptr<Response>& response,
153  const transport::MessageInfo& request_info);
154 
155  bool IsInit(void) const { return response_receiver_ != nullptr; }
156 
157  std::string node_name_;
158 
159  std::function<void(const std::shared_ptr<Response>&,
160  const transport::MessageInfo&)>
161  response_callback_;
162 
163  std::unordered_map<uint64_t,
164  std::tuple<SharedPromise, CallbackType, SharedFuture>>
165  pending_requests_;
166  std::mutex pending_requests_mutex_;
167 
168  std::shared_ptr<transport::Transmitter<Request>> request_transmitter_;
169  std::shared_ptr<transport::Receiver<Response>> response_receiver_;
170  std::string request_channel_;
171  std::string response_channel_;
172 
173  transport::Identity writer_id_;
174  uint64_t sequence_number_;
175 };
176 
177 template <typename Request, typename Response>
179 
180 template <typename Request, typename Response>
182  proto::RoleAttributes role;
183  role.set_node_name(node_name_);
184  role.set_channel_name(request_channel_);
185  auto channel_id = common::GlobalData::RegisterChannel(request_channel_);
186  role.set_channel_id(channel_id);
187  role.mutable_qos_profile()->CopyFrom(
189  auto transport = transport::Transport::Instance();
190  request_transmitter_ =
191  transport->CreateTransmitter<Request>(role, proto::OptionalMode::RTPS);
192  if (request_transmitter_ == nullptr) {
193  AERROR << "Create request pub failed.";
194  return false;
195  }
196  writer_id_ = request_transmitter_->id();
197 
198  response_callback_ =
200  std::placeholders::_1, std::placeholders::_2);
201 
202  role.set_channel_name(response_channel_);
203  channel_id = common::GlobalData::RegisterChannel(response_channel_);
204  role.set_channel_id(channel_id);
205  response_receiver_ = transport->CreateReceiver<Response>(
206  role,
207  [=](const std::shared_ptr<Response>& response,
208  const transport::MessageInfo& message_info,
209  const proto::RoleAttributes& reader_attr) {
210  (void)message_info;
211  (void)reader_attr;
212  response_callback_(response, message_info);
213  },
214  proto::OptionalMode::RTPS);
215  if (response_receiver_ == nullptr) {
216  AERROR << "Create response sub failed.";
217  request_transmitter_.reset();
218  return false;
219  }
220  return true;
221 }
222 
223 template <typename Request, typename Response>
226  const std::chrono::seconds& timeout_s) {
227  if (!IsInit()) {
228  return nullptr;
229  }
230  auto future = AsyncSendRequest(request);
231  if (!future.valid()) {
232  return nullptr;
233  }
234  auto status = future.wait_for(timeout_s);
235  if (status == std::future_status::ready) {
236  return future.get();
237  } else {
238  return nullptr;
239  }
240 }
241 
242 template <typename Request, typename Response>
245  const std::chrono::seconds& timeout_s) {
246  if (!IsInit()) {
247  return nullptr;
248  }
249  auto request_ptr = std::make_shared<const Request>(request);
250  return SendRequest(request_ptr, timeout_s);
251 }
252 
253 template <typename Request, typename Response>
256  auto request_ptr = std::make_shared<const Request>(request);
257  return AsyncSendRequest(request_ptr);
258 }
259 
260 template <typename Request, typename Response>
263  return AsyncSendRequest(request, [](SharedFuture) {});
264 }
265 
266 template <typename Request, typename Response>
269  CallbackType&& cb) {
270  if (IsInit()) {
271  std::lock_guard<std::mutex> lock(pending_requests_mutex_);
272  sequence_number_++;
273  transport::MessageInfo info(writer_id_, sequence_number_, writer_id_);
274  request_transmitter_->Transmit(request, info);
275  SharedPromise call_promise = std::make_shared<Promise>();
276  SharedFuture f(call_promise->get_future());
277  pending_requests_[info.seq_num()] =
278  std::make_tuple(call_promise, std::forward<CallbackType>(cb), f);
279  return f;
280  } else {
281  return std::shared_future<std::shared_ptr<Response>>();
282  }
283 }
284 
285 template <typename Request, typename Response>
287  return true;
288 }
289 
290 template <typename Request, typename Response>
292  const std::shared_ptr<Response>& response,
293  const transport::MessageInfo& request_header) {
294  ADEBUG << "client recv response.";
295  std::lock_guard<std::mutex> lock(pending_requests_mutex_);
296  if (request_header.spare_id() != writer_id_) {
297  return;
298  }
299  uint64_t sequence_number = request_header.seq_num();
300  if (this->pending_requests_.count(sequence_number) == 0) {
301  return;
302  }
303  auto tuple = this->pending_requests_[sequence_number];
304  auto call_promise = std::get<0>(tuple);
305  auto callback = std::get<1>(tuple);
306  auto future = std::get<2>(tuple);
307  this->pending_requests_.erase(sequence_number);
308  call_promise->set_value(response);
309  callback(future);
310 }
311 
312 } // namespace cyber
313 } // namespace apollo
314 
315 #endif // CYBER_SERVICE_CLIENT_H_
bool WaitForService(std::chrono::duration< int64_t, RatioT > timeout=std::chrono::duration< int64_t, RatioT >(-1))
wait for the connection with the Service established
Definition: client.h:145
Client()=delete
forbid Constructing a new Client object with empty params
Client get Response from a responding Service by sending a Request.
Definition: client.h:47
bool WaitForServiceNanoseconds(std::chrono::nanoseconds time_out)
Definition: client_base.h:62
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
bool ServiceIsReady() const
Is the Service is ready?
Definition: client.h:286
virtual ~Client()
Definition: client.h:74
static const QosProfile QOS_PROFILE_SERVICES_DEFAULT
Definition: qos_profile_conf.h:49
#define ADEBUG
Definition: log.h:41
Client(const std::string &node_name, const std::string &service_name)
Construct a new Client object.
Definition: client.h:62
std::shared_future< SharedResponse > SharedFuture
Definition: client.h:53
Definition: identity.h:30
static uint64_t RegisterChannel(const std::string &channel)
uint64_t seq_num() const
Definition: message_info.h:55
typename std::shared_ptr< Response > SharedResponse
Definition: client.h:50
Base class of Client.
Definition: client_base.h:33
Definition: message_info.h:30
SharedResponse SendRequest(SharedRequest request, const std::chrono::seconds &timeout_s=std::chrono::seconds(5))
Request the Service with a shared ptr Request type.
Definition: client.h:225
std::shared_ptr< Promise > SharedPromise
Definition: client.h:52
typename std::shared_ptr< Request > SharedRequest
Definition: client.h:49
void Destroy()
destroy this Client
Definition: client.h:178
SharedFuture AsyncSendRequest(SharedRequest request)
Send Request shared ptr asynchronously.
Definition: client.h:262
bool Init()
Init the Client.
Definition: client.h:181
#define AERROR
Definition: log.h:44
std::promise< SharedResponse > Promise
Definition: client.h:51
std::function< void(SharedFuture)> CallbackType
Definition: client.h:54
const Identity & spare_id() const
Definition: message_info.h:58