38 #ifndef GSLAM_MESSENGER_H 39 #define GSLAM_MESSENGER_H 48 #include <unordered_map> 55 #include <condition_variable> 64 #define messenger GSLAM::Messenger::instance() 68 #ifndef DOXYGEN_IGNORE_INTERNAL 75 explicit ThreadPool(
const int num_threads) : stop(false) {
76 workerRunning.resize(num_threads, 0);
78 for (
size_t i = 0; i < num_threads; ++i) {
79 workers.emplace_back(&ThreadPool::threadFunc,
this, i);
85 std::unique_lock<std::mutex> lock(queue_mutex);
88 condition.notify_all();
89 for (std::thread& worker : workers) worker.join();
92 void threadFunc(
int tid) {
94 std::function<void()> task;
97 std::unique_lock<std::mutex> lock(this->queue_mutex);
99 lock, [
this] {
return this->stop || !this->tasks.empty(); });
100 if (this->stop && this->tasks.empty())
return;
102 task = std::move(this->tasks.front());
106 workerRunning[tid] = 1;
108 workerRunning[tid] = 0;
113 template <
class F,
class... Args>
114 auto Add(F&& f, Args&&... args)
115 -> std::future<
typename std::result_of<F(Args...)>::type>;
117 size_t taskNumLeft() {
return tasks.size(); }
120 std::unique_lock<std::mutex> lock(queue_mutex);
124 int isRunning(
void) {
125 int nr = 0, nt = workerRunning.size();
126 for(
int i=0; i<nt; i++) nr += workerRunning[i];
128 if( nr == 0 )
return 0;
134 std::vector<std::thread> workers;
136 std::vector<int> workerRunning;
138 std::queue<std::function<void()> > tasks;
141 std::mutex queue_mutex;
142 std::condition_variable condition;
147 template <
class F,
class... Args>
148 auto ThreadPool::Add(F&& f, Args&&... args)
149 -> std::future<
typename std::result_of<F(Args...)>::type> {
150 using return_type =
typename std::result_of<F(Args...)>::type;
152 auto task = std::make_shared<std::packaged_task<return_type()> >(
153 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
155 std::future<return_type> res = task->get_future();
157 std::unique_lock<std::mutex> lock(queue_mutex);
161 std::cerr <<
"The ThreadPool object has been destroyed! Cannot add more " 162 "tasks to the ThreadPool!";
164 tasks.emplace([task]() { (*task)(); });
166 condition.notify_one();
180 Subscriber(
const std::string& topic,
const SvarFunction& callback,
size_t queue_size = 0)
181 : impl_(
new Impl(topic,callback,queue_size)) {}
183 if(impl_.use_count()==2)
202 std::string getTopic()
const {
203 if (impl_)
return impl_->topic_;
207 std::string getTypeName()
const {
208 if (impl_)
return impl_->type_.as<SvarClass>().name();
215 uint32_t getNumPublishers()
const ;
217 operator void*()
const {
return (impl_) ? (
void*)1 : (
void*)0; }
219 bool operator<(
const Subscriber& rhs)
const {
return impl_ < rhs.impl_; }
221 bool operator==(
const Subscriber& rhs)
const {
return impl_ == rhs.impl_; }
223 bool operator!=(
const Subscriber& rhs)
const {
return impl_ != rhs.impl_; }
229 Impl(
const std::string& topic,
const SvarFunction& callback,
size_t queue_size = 0)
232 unsubscribed_(
false),
233 queue_size_(queue_size),
234 workthread_(queue_size ?
new msg::ThreadPool(1) :
nullptr) {
235 assert(callback.arg_types.size()==1);
236 type_=callback.arg_types[0];
239 ~
Impl(){workthread_.reset();}
241 void publish(
Svar message)
const {
242 if (unsubscribed_)
return;
244 if(workthread_->taskNumLeft() >= queue_size_)
245 workthread_->popTask();
246 workthread_->Add([
this, message]() {
247 if (unsubscribed_)
return;
248 callback_.call(message);
252 callback_.call(message);
257 SvarFunction callback_;
259 std::shared_ptr<PubSubSpace> space_;
261 std::shared_ptr<msg::ThreadPool> workthread_;
264 Subscriber(std::shared_ptr<Subscriber::Impl> impl) : impl_(impl) {}
266 virtual void publish(
const Svar& message)
const {
268 impl_->publish(message);
270 std::string key()
const{
return getTopic()+
"#"+getTypeName();}
272 std::shared_ptr<Impl> impl_;
279 size_t queue_size = 0)
280 :impl_(
new Impl(topic,type,queue_size)){}
283 if(impl_.use_count()==2)
292 void publish(
const Svar& message)
const;
313 if (impl_)
return impl_->topic_;
321 if (impl_)
return impl_->type_.as<SvarClass>().name();
329 uint32_t getNumSubscribers()
const;
331 operator void*()
const {
return (impl_) ? (
void*)1 : (
void*)0; }
333 bool operator<(
const Publisher& rhs)
const {
return impl_ < rhs.impl_; }
335 bool operator==(
const Publisher& rhs)
const {
return impl_ == rhs.impl_; }
337 bool operator!=(
const Publisher& rhs)
const {
return impl_ != rhs.impl_; }
345 Impl(
const std::string& topic,
const Svar& type,
346 size_t queue_size = 0)
349 queue_size_(queue_size),
350 workthread_(queue_size ?
new msg::ThreadPool(1) :
nullptr) {}
354 std::shared_ptr<PubSubSpace> space_;
356 std::shared_ptr<msg::ThreadPool> workthread_;
360 std::string key()
const{
return getTopic()+
"#"+getTypeName();}
363 std::shared_ptr<Impl> impl_;
366 #ifndef DOXYGEN_IGNORE_INTERNAL 370 std::set<Subscriber> subs_;
371 std::set<Publisher> pubs_;
414 d->pubNewSub=advertise<Publisher>(
"messenger/newsub");
415 d->pubNewPub=advertise<Subscriber>(
"messenger/newpub");
421 static std::shared_ptr<Messenger> inst(
new Messenger());
432 Publisher pub(topic, SvarClass::instance<M>(),queue_size);
434 d->pubNewPub.publish(pub);
442 void publish(
const std::string& topic,
const M& message){
443 std::set<Subscriber> subs;
445 std::unique_lock<std::mutex> lockSpaces(d->mutex_);
446 std::shared_ptr<PubSubSpace> space=d->spaces_[topic];
448 if(space->subs_.empty())
return;
449 std::unique_lock<std::mutex> lockSpace(space->mtx_);
453 sub.publish(message);
461 const std::string& topic, uint32_t queue_size,
462 SvarFunction callback) {
466 d->pubNewSub.publish(sub);
472 uint32_t queue_size=0){
476 d->pubNewSub.publish(sub);
481 template <
class T,
class M>
483 void (T::*fp)(
const std::shared_ptr<M>&), T* obj) {
484 std::function<void(const std::shared_ptr<M>&)> cbk =
485 std::bind(fp, obj, std::placeholders::_1);
486 return subscribe(topic, queue_size, cbk);
492 void (*fp)(
const M&)){
493 return subscribe(topic,queue_size,SvarFunction(fp));
497 template <
class T,
class M>
499 void (T::*fp)(
const M&), T* obj) {
500 std::function<void (const M&)> func=std::bind(fp, obj, std::placeholders::_1);
501 return subscribe(topic, queue_size, [func](
const M& m){
return func(m);});
506 std::unique_lock<std::mutex> lock(d->mutex_);
507 std::vector<Publisher> pubs;
508 for(
auto it1:d->spaces_)
510 std::shared_ptr<PubSubSpace> space=it1.second;
512 std::unique_lock<std::mutex> lock(space->mtx_);
513 pubs.insert(pubs.end(),space->pubs_.begin(),space->pubs_.end());
520 std::unique_lock<std::mutex> lock(d->mutex_);
521 std::vector<Subscriber> subs;
522 for(
auto it1:d->spaces_)
524 std::shared_ptr<PubSubSpace> space=it1.second;
526 std::unique_lock<std::mutex> lock(space->mtx_);
527 subs.insert(subs.end(),space->subs_.begin(),space->subs_.end());
534 if(getPublishers().size()+getSubscribers().size()==0)
536 std::stringstream sst;
537 sst<<
"Publisher and Subscriber lists:\n";
538 sst<<printTable({{width/5-1,
"Type"},
539 {width*2/5-1,
"Topic"},
540 {width*2/5,
"Payload"}});
542 for(
int i=0;i<width;i++)
546 std::unique_lock<std::mutex> lock(d->mutex_);
547 for(
auto it1:d->spaces_)
549 std::shared_ptr<PubSubSpace> space=it1.second;
551 std::unique_lock<std::mutex> lock(space->mtx_);
552 if(!space->pubs_.empty())
553 sst<<printTable({{width/5-1,
"Publisher*"+std::to_string(space->pubs_.size())},
554 {width*2/5-1,space->pubs_.begin()->getTopic()},
555 {width*2/5,space->pubs_.begin()->getTypeName()}});
557 if(!space->subs_.empty())
558 sst<<printTable({{width/5-1,
"Subscriber*"+std::to_string(space->subs_.size())},
559 {width*2/5-1,space->subs_.begin()->getTopic()},
560 {width*2/5,space->subs_.begin()->getTypeName()}});
567 std::unique_lock<std::mutex> lock(d->mutex_);
568 std::shared_ptr<PubSubSpace>& space=d->spaces_[pub.
getTopic()];
569 if(!space) space=std::shared_ptr<PubSubSpace>(
new PubSubSpace());
570 std::unique_lock<std::mutex> lock1(space->mtx_);
571 space->pubs_.insert(pub);
572 pub.impl_->space_=space;
577 std::unique_lock<std::mutex> lock(d->mutex_);
578 std::shared_ptr<PubSubSpace>& space=d->spaces_[sub.getTopic()];
579 if(!space) space=std::shared_ptr<PubSubSpace>(
new PubSubSpace());
580 std::unique_lock<std::mutex> lock1(space->mtx_);
581 space->subs_.insert(sub);
582 sub.impl_->space_=space;
597 std::promise<bool> stopSig;
598 Subscriber subStop=instance().subscribe(
"messenger/stop",0,[&stopSig](
bool b){
599 stopSig.set_value(
true);
601 signal(SIGINT, [](
int sig){
602 instance().publish(
"messenger/stop",
true);
604 stopSig.get_future().wait();
609 static std::string printTable(std::vector<std::pair<int,std::string> > line){
610 std::stringstream sst;
614 size_t width=it.first;
615 std::string& str=it.second;
616 if(str.size()<=width){
617 sst<< std::setw(width)
618 <<std::setiosflags(std::ios::left)
623 sst<<str.substr(0,width)<<
" ";
624 str=str.substr(width);
628 if(emptyCount==line.size())
break;
635 std::unordered_map<std::string,std::shared_ptr<PubSubSpace>> spaces_;
638 std::shared_ptr<Data> d;
644 auto space=impl_->space_;
645 std::unique_lock<std::mutex> lock(space->mtx_);
646 auto it=space->pubs_.find(*
this);
648 space->pubs_.erase(it);
652 if (impl_)
return impl_->space_->subs_.size();
657 if (!impl_)
return 0;
658 return impl_->space_->pubs_.size();
665 impl_->unsubscribed_ =
true;
666 auto space=impl_->space_;
667 std::unique_lock<std::mutex> lock(space->mtx_);
668 auto it=space->subs_.find(*
this);
670 space->subs_.erase(it);
676 std::set<Subscriber> subscribers;
678 std::unique_lock<std::mutex> lock(impl_->space_->mtx_);
679 subscribers = impl_->space_->subs_;
682 if (subscribers.empty()) {
686 if (impl_->workthread_ ) {
688 if(impl_->workthread_->taskNumLeft() >= impl_->queue_size_)
689 impl_->workthread_->popTask();
690 impl_->workthread_->Add([subscribers, message]() {
Publisher advertise(const std::string &topic, uint32_t queue_size=0)
Create a Publisher.
Definition: Messenger.h:430
Definition: Messenger.h:177
std::vector< Publisher > getPublishers() const
get all publishers
Definition: Messenger.h:505
void publish(const Svar &message) const
Publish a message without a copy!
Definition: Messenger.h:673
void join(const Subscriber &sub)
Let the Subscriber join this Messenger space.
Definition: Messenger.h:576
std::vector< Subscriber > getSubscribers() const
get all subscribers
Definition: Messenger.h:519
Subscriber subscribe(const std::string &topic, int queue_size, void(*fp)(const M &))
subscribe a topic with callback function
Definition: Messenger.h:491
uint32_t getNumPublishers() const
Returns the number of publishers this subscriber is connected to.
Definition: Messenger.h:656
uint32_t getNumSubscribers() const
Returns the number of subscribers that are currently connected to this Publisher. ...
Definition: Messenger.h:651
Subscriber subscribe(const std::string &topic, SvarFunction callback, uint32_t queue_size=0)
subscribe a topic with callback function
Definition: Messenger.h:471
void shutdown()
Unsubscribe the callback associated with this Subscriber.
Definition: Messenger.h:661
void join(const Publisher &pub)
Let the Publisher join this Messenger space.
Definition: Messenger.h:566
The Svar class, A Tiny Modern C++ Header Brings Unified Interface for Different Languages.
Definition: Svar.h:561
A tiny class implemented ROS like Pub/Sub messaging.
Definition: Messenger.h:410
std::string getTopic() const
Returns the topic that this Publisher will publish on.
Definition: Messenger.h:312
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(const std::shared_ptr< M > &), T *obj)
subscribe a topic with callback member function
Definition: Messenger.h:482
Definition: Messenger.h:344
static Messenger & instance()
Definition: Messenger.h:420
Definition: Messenger.h:228
std::string getTypeName() const
Returns the topic that this Publisher will publish on.
Definition: Messenger.h:320
void join(Messenger another)
Let all publishers and subscribers left the old space.
Definition: Messenger.h:586
std::string introduction(int width=80) const
list all publishers and subscribers
Definition: Messenger.h:533
void shutdown()
Shutdown the advertisement associated with this Publisher.
Definition: Messenger.h:641
Subscriber subscribe(const std::string &topic, uint32_t queue_size, SvarFunction callback)
subscribe a topic with callback function
Definition: Messenger.h:460
Definition: Messenger.h:275
void publish(const std::string &topic, const M &message)
publish a message without creating a publisher
Definition: Messenger.h:442
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(const M &), T *obj)
subscribe a topic with callback member function
Definition: Messenger.h:498
static int exec()
Notify and wait Subscribers to shutdown.
Definition: Messenger.h:596