HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppMinor

Multithreaded task-scheduler

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
schedulertaskmultithreaded

Problem

This is (supposedly) a multi-threaded scheduler for one-time and/or repeating tasks. The tasks are simple std::function objects. I built it to be a crucial part of a larger project I'm working on, but I developed it stand-alone, so no context is missing for a review.

I'm making heavy use of C++11 language and library features (especially thread support and chrono stuff).

Tasks are supposed to be scheduled by specifying a start time_point, or a delay (converted to a time_point by adding it to now().) An optional duration specifies repeat intervals for the task (if it's non-zero).

It should be possible to de-schedule tasks, preventing them from being started for execution from then on. (Already running tasks won't be stopped, to keep things a bit simpler, and also because I couldn't figure out a clean way to do it anyway.)

I've never done anything with multithreading of this scale/complexity, and in case my brain never recovers from repeatedly being torn into 5 or more threads, I'd like to get some review/feedback from others. Specifically, race conditions/deadlocks/other threading-unpleasantness I didn't spot, lifetime issues, or really anything problematic.

Some simple code at the very bottom demonstrates how it's meant to be used. It seemed to work when compiled with clang 3.3 and libc++.

```
#include
#include
#include
#include
#include
#include
#include
#include

namespace scheduling {
template
class Scheduler {
typedef Clock clock_type;
typedef typename clock_type::time_point time_point;
typedef typename clock_type::duration duration;
typedef std::function task_type;
private:
struct Task {
public:
Task (task_type&& task, const time_point& start, const duration& repeat) : task(std::move(task)), start(start), repeat(repeat) { }
task_type task;
time_point start;
duration repeat;

bool operator::iterator task_handle;
priva

Solution

It looks/sounds like you're trying to build a sort of thread pool. If that's the case, take a look at this StackExchange link for some help on what it is (a thread design pattern). For your purposes you essentially have a worker queue (a single threaded thread pool), where the only difference is (as you are already doing) you use a mutex object to block the 'running' of the current scheduled task instead of a semaphore object.

Just some notes on your code that I could see:
Line 35, you have std::deque todo while a task_handle is defined as a std::list::iterator. If you're using a deque for performance reasons, consider switching all container types to a deque as I didn't see any code that inherently 'needs' a list (efficiency of removing/adding in the middle of the list vs. at begin/end).

There's also a lot of extra 'scope/control' braces to help 'control' the flow of the mutexes (i.e. Java way of doing mutex locks) via an 'auto mutex locker' object. I get the need for the control braces, but what's happening under the hood is more complex:

void clear() {
    { //  lk(mutex); // stack consumes memory for 'auto' object, then mutex.lock is called and control returned
        tasks.clear();
        handles.clear();
    } // stack destroys 'auto lock' object which then calls mutex.unlock
    tasks_updated.notify_all();
}


Consider the following instead:

void clear() { // no extra stack (aside from normal operation)
    mutex.lock();
    tasks.clear();
    handles.clear();
    mutex.unlock();
    tasks_updated.notify_all();
} // no extra stack calls and actually 1 less line of code


Line 103: time_point next_task = task_it == tasks.end() ? clock_type::time_point::max() : task_it->start;
While technically correct it could potentially lead to some confusion down the road if this ever produces a bug; it is your code so as long as you can read it it's all good.

Your private loop function is an infinite loop:

void loop() {
    while (true) {
        std::unique_lock lk(mutex);
        while (todo.empty() && running) {
            modified.wait(lk);
        }
        if (!running) {
            return;
        }
        f = todo.front()->task;
        todo.pop_front();
    }
}


consider this instead (just replaced while (true) with while (running)):

void loop() {
    while (running) {
        std::unique_lock lk(mutex);
        while (todo.empty() && running) {
            modified.wait(lk);
        }
        if (!running) {
            return;
        }
        f = todo.front()->task;
        todo.pop_front();
    }
}


Also, why is Scheduler templated for the clock type instead of just being explicit or derive some classes off of a scheduler that has a specific clock type?

You also have task_type declared as a std::function which means a function that returns void and takes no arguments, yet you're passing in functions that take a std::string as an argument (this is only in your demo code), while technically it might work you're also possibly smashing the stack when you do this (a big no-no).

You also have a struct stuffed in a class (your Task structure) that looks likes it's only used specifically for the Scheduler class, instead consider just making the member variables in your struct private member variables of your Scheduler class.

One last note: it's usually considered 'best' practice to group your member scope accessors (public/private/protected), instead of spreading the keywords where needed.

I can refactor the code an post what I see if you would like, otherwise I hope this all helps.

Code Snippets

void clear() {
    { // <- stack enters new location
        std::lock_guard<std::mutex> lk(mutex); // stack consumes memory for 'auto' object, then mutex.lock is called and control returned
        tasks.clear();
        handles.clear();
    } // stack destroys 'auto lock' object which then calls mutex.unlock
    tasks_updated.notify_all();
}
void clear() { // no extra stack (aside from normal operation)
    mutex.lock();
    tasks.clear();
    handles.clear();
    mutex.unlock();
    tasks_updated.notify_all();
} // no extra stack calls and actually 1 less line of code
void loop() {
    while (true) {
        std::unique_lock<std::mutex> lk(mutex);
        while (todo.empty() && running) {
            modified.wait(lk);
        }
        if (!running) {
            return;
        }
        f = todo.front()->task;
        todo.pop_front();
    }
}
void loop() {
    while (running) {
        std::unique_lock<std::mutex> lk(mutex);
        while (todo.empty() && running) {
            modified.wait(lk);
        }
        if (!running) {
            return;
        }
        f = todo.front()->task;
        todo.pop_front();
    }
}

Context

StackExchange Code Review Q#28437, answer score: 3

Revisions (0)

No revisions yet.