HiveBrain v1.2.0
Get Started
← Back to all entries
patterngoMinor

Concurrency limit map in Go

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
concurrencylimitmap

Problem

Task: per host concurrency limits for web crawler (map[string]Semaphore).

I considered chan struct{} (chan bool) approach, but essentially it would not make code a lot easier because the main hurdle is to delete unused keys from map. And then semaphore takes constant memory - good property - as opposed to chan which grows with each "acquired limit".

I started with Semaphore using sync. Cond Wait/Signal (desired behavior of semaphore is well described in comments). Then I created map[string]*Semaphore with Mutex around all map operations and after semaphore is on hands, Acquire/Release it - that would block caller if needed but not block other access to map.

To remove unused semaphores from map I use separate counter that is stored in semaphore struct and modified only inside map lock. Its value sometimes differs from semaphore.value. When this counter goes to zero I know there are no goroutines that have pointer to semaphore from map except maybe one that is going to do final Release() now, so it's safe to delete key from map to preserve memory.

Essentially it works, test passes, but I would really appreciate any feedback on this approach.

Code link: https://gist.github.com/4130335

```
// Package limitmap provides map of semaphores to limit concurrency against some string keys.
//
// Usage:
// limits := NewLimitMap()
// func process(url url.URL, rch chan http.Response) {
// // At most 2 concurrent requests to each host.
// limits.Acquire(url.Host, 2)
// defer limits.Release(url.Host)
// r, err := http.Get(url.String())
// rch l.max {
panic("oia")
}
}

func (m *LimitMap) Release(key string) {
m.lk.Lock()
l, ok := m.limits[key]
if !ok {
panic("LimitMap: key not in map. Possible reason: Release without Acquire.")
}
l.refs--
if l.refs l.max {
panic("oir")
}
m.wg.Done()
}

// Wait until all released.
func (m *LimitMap) Wait() {
m.wg.Wait()
}

func (m *LimitMap) Size() (keys int,

Solution

For my initial implementation, I would personally apply the common semaphore pattern using buffered channels for controlling the number of running goroutines. The simplest application of which looks like the code below (also available in this gist):

func doWork(s string, ch <-chan struct{}, wg *sync.WaitGroup) {            
  defer func() {            
    <-ch // free up space in the semaphore            
    wg.Done() // tell the WaitGroup we're finished            
  }()            

  fmt.Println(s)             
}            

func execute(work []string) {            
  wg := &sync.WaitGroup{}            
  sema := make(chan struct{}, 10) // concurrency limit of 10            

  for _, url := range work {            
    // if there are 10 items in flight, channel is full / will block            
    // unblocks when a worker finishes            
    sema <- struct{}{}            

    wg.Add(1)            
    go doWork(url, sema)            
  }            

  // close the channel as nothing else should write            
  close(sema)            

  // wait for all goroutines to finish            
  wg.Wait()            
}


If you enhanced this example a bit to use one channel per map key, where you then pass the channel in to the worker function to be read from, I think it'll work for you.

If you're curious why I decided to use a struct{} type for the channel, Dave Cheney has a good post explaining it here.

Code Snippets

func doWork(s string, ch <-chan struct{}, wg *sync.WaitGroup) {            
  defer func() {            
    <-ch // free up space in the semaphore            
    wg.Done() // tell the WaitGroup we're finished            
  }()            

  fmt.Println(s)             
}            

func execute(work []string) {            
  wg := &sync.WaitGroup{}            
  sema := make(chan struct{}, 10) // concurrency limit of 10            

  for _, url := range work {            
    // if there are 10 items in flight, channel is full / will block            
    // unblocks when a worker finishes            
    sema <- struct{}{}            

    wg.Add(1)            
    go doWork(url, sema)            
  }            

  // close the channel as nothing else should write            
  close(sema)            

  // wait for all goroutines to finish            
  wg.Wait()            
}

Context

StackExchange Code Review Q#15954, answer score: 4

Revisions (0)

No revisions yet.