HiveBrain v1.2.0
Get Started
← Back to all entries
patterncMinor

Async safe threadpool

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
asyncthreadpoolsafe

Problem

I'm trying the write a threadpool that can safely be added inside a signal handler or in code forked by multithreaded code. Are there any corner cases that would cause this code to fail? What could be done to improve this code?

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#define MAX_WORKERS 16

struct threadpool
{
    _Atomic(int) idle;
    void *(*func)(void*);
    sem_t sem;
    void * arg;
    int id;
    pthread_t thread;
};

struct threadpool threads[MAX_WORKERS] = {0};

void *Worker(void *arg)
{
    int * t = (int*)arg;
x:
    sem_wait(&threads[*t].sem);
    __sync_synchronize();
    threads[*t].func(threads[*t].arg);
    threads[*t].idle = false;
    goto x;
}

bool ThreadPoolNew()
{
    for (int i = 0; i < MAX_WORKERS; i++) {
        threads[i].id = i;
        sem_init(&threads[i].sem, 0, 0);
        pthread_create(&threads[i].thread, NULL, Worker, &threads[i].id);
    }
    return true;
}

bool ThreadPoolAddTask(void *(*entry)(void*), void * arg, bool retry)
{
    _Atomic(bool) found = false;
x:
    for (size_t i = 0; i < MAX_WORKERS; i++) {
        if (__sync_val_compare_and_swap(&threads[i].idle, 0, 1) == 0) {
            threads[i].func = entry;
            threads[i].arg = arg;
            __sync_synchronize();
            sem_post(&threads[threads[i].id].sem);
            found = true;
            break;
        }
    }

    if (found == false && retry == true) {
        goto x;
    }

    return found;
}

void * t1(void *a)
{
    int i = 0;
    while (true) {
        i += 2;     
        printf("%i\n", i);
        sleep(5);
    }
}

void * t2(void *a)
{
    int i = 0;
    while (true) {
        i += 3;
        printf("%i\n", i);
        sleep(5);
    }
}

//TESTER CODE
//gcc -pthread list.c

int main(void)  
{
    ThreadPoolNew();
    ThreadPoolAddTask(t2, NULL, 1);
    ThreadPoolAddTask(t1, NULL, 1);
    sleep(60);
}

Solution

-
idle

As coded, idle is false when thread is in fact idle. Either rename it to working/running/whatever active or swap the true/false values.

-
Control flow

Unless I am missing something crucial, there is no need to goto neither in Worker nor in ThreadPoolAddTask.

Similarly, found seems redundant:

do {
    for (i = 0, i < MAX_WORKERS; i++) {
        if (...) {
            ....;
            return true;
        }
    }
} while (retry);
return false;


As a side note, retry shouldn't be handled here. It is a caller business logic when and how to retry. I don't see any value added.

-
Worker argument

I recommend passing a pointer to a worker instead of pointer to its index. It simplifies both ThreadPoolNew and Worker to:

void *Worker(void *arg)
{
    struct threadpool * t = (int*)arg;
x:
    sem_wait(t->sem);
    __sync_synchronize();
    t->func(t->arg);
    t->idle = false;
    goto x;
}

bool ThreadPoolNew()
{
    for (int i = 0; i < MAX_WORKERS; i++) {
        sem_init(&threads[i].sem, 0, 0);
        pthread_create(&threads[i].thread, NULL, Worker, &threads[i]);
    }
    return true;
}


and removes the need for thread.id. BTW, notice that assert(threads[i].id == i) always holds, so you may post your semaphore as

sem_post(&threads[i].sem);


-
You may want to __sync_synchronize() after announcing the thread idle (there is no barrier between setting idle and subsequent sem_wait).

Code Snippets

do {
    for (i = 0, i < MAX_WORKERS; i++) {
        if (...) {
            ....;
            return true;
        }
    }
} while (retry);
return false;
void *Worker(void *arg)
{
    struct threadpool * t = (int*)arg;
x:
    sem_wait(t->sem);
    __sync_synchronize();
    t->func(t->arg);
    t->idle = false;
    goto x;
}

bool ThreadPoolNew()
{
    for (int i = 0; i < MAX_WORKERS; i++) {
        sem_init(&threads[i].sem, 0, 0);
        pthread_create(&threads[i].thread, NULL, Worker, &threads[i]);
    }
    return true;
}
sem_post(&threads[i].sem);

Context

StackExchange Code Review Q#114832, answer score: 3

Revisions (0)

No revisions yet.