17 #ifndef CYBER_DATA_DATA_DISPATCHER_H_ 18 #define CYBER_DATA_DATA_DISPATCHER_H_ 41 std::vector<std::weak_ptr<CacheBuffer<std::shared_ptr<T>>>>;
46 bool Dispatch(
const uint64_t channel_id,
const std::shared_ptr<T>& msg);
50 std::mutex buffers_map_mutex_;
61 std::lock_guard<std::mutex> lock(buffers_map_mutex_);
62 auto buffer = channel_buffer.
Buffer();
64 if (buffers_map_.
Get(channel_buffer.
channel_id(), &buffers)) {
65 buffers->emplace_back(buffer);
74 const std::shared_ptr<T>& msg) {
79 if (buffers_map_.
Get(channel_id, &buffers)) {
80 for (
auto& buffer_wptr : *buffers) {
81 if (
auto buffer = buffer_wptr.lock()) {
82 std::lock_guard<std::mutex> lock(buffer->Mutex());
89 return notifier_->
Notify(channel_id);
96 #endif // CYBER_DATA_DATA_DISPATCHER_H_ ::apollo::cyber::Time Time
Definition: racobit_radar_message_manager.h:41
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
std::vector< std::weak_ptr< CacheBuffer< std::shared_ptr< T > >> > BufferVector
Definition: data_dispatcher.h:41
Definition: data_dispatcher.h:38
bool Get(K key, V **value)
Definition: atomic_hash_map.h:51
uint64_t channel_id() const
Definition: channel_buffer.h:48
Definition: channel_buffer.h:36
bool IsShutdown()
Definition: state.h:46
#define DECLARE_SINGLETON(classname)
Definition: macros.h:52
A implementation of lock-free fixed size hash map.
Definition: atomic_hash_map.h:40
Definition: data_notifier.h:42
void AddBuffer(const ChannelBuffer< T > &channel_buffer)
Definition: data_dispatcher.h:60
void Set(K key)
Definition: atomic_hash_map.h:66
std::shared_ptr< BufferType > Buffer() const
Definition: channel_buffer.h:49
bool Dispatch(const uint64_t channel_id, const std::shared_ptr< T > &msg)
Definition: data_dispatcher.h:73
~DataDispatcher()
Definition: data_dispatcher.h:42
bool Notify(const uint64_t channel_id)
Definition: data_notifier.h:73