17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
26 #ifndef TBB_VERSION_MAJOR
27 #include <tbb/version.h>
30 #include <google/protobuf/message.h>
33 #ifndef BOOST_BIND_GLOBAL_PLACEHOLDERS
34 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
36 #include <boost/asio.hpp>
37 #include <boost/bind.hpp>
38 #include <boost/function.hpp>
39 #include <boost/thread.hpp>
40 #include <boost/tuple/tuple.hpp>
53 #if TBB_VERSION_MAJOR >= 2021
58 #define HEADER_LENGTH 8
73 #if TBB_VERSION_MAJOR < 2021
74 class GZ_TRANSPORT_VISIBLE ConnectionReadTask :
public tbb::task
76 class GZ_TRANSPORT_VISIBLE ConnectionReadTask
83 public: ConnectionReadTask(
84 boost::function<
void (
const std::string &)> _func,
85 const std::string &_data) :
91 #if TBB_VERSION_MAJOR < 2021
94 public: tbb::task *execute()
96 this->func(this->data);
101 public:
void operator()()
const
102 { this->func(this->data); }
105 private: boost::function<void (
const std::string &)> func;
108 private: std::string data;
127 public boost::enable_shared_from_this<Connection>
139 public:
bool Connect(
const std::string &_host,
unsigned int _port);
169 private:
void Close();
177 public:
bool Read(std::string &_data);
187 boost::function<
void(uint32_t)> _cb, uint32_t _id,
188 bool _force =
false);
194 public:
void EnqueueMsg(
const std::string &_buffer,
bool _force =
false);
230 public:
template<
typename Handler>
233 boost::mutex::scoped_lock lock(this->socketMutex);
236 gzerr <<
"AsyncRead on a closed socket\n";
240 void (
Connection::*f)(
const boost::system::error_code &,
241 boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
244 boost::asio::async_read(*this->socket,
245 boost::asio::buffer(this->inboundHeader),
247 boost::asio::placeholders::error,
248 boost::make_tuple(_handler)));
258 private:
template<
typename Handler>
259 void OnReadHeader(
const boost::system::error_code &_e,
260 boost::tuple<Handler> _handler)
264 if (_e.value() == boost::asio::error::eof)
265 this->isOpen =
false;
269 std::size_t inboundData_size = 0;
270 std::string header(&this->inboundHeader[0],
271 this->inboundHeader.size());
272 this->inboundHeader.clear();
274 inboundData_size = this->ParseHeader(header);
276 if (inboundData_size > 0)
279 this->inboundData.resize(inboundData_size);
281 void (Connection::*f)(
const boost::system::error_code &e,
282 boost::tuple<Handler>) =
283 &Connection::OnReadData<Handler>;
285 boost::asio::async_read(*this->socket,
286 boost::asio::buffer(this->inboundData),
288 boost::asio::placeholders::error,
293 gzerr <<
"Header is empty\n";
294 boost::get<0>(_handler)(
"");
318 private:
template<
typename Handler>
319 void OnReadData(
const boost::system::error_code &_e,
320 boost::tuple<Handler> _handler)
324 if (_e.value() == boost::asio::error::eof)
325 this->isOpen =
false;
329 std::string data(&this->inboundData[0],
330 this->inboundData.size());
331 this->inboundData.clear();
334 gzerr <<
"OnReadData got empty data!!!\n";
338 #if TBB_VERSION_MAJOR < 2021
339 ConnectionReadTask *task =
new(tbb::task::allocate_root())
340 ConnectionReadTask(boost::get<0>(_handler), data);
341 tbb::task::enqueue(*task);
346 this->taskGroup.run<ConnectionReadTask>(boost::get<0>(_handler), data);
356 {
return this->
shutdown.Connect(_subscriber); }
377 private:
void PostWrite();
382 private:
void OnWrite(
const boost::system::error_code &_e);
386 private:
void OnAccept(
const boost::system::error_code &_e);
390 private: std::size_t ParseHeader(
const std::string &_header);
397 private:
static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
401 private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint()
const;
405 private:
static std::string GetHostname(
406 boost::asio::ip::tcp::endpoint _ep);
411 private:
void OnConnect(
const boost::system::error_code &_error,
412 boost::asio::ip::tcp::resolver::iterator _endPointIter);
415 private: boost::asio::ip::tcp::socket *socket;
418 private: boost::asio::ip::tcp::acceptor *acceptor;
421 private: std::deque<std::string> writeQueue;
425 private: std::deque< std::vector<
426 std::pair<boost::function<void(uint32_t)>, uint32_t> > >
430 private: boost::mutex connectMutex;
433 private: boost::recursive_mutex writeMutex;
436 private: boost::recursive_mutex readMutex;
439 private:
mutable boost::mutex socketMutex;
442 private: boost::condition_variable connectCondition;
448 private: std::vector<char> inboundHeader;
451 private: std::vector<char> inboundData;
454 private:
bool readQuit;
457 private:
unsigned int id;
460 private:
static unsigned int idCounter;
472 private:
unsigned int writeCount;
475 private: std::string localURI;
478 private: std::string localAddress;
481 private: std::string remoteURI;
484 private: std::string remoteAddress;
487 private:
bool connectError;
490 private: std::string ipWhiteList;
493 private:
bool dropMsgLogged;
496 private:
bool isOpen;
498 #if TBB_VERSION_MAJOR >= 2021
#define NULL
Definition: CommonTypes.hh:31
transport
Definition: ConnectionManager.hh:38
#define HEADER_LENGTH
Definition: Connection.hh:58
A class for event processing.
Definition: Event.hh:100
Single TCP/IP connection manager.
Definition: Connection.hh:128
bool IsOpen() const
Is the connection open?
void ProcessWriteQueue(bool _blocking=false)
Handle on-write callbacks.
bool Connect(const std::string &_host, unsigned int _port)
Connect to a remote host.
void EnqueueMsg(const std::string &_buffer, boost::function< void(uint32_t)> _cb, uint32_t _id, bool _force=false)
Write data to the socket.
std::string GetLocalURI() const
Get the local URI.
unsigned int GetId() const
Get the ID of the connection.
void Listen(unsigned int _port, const AcceptCallback &_acceptCB)
Start a server that listens on a port.
void StartRead(const ReadCallback &_cb)
Start a thread that reads from the connection and passes new message to the ReadCallback.
std::string GetRemoteURI() const
Get the remote URI.
std::string GetRemoteHostname() const
Get the remote hostname.
std::string GetIPWhiteList() const
Get the IP white list, from GAZEBO_IP_WHITE_LIST environment variable.
static std::string GetLocalHostname()
Get the local hostname.
unsigned int GetLocalPort() const
Get the port of this connection.
void Cancel()
Cancel all async operations on an open socket.
std::string GetRemoteAddress() const
Get the remote address.
void EnqueueMsg(const std::string &_buffer, bool _force=false)
Write data to the socket.
unsigned int GetRemotePort() const
Get the remote port number.
event::ConnectionPtr ConnectToShutdown(boost::function< void()> _subscriber)
Register a function to be called when the connection is shut down.
Definition: Connection.hh:354
void Shutdown()
Shutdown the socket.
void StopRead()
Stop the read loop.
bool Read(std::string &_data)
Read data from the socket.
std::string GetLocalAddress() const
Get the local address of this connection.
virtual ~Connection()
Destructor.
static bool ValidateIP(const std::string &_ip)
Return true if the _ip is a valid.
void AsyncRead(Handler _handler)
Peform an asyncronous read param[in] _handler Callback to invoke on received data.
Definition: Connection.hh:231
boost::function< void(const std::string &_data)> ReadCallback
The signature of a connection read callback.
Definition: Connection.hh:151
boost::function< void(const ConnectionPtr &)> AcceptCallback
The signature of a connection accept callback.
Definition: Connection.hh:142
Manages boost::asio IO.
Definition: IOManager.hh:36
Definition: TaskGroup.hh:30
auto weakBind(Func _func, boost::shared_ptr< T > _ptr, Args... _args) -> decltype(details::makeWeakBinder(boost::bind(_func, _ptr.get(), _args...), boost::weak_ptr< T >(_ptr)))
Definition: WeakBind.hh:114
#define gzerr
Output an error message.
Definition: Console.hh:50
bool is_stopped()
Is the transport system stopped?
boost::shared_ptr< Connection > ConnectionPtr
Definition: CommonTypes.hh:134
boost::shared_ptr< Connection > ConnectionPtr
Definition: Connection.hh:67
Forward declarations for the common classes.
Definition: Animation.hh:27
GAZEBO_VISIBLE bool shutdown()
Stop and cleanup simulation.