Apollo  6.0
Open source self driving car software
channel_buffer.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_DATA_CHANNEL_BUFFER_H_
18 #define CYBER_DATA_CHANNEL_BUFFER_H_
19 
20 #include <algorithm>
21 #include <functional>
22 #include <memory>
23 #include <vector>
24 
26 #include "cyber/common/log.h"
28 
29 namespace apollo {
30 namespace cyber {
31 namespace data {
32 
34 
35 template <typename T>
37  public:
39  ChannelBuffer(uint64_t channel_id, BufferType* buffer)
40  : channel_id_(channel_id), buffer_(buffer) {}
41 
42  bool Fetch(uint64_t* index, std::shared_ptr<T>& m); // NOLINT
43 
44  bool Latest(std::shared_ptr<T>& m); // NOLINT
45 
46  bool FetchMulti(uint64_t fetch_size, std::vector<std::shared_ptr<T>>* vec);
47 
48  uint64_t channel_id() const { return channel_id_; }
49  std::shared_ptr<BufferType> Buffer() const { return buffer_; }
50 
51  private:
52  uint64_t channel_id_;
53  std::shared_ptr<BufferType> buffer_;
54 };
55 
56 template <typename T>
57 bool ChannelBuffer<T>::Fetch(uint64_t* index,
58  std::shared_ptr<T>& m) { // NOLINT
59  std::lock_guard<std::mutex> lock(buffer_->Mutex());
60  if (buffer_->Empty()) {
61  return false;
62  }
63 
64  if (*index == 0) {
65  *index = buffer_->Tail();
66  } else if (*index == buffer_->Tail() + 1) {
67  return false;
68  } else if (*index < buffer_->Head()) {
69  auto interval = buffer_->Tail() - *index;
70  AWARN << "channel[" << GlobalData::GetChannelById(channel_id_) << "] "
71  << "read buffer overflow, drop_message[" << interval << "] pre_index["
72  << *index << "] current_index[" << buffer_->Tail() << "] ";
73  *index = buffer_->Tail();
74  }
75  m = buffer_->at(*index);
76  return true;
77 }
78 
79 template <typename T>
80 bool ChannelBuffer<T>::Latest(std::shared_ptr<T>& m) { // NOLINT
81  std::lock_guard<std::mutex> lock(buffer_->Mutex());
82  if (buffer_->Empty()) {
83  return false;
84  }
85 
86  m = buffer_->Back();
87  return true;
88 }
89 
90 template <typename T>
91 bool ChannelBuffer<T>::FetchMulti(uint64_t fetch_size,
92  std::vector<std::shared_ptr<T>>* vec) {
93  std::lock_guard<std::mutex> lock(buffer_->Mutex());
94  if (buffer_->Empty()) {
95  return false;
96  }
97 
98  auto num = std::min(buffer_->Size(), fetch_size);
99  vec->reserve(num);
100  for (auto index = buffer_->Tail() - num + 1; index <= buffer_->Tail();
101  ++index) {
102  vec->emplace_back(buffer_->at(index));
103  }
104  return true;
105 }
106 
107 } // namespace data
108 } // namespace cyber
109 } // namespace apollo
110 
111 #endif // CYBER_DATA_CHANNEL_BUFFER_H_
bool Fetch(uint64_t *index, std::shared_ptr< T > &m)
Definition: channel_buffer.h:57
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
bool Latest(std::shared_ptr< T > &m)
Definition: channel_buffer.h:80
uint64_t channel_id() const
Definition: channel_buffer.h:48
ChannelBuffer(uint64_t channel_id, BufferType *buffer)
Definition: channel_buffer.h:39
static std::string GetChannelById(uint64_t id)
Definition: channel_buffer.h:36
Definition: cache_buffer.h:30
std::shared_ptr< BufferType > Buffer() const
Definition: channel_buffer.h:49
Definition: global_data.h:40
#define AWARN
Definition: log.h:43
bool FetchMulti(uint64_t fetch_size, std::vector< std::shared_ptr< T >> *vec)
Definition: channel_buffer.h:91