HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppMinor

C++ 14 thread pool executor design

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
executorpooldesignthread

Problem

With a few experience of python and golang, I tried to make (simple) thread pool executor. Tasks of the executor must be copy-constructible std::function and the result of tasks are required in the main thread.

My Header and Implementation File:

```
#include
#include
#include
#include
#include
#include
#include
#include

class PoolExecutor
{
public:
explicit PoolExecutor(const unsigned int pool_size)
: stop_(false)
, working_(0)
{
for(unsigned int i = 0; i threads_.emplace_back(
std::thread(std::bind(&PoolExecutor::worker, this)));
}

}
virtual ~PoolExecutor()
{
this->stop_ = true;
for(auto& thread : this->threads_)
{
thread.join();
}
}

void submit(std::function func)
{
{
std::lock_guard guard(this->mu2_);
this->work_queue_.push(func);
}
this->cv_.notify_all();
}

bool finished()
{
std::lock_guard guard(this->mu2_);
return this->working_ == 0 && this->work_queue_.empty();
}

void stop()
{
this->stop_ = true;
}

private:
std::vector threads_;
std::shared_timed_mutex mu_;
std::mutex mu2_;
std::atomic_bool stop_;
std::atomic_int working_;
std::condition_variable_any cv_;
std::queue > work_queue_;

void worker()
{
std::function func;
while(!this->stop_)
{
using namespace std::literals::chrono_literals;
std::shared_lock lock(this->mu_, std::try_to_lock);
this->cv_.wait_for(lock, 10ms);

bool accquired = false;
{
std::lock_guard guard(this->mu2_);
if(!this->work_queue_.empty())
{
func = this->work_queue_.front();
this->work_queue_.pop();
accquired = true;
this->working_++;
}

Solution

this->threads_.emplace_back(
    std::thread(std::bind(&PoolExecutor::worker, this)));


Creative use of std::bind, but a lambda would have sufficed:

this->threads_.emplace_back(
    std::thread([this] () { this->worker() });


void submit(std::function func)
{
    this->cv_.notify_all();
}


Why bother waking all worker threads when only a single slice of work was added? std::condition_variable_any::notify_one() is more appropriate.

void stop()
{
    this->stop_ = true;
}


Here, on the other hand, std::condition_variable_any::notify_all() should have been used, wake the threads currently sleeping.

void stop()
{
    this->stop_ = true;
    this->cv_.notify_all()
}


std::shared_lock lock(this->mu_, std::try_to_lock);
this->cv_.wait_for(lock, 10ms);


This part only works as expected for the first thread, as it actually gets the lock. The second thread fails to obtain the lock.

See the documentation for std::condition_variable_any::wait_for. The lock must be locked before entering wait_for.

That means you can't use std::try_to_lock, but actually need std::lock.

While at it, you should get used to using the optional 3rd parameter on wait to pass in a predicate in order to filter spurious wakes:

std::shared_lock lock(this->mu_, std::lock);
this->cv_.wait_for(lock, 10ms, [this] () {return this->stop_ || !this->work_queue_.empty());


That predicate is executed before waiting for the first time, so if e.g. this->stop_ is already set by the time lock is aquired, wait() with the predicate returns immediately without requiring another wake up call.

At this point you can do away with the "timed" stuff as well. You no longer need it. You can now also replace your std::condition_variable_any by the simpler std::condition_variable.

In fact, neither do you need the 2nd mutex either:

std::mutex mu_;
std::condition_variable cv_;

...

bool finished()
{
    std::lock_guard guard(this->mu_);
    return this->working_ == 0 && this->work_queue_.empty();
}

void worker()
{
    while(!this->stop_)
    {
        std::function func;
        {
            std::unique_lock lock(this->mu_);
            this->cv_.wait(lock, [this] () { return this->stop_ || !this->work_queue_.empty() );
            if(this->stop_) break;
            if(!this->work_queue_.empty())
            {
                func = std::move(this->work_queue_.front());
                this->work_queue_.pop();
                this->working_++;
            }
        }
        if(func)
        {
            func();
            --this->working_;
        }
    }
}


Q1. Am I using mutex and lock correctly?

No. Even though it might have even worked, you violated the contract by attempting to wait on an unlocked mutex.

Q2. It is possible to use single mutex in PoolExecutor class?

As seen above, yes.

Q3. Is there any ways to wait executor until finished?

If by "finished" you mean the executor is ready to be destroyed? That is already the case.

Otherwise if you just mean to wait until the pool runs empty? Well, you do have std::atomic_int working_ in there.

All you really need now, is a second condition variable:

void wait() {
    std::unique_lock lock(this->mu_);
    this->cv2_.wait(lock, [this] () {
        return ( this->working_ == 0 && this->work_queue_.empty() ) || this->stop_
    });
}


Now update the worker to invoke this->cv2_.notify_all() after decrementing this->working_ and you are done.

Code Snippets

this->threads_.emplace_back(
    std::thread(std::bind(&PoolExecutor::worker, this)));
this->threads_.emplace_back(
    std::thread([this] () { this->worker() });
void submit(std::function<void()> func)
{
    this->cv_.notify_all();
}
void stop()
{
    this->stop_ = true;
}
void stop()
{
    this->stop_ = true;
    this->cv_.notify_all()
}

Context

StackExchange Code Review Q#144545, answer score: 5

Revisions (0)

No revisions yet.