Apollo  6.0
Open source self driving car software
shm_dispatcher.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_TRANSPORT_DISPATCHER_SHM_DISPATCHER_H_
18 #define CYBER_TRANSPORT_DISPATCHER_SHM_DISPATCHER_H_
19 
20 #include <cstring>
21 #include <memory>
22 #include <string>
23 #include <thread>
24 #include <unordered_map>
25 
28 #include "cyber/common/log.h"
29 #include "cyber/common/macros.h"
34 
35 namespace apollo {
36 namespace cyber {
37 namespace transport {
38 
39 class ShmDispatcher;
44 
45 class ShmDispatcher : public Dispatcher {
46  public:
47  // key: channel_id
48  using SegmentContainer = std::unordered_map<uint64_t, SegmentPtr>;
49 
50  virtual ~ShmDispatcher();
51 
52  void Shutdown() override;
53 
54  template <typename MessageT>
55  void AddListener(const RoleAttributes& self_attr,
56  const MessageListener<MessageT>& listener);
57 
58  template <typename MessageT>
59  void AddListener(const RoleAttributes& self_attr,
60  const RoleAttributes& opposite_attr,
61  const MessageListener<MessageT>& listener);
62 
63  private:
64  void AddSegment(const RoleAttributes& self_attr);
65  void ReadMessage(uint64_t channel_id, uint32_t block_index);
66  void OnMessage(uint64_t channel_id, const std::shared_ptr<ReadableBlock>& rb,
67  const MessageInfo& msg_info);
68  void ThreadFunc();
69  bool Init();
70 
71  uint64_t host_id_;
72  SegmentContainer segments_;
73  std::unordered_map<uint64_t, uint32_t> previous_indexes_;
74  AtomicRWLock segments_lock_;
75  std::thread thread_;
76  NotifierPtr notifier_;
77 
79 };
80 
81 template <typename MessageT>
82 void ShmDispatcher::AddListener(const RoleAttributes& self_attr,
83  const MessageListener<MessageT>& listener) {
84  // FIXME: make it more clean
85  auto listener_adapter = [listener](const std::shared_ptr<ReadableBlock>& rb,
86  const MessageInfo& msg_info) {
87  auto msg = std::make_shared<MessageT>();
89  rb->buf, static_cast<int>(rb->block->msg_size()), msg.get()));
90  listener(msg, msg_info);
91  };
92 
93  Dispatcher::AddListener<ReadableBlock>(self_attr, listener_adapter);
94  AddSegment(self_attr);
95 }
96 
97 template <typename MessageT>
98 void ShmDispatcher::AddListener(const RoleAttributes& self_attr,
99  const RoleAttributes& opposite_attr,
100  const MessageListener<MessageT>& listener) {
101  // FIXME: make it more clean
102  auto listener_adapter = [listener](const std::shared_ptr<ReadableBlock>& rb,
103  const MessageInfo& msg_info) {
104  auto msg = std::make_shared<MessageT>();
106  rb->buf, static_cast<int>(rb->block->msg_size()), msg.get()));
107  listener(msg, msg_info);
108  };
109 
110  Dispatcher::AddListener<ReadableBlock>(self_attr, opposite_attr,
111  listener_adapter);
112  AddSegment(self_attr);
113 }
114 
115 } // namespace transport
116 } // namespace cyber
117 } // namespace apollo
118 
119 #endif // CYBER_TRANSPORT_DISPATCHER_SHM_DISPATCHER_H_
void AddListener(const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
Definition: shm_dispatcher.h:82
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
Definition: rw_lock_guard.h:48
Definition: atomic_rw_lock.h:36
#define RETURN_IF(condition)
Definition: log.h:106
std::enable_if< HasParseFromArray< T >::value, bool >::type ParseFromArray(const void *data, int size, T *message)
Definition: message_traits.h:145
Definition: notifier_base.h:31
std::unordered_map< uint64_t, SegmentPtr > SegmentContainer
Definition: shm_dispatcher.h:48
Definition: shm_dispatcher.h:45
Definition: dispatcher.h:54
#define DECLARE_SINGLETON(classname)
Definition: macros.h:52
Definition: rw_lock_guard.h:35
Definition: message_info.h:30
std::function< void(const std::shared_ptr< MessageT > &, const MessageInfo &)> MessageListener
Definition: dispatcher.h:52