patterncMinor
Async safe threadpool
Viewed 0 times
asyncthreadpoolsafe
Problem
I'm trying the write a threadpool that can safely be added inside a signal handler or in code forked by multithreaded code. Are there any corner cases that would cause this code to fail? What could be done to improve this code?
#include
#include
#include
#include
#include
#include
#include
#define MAX_WORKERS 16
struct threadpool
{
_Atomic(int) idle;
void *(*func)(void*);
sem_t sem;
void * arg;
int id;
pthread_t thread;
};
struct threadpool threads[MAX_WORKERS] = {0};
void *Worker(void *arg)
{
int * t = (int*)arg;
x:
sem_wait(&threads[*t].sem);
__sync_synchronize();
threads[*t].func(threads[*t].arg);
threads[*t].idle = false;
goto x;
}
bool ThreadPoolNew()
{
for (int i = 0; i < MAX_WORKERS; i++) {
threads[i].id = i;
sem_init(&threads[i].sem, 0, 0);
pthread_create(&threads[i].thread, NULL, Worker, &threads[i].id);
}
return true;
}
bool ThreadPoolAddTask(void *(*entry)(void*), void * arg, bool retry)
{
_Atomic(bool) found = false;
x:
for (size_t i = 0; i < MAX_WORKERS; i++) {
if (__sync_val_compare_and_swap(&threads[i].idle, 0, 1) == 0) {
threads[i].func = entry;
threads[i].arg = arg;
__sync_synchronize();
sem_post(&threads[threads[i].id].sem);
found = true;
break;
}
}
if (found == false && retry == true) {
goto x;
}
return found;
}
void * t1(void *a)
{
int i = 0;
while (true) {
i += 2;
printf("%i\n", i);
sleep(5);
}
}
void * t2(void *a)
{
int i = 0;
while (true) {
i += 3;
printf("%i\n", i);
sleep(5);
}
}
//TESTER CODE
//gcc -pthread list.c
int main(void)
{
ThreadPoolNew();
ThreadPoolAddTask(t2, NULL, 1);
ThreadPoolAddTask(t1, NULL, 1);
sleep(60);
}Solution
-
idle
As coded,
-
Control flow
Unless I am missing something crucial, there is no need to
Similarly,
As a side note,
-
Worker argument
I recommend passing a pointer to a worker instead of pointer to its index. It simplifies both
and removes the need for
-
You may want to
idle
As coded,
idle is false when thread is in fact idle. Either rename it to working/running/whatever active or swap the true/false values.-
Control flow
Unless I am missing something crucial, there is no need to
goto neither in Worker nor in ThreadPoolAddTask.Similarly,
found seems redundant:do {
for (i = 0, i < MAX_WORKERS; i++) {
if (...) {
....;
return true;
}
}
} while (retry);
return false;As a side note,
retry shouldn't be handled here. It is a caller business logic when and how to retry. I don't see any value added.-
Worker argument
I recommend passing a pointer to a worker instead of pointer to its index. It simplifies both
ThreadPoolNew and Worker to:void *Worker(void *arg)
{
struct threadpool * t = (int*)arg;
x:
sem_wait(t->sem);
__sync_synchronize();
t->func(t->arg);
t->idle = false;
goto x;
}
bool ThreadPoolNew()
{
for (int i = 0; i < MAX_WORKERS; i++) {
sem_init(&threads[i].sem, 0, 0);
pthread_create(&threads[i].thread, NULL, Worker, &threads[i]);
}
return true;
}and removes the need for
thread.id. BTW, notice that assert(threads[i].id == i) always holds, so you may post your semaphore assem_post(&threads[i].sem);-
You may want to
__sync_synchronize() after announcing the thread idle (there is no barrier between setting idle and subsequent sem_wait).Code Snippets
do {
for (i = 0, i < MAX_WORKERS; i++) {
if (...) {
....;
return true;
}
}
} while (retry);
return false;void *Worker(void *arg)
{
struct threadpool * t = (int*)arg;
x:
sem_wait(t->sem);
__sync_synchronize();
t->func(t->arg);
t->idle = false;
goto x;
}
bool ThreadPoolNew()
{
for (int i = 0; i < MAX_WORKERS; i++) {
sem_init(&threads[i].sem, 0, 0);
pthread_create(&threads[i].thread, NULL, Worker, &threads[i]);
}
return true;
}sem_post(&threads[i].sem);Context
StackExchange Code Review Q#114832, answer score: 3
Revisions (0)
No revisions yet.