patterngoMinor
Concurrency limit map in Go
Viewed 0 times
concurrencylimitmap
Problem
Task: per host concurrency limits for web crawler (
I considered
I started with Semaphore using sync. Cond Wait/Signal (desired behavior of semaphore is well described in comments). Then I created
To remove unused semaphores from map I use separate counter that is stored in semaphore struct and modified only inside map lock. Its value sometimes differs from
Essentially it works, test passes, but I would really appreciate any feedback on this approach.
Code link: https://gist.github.com/4130335
```
// Package limitmap provides map of semaphores to limit concurrency against some string keys.
//
// Usage:
// limits := NewLimitMap()
// func process(url url.URL, rch chan http.Response) {
// // At most 2 concurrent requests to each host.
// limits.Acquire(url.Host, 2)
// defer limits.Release(url.Host)
// r, err := http.Get(url.String())
// rch l.max {
panic("oia")
}
}
func (m *LimitMap) Release(key string) {
m.lk.Lock()
l, ok := m.limits[key]
if !ok {
panic("LimitMap: key not in map. Possible reason: Release without Acquire.")
}
l.refs--
if l.refs l.max {
panic("oir")
}
m.wg.Done()
}
// Wait until all released.
func (m *LimitMap) Wait() {
m.wg.Wait()
}
func (m *LimitMap) Size() (keys int,
map[string]Semaphore).I considered
chan struct{} (chan bool) approach, but essentially it would not make code a lot easier because the main hurdle is to delete unused keys from map. And then semaphore takes constant memory - good property - as opposed to chan which grows with each "acquired limit".I started with Semaphore using sync. Cond Wait/Signal (desired behavior of semaphore is well described in comments). Then I created
map[string]*Semaphore with Mutex around all map operations and after semaphore is on hands, Acquire/Release it - that would block caller if needed but not block other access to map.To remove unused semaphores from map I use separate counter that is stored in semaphore struct and modified only inside map lock. Its value sometimes differs from
semaphore.value. When this counter goes to zero I know there are no goroutines that have pointer to semaphore from map except maybe one that is going to do final Release() now, so it's safe to delete key from map to preserve memory.Essentially it works, test passes, but I would really appreciate any feedback on this approach.
Code link: https://gist.github.com/4130335
```
// Package limitmap provides map of semaphores to limit concurrency against some string keys.
//
// Usage:
// limits := NewLimitMap()
// func process(url url.URL, rch chan http.Response) {
// // At most 2 concurrent requests to each host.
// limits.Acquire(url.Host, 2)
// defer limits.Release(url.Host)
// r, err := http.Get(url.String())
// rch l.max {
panic("oia")
}
}
func (m *LimitMap) Release(key string) {
m.lk.Lock()
l, ok := m.limits[key]
if !ok {
panic("LimitMap: key not in map. Possible reason: Release without Acquire.")
}
l.refs--
if l.refs l.max {
panic("oir")
}
m.wg.Done()
}
// Wait until all released.
func (m *LimitMap) Wait() {
m.wg.Wait()
}
func (m *LimitMap) Size() (keys int,
Solution
For my initial implementation, I would personally apply the common semaphore pattern using buffered channels for controlling the number of running goroutines. The simplest application of which looks like the code below (also available in this gist):
If you enhanced this example a bit to use one channel per map key, where you then pass the channel in to the worker function to be read from, I think it'll work for you.
If you're curious why I decided to use a
func doWork(s string, ch <-chan struct{}, wg *sync.WaitGroup) {
defer func() {
<-ch // free up space in the semaphore
wg.Done() // tell the WaitGroup we're finished
}()
fmt.Println(s)
}
func execute(work []string) {
wg := &sync.WaitGroup{}
sema := make(chan struct{}, 10) // concurrency limit of 10
for _, url := range work {
// if there are 10 items in flight, channel is full / will block
// unblocks when a worker finishes
sema <- struct{}{}
wg.Add(1)
go doWork(url, sema)
}
// close the channel as nothing else should write
close(sema)
// wait for all goroutines to finish
wg.Wait()
}If you enhanced this example a bit to use one channel per map key, where you then pass the channel in to the worker function to be read from, I think it'll work for you.
If you're curious why I decided to use a
struct{} type for the channel, Dave Cheney has a good post explaining it here.Code Snippets
func doWork(s string, ch <-chan struct{}, wg *sync.WaitGroup) {
defer func() {
<-ch // free up space in the semaphore
wg.Done() // tell the WaitGroup we're finished
}()
fmt.Println(s)
}
func execute(work []string) {
wg := &sync.WaitGroup{}
sema := make(chan struct{}, 10) // concurrency limit of 10
for _, url := range work {
// if there are 10 items in flight, channel is full / will block
// unblocks when a worker finishes
sema <- struct{}{}
wg.Add(1)
go doWork(url, sema)
}
// close the channel as nothing else should write
close(sema)
// wait for all goroutines to finish
wg.Wait()
}Context
StackExchange Code Review Q#15954, answer score: 4
Revisions (0)
No revisions yet.