HiveBrain v1.2.0
Get Started
← Back to all entries
patterncMinor

C Threadsafe Priority Queue O(log n) push, O(1) pop

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
prioritythreadsafelogpushqueuepop

Problem

I wrote this for an A* implementation:

(pqueue.c)

```
#include "pqueue.h"
#include
#include
#include
#include
#include

struct pqueue* new_pqueue(size_t capacity, int mt) {
struct pqueue* pqueue = malloc(sizeof(struct pqueue));
pqueue->capacity = capacity;
pqueue->data = malloc((capacity == 0 ? 1 : capacity) * sizeof(struct __pqueue_entry));
pqueue->rc = capacity == 0 ? 1 : 0;
pqueue->start = 0;
pqueue->end = 0;
pqueue->size = 0;
pqueue->mt = mt;
if (mt) {
if (pthread_mutex_init(&pqueue->data_mutex, NULL)) {
free(pqueue->data);
pqueue->data = NULL;
free(pqueue);
return NULL;
}
if (pthread_mutex_init(&pqueue->out_mutex, NULL)) {
free(pqueue->data);
pqueue->data = NULL;
free(pqueue);
pthread_mutex_destroy(&pqueue->data_mutex);
return NULL;
}
if (pthread_mutex_init(&pqueue->in_mutex, NULL)) {
free(pqueue->data);
pqueue->data = NULL;
free(pqueue);
pthread_mutex_destroy(&pqueue->data_mutex);
pthread_mutex_destroy(&pqueue->out_mutex);
return NULL;
}
if (pthread_cond_init(&pqueue->out_cond, NULL)) {
free(pqueue->data);
pqueue->data = NULL;
free(pqueue);
pthread_mutex_destroy(&pqueue->data_mutex);
pthread_mutex_destroy(&pqueue->out_mutex);
pthread_mutex_destroy(&pqueue->in_mutex);
return NULL;
}
if (pthread_cond_init(&pqueue->in_cond, NULL)) {
free(pqueue->data);
pqueue->data = NULL;
free(pqueue);
pthread_mutex_destroy(&pqueue->data_mutex);
pthread_mutex_destroy(&pqueue->out_mutex);
pthread_mutex_destroy(&pqueue->in_mutex);
pthread_cond_destroy(&pqueue->out_cond);
return NULL;
}
}
return pque

Solution

Doesn't work if mt == 0

If looks like the code is supposed to support both multithreaded and non-multithreaded cases. However, in add_pqueue(), there is this code:

} else {
    if (!pqueue->mt) return 1;


The effect of this is to always abort if multithreading is off. I think you meant to do this instead:

} else if (pqueue->mt) {


Mutexes and conditions not used properly

Consider this code in pop_queue():

if (pqueue->mt) {
    pthread_mutex_lock(&pqueue->out_mutex);
    while (pqueue->size == 0) {
        pthread_cond_wait(&pqueue->out_cond, &pqueue->out_mutex);
    }
    pthread_mutex_unlock(&pqueue->out_mutex);
    pthread_mutex_lock(&pqueue->data_mutex);


The goal of out_mutex and out_cond is to wait for the queue to be non-empty before proceeding to lock data_mutex and actually popping from the queue. However, consider the case where size == 1 initially. What could happen is:

  • Two threads call pop_queue().



  • Each thread gets past the size check and arrives at the line which is about to lock data_mutex.



  • Thread 0 locks data_mutex, pops an element from the queue, and unlocks data_mutex. This empties the queue, setting size to 0.



  • Thread 1 locks data_mutex, and then tries to pop from an empty queue, resulting in an error.



The same problem also happens in add_queue() but with the size full check.

What I suggest is:

  • Only use one mutex, data_mutex.



  • If you get past the size check while holding data_mutex, then just proceed.



The code from above becomes:

if (pqueue->mt) {
    pthread_mutex_lock(&pqueue->data_mutex);
    while (pqueue->size == 0) {
        pthread_cond_wait(&pqueue->out_cond, &pqueue->data_mutex);
    }


Add isn't \$O(\log n)\$

I took a closer look at your add function and it appears to do a binary search to find an insertion point followed by using one or more calls to memmove() to make an open slot in the queue to insert the new element. The memory moving takes \$O(n)\$ time. If you used a heap data structure, you could achieve \$O(\log n)\$.

Also, as a member of the Floating Point Police™, I disapprove of using floating point to do the binary search. This can be done using only integer arithmetic.

Code Snippets

} else {
    if (!pqueue->mt) return 1;
} else if (pqueue->mt) {
if (pqueue->mt) {
    pthread_mutex_lock(&pqueue->out_mutex);
    while (pqueue->size == 0) {
        pthread_cond_wait(&pqueue->out_cond, &pqueue->out_mutex);
    }
    pthread_mutex_unlock(&pqueue->out_mutex);
    pthread_mutex_lock(&pqueue->data_mutex);
if (pqueue->mt) {
    pthread_mutex_lock(&pqueue->data_mutex);
    while (pqueue->size == 0) {
        pthread_cond_wait(&pqueue->out_cond, &pqueue->data_mutex);
    }

Context

StackExchange Code Review Q#135650, answer score: 4

Revisions (0)

No revisions yet.