Apollo  6.0
Open source self driving car software
data_dispatcher.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #ifndef CYBER_DATA_DATA_DISPATCHER_H_
18 #define CYBER_DATA_DATA_DISPATCHER_H_
19 
20 #include <memory>
21 #include <mutex>
22 #include <vector>
23 
24 #include "cyber/common/log.h"
25 #include "cyber/common/macros.h"
27 #include "cyber/state.h"
28 #include "cyber/time/time.h"
29 
30 namespace apollo {
31 namespace cyber {
32 namespace data {
33 
36 
37 template <typename T>
39  public:
40  using BufferVector =
41  std::vector<std::weak_ptr<CacheBuffer<std::shared_ptr<T>>>>;
43 
44  void AddBuffer(const ChannelBuffer<T>& channel_buffer);
45 
46  bool Dispatch(const uint64_t channel_id, const std::shared_ptr<T>& msg);
47 
48  private:
49  DataNotifier* notifier_ = DataNotifier::Instance();
50  std::mutex buffers_map_mutex_;
52 
54 };
55 
56 template <typename T>
58 
59 template <typename T>
60 void DataDispatcher<T>::AddBuffer(const ChannelBuffer<T>& channel_buffer) {
61  std::lock_guard<std::mutex> lock(buffers_map_mutex_);
62  auto buffer = channel_buffer.Buffer();
63  BufferVector* buffers = nullptr;
64  if (buffers_map_.Get(channel_buffer.channel_id(), &buffers)) {
65  buffers->emplace_back(buffer);
66  } else {
67  BufferVector new_buffers = {buffer};
68  buffers_map_.Set(channel_buffer.channel_id(), new_buffers);
69  }
70 }
71 
72 template <typename T>
73 bool DataDispatcher<T>::Dispatch(const uint64_t channel_id,
74  const std::shared_ptr<T>& msg) {
75  BufferVector* buffers = nullptr;
77  return false;
78  }
79  if (buffers_map_.Get(channel_id, &buffers)) {
80  for (auto& buffer_wptr : *buffers) {
81  if (auto buffer = buffer_wptr.lock()) {
82  std::lock_guard<std::mutex> lock(buffer->Mutex());
83  buffer->Fill(msg);
84  }
85  }
86  } else {
87  return false;
88  }
89  return notifier_->Notify(channel_id);
90 }
91 
92 } // namespace data
93 } // namespace cyber
94 } // namespace apollo
95 
96 #endif // CYBER_DATA_DATA_DISPATCHER_H_
::apollo::cyber::Time Time
Definition: racobit_radar_message_manager.h:41
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::vector< std::weak_ptr< CacheBuffer< std::shared_ptr< T > >> > BufferVector
Definition: data_dispatcher.h:41
Definition: data_dispatcher.h:38
bool Get(K key, V **value)
Definition: atomic_hash_map.h:51
uint64_t channel_id() const
Definition: channel_buffer.h:48
Definition: channel_buffer.h:36
bool IsShutdown()
Definition: state.h:46
#define DECLARE_SINGLETON(classname)
Definition: macros.h:52
A implementation of lock-free fixed size hash map.
Definition: atomic_hash_map.h:40
Definition: data_notifier.h:42
void AddBuffer(const ChannelBuffer< T > &channel_buffer)
Definition: data_dispatcher.h:60
void Set(K key)
Definition: atomic_hash_map.h:66
std::shared_ptr< BufferType > Buffer() const
Definition: channel_buffer.h:49
bool Dispatch(const uint64_t channel_id, const std::shared_ptr< T > &msg)
Definition: data_dispatcher.h:73
~DataDispatcher()
Definition: data_dispatcher.h:42
bool Notify(const uint64_t channel_id)
Definition: data_notifier.h:73