18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
24 #ifndef TBB_VERSION_MAJOR
25 #include <tbb/version.h>
29 #ifndef BOOST_BIND_GLOBAL_PLACEHOLDERS
30 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
33 #include <boost/bind.hpp>
34 #include <boost/enable_shared_from_this.hpp>
39 #if TBB_VERSION_MAJOR >= 2021
52 #if TBB_VERSION_MAJOR < 2021
53 class GZ_TRANSPORT_VISIBLE PublishTask :
public tbb::task
55 class GZ_TRANSPORT_VISIBLE PublishTask
62 const google::protobuf::Message &_message)
65 this->msg = _message.New();
66 this->msg->CopyFrom(_message);
69 #if TBB_VERSION_MAJOR < 2021
72 public: tbb::task *execute()
75 public:
void operator()()
const
78 this->pub->WaitForConnection();
79 this->pub->Publish(*this->msg,
true);
80 this->pub->SendMessage();
82 #if TBB_VERSION_MAJOR < 2021
92 private: google::protobuf::Message *msg;
102 class GZ_TRANSPORT_VISIBLE
Node :
103 public boost::enable_shared_from_this<Node>
119 public:
void Init(
const std::string &_space =
"");
181 public:
template<
typename M>
183 const google::protobuf::Message &_message)
186 #if TBB_VERSION_MAJOR < 2021
187 PublishTask *task =
new(tbb::task::allocate_root())
188 PublishTask(pub, _message);
190 tbb::task::enqueue(*task);
193 this->taskGroup.run<PublishTask>(pub, _message);
204 public:
template<
typename M>
206 unsigned int _queueLimit = 1000,
209 std::string decodedTopic = this->DecodeTopicName(_topic);
212 decodedTopic, _queueLimit, _hzRate);
214 boost::mutex::scoped_lock lock(this->publisherMutex);
215 publisher->SetNode(shared_from_this());
216 this->publishers.push_back(publisher);
227 public:
void Publish(
const std::string &_topic,
228 const google::protobuf::Message &_message)
231 _message.GetTypeName());
232 pub->WaitForConnection();
234 pub->Publish(_message,
true);
245 const std::string &_msgTypeName,
246 unsigned int _queueLimit = 1000,
249 std::string decodedTopic = this->DecodeTopicName(_topic);
252 decodedTopic, _msgTypeName, _queueLimit, _hzRate);
254 boost::mutex::scoped_lock lock(this->publisherMutex);
255 publisher->SetNode(shared_from_this());
256 this->publishers.push_back(publisher);
268 public:
template<
typename M,
typename T>
270 void(T::*_fp)(
const boost::shared_ptr<M const> &), T *_obj,
271 bool _latching =
false)
274 std::string decodedTopic = this->DecodeTopicName(_topic);
275 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
278 using namespace boost::placeholders;
279 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
287 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
298 public:
template<
typename M>
300 void(*_fp)(
const boost::shared_ptr<M const> &),
301 bool _latching =
false)
304 std::string decodedTopic = this->DecodeTopicName(_topic);
305 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
308 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
309 this->callbacks[decodedTopic].push_back(
316 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
330 void(T::*_fp)(
const std::string &), T *_obj,
331 bool _latching =
false)
334 std::string decodedTopic = this->DecodeTopicName(_topic);
335 ops.
Init(decodedTopic, shared_from_this(), _latching);
338 using namespace boost::placeholders;
339 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
347 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
360 void(*_fp)(
const std::string &),
bool _latching =
false)
363 std::string decodedTopic = this->DecodeTopicName(_topic);
364 ops.
Init(decodedTopic, shared_from_this(), _latching);
367 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
368 this->callbacks[decodedTopic].push_back(
375 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
385 const std::string &_msg);
400 const std::string &_msg);
414 public: std::string
GetMsgType(
const std::string &_topic)
const;
434 private:
bool PrivateInit(
const std::string &_space,
436 const bool _fallbackToDefault);
438 private: std::string topicNamespace;
439 private: std::vector<PublisherPtr> publishers;
440 private: std::vector<PublisherPtr>::iterator publishersIter;
441 private:
static unsigned int idCounter;
442 private:
unsigned int id;
444 private:
typedef std::list<CallbackHelperPtr> Callback_L;
445 private:
typedef std::map<std::string, Callback_L> Callback_M;
446 private: Callback_M callbacks;
447 private: std::map<std::string, std::list<std::string> > incomingMsgs;
450 private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
452 #if TBB_VERSION_MAJOR >= 2021
457 private: boost::mutex publisherMutex;
458 private: boost::mutex publisherDeleteMutex;
459 private: boost::recursive_mutex incomingMutex;
463 private: boost::recursive_mutex processIncomingMutex;
465 private:
bool initialized;
#define NULL
Definition: CommonTypes.hh:31
transport
Definition: ConnectionManager.hh:38
Forward declarations for transport.
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
A Time class, can be used to hold wall- or sim-time.
Definition: Time.hh:48
Callback helper Template.
Definition: CallbackHelper.hh:112
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:104
void Fini()
Finalize the node.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:299
std::string GetTopicNamespace() const
Get the topic namespace for this node.
void InsertLatchedMsg(const std::string &_topic, MessagePtr _msg)
Add a latched message to the node for publication.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:329
transport::PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:244
bool IsInitialized() const
Check if this Node has been initialized.
unsigned int GetId() const
Get the unique ID of the node.
bool HandleMessage(const std::string &_topic, MessagePtr _msg)
Handle incoming msg.
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:205
void ProcessIncoming()
Process incoming messages.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:227
std::string DecodeTopicName(const std::string &_topic)
Decode a topic name.
bool HasLatchedSubscriber(const std::string &_topic) const
Return true if a subscriber on a specific topic is latched.
void Init(const std::string &_space="")
Init the node.
void InsertLatchedMsg(const std::string &_topic, const std::string &_msg)
Add a latched message to the node for publication.
std::string GetMsgType(const std::string &_topic) const
Get the message type for a topic.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:269
virtual ~Node()
Destructor.
bool HandleData(const std::string &_topic, const std::string &_msg)
Handle incoming data.
void RemoveCallback(const std::string &_topic, unsigned int _id)
void ProcessPublishers()
Process all publishers, which has each publisher send it's most recent message over the wire.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:182
bool TryInit(const common::Time &_maxWait=common::Time(1, 0))
Try to initialize the node to use the global namespace, and specify the maximum wait time.
std::string EncodeTopicName(const std::string &_topic)
Encode a topic name.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:359
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:178
Options for a subscription.
Definition: SubscribeOptions.hh:36
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
Definition: TaskGroup.hh:30
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
Forward declarations for the common classes.
Definition: Animation.hh:27