Apollo  6.0
Open source self driving car software
record_file_writer.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_
18 #define CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_
19 
20 #include <condition_variable>
21 #include <fstream>
22 #include <memory>
23 #include <string>
24 #include <thread>
25 #include <type_traits>
26 #include <unordered_map>
27 #include <utility>
28 
29 #include "google/protobuf/io/zero_copy_stream_impl.h"
30 #include "google/protobuf/message.h"
31 #include "google/protobuf/text_format.h"
32 
33 #include "cyber/common/log.h"
36 #include "cyber/time/time.h"
37 
38 namespace apollo {
39 namespace cyber {
40 namespace record {
41 
42 struct Chunk {
43  Chunk() { clear(); }
44 
45  inline void clear() {
46  body_.reset(new proto::ChunkBody());
47  header_.set_begin_time(0);
48  header_.set_end_time(0);
49  header_.set_message_number(0);
50  header_.set_raw_size(0);
51  }
52 
53  inline void add(const proto::SingleMessage& message) {
54  std::lock_guard<std::mutex> lock(mutex_);
55  proto::SingleMessage* p_message = body_->add_messages();
56  *p_message = message;
57  if (header_.begin_time() == 0) {
58  header_.set_begin_time(message.time());
59  }
60  if (header_.begin_time() > message.time()) {
61  header_.set_begin_time(message.time());
62  }
63  if (header_.end_time() < message.time()) {
64  header_.set_end_time(message.time());
65  }
66  header_.set_message_number(header_.message_number() + 1);
67  header_.set_raw_size(header_.raw_size() + message.content().size());
68  }
69 
70  inline bool empty() { return header_.message_number() == 0; }
71 
72  std::mutex mutex_;
73  proto::ChunkHeader header_;
74  std::unique_ptr<proto::ChunkBody> body_ = nullptr;
75 };
76 
78  public:
80  virtual ~RecordFileWriter();
81  bool Open(const std::string& path) override;
82  void Close() override;
83  bool WriteHeader(const proto::Header& header);
84  bool WriteChannel(const proto::Channel& channel);
85  bool WriteMessage(const proto::SingleMessage& message);
86  uint64_t GetMessageNumber(const std::string& channel_name) const;
87 
88  private:
89  bool WriteChunk(const proto::ChunkHeader& chunk_header,
90  const proto::ChunkBody& chunk_body);
91  template <typename T>
92  bool WriteSection(const T& message);
93  bool WriteIndex();
94  void Flush();
95  std::atomic_bool is_writing_;
96  std::unique_ptr<Chunk> chunk_active_ = nullptr;
97  std::unique_ptr<Chunk> chunk_flush_ = nullptr;
98  std::shared_ptr<std::thread> flush_thread_ = nullptr;
99  std::mutex flush_mutex_;
100  std::condition_variable flush_cv_;
101  std::unordered_map<std::string, uint64_t> channel_message_number_map_;
102 };
103 
104 template <typename T>
105 bool RecordFileWriter::WriteSection(const T& message) {
106  proto::SectionType type;
108  type = proto::SectionType::SECTION_CHUNK_HEADER;
110  type = proto::SectionType::SECTION_CHUNK_BODY;
112  type = proto::SectionType::SECTION_CHANNEL;
114  type = proto::SectionType::SECTION_HEADER;
115  if (!SetPosition(0)) {
116  AERROR << "Jump to position #0 failed";
117  return false;
118  }
120  type = proto::SectionType::SECTION_INDEX;
121  } else {
122  AERROR << "Do not support this template typename.";
123  return false;
124  }
125  Section section;
127  memset(&section, 0, sizeof(section));
128  section = {type, static_cast<int64_t>(message.ByteSizeLong())};
129  ssize_t count = write(fd_, &section, sizeof(section));
130  if (count < 0) {
131  AERROR << "Write fd failed, fd: " << fd_ << ", errno: " << errno;
132  return false;
133  }
134  if (count != sizeof(section)) {
135  AERROR << "Write fd failed, fd: " << fd_
136  << ", expect count: " << sizeof(section)
137  << ", actual count: " << count;
138  return false;
139  }
140  {
141  google::protobuf::io::FileOutputStream raw_output(fd_);
142  message.SerializeToZeroCopyStream(&raw_output);
143  }
144  if (type == proto::SectionType::SECTION_HEADER) {
145  static char blank[HEADER_LENGTH] = {'0'};
146  count = write(fd_, &blank, HEADER_LENGTH - message.ByteSizeLong());
147  if (count < 0) {
148  AERROR << "Write fd failed, fd: " << fd_ << ", errno: " << errno;
149  return false;
150  }
151  if (static_cast<size_t>(count) != HEADER_LENGTH - message.ByteSizeLong()) {
152  AERROR << "Write fd failed, fd: " << fd_
153  << ", expect count: " << sizeof(section)
154  << ", actual count: " << count;
155  return false;
156  }
157  }
158  header_.set_size(CurrentPosition());
159  return true;
160 }
161 
162 } // namespace record
163 } // namespace cyber
164 } // namespace apollo
165 
166 #endif // CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_
void clear()
Definition: record_file_writer.h:45
void add(const proto::SingleMessage &message)
Definition: record_file_writer.h:53
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::unique_ptr< proto::ChunkBody > body_
Definition: record_file_writer.h:74
Definition: section.h:24
Definition: record_file_writer.h:77
std::mutex mutex_
Definition: record_file_writer.h:72
Definition: record_file_base.h:31
const int HEADER_LENGTH
Definition: record_file_base.h:29
proto::ChunkHeader header_
Definition: record_file_writer.h:73
apollo::cyber::base::std value
#define AERROR
Definition: log.h:44
bool empty()
Definition: record_file_writer.h:70
Definition: record_file_writer.h:42
Chunk()
Definition: record_file_writer.h:43