patterncppMinor
Yet another event dispatcher in c++11
Viewed 0 times
yetanotherdispatcherevent
Problem
I've written an event dispatcher for a private project which task is to collect events from different threads and dispatch them in a single thread:
```
namespace events
{
namespace
{
// unfortunately make_unique is not part of c++11 so we use our own
template
std::unique_ptr make_unique(Args &&... args)
{
return std::unique_ptr(new T(std::forward(args)...));
}
}
// Dispatcher for events posted from arbitrary threads
// Users can register functions that listen for certain events. When such events are posted from
// arbitary threads, a single dispatcher thread will sequentially call the registered callbacks.
template
class dispatcher
{
public:
dispatcher() : running_(false), ready_(false) {}
// non copyable
dispatcher(dispatcher const &) = delete;
dispatcher &operator=(dispatcher) = delete;
// but movable
dispatcher(dispatcher &&) = default;
// Needs to be called once for dispatching to start
// It is possible to register callbacks and post events before this function has been called
void start()
{
std::unique_lock lock(mutex_);
running_ = true;
thread_ = make_unique(&dispatcher::worker, this);
// do not return until thread is ready for processing events
condition_.wait(lock, [this]() { return !ready_; });
}
// Stops processing of events and joins dispatching thread
// Depending on timing, events already queued may still be executed. The queue is not cleared
// after stopping, so it is possible to use stop()/start() sequences to pause execution
// After stop() returns it is ensured that no callbacks will be executed
void stop()
{
{
std::unique_lock lock(mutex_);
running_ = false;
}
condition_.notify_one();
thread_->join();
}
using callback_t = std::function;
// Register a function as an event listener
// It is allowed to register multiple callbacks for the same event id
void register_event(int event_id, callback_t function)
{
std::unique_lock lock(mutex_);
c
```
namespace events
{
namespace
{
// unfortunately make_unique is not part of c++11 so we use our own
template
std::unique_ptr make_unique(Args &&... args)
{
return std::unique_ptr(new T(std::forward(args)...));
}
}
// Dispatcher for events posted from arbitrary threads
// Users can register functions that listen for certain events. When such events are posted from
// arbitary threads, a single dispatcher thread will sequentially call the registered callbacks.
template
class dispatcher
{
public:
dispatcher() : running_(false), ready_(false) {}
// non copyable
dispatcher(dispatcher const &) = delete;
dispatcher &operator=(dispatcher) = delete;
// but movable
dispatcher(dispatcher &&) = default;
// Needs to be called once for dispatching to start
// It is possible to register callbacks and post events before this function has been called
void start()
{
std::unique_lock lock(mutex_);
running_ = true;
thread_ = make_unique(&dispatcher::worker, this);
// do not return until thread is ready for processing events
condition_.wait(lock, [this]() { return !ready_; });
}
// Stops processing of events and joins dispatching thread
// Depending on timing, events already queued may still be executed. The queue is not cleared
// after stopping, so it is possible to use stop()/start() sequences to pause execution
// After stop() returns it is ensured that no callbacks will be executed
void stop()
{
{
std::unique_lock lock(mutex_);
running_ = false;
}
condition_.notify_one();
thread_->join();
}
using callback_t = std::function;
// Register a function as an event listener
// It is allowed to register multiple callbacks for the same event id
void register_event(int event_id, callback_t function)
{
std::unique_lock lock(mutex_);
c
Solution
Start/Stop
You should consider calling these from your constructor/destructor so that the thread is ready when constructed and automatically stopped when destroyed. It would make is easier to use. Ask yourself this, would you ever create a dispatcher without the intent of calling
In
Skip
As has already been mentioned as you have disallowed copy/move I would recommend that you just have the thread as a member in the class without the
Better naming
As
Simplify loop logic
The loop in your worker seems a bit convoluted I would probably structure it something like this (caveat-emptor: I might have changed behaviour on shutdown):
Use standard functions/methods
I would change:
to:
Use
Signalling variables such as
Suffix
If you're not on a POSIX system then you're safe. For more info, see here.
You should consider calling these from your constructor/destructor so that the thread is ready when constructed and automatically stopped when destroyed. It would make is easier to use. Ask yourself this, would you ever create a dispatcher without the intent of calling
start()/stop() on it? If not, they should be called from your ctor/dtor.ready_ is redundantIn
start you can simply wait for running_ to be true and remove ready_. That said I don't really think that there is a need to wait for the thread to have started... You could just return without waiting as the events will be executed as soon as the thread is up. Yes, sure you may have a very slight delay before your events start executing, but you may have that even if the thread is already up because the OS doesn't guarantee that your thread will be scheduled immediately.Skip
unique_ptr on threadAs has already been mentioned as you have disallowed copy/move I would recommend that you just have the thread as a member in the class without the
unique_ptr. The thread is default constructed as an empty thread object without an associated thread.Better naming
As
mutex_ is protecting events_ it should probably be called events_lock_ or something similar. Same goes for condition_.Simplify loop logic
The loop in your worker seems a bit convoluted I would probably structure it something like this (caveat-emptor: I might have changed behaviour on shutdown):
while (running_) {
auto next_event = std::chrono::time_point::max();
if(!events.empty()){
next_event = events.begin()->first;
}
// Will wait until:
// 1) the time of the next event OR
// 2) the thread was notified AND
// a) The thread is set to terminate OR
// b) A event with a closer time point was detected
// at which point we re-calculate the sleep time.
condition_.wait_until(lock, next_event, [&](){
return !running ||
(!events.empty() && events.begin()->first second;
events.erase(it);
func(lock); // Temporarily releases lock
}
}Use standard functions/methods
I would change:
auto range = callbacks_.equal_range(event_id);
std::vector functions;
for (auto it = range.first; it != range.second; ++it)
{
functions.push_back(it->second);
}to:
auto range = callbacks_.equal_range(event_id);
std::vector functions();
std::transform(range.first, range.second, std::back_inserter(functions),
[](auto& p){ return p->second; });Use
std::atomicSignalling variables such as
running_ and ready_ typically are implemented using atomic types. This means that you do not need to take a mutex lock to read/write to them. Suffix
_t is reserved for use by POSIX.If you're not on a POSIX system then you're safe. For more info, see here.
Code Snippets
while (running_) {
auto next_event = std::chrono::time_point::max();
if(!events.empty()){
next_event = events.begin()->first;
}
// Will wait until:
// 1) the time of the next event OR
// 2) the thread was notified AND
// a) The thread is set to terminate OR
// b) A event with a closer time point was detected
// at which point we re-calculate the sleep time.
condition_.wait_until(lock, next_event, [&](){
return !running ||
(!events.empty() && events.begin()->first < next_event);
});
// Woken up either through notify of new events, or because it is time
// for the next event.
if(next_event <= std::chrono::high_resolution_clock::now()){
// Note that here events is never empty because otherwise
// next_event would be end of time and we would not enter here.
// Only place where events are removed is below.
auto it = events.begin();
auto func = it->second;
events.erase(it);
func(lock); // Temporarily releases lock
}
}auto range = callbacks_.equal_range(event_id);
std::vector<callback_t> functions;
for (auto it = range.first; it != range.second; ++it)
{
functions.push_back(it->second);
}auto range = callbacks_.equal_range(event_id);
std::vector<callback_t> functions();
std::transform(range.first, range.second, std::back_inserter(functions),
[](auto& p){ return p->second; });Context
StackExchange Code Review Q#123296, answer score: 8
Revisions (0)
No revisions yet.