Apollo  6.0
Open source self driving car software
msg_buffer.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 #pragma once
17 
18 #include <limits>
19 #include <memory>
20 #include <string>
21 #include <utility>
22 #include <vector>
23 
24 #include <boost/circular_buffer.hpp>
25 #include "gflags/gflags.h"
26 
27 #include "cyber/cyber.h"
28 
29 namespace apollo {
30 namespace perception {
31 namespace onboard {
32 
33 DECLARE_int32(obs_msg_buffer_size);
34 DECLARE_double(obs_buffer_match_precision);
35 
36 template <class T>
37 class MsgBuffer {
38  public:
39  typedef std::shared_ptr<T const> ConstPtr;
40  typedef std::pair<double, ConstPtr> ObjectPair;
41 
42  public:
43  MsgBuffer() : buffer_queue_(FLAGS_obs_msg_buffer_size) {}
44  ~MsgBuffer() = default;
45 
46  MsgBuffer(const MsgBuffer&) = delete;
47  MsgBuffer operator=(const MsgBuffer&) = delete;
48 
49  void Init(const std::string& channel, const std::string& name);
50 
51  // get nearest message
52  int LookupNearest(double timestamp, ConstPtr* msg);
53  // get latest message
54  int LookupLatest(ConstPtr* msg);
55  // get messages in (timestamp-period, timestamp+period)
56  int LookupPeriod(double timestamp, double period,
57  std::vector<ObjectPair>* msgs);
58 
59  private:
60  void MsgCallback(const ConstPtr& msg);
61 
62  private:
63  std::string node_name_;
64  std::unique_ptr<cyber::Node> node_;
65  std::shared_ptr<cyber::Reader<T>> msg_subscriber_;
66  std::mutex buffer_mutex_;
67 
68  bool init_ = false;
69  boost::circular_buffer<ObjectPair> buffer_queue_;
70 };
71 
72 template <class T>
73 void MsgBuffer<T>::Init(const std::string& channel, const std::string& name) {
74  int index = static_cast<int>(name.find_last_of('/'));
75  if (index != -1) {
76  node_name_ = name.substr(index + 1) + "_subscriber";
77  } else {
78  node_name_ = name + "_subscriber";
79  }
80  node_.reset(apollo::cyber::CreateNode(node_name_).release());
81 
82  std::function<void(const ConstPtr&)> register_call =
83  std::bind(&MsgBuffer<T>::MsgCallback, this, std::placeholders::_1);
84  msg_subscriber_ = node_->CreateReader<T>(channel, register_call);
85 
86  std::lock_guard<std::mutex> lock(buffer_mutex_);
87  buffer_queue_.set_capacity(FLAGS_obs_msg_buffer_size);
88  init_ = true;
89 }
90 
91 template <class T>
92 void MsgBuffer<T>::MsgCallback(const ConstPtr& msg) {
93  std::lock_guard<std::mutex> lock(buffer_mutex_);
94  double timestamp = msg->measurement_time();
95  buffer_queue_.push_back(std::make_pair(timestamp, msg));
96 }
97 
98 template <class T>
99 int MsgBuffer<T>::LookupNearest(double timestamp, ConstPtr* msg) {
100  std::lock_guard<std::mutex> lock(buffer_mutex_);
101  if (!init_) {
102  AERROR << "msg buffer is uninitialized.";
103  return false;
104  }
105  if (buffer_queue_.empty()) {
106  AERROR << "msg buffer is empty.";
107  return false;
108  }
109  if (buffer_queue_.front().first - FLAGS_obs_buffer_match_precision >
110  timestamp) {
111  AERROR << "Your timestamp (" << timestamp
112  << ") is earlier than the oldest timestamp ("
113  << buffer_queue_.front().first << ").";
114  return false;
115  }
116  if (buffer_queue_.back().first + FLAGS_obs_buffer_match_precision <
117  timestamp) {
118  AERROR << "Your timestamp (" << timestamp
119  << ") is newer than the latest timestamp ("
120  << buffer_queue_.back().first << ").";
121  return false;
122  }
123 
124  // loop to find nearest
125  double distance = std::numeric_limits<double>::max();
126  int idx = static_cast<int>(buffer_queue_.size()) - 1;
127  for (; idx >= 0; --idx) {
128  double temp_distance = fabs(timestamp - buffer_queue_[idx].first);
129  if (temp_distance >= distance) {
130  break;
131  }
132  distance = temp_distance;
133  }
134  *msg = buffer_queue_[idx + 1].second;
135 
136  return true;
137 }
138 
139 template <class T>
141  std::lock_guard<std::mutex> lock(buffer_mutex_);
142  if (!init_) {
143  AERROR << "Message buffer is uninitialized.";
144  return false;
145  }
146  if (buffer_queue_.empty()) {
147  AERROR << "Message buffer is empty.";
148  return false;
149  }
150  *msg = buffer_queue_.back().second;
151  return true;
152 }
153 
154 template <class T>
155 int MsgBuffer<T>::LookupPeriod(const double timestamp, const double period,
156  std::vector<ObjectPair>* msgs) {
157  std::lock_guard<std::mutex> lock(buffer_mutex_);
158  if (!init_) {
159  AERROR << "Message buffer is uninitialized.";
160  return false;
161  }
162  if (buffer_queue_.empty()) {
163  AERROR << "Message buffer is empty.";
164  return false;
165  }
166  if (buffer_queue_.front().first - FLAGS_obs_buffer_match_precision >
167  timestamp) {
168  AERROR << "Your timestamp (" << timestamp << ") is earlier than the oldest "
169  << "timestamp (" << buffer_queue_.front().first << ").";
170  return false;
171  }
172  if (buffer_queue_.back().first + FLAGS_obs_buffer_match_precision <
173  timestamp) {
174  AERROR << "Your timestamp (" << timestamp << ") is newer than the latest "
175  << "timestamp (" << buffer_queue_.back().first << ").";
176  return false;
177  }
178 
179  const double lower_timestamp = timestamp - period;
180  const double upper_timestamp = timestamp + period;
181  for (const auto& obj_pair : buffer_queue_) {
182  if (obj_pair.first < lower_timestamp) {
183  continue;
184  }
185  if (obj_pair.first > upper_timestamp) {
186  break;
187  }
188  msgs->push_back(obj_pair);
189  }
190 
191  return true;
192 }
193 
194 } // namespace onboard
195 } // namespace perception
196 } // namespace apollo
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: msg_buffer.h:37
int LookupNearest(double timestamp, ConstPtr *msg)
Definition: msg_buffer.h:99
DECLARE_double(obs_buffer_match_precision)
std::shared_ptr< T const > ConstPtr
Definition: msg_buffer.h:39
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space="")
void Init(const std::string &channel, const std::string &name)
Definition: msg_buffer.h:73
MsgBuffer operator=(const MsgBuffer &)=delete
std::pair< double, ConstPtr > ObjectPair
Definition: msg_buffer.h:40
int LookupPeriod(double timestamp, double period, std::vector< ObjectPair > *msgs)
Definition: msg_buffer.h:155
int LookupLatest(ConstPtr *msg)
Definition: msg_buffer.h:140
MsgBuffer()
Definition: msg_buffer.h:43
#define AERROR
Definition: log.h:44
DECLARE_int32(obs_msg_buffer_size)