HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppMinor

Reuseable C++11 Thread

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
reuseablethreadstackoverflow

Problem

I'm learning about threading support in C++. I've got a basic understanding of thread-pools and why creating and destroying threads on some systems can be expensive. As far as I'm aware C++11 doesn't have any built in support for worker/background threads. std::thread is only designed to perform one task which you set in the constructor; it then destroys the thread after the task is complete.

At the moment I have no use for a fully fledged thread-pool but I would like to be able to reuse a thread for different tasks after I create it. I've created a simple class that will accept any function or method wrapped in a lambda and will then execute it exactly once. The reusable thread object should not destruct until the work function is fully complete.

This is the implementation:

class Reusable_Thread
{
    public:
        Reusable_Thread()
            : m_thread_pause(true), m_thread_quit(false),
              m_thread(&Reusable_Thread::thread_worker, this)
        { }

        ~Reusable_Thread()
        {
            m_thread_quit = true;
            m_thread.join();
        }

        bool get_readiness() const { return m_thread_pause; }

        bool set_work(const std::function& work_func)
        {
            if (get_readiness())
            {
                m_work_func = work_func;
                return true;
            }
            else
            {
                return false;
            }
        }

    private:
        std::atomic m_thread_pause;
        std::atomic m_thread_quit;
        std::thread m_thread;
        std::function m_work_func;

        void thread_worker()
        {
            while (!m_thread_quit)
            {
                if (!m_thread_pause)
                {
                    m_work_func();
                    m_thread_pause = true;
                }
            }
        }
};


And this is some example usage:

```
int main()
{
Reusable_Thread thread;

auto f1 = [&] () { / do some work / };
thread

Solution

A nice way to warm your cup of tea


Are there any problems with implementing a reusable thread this way?

Yes. Let's have a look at a slightly different main:

int main()
{
    Reusable_Thread thread;

    std::this_thread::sleep_for(std::chrono::seconds(10));

    return 0;
}


This should just wait for 10 seconds and then exit, right?

$ g++ reusablethread.cc -std=c++11 -pthread -o demo
$ /usr/bin/time ./demo
10.00user 0.00system 0:10.00elapsed 99%CPU (0avgtext+0avgdata 3148maxresident)k
0inputs+0outputs (0major+129minor)pagefaults 0swaps


Nope. It actually runs full throttle. This is due to your choice of

void thread_worker()
{
    while (!m_thread_quit)
    {
        if (!m_thread_pause)
        {
            /* ... */
        }
    }
}


This isn't the fault of std::atomic<>, but your design. If we don't have something to do, we shouldn't check immediately afterwards. Instead, we should wait. More on that later.

howdoiturnthison

Your class is currently missing features to toggle m_thread_pause, which makes it hard to reason about other behaviour. I guess you're just missing m_thread_pause = false in set_work, e.g.

bool set_work(const std::function& work_func)
{
    if (get_readiness())
    {
        m_work_func = work_func;
        m_thread_pause = false;     // here, but wrong, see below
        return true;
    }
    else
    {
        return false;
    }
}


Furthermore, the name get_readiness is misleading. What is ready? The internal thread? Is the work slot free? is_paused is much better.

Race condition m_work_func and m_thread_pause

Assume we have our Reusable_Thread rthread and it's currently paused. We have access on our rthread from two threads A and B. Both call set_work. What happens?

void foo(){ puts("foo"); }
void bar(){ puts("bar"); }

void thread_Alice(Reusable_Thread & rthread) {
   rthread.set_work(foo);
}

void thread_Bob(Reusable_Thread & rthread) {
   rthread.set_work(bar);
}

int main(){
    Reusable_Thread rthread;
    std::thread A(thread_Alice, std::ref(rthread));
    std::thread B(thread_Alice, std::ref(rthread));
    A.join();
    B.join();
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 0;
}


It's certainly possible that the result is foo. It's also possible that the result is bar. But it's also possible that it's neither and your program crashes:

  • We create the std::thread in rthread, we call it R



  • We create the std::threads A and B. Neither run yet.



  • A gets scheduled. It calls set_work and:



  • checks m_thread_pause (get_readiness())



  • sets the worker to foo



  • yields due to scheduling



  • B gets scheduled. It calls set_work and



  • checks m_thread_pause (get_readiness(), still true)



  • yields



-
A gets scheduled. It resumes the execution and

  • sets m_thread_pause to false



  • returns



A's thread of execution has ended.

  • R gets scheduled. It



  • checks m_thread_quit (is false)



  • checks m_thread_pause (is false due to A)



  • starts reading the std::function<> m_work_fun.



  • yields while reading



  • B gets scheduled. It finishes its call to set_work:



  • sets the worker to bar, although R currently reads from it



  • yields



  • R gets scheduled. It finishes reading m_work_fun and ends up with a corrupt std::function and uses operator(). Your program crashes.



  • Even if your program didn't crash, after R sets m_thread_pause to true, B gets scheduled and finishes set_work, thus setting m_thread_pause to false and the corrupt function gets called again.



So depending on where the threads get interrupted, you end up with either:

  • bar



  • foo



  • bar


bar

  • foo


foo

  • foo


bar

  • bar


foo

  • crash



So, how does one get rid of this? Usually, you would use std::atomic::exchange or std::atomic::compare_exchange on m_thread_pause:

bool set_work(const std::function& work_func)
{
    if (m_thread_pause.exchange(false)) // old value was true
    {
        m_work_func = work_func;
        return true;
    }
    else
    {
        return false;
    }
}


But that doesn't work either, because R could start using m_work_func before it's actually set. The underlying problem here is that m_work_func and m_thread_pause are intertwined.

Third wheel to the rescue

We can decouple this by using a third atomic, that checks whether the function can be set safely:

bool set_work(const std::function& work_func)
{
    if (!m_function_ready && m_thread_pause.exchange(false))
    {
        m_work_func = work_func;
        m_function_ready = true;
        return true;
    }
    ...
}


Remember, only one thread will see m_thread_pause as true, due to the atomic exchange, so we're save from the previous A/B race.

```
void thread_worker()
{
while (!m_thread_quit)
{
if (!m_thread_pause && m_function_ready)
{
m_work_func();
m_thread_pause

Code Snippets

int main()
{
    Reusable_Thread thread;

    std::this_thread::sleep_for(std::chrono::seconds(10));

    return 0;
}
void thread_worker()
{
    while (!m_thread_quit)
    {
        if (!m_thread_pause)
        {
            /* ... */
        }
    }
}
bool set_work(const std::function<void()>& work_func)
{
    if (get_readiness())
    {
        m_work_func = work_func;
        m_thread_pause = false;     // here, but wrong, see below
        return true;
    }
    else
    {
        return false;
    }
}
void foo(){ puts("foo"); }
void bar(){ puts("bar"); }

void thread_Alice(Reusable_Thread & rthread) {
   rthread.set_work(foo);
}

void thread_Bob(Reusable_Thread & rthread) {
   rthread.set_work(bar);
}

int main(){
    Reusable_Thread rthread;
    std::thread A(thread_Alice, std::ref(rthread));
    std::thread B(thread_Alice, std::ref(rthread));
    A.join();
    B.join();
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 0;
}
bool set_work(const std::function<void()>& work_func)
{
    if (m_thread_pause.exchange(false)) // old value was true
    {
        m_work_func = work_func;
        return true;
    }
    else
    {
        return false;
    }
}

Context

StackExchange Code Review Q#134214, answer score: 7

Revisions (0)

No revisions yet.