24 #include <boost/circular_buffer.hpp> 25 #include "gflags/gflags.h" 30 namespace perception {
43 MsgBuffer() : buffer_queue_(FLAGS_obs_msg_buffer_size) {}
49 void Init(
const std::string& channel,
const std::string& name);
57 std::vector<ObjectPair>* msgs);
60 void MsgCallback(
const ConstPtr& msg);
63 std::string node_name_;
64 std::unique_ptr<cyber::Node> node_;
65 std::shared_ptr<cyber::Reader<T>> msg_subscriber_;
66 std::mutex buffer_mutex_;
69 boost::circular_buffer<ObjectPair> buffer_queue_;
74 int index =
static_cast<int>(name.find_last_of(
'/'));
76 node_name_ = name.substr(index + 1) +
"_subscriber";
78 node_name_ = name +
"_subscriber";
82 std::function<void(const ConstPtr&)> register_call =
84 msg_subscriber_ = node_->CreateReader<T>(channel, register_call);
86 std::lock_guard<std::mutex> lock(buffer_mutex_);
87 buffer_queue_.set_capacity(FLAGS_obs_msg_buffer_size);
93 std::lock_guard<std::mutex> lock(buffer_mutex_);
94 double timestamp = msg->measurement_time();
95 buffer_queue_.push_back(std::make_pair(timestamp, msg));
100 std::lock_guard<std::mutex> lock(buffer_mutex_);
102 AERROR <<
"msg buffer is uninitialized.";
105 if (buffer_queue_.empty()) {
106 AERROR <<
"msg buffer is empty.";
109 if (buffer_queue_.front().first - FLAGS_obs_buffer_match_precision >
111 AERROR <<
"Your timestamp (" << timestamp
112 <<
") is earlier than the oldest timestamp (" 113 << buffer_queue_.front().first <<
").";
116 if (buffer_queue_.back().first + FLAGS_obs_buffer_match_precision <
118 AERROR <<
"Your timestamp (" << timestamp
119 <<
") is newer than the latest timestamp (" 120 << buffer_queue_.back().first <<
").";
125 double distance = std::numeric_limits<double>::max();
126 int idx =
static_cast<int>(buffer_queue_.size()) - 1;
127 for (; idx >= 0; --idx) {
128 double temp_distance = fabs(timestamp - buffer_queue_[idx].first);
129 if (temp_distance >= distance) {
132 distance = temp_distance;
134 *msg = buffer_queue_[idx + 1].second;
141 std::lock_guard<std::mutex> lock(buffer_mutex_);
143 AERROR <<
"Message buffer is uninitialized.";
146 if (buffer_queue_.empty()) {
147 AERROR <<
"Message buffer is empty.";
150 *msg = buffer_queue_.back().second;
156 std::vector<ObjectPair>* msgs) {
157 std::lock_guard<std::mutex> lock(buffer_mutex_);
159 AERROR <<
"Message buffer is uninitialized.";
162 if (buffer_queue_.empty()) {
163 AERROR <<
"Message buffer is empty.";
166 if (buffer_queue_.front().first - FLAGS_obs_buffer_match_precision >
168 AERROR <<
"Your timestamp (" << timestamp <<
") is earlier than the oldest " 169 <<
"timestamp (" << buffer_queue_.front().first <<
").";
172 if (buffer_queue_.back().first + FLAGS_obs_buffer_match_precision <
174 AERROR <<
"Your timestamp (" << timestamp <<
") is newer than the latest " 175 <<
"timestamp (" << buffer_queue_.back().first <<
").";
179 const double lower_timestamp = timestamp - period;
180 const double upper_timestamp = timestamp + period;
181 for (
const auto& obj_pair : buffer_queue_) {
182 if (obj_pair.first < lower_timestamp) {
185 if (obj_pair.first > upper_timestamp) {
188 msgs->push_back(obj_pair);
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: msg_buffer.h:37
int LookupNearest(double timestamp, ConstPtr *msg)
Definition: msg_buffer.h:99
DECLARE_double(obs_buffer_match_precision)
std::shared_ptr< T const > ConstPtr
Definition: msg_buffer.h:39
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space="")
void Init(const std::string &channel, const std::string &name)
Definition: msg_buffer.h:73
MsgBuffer operator=(const MsgBuffer &)=delete
std::pair< double, ConstPtr > ObjectPair
Definition: msg_buffer.h:40
int LookupPeriod(double timestamp, double period, std::vector< ObjectPair > *msgs)
Definition: msg_buffer.h:155
int LookupLatest(ConstPtr *msg)
Definition: msg_buffer.h:140
MsgBuffer()
Definition: msg_buffer.h:43
#define AERROR
Definition: log.h:44
DECLARE_int32(obs_msg_buffer_size)