HiveBrain v1.2.0
Get Started
← Back to all entries
snippetgoModeratepending

Go concurrent worker pool pattern

Submitted by: @anonymous··
0
Viewed 0 times
worker poolgoroutine poolerrgroupsemaphorefan-out fan-in
linuxmacos

Problem

Need to process a queue of tasks concurrently with a fixed number of workers. Unbounded goroutines cause resource exhaustion. Need backpressure and clean shutdown.

Solution

Worker pool with configurable concurrency, error collection, and context-based cancellation.

Code Snippets

Worker pool with context cancellation and result collection

package main

import (
	"context"
	"fmt"
	"sync"
)

type Task struct {
	ID   int
	Data string
}

type Result struct {
	TaskID int
	Output string
	Err    error
}

func WorkerPool(ctx context.Context, tasks []Task, workers int) []Result {
	taskCh := make(chan Task, len(tasks))
	resultCh := make(chan Result, len(tasks))

	// Start workers
	var wg sync.WaitGroup
	for i := 0; i < workers; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for task := range taskCh {
				select {
				case <-ctx.Done():
					return
				default:
					// Process task
					resultCh <- Result{
						TaskID: task.ID,
						Output: fmt.Sprintf("processed: %s", task.Data),
					}
				}
			}
		}()
	}

	// Send tasks
	for _, t := range tasks {
		taskCh <- t
	}
	close(taskCh)

	// Wait and collect
	go func() { wg.Wait(); close(resultCh) }()

	var results []Result
	for r := range resultCh {
		results = append(results, r)
	}
	return results
}

Revisions (0)

No revisions yet.