patterngoMinor
Concurrent task pool
Viewed 0 times
poolconcurrenttask
Problem
My use case is to dispatch multiple long-running tasks to execute concurrently. The expectation is that the tasks will be IO-bound (e.g. network requests), but importantly each task is different. This is not a question about data parallelism.
I've written a simple pool type that wraps a
Here is an example of how it might be used:
This utility seems pretty
I've written a simple pool type that wraps a
sync.WaitGroup and collects any errors that occur.package gopool
import "sync"
// GoPool is a pool of goroutines with error reporting.
type GoPool struct {
waitGroup sync.WaitGroup
errors chan error
}
// New initializes a new GoPool.
func New() *GoPool {
return &GoPool{
waitGroup: sync.WaitGroup{},
errors: make(chan error),
}
}
// Run runs the specified function in a goroutine, collecting any errors that might occur.
func (p *GoPool) Run(goFuncs ...func() error) {
p.waitGroup.Add(len(goFuncs))
for _, goFunc := range goFuncs {
goFunc := goFunc
go func() {
defer p.waitGroup.Done()
err := goFunc()
if err != nil {
p.errors <- err
}
}()
}
}
// Wait waits for all specified tasks in the GoPoo to complete, and returns any collected errors that occurred.
func (p *GoPool) Wait() (errors []error) {
go func() {
p.waitGroup.Wait()
close(p.errors)
}()
for err := range p.errors {
errors = append(errors, err)
}
return errors
}Here is an example of how it might be used:
func DoThings() {
var thingA int
var thingB string
var thingC *Thing
pool := gopool.New()
pool.Run(func() (err error) {
thingA, err = FetchThingA()
return err
})
pool.Run(func() (err error) {
thingB = FetchThingB()
return nil
})
pool.Run(func() (err error) {
thingC, err = FetchThingC()
return err
})
errs := pool.Wait()
if len(errs) > 0 {
// Handle errs
}
// Use each of the fetched things
}This utility seems pretty
Solution
Your concept is an interesting one, but it has a number of concerns in it.
The most significant issue I can see is related to the errors that can be returned. There's no way to match an error to the function that caused it.
Other concerns are that your error channel is unbuffered, which could potentially leave go-routines unnecessarily hanging around waiting for the channel to be drained.
Still, even if that's OK, there's still a usability issue. My concern here is about the requirement to manage a relatively sensitive life-cycle outside the Pool code. Your user has to know that they create a new Pool, then Run a few items, and then Wait. The order cannot be changed, and you cannot reuse a pool after Waiting for it.
My feeling is that you could use a much simpler mechanism of having a single mechanism that "does it all". The user's code would look something like:
There's no notion in the above of the internal implementation of the pool. That's a good thing. You use basic Go mechanisms like slices to aggregate functions to run, and you use basic go parameter manipulation to expand the slice for entry.
I played around a bit, and settled on this function:
The Run method thus encapsulates all the sequencing logic. There's no need for an external struct. There's no need for an error channel, and the errors are indexed relative to the input functions (so you can associate an error to a function). Also, the function returns a boolean "OK" flag to be used in a similar way as other Go-like functions.
The Use-Cases I tested were things like:
Note that you can see the above all running in the playground: https://play.golang.org/p/9_MrR6gvj2
The most significant issue I can see is related to the errors that can be returned. There's no way to match an error to the function that caused it.
Other concerns are that your error channel is unbuffered, which could potentially leave go-routines unnecessarily hanging around waiting for the channel to be drained.
Still, even if that's OK, there's still a usability issue. My concern here is about the requirement to manage a relatively sensitive life-cycle outside the Pool code. Your user has to know that they create a new Pool, then Run a few items, and then Wait. The order cannot be changed, and you cannot reuse a pool after Waiting for it.
My feeling is that you could use a much simpler mechanism of having a single mechanism that "does it all". The user's code would look something like:
toRun := []func() error{}
toRun = append(toRun, func() error{fmt.Println("one"); return nil});
....
errors := pool.Run(toRun...)There's no notion in the above of the internal implementation of the pool. That's a good thing. You use basic Go mechanisms like slices to aggregate functions to run, and you use basic go parameter manipulation to expand the slice for entry.
I played around a bit, and settled on this function:
// Run will run all the supplied functions in separate Go-routines, and return any errors in the resulting slice,
// and also return a boolen ok indicator which will be true if all goroutines succeeded.
func Run(toRun ...func() error) ([]error, bool) {
count := len(toRun)
var wg sync.WaitGroup
wg.Add(count)
errors := make([]error, count, count)
ok := true
for i, fn := range toRun {
// create a closure for the error index and the function
go func(index int, fn func() error) {
defer wg.Done()
errors[index] = fn()
}(i, fn)
}
wg.Wait()
for _, e := range errors {
if e != nil {
ok = false
}
}
return errors, ok
}The Run method thus encapsulates all the sequencing logic. There's no need for an external struct. There's no need for an error channel, and the errors are indexed relative to the input functions (so you can associate an error to a function). Also, the function returns a boolean "OK" flag to be used in a similar way as other Go-like functions.
The Use-Cases I tested were things like:
func main() {
runs := []func() error{}
runs = append(runs, func() error {
fmt.Println("one")
return nil
})
runs = append(runs, func() error {
fmt.Println("two")
return nil
})
runs = append(runs, func() error {
fmt.Println("three")
return nil
})
fmt.Println("Run one")
if errs, ok := Run(runs...); !ok {
fmt.Printf("Some errors: %v\n", errs)
} else {
fmt.Println("OK")
}
runs = append(runs, func() error {
fmt.Println("fail")
return errors.New("Failed")
})
fmt.Println("Run Two")
if errs, ok := Run(runs...); !ok {
fmt.Printf("Some errors: %v\n", errs)
} else {
fmt.Println("OK")
}
}Note that you can see the above all running in the playground: https://play.golang.org/p/9_MrR6gvj2
Code Snippets
toRun := []func() error{}
toRun = append(toRun, func() error{fmt.Println("one"); return nil});
....
errors := pool.Run(toRun...)// Run will run all the supplied functions in separate Go-routines, and return any errors in the resulting slice,
// and also return a boolen ok indicator which will be true if all goroutines succeeded.
func Run(toRun ...func() error) ([]error, bool) {
count := len(toRun)
var wg sync.WaitGroup
wg.Add(count)
errors := make([]error, count, count)
ok := true
for i, fn := range toRun {
// create a closure for the error index and the function
go func(index int, fn func() error) {
defer wg.Done()
errors[index] = fn()
}(i, fn)
}
wg.Wait()
for _, e := range errors {
if e != nil {
ok = false
}
}
return errors, ok
}func main() {
runs := []func() error{}
runs = append(runs, func() error {
fmt.Println("one")
return nil
})
runs = append(runs, func() error {
fmt.Println("two")
return nil
})
runs = append(runs, func() error {
fmt.Println("three")
return nil
})
fmt.Println("Run one")
if errs, ok := Run(runs...); !ok {
fmt.Printf("Some errors: %v\n", errs)
} else {
fmt.Println("OK")
}
runs = append(runs, func() error {
fmt.Println("fail")
return errors.New("Failed")
})
fmt.Println("Run Two")
if errs, ok := Run(runs...); !ok {
fmt.Printf("Some errors: %v\n", errs)
} else {
fmt.Println("OK")
}
}Context
StackExchange Code Review Q#144516, answer score: 3
Revisions (0)
No revisions yet.