Apollo  6.0
Open source self driving car software
udp_listener.h
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright 2018 The Apollo Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *****************************************************************************/
16 
17 #pragma once
18 
19 #include <arpa/inet.h>
20 #include <fcntl.h>
21 #include <netinet/in.h>
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/resource.h>
25 #include <sys/socket.h>
26 #include <sys/types.h>
27 #include <sys/wait.h>
28 #include <unistd.h>
29 
30 namespace apollo {
31 namespace bridge {
32 
33 constexpr int MAXEPOLLSIZE = 100;
34 
35 template <typename T>
36 class UDPListener {
37  public:
38  typedef bool (T::*func)(int fd);
40  UDPListener(T *receiver, uint16_t port, func msg_handle) {
41  receiver_ = receiver;
42  listened_port_ = port;
43  msg_handle_ = msg_handle;
44  }
46  if (listener_sock_ != -1) {
47  close(listener_sock_);
48  }
49  }
50 
51  void SetMsgHandle(func msg_handle) { msg_handle_ = msg_handle; }
52  bool Initialize(T *receiver, func msg_handle, uint16_t port);
53  bool Listen();
54 
55  static void *pthread_handle_message(void *param);
56 
57  public:
58  struct Param {
59  int fd_ = 0;
61  };
62 
63  private:
64  bool setnonblocking(int sockfd);
65  void MessageHandle(int fd);
66 
67  private:
68  T *receiver_ = nullptr;
69  uint16_t listened_port_ = 0;
70  int listener_sock_ = -1;
71  func msg_handle_ = nullptr;
72  int kdpfd_ = 0;
73 };
74 
75 template <typename T>
76 bool UDPListener<T>::Initialize(T *receiver, func msg_handle, uint16_t port) {
77  msg_handle_ = msg_handle;
78  if (!msg_handle_) {
79  return false;
80  }
81 
82  receiver_ = receiver;
83  if (!receiver_) {
84  return false;
85  }
86  listened_port_ = port;
87  struct rlimit rt;
88  rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE;
89  if (setrlimit(RLIMIT_NOFILE, &rt) == -1) {
90  return false;
91  }
92 
93  listener_sock_ = socket(AF_INET, SOCK_DGRAM, 0);
94  if (listener_sock_ == -1) {
95  return false;
96  }
97  int opt = SO_REUSEADDR;
98  setsockopt(listener_sock_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
99  setnonblocking(listener_sock_);
100 
101  struct sockaddr_in serv_addr;
102  serv_addr.sin_family = PF_INET;
103  serv_addr.sin_port = htons((uint16_t)listened_port_);
104  serv_addr.sin_addr.s_addr = INADDR_ANY;
105  if (bind(listener_sock_, (struct sockaddr *)&serv_addr,
106  sizeof(struct sockaddr)) == -1) {
107  close(listener_sock_);
108  return false;
109  }
110  kdpfd_ = epoll_create(MAXEPOLLSIZE);
111  struct epoll_event ev;
112  ev.events = EPOLLIN | EPOLLET;
113  ev.data.fd = listener_sock_;
114  if (epoll_ctl(kdpfd_, EPOLL_CTL_ADD, listener_sock_, &ev) < 0) {
115  close(listener_sock_);
116  return false;
117  }
118  return true;
119 }
120 
121 template <typename T>
123  int nfds = -1;
124  bool res = true;
125  struct epoll_event events[MAXEPOLLSIZE];
126  while (true) {
127  nfds = epoll_wait(kdpfd_, events, 10000, -1);
128  if (nfds == -1) {
129  res = false;
130  break;
131  }
132 
133  for (int i = 0; i < nfds; ++i) {
134  if (events[i].data.fd == listener_sock_) {
135  pthread_t thread;
136  pthread_attr_t attr;
137  pthread_attr_init(&attr);
138  pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
139  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
140  Param *par = new Param;
141  par->fd_ = events[i].data.fd;
142  par->listener_ = this;
143  if (pthread_create(&thread, &attr,
145  reinterpret_cast<void *>(par))) {
146  res = false;
147  return res;
148  }
149  }
150  }
151  }
152  close(listener_sock_);
153  return res;
154 }
155 
156 template <typename T>
157 bool UDPListener<T>::setnonblocking(int sockfd) {
158  if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0) | O_NONBLOCK) == -1) {
159  return false;
160  }
161  return true;
162 }
163 
164 template <typename T>
166  Param *par = static_cast<Param *>(param);
167  int fd = par->fd_;
168  UDPListener<T> *listener = par->listener_;
169  if (par) {
170  delete par;
171  }
172  par = nullptr;
173  if (!listener) {
174  pthread_exit(nullptr);
175  }
176  listener->MessageHandle(fd);
177  pthread_exit(nullptr);
178 }
179 
180 template <typename T>
181 void UDPListener<T>::MessageHandle(int fd) {
182  if (!receiver_ || !msg_handle_) {
183  return;
184  }
185  (receiver_->*msg_handle_)(fd);
186 }
187 
188 } // namespace bridge
189 } // namespace apollo
PlanningContext is the runtime context in planning. It is persistent across multiple frames...
Definition: atomic_hash_map.h:25
~UDPListener()
Definition: udp_listener.h:45
constexpr int MAXEPOLLSIZE
Definition: udp_listener.h:33
bool(T::* func)(int fd)
Definition: udp_listener.h:38
bool Initialize(T *receiver, func msg_handle, uint16_t port)
Definition: udp_listener.h:76
UDPListener(T *receiver, uint16_t port, func msg_handle)
Definition: udp_listener.h:40
UDPListener< T > * listener_
Definition: udp_listener.h:60
bool Listen()
Definition: udp_listener.h:122
static void * pthread_handle_message(void *param)
Definition: udp_listener.h:165
int fd_
Definition: udp_listener.h:59
void SetMsgHandle(func msg_handle)
Definition: udp_listener.h:51
Definition: udp_listener.h:36
UDPListener()
Definition: udp_listener.h:39
Definition: udp_listener.h:58