patterncppMinor
C++ Task Scheduler
Viewed 0 times
schedulertaskstackoverflow
Problem
I needed a task scheduler that could be used somehow like this:
It had to have tasks scheduled once and repetitive tasks, and should stop and clean itself up gracefully on destruction even while running. I did not care for parallelism: tasks that should run in their own threads should manage it. It had to accept lambdas for simplicity.
I could not find anything similar in Boost or POCO, though I did not search very hard as I was interested in writing it myself. There are some related questions in C#, but I need C++.
```
#include
#include
#include
#include
#include
#include
#include
#include
class Scheduler : boost::noncopyable
{
private:
std::multimap> tasks;
std::mutex mutex;
std::unique_ptr thread;
std::condition_variable blocker;
bool go_on;
public:
Scheduler()
:go_on(true)
{
thread.reset(new st
#include
// To compile and run this example, include here the code listed in the second code block
void Task2()
{
std::cout << "OK2 ! now is " << std::chrono::system_clock::now().time_since_epoch().count() << std::endl;
}
void Task3()
{
std::cout << "--3 " << std::endl;
}
void Task1(Scheduler & sch)
{
auto now = std::chrono::system_clock::now();
std::cout << "OK1 ! now is " << now.time_since_epoch().count() << std::endl;
sch.ScheduleAt(now + std::chrono::seconds(1), []{ Task2(); });
sch.ScheduleAt(now + std::chrono::seconds(2), []{ Task2(); });
sch.ScheduleAt(now + std::chrono::seconds(3), []{ Task2(); });
}
void main()
{
auto now = std::chrono::system_clock::now();
Scheduler sch;
sch.ScheduleAt(now + std::chrono::seconds(15), [&sch]{ Task1(sch); });
sch.ScheduleAt(now + std::chrono::seconds(20), [&sch]{ Task1(sch); });
sch.ScheduleAt(now + std::chrono::seconds(25), [&sch]{ Task1(sch); });
sch.ScheduleAt(now + std::chrono::seconds( 2), [&sch]{ Task2(); });
sch.ScheduleEvery(std::chrono::seconds(1), []{ Task3(); });
getchar();
}It had to have tasks scheduled once and repetitive tasks, and should stop and clean itself up gracefully on destruction even while running. I did not care for parallelism: tasks that should run in their own threads should manage it. It had to accept lambdas for simplicity.
I could not find anything similar in Boost or POCO, though I did not search very hard as I was interested in writing it myself. There are some related questions in C#, but I need C++.
```
#include
#include
#include
#include
#include
#include
#include
#include
class Scheduler : boost::noncopyable
{
private:
std::multimap> tasks;
std::mutex mutex;
std::unique_ptr thread;
std::condition_variable blocker;
bool go_on;
public:
Scheduler()
:go_on(true)
{
thread.reset(new st
Solution
So a couple of minor issues first:
Argh, bad. Someone who can write this should know better!
There's a problem with your
Finally, a (very) minor point: why introduce a dependency on boost simply for
Not only that, but it doesn't disallow move operations. Hence:
are still accessible (and still generated). This may (or may not be) what you want.
There may be a library that (sort of) does what you want, however, it is C, not C++: libevent. I've never used it, so take this recommendation with a grain of salt.
With what you have here, there won't be any race conditions, because you launch a
Personally, this implementation has some extra pieces that don't seem to be needed; the
This implementation doesn't fix the part where long running functions will cause everything to get out of sync, but it doesn't rely on
I imagine there's also a slightly cleaner way of doing this with
Edit: Whoops, there is shared mutable state, I'm definitely wrong about that.
void main()Argh, bad. Someone who can write this should know better!
There's a problem with your
ScheduleAt function. You're passing in an rvalue reference to time, and trying to bind this to a non-const lvalue reference. Visual Studio has some (evil) extensions that let you get away with this, however, this isn't portable, and should be one of:void ScheduleAt(const std::chrono::system_clock::time_point& time, std::function func)
void ScheduleAt(std::chrono::system_clock::time_point&& time, std::function func)Finally, a (very) minor point: why introduce a dependency on boost simply for
non-copyable? It's all of 2 lines in C++11:Scheduler& operator=(const Scheduler& rhs) = delete;
Scheduler(const Scheduler& rhs) = delete;Not only that, but it doesn't disallow move operations. Hence:
Scheduler& operator=(Schedular&& rhs);
Scheduler(Schedular&& rhs);are still accessible (and still generated). This may (or may not be) what you want.
There may be a library that (sort of) does what you want, however, it is C, not C++: libevent. I've never used it, so take this recommendation with a grain of salt.
With what you have here, there won't be any race conditions, because you launch a
Scehduler from the main thread, and this only ever creates one thread of its own. Further, this thread only performs sequential execution.Personally, this implementation has some extra pieces that don't seem to be needed; the
mutex and condition_variable only really there to signal when things should happen, not for any actual locking of shared mutable state. Further (as you're probably aware), this likely won't work so well for any function which have a non-negligible execution time. The getchar at the end is a bit of a hack as well, you should probably launch this in its own thread.This implementation doesn't fix the part where long running functions will cause everything to get out of sync, but it doesn't rely on
mutex or condition_variable where they aren't really needed. You could probably fix this by launching each func() call in its own thread if you wanted to. I've opted to use a priority_queue instead of a multimap as well:#include
#include
#include
#include
#include
#include
struct function_timer
{
std::function func;
std::chrono::system_clock::time_point time;
function_timer()
{ }
function_timer(std::function&& f, std::chrono::system_clock::time_point& t)
: func(f),
time(t)
{ }
//Note: we want our priority_queue to be ordered in terms of
//smallest time to largest, hence the > in operator rhs.time;
}
void get()
{
func();
}
};
class Scheduler
{
private:
std::priority_queue tasks;
std::unique_ptr thread;
bool go_on;
Scheduler& operator=(const Scheduler& rhs) = delete;
Scheduler(const Scheduler& rhs) = delete;
public:
Scheduler()
:go_on(true),
thread(new std::thread([this]() { ThreadLoop(); }))
{ }
~Scheduler()
{
go_on = false;
thread->join();
}
void ThreadLoop()
{
while(go_on)
{
auto now = std::chrono::system_clock::now();
while(!tasks.empty() && tasks.top().time && func)
{
tasks.push(function_timer(std::move(func), time));
}
void ScheduleEvery(std::chrono::system_clock::duration interval, std::function func)
{
std::function waitFunc = [this,interval,func]()
{
func();
this->ScheduleEvery(interval, func);
};
ScheduleAt(std::chrono::system_clock::now() + interval, std::move(waitFunc));
}
};I imagine there's also a slightly cleaner way of doing this with
std::async, std::promise and std::future, but perhaps someone else can figure that out.Edit: Whoops, there is shared mutable state, I'm definitely wrong about that.
Code Snippets
void main()void ScheduleAt(const std::chrono::system_clock::time_point& time, std::function<void()> func)
void ScheduleAt(std::chrono::system_clock::time_point&& time, std::function<void()> func)Scheduler& operator=(const Scheduler& rhs) = delete;
Scheduler(const Scheduler& rhs) = delete;Scheduler& operator=(Schedular&& rhs);
Scheduler(Schedular&& rhs);#include <functional>
#include <chrono>
#include <future>
#include <queue>
#include <thread>
#include <memory>
struct function_timer
{
std::function<void()> func;
std::chrono::system_clock::time_point time;
function_timer()
{ }
function_timer(std::function<void()>&& f, std::chrono::system_clock::time_point& t)
: func(f),
time(t)
{ }
//Note: we want our priority_queue to be ordered in terms of
//smallest time to largest, hence the > in operator<. This isn't good
//practice - it should be a separate struct - but I've done this for brevity.
bool operator<(const function_timer& rhs) const
{
return time > rhs.time;
}
void get()
{
func();
}
};
class Scheduler
{
private:
std::priority_queue<function_timer> tasks;
std::unique_ptr<std::thread> thread;
bool go_on;
Scheduler& operator=(const Scheduler& rhs) = delete;
Scheduler(const Scheduler& rhs) = delete;
public:
Scheduler()
:go_on(true),
thread(new std::thread([this]() { ThreadLoop(); }))
{ }
~Scheduler()
{
go_on = false;
thread->join();
}
void ThreadLoop()
{
while(go_on)
{
auto now = std::chrono::system_clock::now();
while(!tasks.empty() && tasks.top().time <= now) {
function_timer& f = tasks.top();
f.get();
tasks.pop();
}
if(tasks.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
std::this_thread::sleep_for(tasks.top().time - std::chrono::system_clock::now());
}
}
}
void ScheduleAt(std::chrono::system_clock::time_point& time, std::function<void()>&& func)
{
tasks.push(function_timer(std::move(func), time));
}
void ScheduleEvery(std::chrono::system_clock::duration interval, std::function<void()> func)
{
std::function<void()> waitFunc = [this,interval,func]()
{
func();
this->ScheduleEvery(interval, func);
};
ScheduleAt(std::chrono::system_clock::now() + interval, std::move(waitFunc));
}
};Context
StackExchange Code Review Q#21336, answer score: 9
Revisions (0)
No revisions yet.