HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppMinor

Thread pool using boost::thread

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
poolboostthreadusing

Problem

I would like to make a portable thread pool using boost::thread. Some one could tell me to look at boost::thread_group and boost::asio. I know, but I am not really familiar with these things so I don't know if they are suitable for my purpose.

Please give me comments on what I supposed to do here.

JobItem and JobQueue:

class JobItem
{
private:
    uint32_t        job_type_;      //!                 job_queue_;
public:
    // Definitely I need to use mutex and condition variable here
    JobItem* pick();
    void push(JobItem* job_item);
};


JobWorker (worker thread):

class JobWorker
{
private:
    JobQueue* shared_job_queue_;
    boost::thread the_thread_;

public:
    JobWorker(JobQueue* shared_queue) : shared_job_queue_(shared_queue);

    // exec_job will be overridden by concrete subclasses
    virtual void exec_job(JobItem* job_item) = 0;
    void run()
    {
        while(true)
        {
            JobItem* job = shared_job_queue_->pick();
            exec_job(job);
            delete job;
        }
    }
    void start()
    {
        the_thread_ = boost::thread(boost::bind(&JobWorker::run, this));
    }
    void stop()
    {
        the_thread_.join();
    }
};


Concrete JobWorker subclasses:

class Task_1_Worker : public JobWorker
{
    // override exec_job
    void exec_job(JobItem* job)
    {
        // Process job item here
    }
}

class Task_2_Worker : public JobWorker
{
    // override exec_job
    void exec_job(JobItem* job)
    {
        // Process job item here
    }
}


Now, WorkerPool with template:

```
template
class WorkerPool
{
private:
JobQueue shared_job_queue_;
list job_workers_;

public:
WorkerPool(const int num_threads);
~WorkerPool(void) {};

void push(JobItem* job)
{
shared_job_queue_.push(job);
}
void start();
void stop();
};

template
WorkerPool::WorkerPool(const int num_threads)
{
for(int i=0; i
void WorkerPool::start()
{
for(std::list::iterator

Solution

Design

Not sure why you have a virtual function on the worker?

virtual void exec_job(JobItem* job_item) = 0;


The worker is simple he picks up jobs and then does them. What the job is defines what work should be done. So I would have put the virtual function that defines the work on the JobItem.

The run() function of the worker is then simply.

void run()
{
    // You want some way for the thread to eventually exit.
    // You can make that happen by letting the queue return
    // a null object when the object is being shut down.
    JobItem* job;
    while(job = shared_job_queue_->pick())
    {
        job->do();
        delete job;
    }
}


Code Review

Prefer not to use pointers.

Pointers don't have the concept of ownership semantics. So we don't know who is responsible for deleting them. Use a type that conveys ownership semantics so the person reading the code understands the semantics.

JobQueue* shared_job_queue_;


You always need a job queue. So this can never be nullptr. So rather than a pointer a better choice would have been a reference. Also a reference implies there is no ownership of the object so you are not supposed to delete it.

class JobWorker
{
private:
    JobQueue& shared_job_queue_;    
public:
    JobWorker(JobQueue& shared_queue)
        : shared_job_queue_(shared_queue)
    {}


Again in the JobQueue. You push and pop pointers. So there is no concept of ownership.

class JobQueue
{
private:
    queue                 job_queue_;
public:
    // Definitely I need to use mutex and condition variable here
    JobItem* pick();
    void push(JobItem* job_item);
};


As a user of this class I could easily accidently pass the address of a Job object that is actual an automatic variable. Unless I read the code it is not obvious that the code will blow up. Even worse the code may not blow up for a while since the pointer is not deleted until after the work has been done.

In this case I would use a smart pointer. The obvious one is std::unique_ptr. When you use this as a parameter you are indicating that you are accepting ownership of the object.

class JobQueue
{
private:
    // You are maintaining a queue of owned pointers.
    // You will take responsibility for deleting them when this
    // object is destroyed.
    queue>      job_queue_;
public:
    // This method returns a pointer.
    // More importantly it is returning ownership of the pointer to
    // you uniquely so you are the sole owner of the pointer and the
    // JobQueue is no longer retaining any pointers to this object.
    std::unique_ptr pick();

    // This method accepts a unique_ptr.
    // Because it accepts the unique_ptr by value you are loosing
    // ownership. Once this method is called you will no longer own
    // the object it will be gone from the object you passed.
    void push(std::unique_ptr job_item);
};


Use modern C++

Its 2016. C++11 has been out for 5 years. C++14 has been out for two years. Both have std::thread and the associated parallel libraries.

Code Snippets

virtual void exec_job(JobItem* job_item) = 0;
void run()
{
    // You want some way for the thread to eventually exit.
    // You can make that happen by letting the queue return
    // a null object when the object is being shut down.
    JobItem* job;
    while(job = shared_job_queue_->pick())
    {
        job->do();
        delete job;
    }
}
JobQueue* shared_job_queue_;
class JobWorker
{
private:
    JobQueue& shared_job_queue_;    
public:
    JobWorker(JobQueue& shared_queue)
        : shared_job_queue_(shared_queue)
    {}
class JobQueue
{
private:
    queue<JobItem*>                 job_queue_;
public:
    // Definitely I need to use mutex and condition variable here
    JobItem* pick();
    void push(JobItem* job_item);
};

Context

StackExchange Code Review Q#122588, answer score: 3

Revisions (0)

No revisions yet.