HiveBrain v1.2.0
Get Started
← Back to all entries
patterncppMinor

Producer/consumer problem with four priority levels

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
priorityproblemwithfourconsumerlevelsproducer

Problem

Here you can see a mediocre code of mine where I have fun with four deques, each with a different priority.

Here is how it works: in a while loop I generate a random number between 1 (highest priority) and 4 (lowest priority), and this number tells me in which deque I will insert the new element (by insert(int num, int fd)).

Before the while loop, a remover thread is detached with remove(void) method: this thread will remove from the highest priority maxQ deque (if there are any elements), then from medQ and so on to the lowest one, unknownQ'.

Apparently it works: I know there is a matter of starvation since it will always remove elements with higher priorities; what if
deque maxQ is always full of elements and minQ has few elements? Yep, deques with lower priorities will never be updated.
For now the elements to insert are randomly generated and this assures me that, sooner or later, all elements from all
deques will be removed.

But this is not my problem: I just hate the way
insert and remove methods are designed. They are (to me) ugly and redundant: the code is repeated, almost identical, for each deque.
I come from C and I still don't know the true power of C++: what suggestions do you provide to improve
insert and remove?

Compiled with:


g++ -std=c++11 -o funwithmultideque funwithmultideque.cpp -pthread

The code:

``
#include
#include
#include
#include
#include
#include
#include
#include

#define MAX_QUEUE_SIZE 100

#define DEFAULTCOLOR "\033[0m"
#define RED "\033[22;31m"
#define YELLOW "\033[1;33m"
#define GREEN "\033[0;0;32m"

#define debug_default(...) std::cout start;
} info_conn;

class QueuesManager {
public:
void initWorkerThread(void);
void remove(void);
void insert(int num, int fd);
// for debug
void printQueues(int flag);
private:
std::thread threadRead; // remover thread
std::mute

Solution

Encapsulate individual queues

There's a lot of identical code shared between your queues, which is an indication to encapsulate them. Since they're stateful, a class is appropriate.

class QueueEntry {
public:

    info_connection remove();

    void insert(int fd);

    size_t size() const { return q.size(); }

private:
    std::mutex m;
    std::condition_variable w, r;
    std::deque q;
};

info_connection QueueEntry::remove(){
    std::unique_lock lck(m);
    r.wait(lck, [this]() { return q.size() > 0; });
    auto i = std::move(q.front());
    q.pop_front();
    w.notify_one();
    return i;
}

void QueueEntry::insert(int fd){
    std::unique_lock locker(m);
    w.wait(locker, [this]() { return q.size() < MAX_QUEUE_SIZE; });
    info_connection i { fd, std::chrono::system_clock::now() };
    q.push_back(i);
    r.notify_one();
}


This let's us significantly simplify the remaining code in the queue manager:

class QueuesManager {
public:
    void initWorkerThread(void);

    void remove(void);

    void insert(int num, int fd);

    // for debug
    void printQueues(bool flag);

private:
    std::thread threadRead; // remover thread
    QueueEntry maxQ, medQ, minQ, unknownQ;
};

void QueuesManager::printQueues(bool flag) {
    // show deques after inserting
    if (flag) {
        debug_green(maxQ.size()  0) {
            ic = maxQ.remove();
            continue;
        }
        if (medQ.size() > 0) {
            ic = medQ.remove();
            continue;
        }
        if (minQ.size() > 0) {
            ic = minQ.remove();
            continue;
        }
        if (unknownQ.size() > 0) {
            ic = unknownQ.remove();
            continue;
        }
    }
}


Note that we've rewritten printQueues to take a bool rather than a bool disguised as an int. If the specific queues are not that interesting, and merely the indexing we can simplify our methods further:

class QueuesManager {
    ...
    std::array qs;
};

void QueuesManager::insert(int num, int fd) {
    QueueEntry &q = qs.at(num);
    q.insert(fd);
    printQueues(true);
}

void QueuesManager::remove(void) {
    while (true) {
        printQueues(false);
        for(auto &q : qs){
            if(q.size() > 0){
                info_conn ic = q.remove();
                break;
            }
        }
    }
}


Rewriting printQueues is left as an exercise.

Code Snippets

class QueueEntry {
public:

    info_connection remove();

    void insert(int fd);

    size_t size() const { return q.size(); }

private:
    std::mutex m;
    std::condition_variable w, r;
    std::deque<info_conn> q;
};

info_connection QueueEntry::remove(){
    std::unique_lock<std::mutex> lck(m);
    r.wait(lck, [this]() { return q.size() > 0; });
    auto i = std::move(q.front());
    q.pop_front();
    w.notify_one();
    return i;
}

void QueueEntry::insert(int fd){
    std::unique_lock<std::mutex> locker(m);
    w.wait(locker, [this]() { return q.size() < MAX_QUEUE_SIZE; });
    info_connection i { fd, std::chrono::system_clock::now() };
    q.push_back(i);
    r.notify_one();
}
class QueuesManager {
public:
    void initWorkerThread(void);

    void remove(void);

    void insert(int num, int fd);

    // for debug
    void printQueues(bool flag);

private:
    std::thread threadRead; // remover thread
    QueueEntry maxQ, medQ, minQ, unknownQ;
};

void QueuesManager::printQueues(bool flag) {
    // show deques after inserting
    if (flag) {
        debug_green(maxQ.size() << ' ' << medQ.size() << ' ' <<
                    minQ.size() << ' ' << unknownQ.size());
    }
        // show deques after removing
    else {
        debug_yellow(maxQ.size() << ' ' << medQ.size() << ' ' <<
                     minQ.size() << ' ' << unknownQ.size());
    }
}

void QueuesManager::insert(int num, int fd) {
    switch (num) {
        case 1: {
            maxQ.insert(fd);
            break;
        }
        case 2: {
            medQ.insert(fd);
            break;
        }
        case 3: {
            minQ.insert(fd);
            break;
        }
        case 4: {
            unknownQ.insert(fd);
            break;
        }
        default: {
            std::cout << "You shouldn't be here" << std::endl;
            break;
        }
    }
    printQueues(true);
}

void QueuesManager::remove(void) {
    while (true) {
        printQueues(false);
        info_conn ic;
        if (maxQ.size() > 0) {
            ic = maxQ.remove();
            continue;
        }
        if (medQ.size() > 0) {
            ic = medQ.remove();
            continue;
        }
        if (minQ.size() > 0) {
            ic = minQ.remove();
            continue;
        }
        if (unknownQ.size() > 0) {
            ic = unknownQ.remove();
            continue;
        }
    }
}
class QueuesManager {
    ...
    std::array<QueueEntry, 4> qs;
};

void QueuesManager::insert(int num, int fd) {
    QueueEntry &q = qs.at(num);
    q.insert(fd);
    printQueues(true);
}

void QueuesManager::remove(void) {
    while (true) {
        printQueues(false);
        for(auto &q : qs){
            if(q.size() > 0){
                info_conn ic = q.remove();
                break;
            }
        }
    }
}

Context

StackExchange Code Review Q#132938, answer score: 3

Revisions (0)

No revisions yet.