Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
20 
21 #include <tbb/task.h>
22 
23 // This fixes compiler warnings, see #3147 and #3160
24 #ifndef BOOST_BIND_GLOBAL_PLACEHOLDERS
25 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
26 #endif
27 #include <boost/bind.hpp>
28 #include <boost/enable_shared_from_this.hpp>
29 #include <map>
30 #include <list>
31 #include <string>
32 #include <vector>
33 
36 #include "gazebo/util/system.hh"
37 
38 namespace gazebo
39 {
40  namespace transport
41  {
44  class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
45  {
49  public: PublishTask(transport::PublisherPtr _pub,
50  const google::protobuf::Message &_message)
51  : pub(_pub)
52  {
53  this->msg = _message.New();
54  this->msg->CopyFrom(_message);
55  }
56 
59  public: tbb::task *execute()
60  {
61  this->pub->WaitForConnection();
62  this->pub->Publish(*this->msg, true);
63  this->pub->SendMessage();
64  delete this->msg;
65  this->pub.reset();
66  return NULL;
67  }
68 
70  private: transport::PublisherPtr pub;
71 
73  private: google::protobuf::Message *msg;
74  };
76 
79 
83  class GZ_TRANSPORT_VISIBLE Node :
84  public boost::enable_shared_from_this<Node>
85  {
87  public: Node();
88 
90  public: virtual ~Node();
91 
100  public: void Init(const std::string &_space ="");
101 
113  public: bool TryInit(
114  const common::Time &_maxWait = common::Time(1, 0));
115 
120  public: bool IsInitialized() const;
121 
123  public: void Fini();
124 
127  public: std::string GetTopicNamespace() const;
128 
132  public: std::string DecodeTopicName(const std::string &_topic);
133 
137  public: std::string EncodeTopicName(const std::string &_topic);
138 
141  public: unsigned int GetId() const;
142 
145  public: void ProcessPublishers();
146 
148  public: void ProcessIncoming();
149 
153  public: bool HasLatchedSubscriber(const std::string &_topic) const;
154 
155 
162  public: template<typename M>
163  void Publish(const std::string &_topic,
164  const google::protobuf::Message &_message)
165  {
166  transport::PublisherPtr pub = this->Advertise<M>(_topic);
167  PublishTask *task = new(tbb::task::allocate_root())
168  PublishTask(pub, _message);
169 
170  tbb::task::enqueue(*task);
171  return;
172  }
173 
181  public: template<typename M>
182  transport::PublisherPtr Advertise(const std::string &_topic,
183  unsigned int _queueLimit = 1000,
184  double _hzRate = 0)
185  {
186  std::string decodedTopic = this->DecodeTopicName(_topic);
187  PublisherPtr publisher =
188  transport::TopicManager::Instance()->Advertise<M>(
189  decodedTopic, _queueLimit, _hzRate);
190 
191  boost::mutex::scoped_lock lock(this->publisherMutex);
192  publisher->SetNode(shared_from_this());
193  this->publishers.push_back(publisher);
194 
195  return publisher;
196  }
197 
204  public: void Publish(const std::string &_topic,
205  const google::protobuf::Message &_message)
206  {
207  transport::PublisherPtr pub = this->Advertise(_topic,
208  _message.GetTypeName());
209  pub->WaitForConnection();
210 
211  pub->Publish(_message, true);
212  }
213 
221  public: transport::PublisherPtr Advertise(const std::string &_topic,
222  const std::string &_msgTypeName,
223  unsigned int _queueLimit = 1000,
224  double _hzRate = 0)
225  {
226  std::string decodedTopic = this->DecodeTopicName(_topic);
227  PublisherPtr publisher =
229  decodedTopic, _msgTypeName, _queueLimit, _hzRate);
230 
231  boost::mutex::scoped_lock lock(this->publisherMutex);
232  publisher->SetNode(shared_from_this());
233  this->publishers.push_back(publisher);
234 
235  return publisher;
236  }
237 
245  public: template<typename M, typename T>
246  SubscriberPtr Subscribe(const std::string &_topic,
247  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
248  bool _latching = false)
249  {
250  SubscribeOptions ops;
251  std::string decodedTopic = this->DecodeTopicName(_topic);
252  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
253 
254  {
255  using namespace boost::placeholders;
256  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
257  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
258  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
259  }
260 
261  SubscriberPtr result =
262  transport::TopicManager::Instance()->Subscribe(ops);
263 
264  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
265 
266  return result;
267  }
268 
275  public: template<typename M>
276  SubscriberPtr Subscribe(const std::string &_topic,
277  void(*_fp)(const boost::shared_ptr<M const> &),
278  bool _latching = false)
279  {
280  SubscribeOptions ops;
281  std::string decodedTopic = this->DecodeTopicName(_topic);
282  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
283 
284  {
285  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
286  this->callbacks[decodedTopic].push_back(
287  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
288  }
289 
290  SubscriberPtr result =
291  transport::TopicManager::Instance()->Subscribe(ops);
292 
293  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
294 
295  return result;
296  }
297 
305  template<typename T>
306  SubscriberPtr Subscribe(const std::string &_topic,
307  void(T::*_fp)(const std::string &), T *_obj,
308  bool _latching = false)
309  {
310  SubscribeOptions ops;
311  std::string decodedTopic = this->DecodeTopicName(_topic);
312  ops.Init(decodedTopic, shared_from_this(), _latching);
313 
314  {
315  using namespace boost::placeholders;
316  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
317  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
318  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
319  }
320 
321  SubscriberPtr result =
322  transport::TopicManager::Instance()->Subscribe(ops);
323 
324  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
325 
326  return result;
327  }
328 
329 
336  SubscriberPtr Subscribe(const std::string &_topic,
337  void(*_fp)(const std::string &), bool _latching = false)
338  {
339  SubscribeOptions ops;
340  std::string decodedTopic = this->DecodeTopicName(_topic);
341  ops.Init(decodedTopic, shared_from_this(), _latching);
342 
343  {
344  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
345  this->callbacks[decodedTopic].push_back(
347  }
348 
349  SubscriberPtr result =
350  transport::TopicManager::Instance()->Subscribe(ops);
351 
352  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
353 
354  return result;
355  }
356 
361  public: bool HandleData(const std::string &_topic,
362  const std::string &_msg);
363 
368  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
369 
376  public: void InsertLatchedMsg(const std::string &_topic,
377  const std::string &_msg);
378 
385  public: void InsertLatchedMsg(const std::string &_topic,
386  MessagePtr _msg);
387 
391  public: std::string GetMsgType(const std::string &_topic) const;
392 
398  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
399 
411  private: bool PrivateInit(const std::string &_space,
412  const common::Time &_maxWait,
413  const bool _fallbackToDefault);
414 
415  private: std::string topicNamespace;
416  private: std::vector<PublisherPtr> publishers;
417  private: std::vector<PublisherPtr>::iterator publishersIter;
418  private: static unsigned int idCounter;
419  private: unsigned int id;
420 
421  private: typedef std::list<CallbackHelperPtr> Callback_L;
422  private: typedef std::map<std::string, Callback_L> Callback_M;
423  private: Callback_M callbacks;
424  private: std::map<std::string, std::list<std::string> > incomingMsgs;
425 
427  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
428 
429  private: boost::mutex publisherMutex;
430  private: boost::mutex publisherDeleteMutex;
431  private: boost::recursive_mutex incomingMutex;
432 
435  private: boost::recursive_mutex processIncomingMutex;
436 
437  private: bool initialized;
438  };
440  }
441 }
442 #endif
#define NULL
Definition: CommonTypes.hh:31
transport
Definition: ConnectionManager.hh:35
Forward declarations for transport.
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
A Time class, can be used to hold wall- or sim-time.
Definition: Time.hh:48
Callback helper Template.
Definition: CallbackHelper.hh:112
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:85
void Fini()
Finalize the node.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:276
std::string GetTopicNamespace() const
Get the topic namespace for this node.
void InsertLatchedMsg(const std::string &_topic, MessagePtr _msg)
Add a latched message to the node for publication.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:306
transport::PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:221
bool IsInitialized() const
Check if this Node has been initialized.
unsigned int GetId() const
Get the unique ID of the node.
bool HandleMessage(const std::string &_topic, MessagePtr _msg)
Handle incoming msg.
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:182
void ProcessIncoming()
Process incoming messages.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:204
std::string DecodeTopicName(const std::string &_topic)
Decode a topic name.
bool HasLatchedSubscriber(const std::string &_topic) const
Return true if a subscriber on a specific topic is latched.
void Init(const std::string &_space="")
Init the node.
void InsertLatchedMsg(const std::string &_topic, const std::string &_msg)
Add a latched message to the node for publication.
std::string GetMsgType(const std::string &_topic) const
Get the message type for a topic.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:246
virtual ~Node()
Destructor.
bool HandleData(const std::string &_topic, const std::string &_msg)
Handle incoming data.
void RemoveCallback(const std::string &_topic, unsigned int _id)
void ProcessPublishers()
Process all publishers, which has each publisher send it's most recent message over the wire.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:163
bool TryInit(const common::Time &_maxWait=common::Time(1, 0))
Try to initialize the node to use the global namespace, and specify the maximum wait time.
std::string EncodeTopicName(const std::string &_topic)
Encode a topic name.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:336
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:178
Options for a subscription.
Definition: SubscribeOptions.hh:36
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
Forward declarations for the common classes.
Definition: Animation.hh:27