17 #ifndef CYBER_SERVICE_CLIENT_H_ 18 #define CYBER_SERVICE_CLIENT_H_ 26 #include <unordered_map> 46 template <
typename Request,
typename Response>
51 using Promise = std::promise<SharedResponse>;
62 Client(
const std::string& node_name,
const std::string& service_name)
64 node_name_(node_name),
65 request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
66 response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX),
67 sequence_number_(0) {}
93 const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
103 const Request& request,
104 const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
144 template <
typename RatioT = std::milli>
146 std::chrono::duration<int64_t, RatioT>(-1)) {
148 std::chrono::duration_cast<std::chrono::nanoseconds>(timeout));
152 void HandleResponse(
const std::shared_ptr<Response>& response,
155 bool IsInit(
void)
const {
return response_receiver_ !=
nullptr; }
157 std::string node_name_;
159 std::function<void(const std::shared_ptr<Response>&,
163 std::unordered_map<uint64_t,
164 std::tuple<SharedPromise, CallbackType, SharedFuture>>
166 std::mutex pending_requests_mutex_;
168 std::shared_ptr<transport::Transmitter<Request>> request_transmitter_;
169 std::shared_ptr<transport::Receiver<Response>> response_receiver_;
170 std::string request_channel_;
171 std::string response_channel_;
174 uint64_t sequence_number_;
177 template <
typename Request,
typename Response>
180 template <
typename Request,
typename Response>
182 proto::RoleAttributes role;
183 role.set_node_name(node_name_);
184 role.set_channel_name(request_channel_);
186 role.set_channel_id(channel_id);
187 role.mutable_qos_profile()->CopyFrom(
190 request_transmitter_ =
191 transport->CreateTransmitter<Request>(role, proto::OptionalMode::RTPS);
192 if (request_transmitter_ ==
nullptr) {
193 AERROR <<
"Create request pub failed.";
196 writer_id_ = request_transmitter_->id();
200 std::placeholders::_1, std::placeholders::_2);
202 role.set_channel_name(response_channel_);
204 role.set_channel_id(channel_id);
205 response_receiver_ = transport->CreateReceiver<Response>(
207 [=](
const std::shared_ptr<Response>& response,
209 const proto::RoleAttributes& reader_attr) {
212 response_callback_(response, message_info);
214 proto::OptionalMode::RTPS);
215 if (response_receiver_ ==
nullptr) {
216 AERROR <<
"Create response sub failed.";
217 request_transmitter_.reset();
223 template <
typename Request,
typename Response>
226 const std::chrono::seconds& timeout_s) {
231 if (!future.valid()) {
234 auto status = future.wait_for(timeout_s);
235 if (status == std::future_status::ready) {
242 template <
typename Request,
typename Response>
245 const std::chrono::seconds& timeout_s) {
249 auto request_ptr = std::make_shared<const Request>(request);
253 template <
typename Request,
typename Response>
256 auto request_ptr = std::make_shared<const Request>(request);
260 template <
typename Request,
typename Response>
266 template <
typename Request,
typename Response>
271 std::lock_guard<std::mutex> lock(pending_requests_mutex_);
274 request_transmitter_->Transmit(request, info);
277 pending_requests_[info.
seq_num()] =
278 std::make_tuple(call_promise, std::forward<CallbackType>(cb), f);
281 return std::shared_future<std::shared_ptr<Response>>();
285 template <
typename Request,
typename Response>
290 template <
typename Request,
typename Response>
292 const std::shared_ptr<Response>& response,
294 ADEBUG <<
"client recv response.";
295 std::lock_guard<std::mutex> lock(pending_requests_mutex_);
296 if (request_header.
spare_id() != writer_id_) {
299 uint64_t sequence_number = request_header.
seq_num();
300 if (this->pending_requests_.count(sequence_number) == 0) {
303 auto tuple = this->pending_requests_[sequence_number];
304 auto call_promise = std::get<0>(tuple);
305 auto callback = std::get<1>(tuple);
306 auto future = std::get<2>(tuple);
307 this->pending_requests_.erase(sequence_number);
308 call_promise->set_value(response);
315 #endif // CYBER_SERVICE_CLIENT_H_ bool WaitForService(std::chrono::duration< int64_t, RatioT > timeout=std::chrono::duration< int64_t, RatioT >(-1))
wait for the connection with the Service established
Definition: client.h:145
Client()=delete
forbid Constructing a new Client object with empty params
Client get Response from a responding Service by sending a Request.
Definition: client.h:47
bool WaitForServiceNanoseconds(std::chrono::nanoseconds time_out)
Definition: client_base.h:62
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
bool ServiceIsReady() const
Is the Service is ready?
Definition: client.h:286
virtual ~Client()
Definition: client.h:74
static const QosProfile QOS_PROFILE_SERVICES_DEFAULT
Definition: qos_profile_conf.h:49
#define ADEBUG
Definition: log.h:41
Client(const std::string &node_name, const std::string &service_name)
Construct a new Client object.
Definition: client.h:62
std::shared_future< SharedResponse > SharedFuture
Definition: client.h:53
Definition: identity.h:30
static uint64_t RegisterChannel(const std::string &channel)
uint64_t seq_num() const
Definition: message_info.h:55
typename std::shared_ptr< Response > SharedResponse
Definition: client.h:50
Base class of Client.
Definition: client_base.h:33
Definition: message_info.h:30
SharedResponse SendRequest(SharedRequest request, const std::chrono::seconds &timeout_s=std::chrono::seconds(5))
Request the Service with a shared ptr Request type.
Definition: client.h:225
std::shared_ptr< Promise > SharedPromise
Definition: client.h:52
typename std::shared_ptr< Request > SharedRequest
Definition: client.h:49
void Destroy()
destroy this Client
Definition: client.h:178
SharedFuture AsyncSendRequest(SharedRequest request)
Send Request shared ptr asynchronously.
Definition: client.h:262
bool Init()
Init the Client.
Definition: client.h:181
#define AERROR
Definition: log.h:44
std::promise< SharedResponse > Promise
Definition: client.h:51
std::function< void(SharedFuture)> CallbackType
Definition: client.h:54
const Identity & spare_id() const
Definition: message_info.h:58