patterncppMinor
Queue for distributing tasks between threads
Viewed 0 times
distributingthreadsbetweentasksforqueue
Problem
I implemented the following class to dispatch
I am using the C++14 standard.
```
#pragma once
#include
#include
#include
#include
namespace alpha {
template
class AsyncQueue {
private:
std::shared_ptr> queue;
std::shared_ptr mutex;
std::shared_ptr cond;
public:
/ Big 5 /
AsyncQueue(const AsyncQueue& other) = default;
AsyncQueue(AsyncQueue&& other) = default;
AsyncQueue& operator=(const AsyncQueue& other) = default;
AsyncQueue& operator=(AsyncQueue&& other) = default;
virtual ~AsyncQueue() = default;
/**
* @brief Default constructor
*/
AsyncQueue() :
queue(new std::queue()),
mutex(new std::mutex),
cond(new std::condition_variable()) {
}
/**
* @brief Push a value to the async queue
*/
void push(const T& object) {
{
std::lock_guard lock(*mutex);
queue->push(object);
}
cond->notify_one();
}
/**
* @brief Push a value to the async queue (move variant)
*/
void push(T&& object) {
{
std::lock_guard lock(*mutex);
queue->push(object);
}
cond->notify_one();
}
/**
* @brief Pop a value from the queue
*/
T&& pop() {
std::unique_lock lock(*mutex);
while(queue->empty()) {
cond->wait(lock);
}
T object = std::move(queue->front());
queue->pop();
return std::move(object
std::function objects to a thread pool. Multiple threads will block on the pop call until a task is available.I am using the C++14 standard.
```
#pragma once
#include
#include
#include
#include
namespace alpha {
template
class AsyncQueue {
private:
std::shared_ptr> queue;
std::shared_ptr mutex;
std::shared_ptr cond;
public:
/ Big 5 /
AsyncQueue(const AsyncQueue& other) = default;
AsyncQueue(AsyncQueue&& other) = default;
AsyncQueue& operator=(const AsyncQueue& other) = default;
AsyncQueue& operator=(AsyncQueue&& other) = default;
virtual ~AsyncQueue() = default;
/**
* @brief Default constructor
*/
AsyncQueue() :
queue(new std::queue()),
mutex(new std::mutex),
cond(new std::condition_variable()) {
}
/**
* @brief Push a value to the async queue
*/
void push(const T& object) {
{
std::lock_guard lock(*mutex);
queue->push(object);
}
cond->notify_one();
}
/**
* @brief Push a value to the async queue (move variant)
*/
void push(T&& object) {
{
std::lock_guard lock(*mutex);
queue->push(object);
}
cond->notify_one();
}
/**
* @brief Pop a value from the queue
*/
T&& pop() {
std::unique_lock lock(*mutex);
while(queue->empty()) {
cond->wait(lock);
}
T object = std::move(queue->front());
queue->pop();
return std::move(object
Solution
Separation of concerns
You have the members of your class as
I would recommend that you design your queue as non-copyable and non-assignable. Because assigning a queue makes no sense. If the user needs to pass a queue around, it is their responsibility to make a
The big 5
Following the above the big 5 should now be:
(note
Use
If you still insist on using
Forwarding references
You're better off using forwarding references (also known as universal reference but forwarding reference is the new preferred name) for the
Returning by move in
The reason you're crashing is that the local value you moved out of is destroyed when you leave the function scope. So you're moving from rubbish. Returning by r-value reference: Just don't. The compiler will likely perform (N)RVO and the copy will be optimized out if it is possible.
Condtional wait
You have an infinite conditional wait. It may be useful to have a functionality to "sever" the queue and release all threads waiting on jobs.
Typically you would have:
Also prefer the more concise way of waiting on a condition variable:
could be:
For reference, you might be interested in: my implementation of a similar class
You have the members of your class as
shared_ptrs this indicates that you want shallow copy assignment and construction. This is not something that you should concern yourself when designing the class.I would recommend that you design your queue as non-copyable and non-assignable. Because assigning a queue makes no sense. If the user needs to pass a queue around, it is their responsibility to make a
shared_ptr. This has the benefit of making your implementation cleaner, but also performing better as your member variables will now get a better cache locality.The big 5
Following the above the big 5 should now be:
AsyncQueue(const AsyncQueue& other) = delete;
AsyncQueue(AsyncQueue&& other) = delete;
AsyncQueue& operator=(const AsyncQueue& other) = delete;
AsyncQueue& operator=(AsyncQueue&& other) = delete;
virtual ~AsyncQueue() = default;(note
delete instead of default). The move assignment and construction cannot be implemented as moving a mutex is not supported (for good reason).Use
make_sharedIf you still insist on using
shared_ptr, you should use make_shared to construct them because this will co-allocate the reference count with the object data. This saves some memory but more importantly it improves data locality.Forwarding references
You're better off using forwarding references (also known as universal reference but forwarding reference is the new preferred name) for the
push method. In fact I would go as far as to not provide push but instead provide emplace that constructs the object in place like this:template
void emplace(Args&&... args){
{
std::lock_guard lock(*mutex);
queue->emplace(std::forward(args)...);
}
cond->notify_one();
}emplace() will construct the queued object in place using the arguments passed to emplace() as constructor arguments to avoid a copy. This way you leave the decision whether or not to move to your user.Obj o;
asyncqueue.emplace(o); // Will copy construct.
asyncqueue.emplace(std::move(o)); // Will move construct.
// Will evaluate o*3 into a temporary and move from it.
// Possibly eliding the copy.
asyncqueue.emplace( o * 3);
asyncqueue.emplace("whatevz"); // Will call constructor taking c-string.Returning by move in
popThe reason you're crashing is that the local value you moved out of is destroyed when you leave the function scope. So you're moving from rubbish. Returning by r-value reference: Just don't. The compiler will likely perform (N)RVO and the copy will be optimized out if it is possible.
Condtional wait
You have an infinite conditional wait. It may be useful to have a functionality to "sever" the queue and release all threads waiting on jobs.
Typically you would have:
std::atomic still_running{true};
void stop(){ still_running = false; cond->notify_all();}
void push(T&&){
if(!still_running) throw error("severed pipe");
...
}
void pop(){
...
while(queue->empty() && still_running) {
cond->wait(lock);
}
if(queue->empty(){
// Check for queue empty here, as still_running could
// have been changed after we returned but we could still
// return useful data.
throw error("severed pipe");
}
...
}Also prefer the more concise way of waiting on a condition variable:
while(queue->empty()) {
cond->wait(lock);
}could be:
cond->wait(lock, [this](){ return !queue->empty();});For reference, you might be interested in: my implementation of a similar class
Code Snippets
AsyncQueue(const AsyncQueue& other) = delete;
AsyncQueue(AsyncQueue&& other) = delete;
AsyncQueue& operator=(const AsyncQueue& other) = delete;
AsyncQueue& operator=(AsyncQueue&& other) = delete;
virtual ~AsyncQueue() = default;template<typename... Args>
void emplace(Args&&... args){
{
std::lock_guard<std::mutex> lock(*mutex);
queue->emplace(std::forward<Args>(args)...);
}
cond->notify_one();
}Obj o;
asyncqueue.emplace(o); // Will copy construct.
asyncqueue.emplace(std::move(o)); // Will move construct.
// Will evaluate o*3 into a temporary and move from it.
// Possibly eliding the copy.
asyncqueue.emplace( o * 3);
asyncqueue.emplace("whatevz"); // Will call constructor taking c-string.std::atomic<bool> still_running{true};
void stop(){ still_running = false; cond->notify_all();}
void push(T&&){
if(!still_running) throw error("severed pipe");
...
}
void pop(){
...
while(queue->empty() && still_running) {
cond->wait(lock);
}
if(queue->empty(){
// Check for queue empty here, as still_running could
// have been changed after we returned but we could still
// return useful data.
throw error("severed pipe");
}
...
}while(queue->empty()) {
cond->wait(lock);
}Context
StackExchange Code Review Q#97441, answer score: 7
Revisions (0)
No revisions yet.