Connection.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
19 
20 #undef emit
21 #include <tbb/task.h>
22 #define emit
23 
24 // If TBB_VERSION_MAJOR is not defined, this means that
25 // tbb >= 2021 and we can include the tbb/version.h header
26 #ifndef TBB_VERSION_MAJOR
27 #include <tbb/version.h>
28 #endif
29 
30 #include <google/protobuf/message.h>
31 
32 // This fixes compiler warnings, see #3147 and #3160
33 #ifndef BOOST_BIND_GLOBAL_PLACEHOLDERS
34 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
35 #endif
36 #include <boost/asio.hpp>
37 #include <boost/bind.hpp>
38 #include <boost/function.hpp>
39 #include <boost/thread.hpp>
40 #include <boost/tuple/tuple.hpp>
41 
42 #include <string>
43 #include <vector>
44 #include <iostream>
45 #include <iomanip>
46 #include <deque>
47 #include <utility>
48 
49 #include "gazebo/common/Event.hh"
50 #include "gazebo/common/Console.hh"
53 #if TBB_VERSION_MAJOR >= 2021
55 #endif
56 #include "gazebo/util/system.hh"
57 
58 #define HEADER_LENGTH 8
59 
60 namespace gazebo
61 {
62  namespace transport
63  {
64  extern GZ_TRANSPORT_VISIBLE bool is_stopped();
65 
66  class IOManager;
67  class Connection;
68  typedef boost::shared_ptr<Connection> ConnectionPtr;
69 
73 #if TBB_VERSION_MAJOR < 2021
74  class GZ_TRANSPORT_VISIBLE ConnectionReadTask : public tbb::task
75 #else
76  class GZ_TRANSPORT_VISIBLE ConnectionReadTask
77 #endif
78  {
83  public: ConnectionReadTask(
84  boost::function<void (const std::string &)> _func,
85  const std::string &_data) :
86  func(_func),
87  data(_data)
88  {
89  }
90 
91 #if TBB_VERSION_MAJOR < 2021
94  public: tbb::task *execute()
95  {
96  this->func(this->data);
97  return NULL;
98  }
99 #else
101  public: void operator()() const
102  { this->func(this->data); }
103 #endif
105  private: boost::function<void (const std::string &)> func;
106 
108  private: std::string data;
109  };
111 
126  class GZ_TRANSPORT_VISIBLE Connection :
127  public boost::enable_shared_from_this<Connection>
128  {
130  public: Connection();
131 
133  public: virtual ~Connection();
134 
139  public: bool Connect(const std::string &_host, unsigned int _port);
140 
142  typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
143 
148  public: void Listen(unsigned int _port, const AcceptCallback &_acceptCB);
149 
151  typedef boost::function<void(const std::string &_data)> ReadCallback;
152 
156  public: void StartRead(const ReadCallback &_cb);
157 
159  public: void StopRead();
160 
162  public: void Shutdown();
163 
166  public: bool IsOpen() const;
167 
169  private: void Close();
170 
172  public: void Cancel();
173 
177  public: bool Read(std::string &_data);
178 
186  public: void EnqueueMsg(const std::string &_buffer,
187  boost::function<void(uint32_t)> _cb, uint32_t _id,
188  bool _force = false);
189 
194  public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
195 
198  public: std::string GetLocalURI() const;
199 
202  public: std::string GetRemoteURI() const;
203 
206  public: std::string GetLocalAddress() const;
207 
210  public: unsigned int GetLocalPort() const;
211 
214  public: std::string GetRemoteAddress() const;
215 
218  public: unsigned int GetRemotePort() const;
219 
222  public: std::string GetRemoteHostname() const;
223 
226  public: static std::string GetLocalHostname();
227 
230  public: template<typename Handler>
231  void AsyncRead(Handler _handler)
232  {
233  boost::mutex::scoped_lock lock(this->socketMutex);
234  if (!this->IsOpen())
235  {
236  gzerr << "AsyncRead on a closed socket\n";
237  return;
238  }
239 
240  void (Connection::*f)(const boost::system::error_code &,
241  boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
242 
243  this->inboundHeader.resize(HEADER_LENGTH);
244  boost::asio::async_read(*this->socket,
245  boost::asio::buffer(this->inboundHeader),
246  common::weakBind(f, this->shared_from_this(),
247  boost::asio::placeholders::error,
248  boost::make_tuple(_handler)));
249  }
250 
258  private: template<typename Handler>
259  void OnReadHeader(const boost::system::error_code &_e,
260  boost::tuple<Handler> _handler)
261  {
262  if (_e)
263  {
264  if (_e.value() == boost::asio::error::eof)
265  this->isOpen = false;
266  }
267  else
268  {
269  std::size_t inboundData_size = 0;
270  std::string header(&this->inboundHeader[0],
271  this->inboundHeader.size());
272  this->inboundHeader.clear();
273 
274  inboundData_size = this->ParseHeader(header);
275 
276  if (inboundData_size > 0)
277  {
278  // Start the asynchronous call to receive data
279  this->inboundData.resize(inboundData_size);
280 
281  void (Connection::*f)(const boost::system::error_code &e,
282  boost::tuple<Handler>) =
283  &Connection::OnReadData<Handler>;
284 
285  boost::asio::async_read(*this->socket,
286  boost::asio::buffer(this->inboundData),
287  common::weakBind(f, this->shared_from_this(),
288  boost::asio::placeholders::error,
289  _handler));
290  }
291  else
292  {
293  gzerr << "Header is empty\n";
294  boost::get<0>(_handler)("");
295  // This code tries to read the header again. We should
296  // never get here.
297  // this->inboundHeader.resize(HEADER_LENGTH);
298 
299  // void (Connection::*f)(const boost::system::error_code &,
300  // boost::tuple<Handler>) =
301  // &Connection::OnReadHeader<Handler>;
302 
303  // boost::asio::async_read(*this->socket,
304  // boost::asio::buffer(this->inboundHeader),
305  // common::weakBind(f, this->shared_from_this(),
306  // boost::asio::placeholders::error, _handler));
307  }
308  }
309  }
310 
318  private: template<typename Handler>
319  void OnReadData(const boost::system::error_code &_e,
320  boost::tuple<Handler> _handler)
321  {
322  if (_e)
323  {
324  if (_e.value() == boost::asio::error::eof)
325  this->isOpen = false;
326  }
327 
328  // Inform caller that data has been received
329  std::string data(&this->inboundData[0],
330  this->inboundData.size());
331  this->inboundData.clear();
332 
333  if (data.empty())
334  gzerr << "OnReadData got empty data!!!\n";
335 
336  if (!_e && !transport::is_stopped())
337  {
338 #if TBB_VERSION_MAJOR < 2021
339  ConnectionReadTask *task = new(tbb::task::allocate_root())
340  ConnectionReadTask(boost::get<0>(_handler), data);
341  tbb::task::enqueue(*task);
342 
343  // Non-tbb version:
344  // boost::get<0>(_handler)(data);
345 #else
346  this->taskGroup.run<ConnectionReadTask>(boost::get<0>(_handler), data);
347 #endif
348  }
349  }
350 
354  public: event::ConnectionPtr ConnectToShutdown(boost::function<void()>
355  _subscriber)
356  { return this->shutdown.Connect(_subscriber); }
357 
359  public: void ProcessWriteQueue(bool _blocking = false);
360 
363  public: unsigned int GetId() const;
364 
368  public: static bool ValidateIP(const std::string &_ip);
369 
373  public: std::string GetIPWhiteList() const;
374 
377  private: void PostWrite();
378 
382  private: void OnWrite(const boost::system::error_code &_e);
383 
386  private: void OnAccept(const boost::system::error_code &_e);
387 
390  private: std::size_t ParseHeader(const std::string &_header);
391 
393  private: void ReadLoop(const ReadCallback &_cb);
394 
397  private: static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
398 
401  private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
402 
405  private: static std::string GetHostname(
406  boost::asio::ip::tcp::endpoint _ep);
407 
411  private: void OnConnect(const boost::system::error_code &_error,
412  boost::asio::ip::tcp::resolver::iterator _endPointIter);
413 
415  private: boost::asio::ip::tcp::socket *socket;
416 
418  private: boost::asio::ip::tcp::acceptor *acceptor;
419 
421  private: std::deque<std::string> writeQueue;
422 
425  private: std::deque< std::vector<
426  std::pair<boost::function<void(uint32_t)>, uint32_t> > >
427  callbacks;
428 
430  private: boost::mutex connectMutex;
431 
433  private: boost::recursive_mutex writeMutex;
434 
436  private: boost::recursive_mutex readMutex;
437 
439  private: mutable boost::mutex socketMutex;
440 
442  private: boost::condition_variable connectCondition;
443 
445  private: AcceptCallback acceptCB;
446 
448  private: std::vector<char> inboundHeader;
449 
451  private: std::vector<char> inboundData;
452 
454  private: bool readQuit;
455 
457  private: unsigned int id;
458 
460  private: static unsigned int idCounter;
461 
463  private: ConnectionPtr acceptConn;
464 
466  private: event::EventT<void()> shutdown;
467 
469  private: static IOManager *iomanager;
470 
472  private: unsigned int writeCount;
473 
475  private: std::string localURI;
476 
478  private: std::string localAddress;
479 
481  private: std::string remoteURI;
482 
484  private: std::string remoteAddress;
485 
487  private: bool connectError;
488 
490  private: std::string ipWhiteList;
491 
493  private: bool dropMsgLogged;
494 
496  private: bool isOpen;
497 
498 #if TBB_VERSION_MAJOR >= 2021
500  private: TaskGroup taskGroup;
501 #endif
502  };
504  }
505 }
506 #endif
#define NULL
Definition: CommonTypes.hh:31
transport
Definition: ConnectionManager.hh:38
#define HEADER_LENGTH
Definition: Connection.hh:58
A class for event processing.
Definition: Event.hh:100
Single TCP/IP connection manager.
Definition: Connection.hh:128
bool IsOpen() const
Is the connection open?
void ProcessWriteQueue(bool _blocking=false)
Handle on-write callbacks.
bool Connect(const std::string &_host, unsigned int _port)
Connect to a remote host.
void EnqueueMsg(const std::string &_buffer, boost::function< void(uint32_t)> _cb, uint32_t _id, bool _force=false)
Write data to the socket.
std::string GetLocalURI() const
Get the local URI.
unsigned int GetId() const
Get the ID of the connection.
void Listen(unsigned int _port, const AcceptCallback &_acceptCB)
Start a server that listens on a port.
void StartRead(const ReadCallback &_cb)
Start a thread that reads from the connection and passes new message to the ReadCallback.
std::string GetRemoteURI() const
Get the remote URI.
std::string GetRemoteHostname() const
Get the remote hostname.
std::string GetIPWhiteList() const
Get the IP white list, from GAZEBO_IP_WHITE_LIST environment variable.
static std::string GetLocalHostname()
Get the local hostname.
unsigned int GetLocalPort() const
Get the port of this connection.
void Cancel()
Cancel all async operations on an open socket.
std::string GetRemoteAddress() const
Get the remote address.
void EnqueueMsg(const std::string &_buffer, bool _force=false)
Write data to the socket.
unsigned int GetRemotePort() const
Get the remote port number.
event::ConnectionPtr ConnectToShutdown(boost::function< void()> _subscriber)
Register a function to be called when the connection is shut down.
Definition: Connection.hh:354
void Shutdown()
Shutdown the socket.
void StopRead()
Stop the read loop.
bool Read(std::string &_data)
Read data from the socket.
std::string GetLocalAddress() const
Get the local address of this connection.
virtual ~Connection()
Destructor.
static bool ValidateIP(const std::string &_ip)
Return true if the _ip is a valid.
void AsyncRead(Handler _handler)
Peform an asyncronous read param[in] _handler Callback to invoke on received data.
Definition: Connection.hh:231
boost::function< void(const std::string &_data)> ReadCallback
The signature of a connection read callback.
Definition: Connection.hh:151
boost::function< void(const ConnectionPtr &)> AcceptCallback
The signature of a connection accept callback.
Definition: Connection.hh:142
Manages boost::asio IO.
Definition: IOManager.hh:36
Definition: TaskGroup.hh:30
auto weakBind(Func _func, boost::shared_ptr< T > _ptr, Args... _args) -> decltype(details::makeWeakBinder(boost::bind(_func, _ptr.get(), _args...), boost::weak_ptr< T >(_ptr)))
Definition: WeakBind.hh:114
#define gzerr
Output an error message.
Definition: Console.hh:50
bool is_stopped()
Is the transport system stopped?
boost::shared_ptr< Connection > ConnectionPtr
Definition: CommonTypes.hh:134
boost::shared_ptr< Connection > ConnectionPtr
Definition: Connection.hh:67
Forward declarations for the common classes.
Definition: Animation.hh:27
GAZEBO_VISIBLE bool shutdown()
Stop and cleanup simulation.