17 #ifndef CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_ 18 #define CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_ 20 #include <condition_variable> 25 #include <type_traits> 26 #include <unordered_map> 29 #include "google/protobuf/io/zero_copy_stream_impl.h" 30 #include "google/protobuf/message.h" 31 #include "google/protobuf/text_format.h" 46 body_.reset(
new proto::ChunkBody());
53 inline void add(
const proto::SingleMessage& message) {
54 std::lock_guard<std::mutex> lock(
mutex_);
55 proto::SingleMessage* p_message =
body_->add_messages();
57 if (
header_.begin_time() == 0) {
58 header_.set_begin_time(message.time());
60 if (
header_.begin_time() > message.time()) {
61 header_.set_begin_time(message.time());
63 if (
header_.end_time() < message.time()) {
64 header_.set_end_time(message.time());
67 header_.set_raw_size(
header_.raw_size() + message.content().size());
74 std::unique_ptr<proto::ChunkBody>
body_ =
nullptr;
81 bool Open(
const std::string& path)
override;
82 void Close()
override;
83 bool WriteHeader(
const proto::Header& header);
84 bool WriteChannel(
const proto::Channel& channel);
85 bool WriteMessage(
const proto::SingleMessage& message);
86 uint64_t GetMessageNumber(
const std::string& channel_name)
const;
89 bool WriteChunk(
const proto::ChunkHeader& chunk_header,
90 const proto::ChunkBody& chunk_body);
92 bool WriteSection(
const T& message);
95 std::atomic_bool is_writing_;
96 std::unique_ptr<Chunk> chunk_active_ =
nullptr;
97 std::unique_ptr<Chunk> chunk_flush_ =
nullptr;
98 std::shared_ptr<std::thread> flush_thread_ =
nullptr;
99 std::mutex flush_mutex_;
100 std::condition_variable flush_cv_;
101 std::unordered_map<std::string, uint64_t> channel_message_number_map_;
104 template <
typename T>
105 bool RecordFileWriter::WriteSection(
const T& message) {
106 proto::SectionType type;
108 type = proto::SectionType::SECTION_CHUNK_HEADER;
110 type = proto::SectionType::SECTION_CHUNK_BODY;
112 type = proto::SectionType::SECTION_CHANNEL;
114 type = proto::SectionType::SECTION_HEADER;
115 if (!SetPosition(0)) {
116 AERROR <<
"Jump to position #0 failed";
120 type = proto::SectionType::SECTION_INDEX;
122 AERROR <<
"Do not support this template typename.";
127 memset(§ion, 0,
sizeof(section));
128 section = {type,
static_cast<int64_t
>(message.ByteSizeLong())};
129 ssize_t count = write(fd_, §ion,
sizeof(section));
131 AERROR <<
"Write fd failed, fd: " << fd_ <<
", errno: " << errno;
134 if (count !=
sizeof(section)) {
135 AERROR <<
"Write fd failed, fd: " << fd_
136 <<
", expect count: " <<
sizeof(section)
137 <<
", actual count: " << count;
141 google::protobuf::io::FileOutputStream raw_output(fd_);
142 message.SerializeToZeroCopyStream(&raw_output);
144 if (type == proto::SectionType::SECTION_HEADER) {
146 count = write(fd_, &blank,
HEADER_LENGTH - message.ByteSizeLong());
148 AERROR <<
"Write fd failed, fd: " << fd_ <<
", errno: " << errno;
151 if (static_cast<size_t>(count) !=
HEADER_LENGTH - message.ByteSizeLong()) {
152 AERROR <<
"Write fd failed, fd: " << fd_
153 <<
", expect count: " <<
sizeof(section)
154 <<
", actual count: " << count;
158 header_.set_size(CurrentPosition());
166 #endif // CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_ void clear()
Definition: record_file_writer.h:45
void add(const proto::SingleMessage &message)
Definition: record_file_writer.h:53
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::unique_ptr< proto::ChunkBody > body_
Definition: record_file_writer.h:74
Definition: record_file_writer.h:77
std::mutex mutex_
Definition: record_file_writer.h:72
Definition: record_file_base.h:31
const int HEADER_LENGTH
Definition: record_file_base.h:29
proto::ChunkHeader header_
Definition: record_file_writer.h:73
apollo::cyber::base::std value
#define AERROR
Definition: log.h:44
bool empty()
Definition: record_file_writer.h:70
Definition: record_file_writer.h:42
Chunk()
Definition: record_file_writer.h:43