HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppMinor

Yet another event dispatcher in c++11

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
yetanotherdispatcherevent

Problem

I've written an event dispatcher for a private project which task is to collect events from different threads and dispatch them in a single thread:

```
namespace events
{
namespace
{
// unfortunately make_unique is not part of c++11 so we use our own
template
std::unique_ptr make_unique(Args &&... args)
{
return std::unique_ptr(new T(std::forward(args)...));
}
}

// Dispatcher for events posted from arbitrary threads
// Users can register functions that listen for certain events. When such events are posted from
// arbitary threads, a single dispatcher thread will sequentially call the registered callbacks.
template
class dispatcher
{
public:
dispatcher() : running_(false), ready_(false) {}
// non copyable
dispatcher(dispatcher const &) = delete;
dispatcher &operator=(dispatcher) = delete;
// but movable
dispatcher(dispatcher &&) = default;

// Needs to be called once for dispatching to start
// It is possible to register callbacks and post events before this function has been called
void start()
{
std::unique_lock lock(mutex_);
running_ = true;
thread_ = make_unique(&dispatcher::worker, this);
// do not return until thread is ready for processing events
condition_.wait(lock, [this]() { return !ready_; });
}

// Stops processing of events and joins dispatching thread
// Depending on timing, events already queued may still be executed. The queue is not cleared
// after stopping, so it is possible to use stop()/start() sequences to pause execution
// After stop() returns it is ensured that no callbacks will be executed
void stop()
{
{
std::unique_lock lock(mutex_);
running_ = false;
}
condition_.notify_one();
thread_->join();
}

using callback_t = std::function;

// Register a function as an event listener
// It is allowed to register multiple callbacks for the same event id
void register_event(int event_id, callback_t function)
{
std::unique_lock lock(mutex_);
c

Solution

Start/Stop

You should consider calling these from your constructor/destructor so that the thread is ready when constructed and automatically stopped when destroyed. It would make is easier to use. Ask yourself this, would you ever create a dispatcher without the intent of calling start()/stop() on it? If not, they should be called from your ctor/dtor.

ready_ is redundant

In start you can simply wait for running_ to be true and remove ready_. That said I don't really think that there is a need to wait for the thread to have started... You could just return without waiting as the events will be executed as soon as the thread is up. Yes, sure you may have a very slight delay before your events start executing, but you may have that even if the thread is already up because the OS doesn't guarantee that your thread will be scheduled immediately.

Skip unique_ptr on thread

As has already been mentioned as you have disallowed copy/move I would recommend that you just have the thread as a member in the class without the unique_ptr. The thread is default constructed as an empty thread object without an associated thread.

Better naming

As mutex_ is protecting events_ it should probably be called events_lock_ or something similar. Same goes for condition_.

Simplify loop logic

The loop in your worker seems a bit convoluted I would probably structure it something like this (caveat-emptor: I might have changed behaviour on shutdown):

while (running_) {
    auto next_event = std::chrono::time_point::max(); 
    if(!events.empty()){
        next_event = events.begin()->first;
    }

    // Will wait until:
    // 1) the time of the next event OR 
    // 2) the thread was notified AND
    //    a) The thread is set to terminate OR
    //    b) A event with a closer time point was detected
    //       at which point we re-calculate the sleep time.
    condition_.wait_until(lock, next_event, [&](){
            return !running ||
                   (!events.empty() && events.begin()->first second;
         events.erase(it);
         func(lock); // Temporarily releases lock
    }
}


Use standard functions/methods

I would change:

auto range = callbacks_.equal_range(event_id);
std::vector functions;
for (auto it = range.first; it != range.second; ++it)
{
  functions.push_back(it->second);
}


to:

auto range = callbacks_.equal_range(event_id);
std::vector functions();
std::transform(range.first, range.second, std::back_inserter(functions), 
    [](auto& p){ return p->second; });


Use std::atomic

Signalling variables such as running_ and ready_ typically are implemented using atomic types. This means that you do not need to take a mutex lock to read/write to them.

Suffix _t is reserved for use by POSIX.

If you're not on a POSIX system then you're safe. For more info, see here.

Code Snippets

while (running_) {
    auto next_event = std::chrono::time_point::max(); 
    if(!events.empty()){
        next_event = events.begin()->first;
    }

    // Will wait until:
    // 1) the time of the next event OR 
    // 2) the thread was notified AND
    //    a) The thread is set to terminate OR
    //    b) A event with a closer time point was detected
    //       at which point we re-calculate the sleep time.
    condition_.wait_until(lock, next_event, [&](){
            return !running ||
                   (!events.empty() && events.begin()->first < next_event);
        });

    // Woken up either through notify of new events, or because it is time
    // for the next event.
    if(next_event <= std::chrono::high_resolution_clock::now()){
         // Note that here events is never empty because otherwise
         // next_event would be end of time and we would not enter here.
         // Only place where events are removed is below. 
         auto it = events.begin();
         auto func = it->second;
         events.erase(it);
         func(lock); // Temporarily releases lock
    }
}
auto range = callbacks_.equal_range(event_id);
std::vector<callback_t> functions;
for (auto it = range.first; it != range.second; ++it)
{
  functions.push_back(it->second);
}
auto range = callbacks_.equal_range(event_id);
std::vector<callback_t> functions();
std::transform(range.first, range.second, std::back_inserter(functions), 
    [](auto& p){ return p->second; });

Context

StackExchange Code Review Q#123296, answer score: 8

Revisions (0)

No revisions yet.