patterngoModeratependingCanonical
Go Channel Patterns - Fan-out, Fan-in, and Pipeline
Viewed 0 times
go channelsfan-outfan-inpipelineworker poolgoroutinesconcurrency
Problem
Need to process data concurrently in Go but unclear how to structure goroutines and channels for different concurrency patterns.
Solution
Common channel patterns:
// Pipeline: chain processing stages
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// Usage: pipeline
for v := range square(generate(1, 2, 3, 4)) {
fmt.Println(v) // 1, 4, 9, 16
}
// Fan-out: multiple goroutines read from same channel
// Fan-in: merge multiple channels into one
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
// Worker pool with bounded concurrency
func workerPool(jobs <-chan Job, results chan<- Result, numWorkers int) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
results <- process(job)
}
}()
}
go func() {
wg.Wait()
close(results)
}()
}Why
Channels are Go's primary synchronization mechanism. These patterns compose to build complex concurrent systems from simple, testable stages.
Gotchas
- Always ensure channels get closed by the sender, not the receiver
- Unbuffered channels block until both sender and receiver are ready - use buffered channels for decoupling
Context
Building concurrent Go applications
Revisions (0)
No revisions yet.