GSLAM  3.0.0
Messenger.h
1 // GSLAM - A general SLAM framework and benchmark
2 // Copyright 2018 PILAB Inc. All rights reserved.
3 // https://github.com/zdzhaoyong/GSLAM
4 //
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are met:
7 //
8 // * Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above copyright notice,
11 // this list of conditions and the following disclaimer in the documentation
12 // and/or other materials provided with the distribution.
13 // * Neither the name of Google Inc. nor the names of its contributors may be
14 // used to endorse or promote products derived from this software without
15 // specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 // POSSIBILITY OF SUCH DAMAGE.
28 //
29 // Author: zd5945@126.com (Yong Zhao)
30 //
31 // Messenger: A light-weight, efficient, thread-safe message publish and
32 // subscribe tool similar with ROS, a popular robot operating system
33 // The tool has the following features:
34 // * Header only based on c++11, no extra dependency, makes it portable.
35 // * Thread safe and support multi-thread condition notify mode by setting the queue size.
36 // * Able to transfer any classes efficiently, including ROS defined messages, which means it can replace ROS messagging or work with it.
37 
38 #ifndef GSLAM_MESSENGER_H
39 #define GSLAM_MESSENGER_H
40 
41 
42 #include <signal.h>
43 #include <functional>
44 #include <iostream>
45 #include <iomanip>
46 #include <set>
47 #include <map>
48 #include <unordered_map>
49 #include <memory>
50 #include <mutex>
51 #include <string>
52 #include <sstream>
53 #include <vector>
54 #include <atomic>
55 #include <condition_variable>
56 #include <future>
57 #include <queue>
58 #include <thread>
59 #include "Svar.h"
60 
61 #ifdef messenger
62 #undef messenger
63 #endif
64 #define messenger GSLAM::Messenger::instance()
65 
66 namespace GSLAM {
67 
68 #ifndef DOXYGEN_IGNORE_INTERNAL
69 namespace msg{
70 
71 // A simple threadpool implementation.
72 class ThreadPool {
73 public:
74  // All the threads are created upon construction.
75  explicit ThreadPool(const int num_threads) : stop(false) {
76  workerRunning.resize(num_threads, 0);
77 
78  for (size_t i = 0; i < num_threads; ++i) {
79  workers.emplace_back(&ThreadPool::threadFunc, this, i);
80  }
81  }
82 
83  ~ThreadPool() {
84  {
85  std::unique_lock<std::mutex> lock(queue_mutex);
86  stop = true;
87  }
88  condition.notify_all();
89  for (std::thread& worker : workers) worker.join();
90  }
91 
92  void threadFunc(int tid) {
93  for (;;) {
94  std::function<void()> task;
95 
96  {
97  std::unique_lock<std::mutex> lock(this->queue_mutex);
98  this->condition.wait(
99  lock, [this] { return this->stop || !this->tasks.empty(); });
100  if (this->stop && this->tasks.empty()) return;
101 
102  task = std::move(this->tasks.front());
103  this->tasks.pop();
104  }
105 
106  workerRunning[tid] = 1;
107  task();
108  workerRunning[tid] = 0;
109  }
110  }
111 
112  // Adds a task to the threadpool.
113  template <class F, class... Args>
114  auto Add(F&& f, Args&&... args)
115  -> std::future<typename std::result_of<F(Args...)>::type>;
116 
117  size_t taskNumLeft() { return tasks.size(); }
118 
119  void popTask(){
120  std::unique_lock<std::mutex> lock(queue_mutex);
121  tasks.pop();
122  }
123 
124  int isRunning(void) {
125  int nr = 0, nt = workerRunning.size();
126  for(int i=0; i<nt; i++) nr += workerRunning[i];
127 
128  if( nr == 0 ) return 0;
129  else return 1;
130  }
131 
132 private:
133  // Keep track of threads so we can join them
134  std::vector<std::thread> workers;
135  // work running status (0 - idle, 1 - working)
136  std::vector<int> workerRunning;
137  // The task queue
138  std::queue<std::function<void()> > tasks;
139 
140  // Synchronization
141  std::mutex queue_mutex;
142  std::condition_variable condition;
143  bool stop;
144 };
145 
146 // add new work item to the pool
147 template <class F, class... Args>
148 auto ThreadPool::Add(F&& f, Args&&... args)
149 -> std::future<typename std::result_of<F(Args...)>::type> {
150  using return_type = typename std::result_of<F(Args...)>::type;
151 
152  auto task = std::make_shared<std::packaged_task<return_type()> >(
153  std::bind(std::forward<F>(f), std::forward<Args>(args)...));
154 
155  std::future<return_type> res = task->get_future();
156  {
157  std::unique_lock<std::mutex> lock(queue_mutex);
158 
159  // don't allow enqueueing after stopping the pool
160  if (stop)
161  std::cerr << "The ThreadPool object has been destroyed! Cannot add more "
162  "tasks to the ThreadPool!";
163 
164  tasks.emplace([task]() { (*task)(); });
165  }
166  condition.notify_one();
167  return res;
168 }
169 
170 } // end of namespace detail
171 #endif
172 
173 class Messenger;
174 class Publisher;
175 class PubSubSpace;
176 
177 class Subscriber {
178 public:
179  Subscriber() {}
180  Subscriber(const std::string& topic, const SvarFunction& callback, size_t queue_size = 0)
181  : impl_(new Impl(topic,callback,queue_size)) {}
182  ~Subscriber() {
183  if(impl_.use_count()==2)
184  {
185  shutdown();
186  }
187  }
188 
189  /**
190  * \brief Unsubscribe the callback associated with this Subscriber
191  *
192  * This method usually does not need to be explicitly called, as automatic
193  * shutdown happens when
194  * all copies of this Subscriber go out of scope
195  *
196  * This method overrides the automatic reference counted unsubscribe, and
197  * immediately
198  * unsubscribes the callback associated with this Subscriber
199  */
200  void shutdown() ;
201 
202  std::string getTopic() const {
203  if (impl_) return impl_->topic_;
204  return "";
205  }
206 
207  std::string getTypeName() const {
208  if (impl_) return impl_->type_.as<SvarClass>().name();
209  return "";
210  }
211 
212  /**
213  * \brief Returns the number of publishers this subscriber is connected to
214  */
215  uint32_t getNumPublishers() const ;
216 
217  operator void*() const { return (impl_) ? (void*)1 : (void*)0; }
218 
219  bool operator<(const Subscriber& rhs) const { return impl_ < rhs.impl_; }
220 
221  bool operator==(const Subscriber& rhs) const { return impl_ == rhs.impl_; }
222 
223  bool operator!=(const Subscriber& rhs) const { return impl_ != rhs.impl_; }
224 
225 protected:
226  friend class Messenger;
227  friend class Publisher;
228  struct Impl {
229  Impl(const std::string& topic, const SvarFunction& callback, size_t queue_size = 0)
230  : topic_(topic),
231  callback_(callback),
232  unsubscribed_(false),
233  queue_size_(queue_size),
234  workthread_(queue_size ? new msg::ThreadPool(1) : nullptr) {
235  assert(callback.arg_types.size()==1);
236  type_=callback.arg_types[0];
237  }
238 
239  ~Impl(){workthread_.reset();}
240 
241  void publish(Svar message) const {
242  if (unsubscribed_) return;
243  if (workthread_ ){
244  if(workthread_->taskNumLeft() >= queue_size_)
245  workthread_->popTask();
246  workthread_->Add([this, message]() {
247  if (unsubscribed_) return;
248  callback_.call(message);
249  });
250  return;
251  }
252  callback_.call(message);
253  }
254 
255  std::string topic_;
256  Svar type_;
257  SvarFunction callback_;
258  bool unsubscribed_;
259  std::shared_ptr<PubSubSpace> space_;
260  size_t queue_size_;
261  std::shared_ptr<msg::ThreadPool> workthread_;
262  };
263 
264  Subscriber(std::shared_ptr<Subscriber::Impl> impl) : impl_(impl) {}
265 
266  virtual void publish(const Svar& message) const {
267  if (!impl_) return;
268  impl_->publish(message);
269  }
270  std::string key()const{return getTopic()+"#"+getTypeName();}
271 
272  std::shared_ptr<Impl> impl_;
273 };
274 
275 class Publisher {
276 public:
277  Publisher() {}
278  Publisher(const std::string& topic, const Svar& type,
279  size_t queue_size = 0)
280  :impl_(new Impl(topic,type,queue_size)){}
281 
282  virtual ~Publisher() {
283  if(impl_.use_count()==2)
284  {
285  shutdown();
286  }
287  }
288 
289  /**
290  * \brief Publish a message without a copy!
291  */
292  void publish(const Svar& message) const;
293 
294  /**
295  * \brief Shutdown the advertisement associated with this Publisher
296  *
297  * This method usually does not need to be explicitly called, as automatic
298  * shutdown happens when
299  * all copies of this Publisher go out of scope
300  *
301  * This method overrides the automatic reference counted unadvertise, and does
302  * so immediately.
303  * \note Note that if multiple advertisements were made through
304  * NodeHandle::advertise(), this will
305  * only remove the one associated with this Publisher
306  */
307  void shutdown() ;
308 
309  /**
310  * \brief Returns the topic that this Publisher will publish on.
311  */
312  std::string getTopic() const {
313  if (impl_) return impl_->topic_;
314  return "";
315  }
316 
317  /**
318  * \brief Returns the topic that this Publisher will publish on.
319  */
320  std::string getTypeName() const {
321  if (impl_) return impl_->type_.as<SvarClass>().name();
322  return "";
323  }
324 
325  /**
326  * \brief Returns the number of subscribers that are currently connected to
327  * this Publisher
328  */
329  uint32_t getNumSubscribers() const;
330 
331  operator void*() const { return (impl_) ? (void*)1 : (void*)0; }
332 
333  bool operator<(const Publisher& rhs) const { return impl_ < rhs.impl_; }
334 
335  bool operator==(const Publisher& rhs) const { return impl_ == rhs.impl_; }
336 
337  bool operator!=(const Publisher& rhs) const { return impl_ != rhs.impl_; }
338 
339 
340 protected:
341  friend class Messenger;
342  friend class Subscriber;
343 
344  struct Impl {
345  Impl(const std::string& topic, const Svar& type,
346  size_t queue_size = 0)
347  : topic_(topic),
348  type_(type),
349  queue_size_(queue_size),
350  workthread_(queue_size ? new msg::ThreadPool(1) : nullptr) {}
351 
352  std::string topic_;
353  Svar type_;
354  std::shared_ptr<PubSubSpace> space_;
355  size_t queue_size_;
356  std::shared_ptr<msg::ThreadPool> workthread_;
357  std::mutex mutex_;
358  };
359  Publisher(Impl* implement) : impl_(implement) {}
360  std::string key()const{return getTopic()+"#"+getTypeName();}
361 
362 
363  std::shared_ptr<Impl> impl_;
364 };
365 
366 #ifndef DOXYGEN_IGNORE_INTERNAL
367 struct PubSubSpace
368 {
369  std::mutex mtx_;
370  std::set<Subscriber> subs_;
371  std::set<Publisher> pubs_;
372 };
373 #endif
374 
375 /**
376  @brief A tiny class implemented ROS like Pub/Sub messaging.
377 
378 Messenger: A light-weight, efficient, thread-safe message publish and
379  subscribe tool similar with ROS, a popular robot operating system.
380 
381 The tool has the following features:
382 - Header only based on c++11, no extra dependency, makes it portable.
383 - Thread safe and support multi-thread condition notify mode by setting the queue size.
384 - Able to transfer any classes efficiently, including ROS defined messages, which means it can replace ROS messagging or work with it.
385 
386 @code
387 // example.cpp
388 #include <GSLAM/core/Messenger.h>
389 
390 using namespace GSLAM;
391 
392 int main(int argc,char** argv)
393 {
394  Subscriber sub=messenger.subscribe("topic_name",[](std::string msg){
395  std::cerr<<"Received string msg "<<msg<<std::endl;
396  });
397 
398  // publish message without creating a Publisher
399  messenger.publish("topic_name","hello world!");
400 
401  Publisher pub=messenger.advertise<std::string>("topic_name");
402  pub.publish("hello Messenger!");
403 
404  return 0;
405 }
406 @endcode
407 
408 @warning The Subscriber will auto shutdown, please hold the Subscriber instance until you want to unsubscribe.
409  */
410 class Messenger {
411 public:
412 
413  Messenger():d(new Data()){
414  d->pubNewSub=advertise<Publisher>("messenger/newsub");
415  d->pubNewPub=advertise<Subscriber>("messenger/newpub");
416  }
417  virtual ~Messenger() {}
418 
419  /// @return The Messenger instance (messenger)
420  static Messenger& instance(){
421  static std::shared_ptr<Messenger> inst(new Messenger());
422  return *inst;
423  }
424 
425  /// @brief Create a Publisher
426  /// @param topic the topic name
427  /// @param queue_size when queue_size>0, messages will be sent in another thread
428  /// @return the Publisher created
429  template <class M>
430  Publisher advertise(const std::string& topic, uint32_t queue_size = 0) {
431  if(topic.empty()) return Publisher();
432  Publisher pub(topic, SvarClass::instance<M>(),queue_size);
433  join(pub);
434  d->pubNewPub.publish(pub);
435  return pub;
436  }
437 
438  /// @brief publish a message without creating a publisher
439  /// @param topic the topic name
440  /// @param message the message will be sent in this thread
441  template <class M>
442  void publish(const std::string& topic,const M& message){
443  std::set<Subscriber> subs;
444  {
445  std::unique_lock<std::mutex> lockSpaces(d->mutex_);
446  std::shared_ptr<PubSubSpace> space=d->spaces_[topic];
447  if(!space) return;
448  if(space->subs_.empty()) return;
449  std::unique_lock<std::mutex> lockSpace(space->mtx_);
450  subs=space->subs_;
451  }
452  for(Subscriber sub:subs)
453  sub.publish(message);
454  }
455 
456  /// @brief subscribe a topic with callback function
457  /// @param topic the topic name
458  /// @param queue_size when queue_size>0, messages will be handled in a separate thread
459  /// @param callback the callback function used to handle the messages
461  const std::string& topic, uint32_t queue_size,
462  SvarFunction callback) {
463  if(topic.empty()) return Subscriber();
464  Subscriber sub(topic, callback,queue_size);
465  join(sub);
466  d->pubNewSub.publish(sub);
467  return sub;
468  }
469 
470  /// @brief subscribe a topic with callback function
471  Subscriber subscribe(const std::string& topic, SvarFunction callback,
472  uint32_t queue_size=0){
473  if(topic.empty()) return Subscriber();
474  Subscriber sub(topic, callback,queue_size);
475  join(sub);
476  d->pubNewSub.publish(sub);
477  return sub;
478  }
479 
480  /// @brief subscribe a topic with callback member function
481  template <class T, class M>
482  Subscriber subscribe(const std::string& topic, uint32_t queue_size,
483  void (T::*fp)(const std::shared_ptr<M>&), T* obj) {
484  std::function<void(const std::shared_ptr<M>&)> cbk =
485  std::bind(fp, obj, std::placeholders::_1);
486  return subscribe(topic, queue_size, cbk);
487  }
488 
489  /// @brief subscribe a topic with callback function
490  template <class M>
491  Subscriber subscribe(const std::string& topic, int queue_size,
492  void (*fp)(const M&)){
493  return subscribe(topic,queue_size,SvarFunction(fp));
494  }
495 
496  /// @brief subscribe a topic with callback member function
497  template <class T, class M>
498  Subscriber subscribe(const std::string& topic, uint32_t queue_size,
499  void (T::*fp)(const M&), T* obj) {
500  std::function<void (const M&)> func=std::bind(fp, obj, std::placeholders::_1);
501  return subscribe(topic, queue_size, [func](const M& m){return func(m);});
502  }
503 
504  /// @brief get all publishers
505  std::vector<Publisher> getPublishers()const{
506  std::unique_lock<std::mutex> lock(d->mutex_);
507  std::vector<Publisher> pubs;
508  for(auto it1:d->spaces_)
509  {
510  std::shared_ptr<PubSubSpace> space=it1.second;
511  if(!space) continue;
512  std::unique_lock<std::mutex> lock(space->mtx_);
513  pubs.insert(pubs.end(),space->pubs_.begin(),space->pubs_.end());
514  }
515  return pubs;
516  }
517 
518  /// @brief get all subscribers
519  std::vector<Subscriber> getSubscribers()const{
520  std::unique_lock<std::mutex> lock(d->mutex_);
521  std::vector<Subscriber> subs;
522  for(auto it1:d->spaces_)
523  {
524  std::shared_ptr<PubSubSpace> space=it1.second;
525  if(!space) continue;
526  std::unique_lock<std::mutex> lock(space->mtx_);
527  subs.insert(subs.end(),space->subs_.begin(),space->subs_.end());
528  }
529  return subs;
530  }
531 
532  /// @brief list all publishers and subscribers
533  std::string introduction(int width=80)const{
534  if(getPublishers().size()+getSubscribers().size()==0)
535  return "";
536  std::stringstream sst;
537  sst<<"Publisher and Subscriber lists:\n";
538  sst<<printTable({{width/5-1,"Type"},
539  {width*2/5-1,"Topic"},
540  {width*2/5,"Payload"}});
541 
542  for(int i=0;i<width;i++)
543  sst<<"-";
544  sst<<std::endl;
545 
546  std::unique_lock<std::mutex> lock(d->mutex_);
547  for(auto it1:d->spaces_)
548  {
549  std::shared_ptr<PubSubSpace> space=it1.second;
550  if(!space) continue;
551  std::unique_lock<std::mutex> lock(space->mtx_);
552  if(!space->pubs_.empty())
553  sst<<printTable({{width/5-1,"Publisher*"+std::to_string(space->pubs_.size())},
554  {width*2/5-1,space->pubs_.begin()->getTopic()},
555  {width*2/5,space->pubs_.begin()->getTypeName()}});
556 
557  if(!space->subs_.empty())
558  sst<<printTable({{width/5-1,"Subscriber*"+std::to_string(space->subs_.size())},
559  {width*2/5-1,space->subs_.begin()->getTopic()},
560  {width*2/5,space->subs_.begin()->getTypeName()}});
561  }
562  return sst.str();
563  }
564 
565  /// Let the Publisher join this Messenger space
566  void join(const Publisher& pub){
567  std::unique_lock<std::mutex> lock(d->mutex_);
568  std::shared_ptr<PubSubSpace>& space=d->spaces_[pub.getTopic()];
569  if(!space) space=std::shared_ptr<PubSubSpace>(new PubSubSpace());
570  std::unique_lock<std::mutex> lock1(space->mtx_);
571  space->pubs_.insert(pub);
572  pub.impl_->space_=space;
573  }
574 
575  /// Let the Subscriber join this Messenger space
576  void join(const Subscriber& sub){
577  std::unique_lock<std::mutex> lock(d->mutex_);
578  std::shared_ptr<PubSubSpace>& space=d->spaces_[sub.getTopic()];
579  if(!space) space=std::shared_ptr<PubSubSpace>(new PubSubSpace());
580  std::unique_lock<std::mutex> lock1(space->mtx_);
581  space->subs_.insert(sub);
582  sub.impl_->space_=space;
583  }
584 
585  /// Let all publishers and subscribers left the old space
586  void join(Messenger another){
587  for(const Publisher& pub:another.getPublishers()){
588  join(pub);
589  }
590  for(const Subscriber& sub:another.getSubscribers()){
591  join(sub);
592  }
593  }
594 
595  /// Notify and wait Subscribers to shutdown
596  static int exec(){
597  std::promise<bool> stopSig;
598  Subscriber subStop=instance().subscribe("messenger/stop",0,[&stopSig](bool b){
599  stopSig.set_value(true);
600  });
601  signal(SIGINT, [](int sig){
602  instance().publish("messenger/stop",true);
603  });
604  stopSig.get_future().wait();
605  return 0;
606  }
607 
608 private:
609  static std::string printTable(std::vector<std::pair<int,std::string> > line){
610  std::stringstream sst;
611  while(true){
612  size_t emptyCount=0;
613  for(auto& it:line){
614  size_t width=it.first;
615  std::string& str=it.second;
616  if(str.size()<=width){
617  sst<< std::setw(width)
618  <<std::setiosflags(std::ios::left)
619  <<str<<" ";
620  str.clear();
621  emptyCount++;
622  }else{
623  sst<<str.substr(0,width)<<" ";
624  str=str.substr(width);
625  }
626  }
627  sst<<std::endl;
628  if(emptyCount==line.size()) break;
629  }
630  return sst.str();
631  }
632 
633  struct Data{
634  std::mutex mutex_;
635  std::unordered_map<std::string,std::shared_ptr<PubSubSpace>> spaces_;
636  Publisher pubNewPub,pubNewSub;
637  };
638  std::shared_ptr<Data> d;
639 };
640 
641 inline void Publisher::shutdown()
642 {
643  if (!impl_) return;
644  auto space=impl_->space_;
645  std::unique_lock<std::mutex> lock(space->mtx_);
646  auto it=space->pubs_.find(*this);
647  impl_.reset();
648  space->pubs_.erase(it);
649 }
650 
651 inline uint32_t Publisher::getNumSubscribers() const {
652  if (impl_) return impl_->space_->subs_.size();
653  return 0;
654 }
655 
656 inline uint32_t Subscriber::getNumPublishers() const {
657  if (!impl_) return 0;
658  return impl_->space_->pubs_.size();
659 }
660 
661 inline void Subscriber::shutdown()
662 {
663  if (!impl_) return;
664 
665  impl_->unsubscribed_ = true;
666  auto space=impl_->space_;
667  std::unique_lock<std::mutex> lock(space->mtx_);
668  auto it=space->subs_.find(*this);
669  impl_.reset();
670  space->subs_.erase(it);
671 }
672 
673 inline void Publisher::publish(const Svar& message) const {
674  if (!impl_) return;
675 
676  std::set<Subscriber> subscribers;
677  {
678  std::unique_lock<std::mutex> lock(impl_->space_->mtx_);
679  subscribers = impl_->space_->subs_;
680  }
681 
682  if (subscribers.empty()) {
683  return;
684  }
685 
686  if (impl_->workthread_ ) {
687 
688  if(impl_->workthread_->taskNumLeft() >= impl_->queue_size_)
689  impl_->workthread_->popTask();
690  impl_->workthread_->Add([subscribers, message]() {
691  for (const Subscriber& s : subscribers) {
692  s.publish(message);
693  }
694  });
695  return;
696  }
697 
698  for (Subscriber s : subscribers) {
699  s.publish(message);
700  }
701 }
702 
703 } // end of namespace GSLAM
704 
705 #endif
Publisher advertise(const std::string &topic, uint32_t queue_size=0)
Create a Publisher.
Definition: Messenger.h:430
Definition: Messenger.h:177
std::vector< Publisher > getPublishers() const
get all publishers
Definition: Messenger.h:505
void publish(const Svar &message) const
Publish a message without a copy!
Definition: Messenger.h:673
void join(const Subscriber &sub)
Let the Subscriber join this Messenger space.
Definition: Messenger.h:576
std::vector< Subscriber > getSubscribers() const
get all subscribers
Definition: Messenger.h:519
Subscriber subscribe(const std::string &topic, int queue_size, void(*fp)(const M &))
subscribe a topic with callback function
Definition: Messenger.h:491
uint32_t getNumPublishers() const
Returns the number of publishers this subscriber is connected to.
Definition: Messenger.h:656
uint32_t getNumSubscribers() const
Returns the number of subscribers that are currently connected to this Publisher. ...
Definition: Messenger.h:651
Subscriber subscribe(const std::string &topic, SvarFunction callback, uint32_t queue_size=0)
subscribe a topic with callback function
Definition: Messenger.h:471
Definition: Camera.h:45
void shutdown()
Unsubscribe the callback associated with this Subscriber.
Definition: Messenger.h:661
void join(const Publisher &pub)
Let the Publisher join this Messenger space.
Definition: Messenger.h:566
The Svar class, A Tiny Modern C++ Header Brings Unified Interface for Different Languages.
Definition: Svar.h:561
A tiny class implemented ROS like Pub/Sub messaging.
Definition: Messenger.h:410
std::string getTopic() const
Returns the topic that this Publisher will publish on.
Definition: Messenger.h:312
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(const std::shared_ptr< M > &), T *obj)
subscribe a topic with callback member function
Definition: Messenger.h:482
Definition: Messenger.h:344
static Messenger & instance()
Definition: Messenger.h:420
Definition: Messenger.h:228
std::string getTypeName() const
Returns the topic that this Publisher will publish on.
Definition: Messenger.h:320
void join(Messenger another)
Let all publishers and subscribers left the old space.
Definition: Messenger.h:586
std::string introduction(int width=80) const
list all publishers and subscribers
Definition: Messenger.h:533
void shutdown()
Shutdown the advertisement associated with this Publisher.
Definition: Messenger.h:641
Subscriber subscribe(const std::string &topic, uint32_t queue_size, SvarFunction callback)
subscribe a topic with callback function
Definition: Messenger.h:460
Definition: Messenger.h:275
void publish(const std::string &topic, const M &message)
publish a message without creating a publisher
Definition: Messenger.h:442
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(const M &), T *obj)
subscribe a topic with callback member function
Definition: Messenger.h:498
static int exec()
Notify and wait Subscribers to shutdown.
Definition: Messenger.h:596