Apollo  6.0
Open source self driving car software
shm_transmitter.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_TRANSPORT_TRANSMITTER_SHM_TRANSMITTER_H_
18 #define CYBER_TRANSPORT_TRANSMITTER_SHM_TRANSMITTER_H_
19 
20 #include <cstring>
21 #include <iostream>
22 #include <memory>
23 #include <string>
24 
26 #include "cyber/common/log.h"
27 #include "cyber/common/util.h"
33 
34 namespace apollo {
35 namespace cyber {
36 namespace transport {
37 
38 template <typename M>
39 class ShmTransmitter : public Transmitter<M> {
40  public:
41  using MessagePtr = std::shared_ptr<M>;
42 
43  explicit ShmTransmitter(const RoleAttributes& attr);
44  virtual ~ShmTransmitter();
45 
46  void Enable() override;
47  void Disable() override;
48 
49  bool Transmit(const MessagePtr& msg, const MessageInfo& msg_info) override;
50 
51  private:
52  bool Transmit(const M& msg, const MessageInfo& msg_info);
53 
54  SegmentPtr segment_;
55  uint64_t channel_id_;
56  uint64_t host_id_;
57  NotifierPtr notifier_;
58 };
59 
60 template <typename M>
61 ShmTransmitter<M>::ShmTransmitter(const RoleAttributes& attr)
62  : Transmitter<M>(attr),
63  segment_(nullptr),
64  channel_id_(attr.channel_id()),
65  notifier_(nullptr) {
66  host_id_ = common::Hash(attr.host_ip());
67 }
68 
69 template <typename M>
71  Disable();
72 }
73 
74 template <typename M>
76  if (this->enabled_) {
77  return;
78  }
79 
80  segment_ = SegmentFactory::CreateSegment(channel_id_);
81  notifier_ = NotifierFactory::CreateNotifier();
82  this->enabled_ = true;
83 }
84 
85 template <typename M>
87  if (this->enabled_) {
88  segment_ = nullptr;
89  notifier_ = nullptr;
90  this->enabled_ = false;
91  }
92 }
93 
94 template <typename M>
96  const MessageInfo& msg_info) {
97  return Transmit(*msg, msg_info);
98 }
99 
100 template <typename M>
101 bool ShmTransmitter<M>::Transmit(const M& msg, const MessageInfo& msg_info) {
102  if (!this->enabled_) {
103  ADEBUG << "not enable.";
104  return false;
105  }
106 
107  WritableBlock wb;
108  std::size_t msg_size = message::ByteSize(msg);
109  if (!segment_->AcquireBlockToWrite(msg_size, &wb)) {
110  AERROR << "acquire block failed.";
111  return false;
112  }
113 
114  ADEBUG << "block index: " << wb.index;
115  if (!message::SerializeToArray(msg, wb.buf, static_cast<int>(msg_size))) {
116  AERROR << "serialize to array failed.";
117  segment_->ReleaseWrittenBlock(wb);
118  return false;
119  }
120  wb.block->set_msg_size(msg_size);
121 
122  char* msg_info_addr = reinterpret_cast<char*>(wb.buf) + msg_size;
123  if (!msg_info.SerializeTo(msg_info_addr, MessageInfo::kSize)) {
124  AERROR << "serialize message info failed.";
125  segment_->ReleaseWrittenBlock(wb);
126  return false;
127  }
129  segment_->ReleaseWrittenBlock(wb);
130 
131  ReadableInfo readable_info(host_id_, wb.index, channel_id_);
132 
133  ADEBUG << "Writing sharedmem message: "
134  << common::GlobalData::GetChannelById(channel_id_)
135  << " to block: " << wb.index;
136  return notifier_->Notify(readable_info);
137 }
138 
139 } // namespace transport
140 } // namespace cyber
141 } // namespace apollo
142 
143 #endif // CYBER_TRANSPORT_TRANSMITTER_SHM_TRANSMITTER_H_
void set_msg_info_size(uint64_t msg_info_size)
Definition: block.h:38
bool enabled_
Definition: endpoint.h:45
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: readable_info.h:32
ShmTransmitter(const RoleAttributes &attr)
Definition: shm_transmitter.h:61
std::enable_if< HasByteSize< T >::value, int >::type ByteSize(const T &message)
Definition: message_traits.h:123
virtual bool Notify(const ReadableInfo &info)=0
Definition: notifier_base.h:31
void set_msg_size(uint64_t msg_size)
Definition: block.h:35
Definition: shm_transmitter.h:39
uint8_t * buf
Definition: segment.h:39
static std::string GetChannelById(uint64_t id)
virtual ~ShmTransmitter()
Definition: shm_transmitter.h:70
std::enable_if< HasSerializeToArray< T >::value, bool >::type SerializeToArray(const T &message, void *data, int size)
Definition: message_traits.h:189
void Disable() override
Definition: shm_transmitter.h:86
std::shared_ptr< Segment > SegmentPtr
Definition: segment.h:34
#define ADEBUG
Definition: log.h:41
void Enable() override
Definition: shm_transmitter.h:75
static const std::size_t kSize
Definition: message_info.h:61
bool Transmit(const MessagePtr &msg, const MessageInfo &msg_info) override
Definition: shm_transmitter.h:95
Definition: message_info.h:30
bool SerializeTo(std::string *dst) const
std::size_t Hash(const std::string &key)
Definition: util.h:27
uint32_t index
Definition: segment.h:37
static SegmentPtr CreateSegment(uint64_t channel_id)
#define AERROR
Definition: log.h:44
Definition: transmitter.h:36
Block * block
Definition: segment.h:38
std::shared_ptr< M > MessagePtr
Definition: shm_transmitter.h:41