Apollo  6.0
Open source self driving car software
record_writer.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_RECORD_RECORD_WRITER_H_
18 #define CYBER_RECORD_RECORD_WRITER_H_
19 
20 #include <cstdint>
21 #include <memory>
22 #include <mutex>
23 #include <set>
24 #include <sstream>
25 #include <string>
26 #include <unordered_map>
27 
28 #include "cyber/proto/record.pb.h"
29 
30 #include "cyber/common/log.h"
36 
37 namespace apollo {
38 namespace cyber {
39 namespace record {
40 
44 class RecordWriter : public RecordBase {
45  public:
46  using MessageNumberMap = std::unordered_map<std::string, uint64_t>;
47  using MessageTypeMap = std::unordered_map<std::string, std::string>;
48  using MessageProtoDescMap = std::unordered_map<std::string, std::string>;
49  using FileWriterPtr = std::unique_ptr<RecordFileWriter>;
50 
54  RecordWriter();
55 
61  explicit RecordWriter(const proto::Header& header);
62 
66  virtual ~RecordWriter();
67 
75  bool Open(const std::string& file);
76 
80  void Close();
81 
91  bool WriteChannel(const std::string& channel_name,
92  const std::string& message_type,
93  const std::string& proto_desc);
94 
106  template <typename MessageT>
107  bool WriteMessage(const std::string& channel_name, const MessageT& message,
108  const uint64_t time_nanosec,
109  const std::string& proto_desc = "");
110 
118  bool SetSizeOfFileSegmentation(uint64_t size_kilobytes);
119 
127  bool SetIntervalOfFileSegmentation(uint64_t time_sec);
128 
136  uint64_t GetMessageNumber(const std::string& channel_name) const override;
137 
145  const std::string& GetMessageType(
146  const std::string& channel_name) const override;
147 
155  const std::string& GetProtoDesc(
156  const std::string& channel_name) const override;
157 
163  std::set<std::string> GetChannelList() const override;
164 
170  bool IsNewChannel(const std::string& channel_name) const;
171 
172  private:
173  bool WriteMessage(const proto::SingleMessage& single_msg);
174  bool SplitOutfile();
175  void OnNewChannel(const std::string& channel_name,
176  const std::string& message_type,
177  const std::string& proto_desc);
178  void OnNewMessage(const std::string& channel_name);
179 
180  std::string path_;
181  uint64_t segment_raw_size_ = 0;
182  uint64_t segment_begin_time_ = 0;
183  uint32_t file_index_ = 0;
184  MessageNumberMap channel_message_number_map_;
185  MessageTypeMap channel_message_type_map_;
186  MessageProtoDescMap channel_proto_desc_map_;
187  FileWriterPtr file_writer_ = nullptr;
188  FileWriterPtr file_writer_backup_ = nullptr;
189  std::mutex mutex_;
190  std::stringstream sstream_;
191 };
192 
193 template <>
194 inline bool RecordWriter::WriteMessage(const std::string& channel_name,
195  const std::string& message,
196  const uint64_t time_nanosec,
197  const std::string& proto_desc) {
198  proto::SingleMessage single_msg;
199  single_msg.set_channel_name(channel_name);
200  single_msg.set_content(message);
201  single_msg.set_time(time_nanosec);
202  return WriteMessage(single_msg);
203 }
204 
205 template <>
207  const std::string& channel_name,
208  const std::shared_ptr<message::RawMessage>& message,
209  const uint64_t time_nanosec, const std::string& proto_desc) {
210  if (message == nullptr) {
211  AERROR << "nullptr error, channel: " << channel_name;
212  return false;
213  }
214  return WriteMessage(channel_name, message->message, time_nanosec);
215 }
216 
217 template <typename MessageT>
218 bool RecordWriter::WriteMessage(const std::string& channel_name,
219  const MessageT& message,
220  const uint64_t time_nanosec,
221  const std::string& proto_desc) {
222  const std::string& message_type = GetMessageType(channel_name);
223  if (message_type.empty()) {
224  if (!WriteChannel(channel_name, message::GetMessageName<MessageT>(),
225  proto_desc)) {
226  AERROR << "Failed to write meta data to channel [" << channel_name
227  << "].";
228  return false;
229  }
230  } else {
231  if (MessageT::descriptor()->full_name() != message_type) {
232  AERROR << "Message type is invalid, expect: " << message_type
233  << ", actual: " << message::GetMessageName<MessageT>();
234  return false;
235  }
236  }
237  std::string content("");
238  if (!message.SerializeToString(&content)) {
239  AERROR << "Failed to serialize message, channel: " << channel_name;
240  return false;
241  }
242  return WriteMessage(channel_name, content, time_nanosec);
243 }
244 
245 } // namespace record
246 } // namespace cyber
247 } // namespace apollo
248 
249 #endif // CYBER_RECORD_RECORD_WRITER_H_
std::unordered_map< std::string, std::string > MessageProtoDescMap
Definition: record_writer.h:48
bool WriteChannel(const std::string &channel_name, const std::string &message_type, const std::string &proto_desc)
Write a channel to record.
RecordWriter()
The default constructor.
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
The record writer.
Definition: record_writer.h:44
bool SetIntervalOfFileSegmentation(uint64_t time_sec)
Set max interval (Second) to segment record file.
bool SetSizeOfFileSegmentation(uint64_t size_kilobytes)
Set max size (KB) to segment record file.
const std::string & GetProtoDesc(const std::string &channel_name) const override
Get proto descriptor string by channel name.
std::unique_ptr< RecordFileWriter > FileWriterPtr
Definition: record_writer.h:49
Base class for record reader and writer.
Definition: record_base.h:35
std::unordered_map< std::string, uint64_t > MessageNumberMap
Definition: record_writer.h:46
bool WriteMessage(const std::string &channel_name, const MessageT &message, const uint64_t time_nanosec, const std::string &proto_desc="")
Write a message to record.
Definition: record_writer.h:218
bool IsNewChannel(const std::string &channel_name) const
Is a new channel recording or not.
const std::string & GetMessageType(const std::string &channel_name) const override
Get message type by channel name.
void Close()
Clean the record.
virtual ~RecordWriter()
Virtual Destructor.
bool Open(const std::string &file)
Open a record to write.
uint64_t GetMessageNumber(const std::string &channel_name) const override
Get message number by channel name.
#define AERROR
Definition: log.h:44
std::set< std::string > GetChannelList() const override
Get channel list.
std::unordered_map< std::string, std::string > MessageTypeMap
Definition: record_writer.h:47