patterncppMinor
Reuseable C++11 Thread
Viewed 0 times
reuseablethreadstackoverflow
Problem
I'm learning about threading support in C++. I've got a basic understanding of thread-pools and why creating and destroying threads on some systems can be expensive. As far as I'm aware C++11 doesn't have any built in support for worker/background threads.
At the moment I have no use for a fully fledged thread-pool but I would like to be able to reuse a thread for different tasks after I create it. I've created a simple class that will accept any function or method wrapped in a lambda and will then execute it exactly once. The reusable thread object should not destruct until the work function is fully complete.
This is the implementation:
And this is some example usage:
```
int main()
{
Reusable_Thread thread;
auto f1 = [&] () { / do some work / };
thread
std::thread is only designed to perform one task which you set in the constructor; it then destroys the thread after the task is complete.At the moment I have no use for a fully fledged thread-pool but I would like to be able to reuse a thread for different tasks after I create it. I've created a simple class that will accept any function or method wrapped in a lambda and will then execute it exactly once. The reusable thread object should not destruct until the work function is fully complete.
This is the implementation:
class Reusable_Thread
{
public:
Reusable_Thread()
: m_thread_pause(true), m_thread_quit(false),
m_thread(&Reusable_Thread::thread_worker, this)
{ }
~Reusable_Thread()
{
m_thread_quit = true;
m_thread.join();
}
bool get_readiness() const { return m_thread_pause; }
bool set_work(const std::function& work_func)
{
if (get_readiness())
{
m_work_func = work_func;
return true;
}
else
{
return false;
}
}
private:
std::atomic m_thread_pause;
std::atomic m_thread_quit;
std::thread m_thread;
std::function m_work_func;
void thread_worker()
{
while (!m_thread_quit)
{
if (!m_thread_pause)
{
m_work_func();
m_thread_pause = true;
}
}
}
};And this is some example usage:
```
int main()
{
Reusable_Thread thread;
auto f1 = [&] () { / do some work / };
thread
Solution
A nice way to warm your cup of tea
Are there any problems with implementing a reusable thread this way?
Yes. Let's have a look at a slightly different main:
This should just wait for 10 seconds and then exit, right?
Nope. It actually runs full throttle. This is due to your choice of
This isn't the fault of
howdoiturnthison
Your class is currently missing features to toggle
Furthermore, the name
Race condition
Assume we have our
It's certainly possible that the result is
-
So depending on where the threads get interrupted, you end up with either:
bar
foo
bar
foo
So, how does one get rid of this? Usually, you would use
But that doesn't work either, because
Third wheel to the rescue
We can decouple this by using a third
Remember, only one thread will see
```
void thread_worker()
{
while (!m_thread_quit)
{
if (!m_thread_pause && m_function_ready)
{
m_work_func();
m_thread_pause
Are there any problems with implementing a reusable thread this way?
Yes. Let's have a look at a slightly different main:
int main()
{
Reusable_Thread thread;
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}This should just wait for 10 seconds and then exit, right?
$ g++ reusablethread.cc -std=c++11 -pthread -o demo
$ /usr/bin/time ./demo
10.00user 0.00system 0:10.00elapsed 99%CPU (0avgtext+0avgdata 3148maxresident)k
0inputs+0outputs (0major+129minor)pagefaults 0swaps
Nope. It actually runs full throttle. This is due to your choice of
void thread_worker()
{
while (!m_thread_quit)
{
if (!m_thread_pause)
{
/* ... */
}
}
}This isn't the fault of
std::atomic<>, but your design. If we don't have something to do, we shouldn't check immediately afterwards. Instead, we should wait. More on that later.howdoiturnthison
Your class is currently missing features to toggle
m_thread_pause, which makes it hard to reason about other behaviour. I guess you're just missing m_thread_pause = false in set_work, e.g.bool set_work(const std::function& work_func)
{
if (get_readiness())
{
m_work_func = work_func;
m_thread_pause = false; // here, but wrong, see below
return true;
}
else
{
return false;
}
}Furthermore, the name
get_readiness is misleading. What is ready? The internal thread? Is the work slot free? is_paused is much better.Race condition
m_work_func and m_thread_pauseAssume we have our
Reusable_Thread rthread and it's currently paused. We have access on our rthread from two threads A and B. Both call set_work. What happens?void foo(){ puts("foo"); }
void bar(){ puts("bar"); }
void thread_Alice(Reusable_Thread & rthread) {
rthread.set_work(foo);
}
void thread_Bob(Reusable_Thread & rthread) {
rthread.set_work(bar);
}
int main(){
Reusable_Thread rthread;
std::thread A(thread_Alice, std::ref(rthread));
std::thread B(thread_Alice, std::ref(rthread));
A.join();
B.join();
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}It's certainly possible that the result is
foo. It's also possible that the result is bar. But it's also possible that it's neither and your program crashes:- We create the
std::threadinrthread, we call itR
- We create the
std::threadsAandB. Neither run yet.
Agets scheduled. It callsset_workand:
- checks
m_thread_pause(get_readiness())
- sets the worker to
foo
- yields due to scheduling
Bgets scheduled. It callsset_workand
- checks
m_thread_pause(get_readiness(), stilltrue)
- yields
-
A gets scheduled. It resumes the execution and- sets
m_thread_pausetofalse
returns
A's thread of execution has ended.Rgets scheduled. It
- checks
m_thread_quit(isfalse)
- checks
m_thread_pause(isfalsedue toA)
- starts reading the
std::function<>m_work_fun.
- yields while reading
Bgets scheduled. It finishes its call toset_work:
- sets the worker to
bar, althoughRcurrently reads from it
- yields
Rgets scheduled. It finishes readingm_work_funand ends up with a corruptstd::functionand usesoperator(). Your program crashes.
- Even if your program didn't crash, after
Rsetsm_thread_pausetotrue,Bgets scheduled and finishesset_work, thus settingm_thread_pausetofalseand the corrupt function gets called again.
So depending on where the threads get interrupted, you end up with either:
- bar
- foo
- bar
bar
- foo
foo
- foo
bar
- bar
foo
- crash
So, how does one get rid of this? Usually, you would use
std::atomic::exchange or std::atomic::compare_exchange on m_thread_pause:bool set_work(const std::function& work_func)
{
if (m_thread_pause.exchange(false)) // old value was true
{
m_work_func = work_func;
return true;
}
else
{
return false;
}
}But that doesn't work either, because
R could start using m_work_func before it's actually set. The underlying problem here is that m_work_func and m_thread_pause are intertwined. Third wheel to the rescue
We can decouple this by using a third
atomic, that checks whether the function can be set safely:bool set_work(const std::function& work_func)
{
if (!m_function_ready && m_thread_pause.exchange(false))
{
m_work_func = work_func;
m_function_ready = true;
return true;
}
...
}Remember, only one thread will see
m_thread_pause as true, due to the atomic exchange, so we're save from the previous A/B race.```
void thread_worker()
{
while (!m_thread_quit)
{
if (!m_thread_pause && m_function_ready)
{
m_work_func();
m_thread_pause
Code Snippets
int main()
{
Reusable_Thread thread;
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}void thread_worker()
{
while (!m_thread_quit)
{
if (!m_thread_pause)
{
/* ... */
}
}
}bool set_work(const std::function<void()>& work_func)
{
if (get_readiness())
{
m_work_func = work_func;
m_thread_pause = false; // here, but wrong, see below
return true;
}
else
{
return false;
}
}void foo(){ puts("foo"); }
void bar(){ puts("bar"); }
void thread_Alice(Reusable_Thread & rthread) {
rthread.set_work(foo);
}
void thread_Bob(Reusable_Thread & rthread) {
rthread.set_work(bar);
}
int main(){
Reusable_Thread rthread;
std::thread A(thread_Alice, std::ref(rthread));
std::thread B(thread_Alice, std::ref(rthread));
A.join();
B.join();
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}bool set_work(const std::function<void()>& work_func)
{
if (m_thread_pause.exchange(false)) // old value was true
{
m_work_func = work_func;
return true;
}
else
{
return false;
}
}Context
StackExchange Code Review Q#134214, answer score: 7
Revisions (0)
No revisions yet.