patterncppMinor
Producer/consumer problem with four priority levels
Viewed 0 times
priorityproblemwithfourconsumerlevelsproducer
Problem
Here you can see a mediocre code of mine where I have fun with four
Here is how it works: in a
Before the
#include
#include
#include
#include
#include
#include
#include
#include
#define MAX_QUEUE_SIZE 100
#define DEFAULTCOLOR "\033[0m"
#define RED "\033[22;31m"
#define YELLOW "\033[1;33m"
#define GREEN "\033[0;0;32m"
#define debug_default(...) std::cout start;
} info_conn;
class QueuesManager {
public:
void initWorkerThread(void);
void remove(void);
void insert(int num, int fd);
// for debug
void printQueues(int flag);
private:
std::thread threadRead; // remover thread
std::mute
deques, each with a different priority.Here is how it works: in a
while loop I generate a random number between 1 (highest priority) and 4 (lowest priority), and this number tells me in which deque I will insert the new element (by insert(int num, int fd)).Before the
while loop, a remover thread is detached with remove(void) method: this thread will remove from the highest priority maxQ deque (if there are any elements), then from medQ and so on to the lowest one, unknownQ'.
Apparently it works: I know there is a matter of starvation since it will always remove elements with higher priorities; what if deque maxQ is always full of elements and minQ has few elements? Yep, deques with lower priorities will never be updated.
For now the elements to insert are randomly generated and this assures me that, sooner or later, all elements from all deques will be removed.
But this is not my problem: I just hate the way insert and remove methods are designed. They are (to me) ugly and redundant: the code is repeated, almost identical, for each deque.
I come from C and I still don't know the true power of C++: what suggestions do you provide to improve insert and remove?
Compiled with:
g++ -std=c++11 -o funwithmultideque funwithmultideque.cpp -pthread
The code:
``#include
#include
#include
#include
#include
#include
#include
#include
#define MAX_QUEUE_SIZE 100
#define DEFAULTCOLOR "\033[0m"
#define RED "\033[22;31m"
#define YELLOW "\033[1;33m"
#define GREEN "\033[0;0;32m"
#define debug_default(...) std::cout start;
} info_conn;
class QueuesManager {
public:
void initWorkerThread(void);
void remove(void);
void insert(int num, int fd);
// for debug
void printQueues(int flag);
private:
std::thread threadRead; // remover thread
std::mute
Solution
Encapsulate individual queues
There's a lot of identical code shared between your queues, which is an indication to encapsulate them. Since they're stateful, a class is appropriate.
This let's us significantly simplify the remaining code in the queue manager:
Note that we've rewritten printQueues to take a bool rather than a bool disguised as an int. If the specific queues are not that interesting, and merely the indexing we can simplify our methods further:
Rewriting
There's a lot of identical code shared between your queues, which is an indication to encapsulate them. Since they're stateful, a class is appropriate.
class QueueEntry {
public:
info_connection remove();
void insert(int fd);
size_t size() const { return q.size(); }
private:
std::mutex m;
std::condition_variable w, r;
std::deque q;
};
info_connection QueueEntry::remove(){
std::unique_lock lck(m);
r.wait(lck, [this]() { return q.size() > 0; });
auto i = std::move(q.front());
q.pop_front();
w.notify_one();
return i;
}
void QueueEntry::insert(int fd){
std::unique_lock locker(m);
w.wait(locker, [this]() { return q.size() < MAX_QUEUE_SIZE; });
info_connection i { fd, std::chrono::system_clock::now() };
q.push_back(i);
r.notify_one();
}This let's us significantly simplify the remaining code in the queue manager:
class QueuesManager {
public:
void initWorkerThread(void);
void remove(void);
void insert(int num, int fd);
// for debug
void printQueues(bool flag);
private:
std::thread threadRead; // remover thread
QueueEntry maxQ, medQ, minQ, unknownQ;
};
void QueuesManager::printQueues(bool flag) {
// show deques after inserting
if (flag) {
debug_green(maxQ.size() 0) {
ic = maxQ.remove();
continue;
}
if (medQ.size() > 0) {
ic = medQ.remove();
continue;
}
if (minQ.size() > 0) {
ic = minQ.remove();
continue;
}
if (unknownQ.size() > 0) {
ic = unknownQ.remove();
continue;
}
}
}Note that we've rewritten printQueues to take a bool rather than a bool disguised as an int. If the specific queues are not that interesting, and merely the indexing we can simplify our methods further:
class QueuesManager {
...
std::array qs;
};
void QueuesManager::insert(int num, int fd) {
QueueEntry &q = qs.at(num);
q.insert(fd);
printQueues(true);
}
void QueuesManager::remove(void) {
while (true) {
printQueues(false);
for(auto &q : qs){
if(q.size() > 0){
info_conn ic = q.remove();
break;
}
}
}
}Rewriting
printQueues is left as an exercise.Code Snippets
class QueueEntry {
public:
info_connection remove();
void insert(int fd);
size_t size() const { return q.size(); }
private:
std::mutex m;
std::condition_variable w, r;
std::deque<info_conn> q;
};
info_connection QueueEntry::remove(){
std::unique_lock<std::mutex> lck(m);
r.wait(lck, [this]() { return q.size() > 0; });
auto i = std::move(q.front());
q.pop_front();
w.notify_one();
return i;
}
void QueueEntry::insert(int fd){
std::unique_lock<std::mutex> locker(m);
w.wait(locker, [this]() { return q.size() < MAX_QUEUE_SIZE; });
info_connection i { fd, std::chrono::system_clock::now() };
q.push_back(i);
r.notify_one();
}class QueuesManager {
public:
void initWorkerThread(void);
void remove(void);
void insert(int num, int fd);
// for debug
void printQueues(bool flag);
private:
std::thread threadRead; // remover thread
QueueEntry maxQ, medQ, minQ, unknownQ;
};
void QueuesManager::printQueues(bool flag) {
// show deques after inserting
if (flag) {
debug_green(maxQ.size() << ' ' << medQ.size() << ' ' <<
minQ.size() << ' ' << unknownQ.size());
}
// show deques after removing
else {
debug_yellow(maxQ.size() << ' ' << medQ.size() << ' ' <<
minQ.size() << ' ' << unknownQ.size());
}
}
void QueuesManager::insert(int num, int fd) {
switch (num) {
case 1: {
maxQ.insert(fd);
break;
}
case 2: {
medQ.insert(fd);
break;
}
case 3: {
minQ.insert(fd);
break;
}
case 4: {
unknownQ.insert(fd);
break;
}
default: {
std::cout << "You shouldn't be here" << std::endl;
break;
}
}
printQueues(true);
}
void QueuesManager::remove(void) {
while (true) {
printQueues(false);
info_conn ic;
if (maxQ.size() > 0) {
ic = maxQ.remove();
continue;
}
if (medQ.size() > 0) {
ic = medQ.remove();
continue;
}
if (minQ.size() > 0) {
ic = minQ.remove();
continue;
}
if (unknownQ.size() > 0) {
ic = unknownQ.remove();
continue;
}
}
}class QueuesManager {
...
std::array<QueueEntry, 4> qs;
};
void QueuesManager::insert(int num, int fd) {
QueueEntry &q = qs.at(num);
q.insert(fd);
printQueues(true);
}
void QueuesManager::remove(void) {
while (true) {
printQueues(false);
for(auto &q : qs){
if(q.size() > 0){
info_conn ic = q.remove();
break;
}
}
}
}Context
StackExchange Code Review Q#132938, answer score: 3
Revisions (0)
No revisions yet.