Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
20 
21 #include <tbb/task.h>
22 #include <boost/bind.hpp>
23 #include <boost/enable_shared_from_this.hpp>
24 #include <map>
25 #include <list>
26 #include <string>
27 #include <vector>
28 
31 #include "gazebo/util/system.hh"
32 
33 namespace gazebo
34 {
35  namespace transport
36  {
39  class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
40  {
44  public: PublishTask(transport::PublisherPtr _pub,
45  const google::protobuf::Message &_message)
46  : pub(_pub)
47  {
48  this->msg = _message.New();
49  this->msg->CopyFrom(_message);
50  }
51 
54  public: tbb::task *execute()
55  {
56  this->pub->WaitForConnection();
57  this->pub->Publish(*this->msg, true);
58  this->pub->SendMessage();
59  delete this->msg;
60  this->pub.reset();
61  return NULL;
62  }
63 
65  private: transport::PublisherPtr pub;
66 
68  private: google::protobuf::Message *msg;
69  };
71 
74 
78  class GZ_TRANSPORT_VISIBLE Node :
79  public boost::enable_shared_from_this<Node>
80  {
82  public: Node();
83 
85  public: virtual ~Node();
86 
95  public: void Init(const std::string &_space ="");
96 
108  public: bool TryInit(
109  const common::Time &_maxWait = common::Time(1, 0));
110 
115  public: bool IsInitialized() const;
116 
118  public: void Fini();
119 
122  public: std::string GetTopicNamespace() const;
123 
127  public: std::string DecodeTopicName(const std::string &_topic);
128 
132  public: std::string EncodeTopicName(const std::string &_topic);
133 
136  public: unsigned int GetId() const;
137 
140  public: void ProcessPublishers();
141 
143  public: void ProcessIncoming();
144 
148  public: bool HasLatchedSubscriber(const std::string &_topic) const;
149 
150 
157  public: template<typename M>
158  void Publish(const std::string &_topic,
159  const google::protobuf::Message &_message)
160  {
161  transport::PublisherPtr pub = this->Advertise<M>(_topic);
162  PublishTask *task = new(tbb::task::allocate_root())
163  PublishTask(pub, _message);
164 
165  tbb::task::enqueue(*task);
166  return;
167  }
168 
176  public: template<typename M>
177  transport::PublisherPtr Advertise(const std::string &_topic,
178  unsigned int _queueLimit = 1000,
179  double _hzRate = 0)
180  {
181  std::string decodedTopic = this->DecodeTopicName(_topic);
182  PublisherPtr publisher =
183  transport::TopicManager::Instance()->Advertise<M>(
184  decodedTopic, _queueLimit, _hzRate);
185 
186  boost::mutex::scoped_lock lock(this->publisherMutex);
187  publisher->SetNode(shared_from_this());
188  this->publishers.push_back(publisher);
189 
190  return publisher;
191  }
192 
199  public: void Publish(const std::string &_topic,
200  const google::protobuf::Message &_message)
201  {
202  transport::PublisherPtr pub = this->Advertise(_topic,
203  _message.GetTypeName());
204  pub->WaitForConnection();
205 
206  pub->Publish(_message, true);
207  }
208 
216  public: transport::PublisherPtr Advertise(const std::string &_topic,
217  const std::string &_msgTypeName,
218  unsigned int _queueLimit = 1000,
219  double _hzRate = 0)
220  {
221  std::string decodedTopic = this->DecodeTopicName(_topic);
222  PublisherPtr publisher =
224  decodedTopic, _msgTypeName, _queueLimit, _hzRate);
225 
226  boost::mutex::scoped_lock lock(this->publisherMutex);
227  publisher->SetNode(shared_from_this());
228  this->publishers.push_back(publisher);
229 
230  return publisher;
231  }
232 
240  public: template<typename M, typename T>
241  SubscriberPtr Subscribe(const std::string &_topic,
242  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
243  bool _latching = false)
244  {
245  SubscribeOptions ops;
246  std::string decodedTopic = this->DecodeTopicName(_topic);
247  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
248 
249  {
250  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
251  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
252  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
253  }
254 
255  SubscriberPtr result =
256  transport::TopicManager::Instance()->Subscribe(ops);
257 
258  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
259 
260  return result;
261  }
262 
269  public: template<typename M>
270  SubscriberPtr Subscribe(const std::string &_topic,
271  void(*_fp)(const boost::shared_ptr<M const> &),
272  bool _latching = false)
273  {
274  SubscribeOptions ops;
275  std::string decodedTopic = this->DecodeTopicName(_topic);
276  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
277 
278  {
279  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
280  this->callbacks[decodedTopic].push_back(
281  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
282  }
283 
284  SubscriberPtr result =
285  transport::TopicManager::Instance()->Subscribe(ops);
286 
287  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
288 
289  return result;
290  }
291 
299  template<typename T>
300  SubscriberPtr Subscribe(const std::string &_topic,
301  void(T::*_fp)(const std::string &), T *_obj,
302  bool _latching = false)
303  {
304  SubscribeOptions ops;
305  std::string decodedTopic = this->DecodeTopicName(_topic);
306  ops.Init(decodedTopic, shared_from_this(), _latching);
307 
308  {
309  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
310  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
311  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
312  }
313 
314  SubscriberPtr result =
315  transport::TopicManager::Instance()->Subscribe(ops);
316 
317  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
318 
319  return result;
320  }
321 
322 
329  SubscriberPtr Subscribe(const std::string &_topic,
330  void(*_fp)(const std::string &), bool _latching = false)
331  {
332  SubscribeOptions ops;
333  std::string decodedTopic = this->DecodeTopicName(_topic);
334  ops.Init(decodedTopic, shared_from_this(), _latching);
335 
336  {
337  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
338  this->callbacks[decodedTopic].push_back(
340  }
341 
342  SubscriberPtr result =
343  transport::TopicManager::Instance()->Subscribe(ops);
344 
345  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
346 
347  return result;
348  }
349 
354  public: bool HandleData(const std::string &_topic,
355  const std::string &_msg);
356 
361  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
362 
369  public: void InsertLatchedMsg(const std::string &_topic,
370  const std::string &_msg);
371 
378  public: void InsertLatchedMsg(const std::string &_topic,
379  MessagePtr _msg);
380 
384  public: std::string GetMsgType(const std::string &_topic) const;
385 
391  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
392 
404  private: bool PrivateInit(const std::string &_space,
405  const common::Time &_maxWait,
406  const bool _fallbackToDefault);
407 
408  private: std::string topicNamespace;
409  private: std::vector<PublisherPtr> publishers;
410  private: std::vector<PublisherPtr>::iterator publishersIter;
411  private: static unsigned int idCounter;
412  private: unsigned int id;
413 
414  private: typedef std::list<CallbackHelperPtr> Callback_L;
415  private: typedef std::map<std::string, Callback_L> Callback_M;
416  private: Callback_M callbacks;
417  private: std::map<std::string, std::list<std::string> > incomingMsgs;
418 
420  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
421 
422  private: boost::mutex publisherMutex;
423  private: boost::mutex publisherDeleteMutex;
424  private: boost::recursive_mutex incomingMutex;
425 
428  private: boost::recursive_mutex processIncomingMutex;
429 
430  private: bool initialized;
431  };
433  }
434 }
435 #endif
void ProcessIncoming()
Process incoming messages.
transport::PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:216
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:270
Node()
Constructor.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:199
Options for a subscription.
Definition: SubscribeOptions.hh:35
virtual ~Node()
Destructor.
Forward declarations for the common classes.
Definition: Animation.hh:26
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
A Time class, can be used to hold wall- or sim-time. stored as sec and nano-sec.
Definition: Time.hh:47
#define NULL
Definition: CommonTypes.hh:31
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
bool HandleMessage(const std::string &_topic, MessagePtr _msg)
Handle incoming msg.
std::string GetMsgType(const std::string &_topic) const
Get the message type for a topic.
Forward declarations for transport.
bool HandleData(const std::string &_topic, const std::string &_msg)
Handle incoming data.
std::string EncodeTopicName(const std::string &_topic)
Encode a topic name.
void InsertLatchedMsg(const std::string &_topic, const std::string &_msg)
Add a latched message to the node for publication.
std::string DecodeTopicName(const std::string &_topic)
Decode a topic name.
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:158
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:329
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:78
unsigned int GetId() const
Get the unique ID of the node.
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message.
bool TryInit(const common::Time &_maxWait=common::Time(1, 0))
Try to initialize the node to use the global namespace, and specify the maximum wait time.
Callback helper Template.
Definition: CallbackHelper.hh:111
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
void RemoveCallback(const std::string &_topic, unsigned int _id)
void Init(const std::string &_space="")
Init the node.
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
transport
Definition: ConnectionManager.hh:35
void ProcessPublishers()
Process all publishers, which has each publisher send it's most recent message over the wire.
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:177
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:300
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
std::string GetTopicNamespace() const
Get the topic namespace for this node.
bool HasLatchedSubscriber(const std::string &_topic) const
Return true if a subscriber on a specific topic is latched.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:241
void Fini()
Finalize the node.
bool IsInitialized() const
Check if this Node has been initialized.
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:177