17 #ifndef CYBER_DATA_CHANNEL_BUFFER_H_ 18 #define CYBER_DATA_CHANNEL_BUFFER_H_ 40 : channel_id_(channel_id), buffer_(buffer) {}
42 bool Fetch(uint64_t* index, std::shared_ptr<T>& m);
44 bool Latest(std::shared_ptr<T>& m);
46 bool FetchMulti(uint64_t fetch_size, std::vector<std::shared_ptr<T>>* vec);
49 std::shared_ptr<BufferType>
Buffer()
const {
return buffer_; }
53 std::shared_ptr<BufferType> buffer_;
58 std::shared_ptr<T>& m) {
59 std::lock_guard<std::mutex> lock(buffer_->Mutex());
60 if (buffer_->Empty()) {
65 *index = buffer_->Tail();
66 }
else if (*index == buffer_->Tail() + 1) {
68 }
else if (*index < buffer_->Head()) {
69 auto interval = buffer_->Tail() - *index;
71 <<
"read buffer overflow, drop_message[" << interval <<
"] pre_index[" 72 << *index <<
"] current_index[" << buffer_->Tail() <<
"] ";
73 *index = buffer_->Tail();
75 m = buffer_->at(*index);
81 std::lock_guard<std::mutex> lock(buffer_->Mutex());
82 if (buffer_->Empty()) {
92 std::vector<std::shared_ptr<T>>* vec) {
93 std::lock_guard<std::mutex> lock(buffer_->Mutex());
94 if (buffer_->Empty()) {
98 auto num = std::min(buffer_->Size(), fetch_size);
100 for (
auto index = buffer_->Tail() - num + 1; index <= buffer_->Tail();
102 vec->emplace_back(buffer_->at(index));
111 #endif // CYBER_DATA_CHANNEL_BUFFER_H_ bool Fetch(uint64_t *index, std::shared_ptr< T > &m)
Definition: channel_buffer.h:57
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
bool Latest(std::shared_ptr< T > &m)
Definition: channel_buffer.h:80
uint64_t channel_id() const
Definition: channel_buffer.h:48
ChannelBuffer(uint64_t channel_id, BufferType *buffer)
Definition: channel_buffer.h:39
static std::string GetChannelById(uint64_t id)
Definition: channel_buffer.h:36
Definition: cache_buffer.h:30
std::shared_ptr< BufferType > Buffer() const
Definition: channel_buffer.h:49
Definition: global_data.h:40
#define AWARN
Definition: log.h:43
bool FetchMulti(uint64_t fetch_size, std::vector< std::shared_ptr< T >> *vec)
Definition: channel_buffer.h:91