patterngoMinor
Send multiple broadcast to multiple listeners
Viewed 0 times
listenersbroadcastsendmultiple
Problem
I want to send multiple broadcast to multiple listeners in go.
I can't send data through a channel, because only one listener will get it, except for the
However once a channel is closed, I can't send any other signal (first "multiple" of my title).
Hence I wrote the following code (explanation at the end of this post):
Usage example:
I simply close the "current" channel to send a broadcast to all listeners and I create a new channel for the next
I would appreciate any comment regarding the code, the style and anything else!
I can't send data through a channel, because only one listener will get it, except for the
close signal (second "multiple" of my title).However once a channel is closed, I can't send any other signal (first "multiple" of my title).
Hence I wrote the following code (explanation at the end of this post):
package main
import "sync"
// Broadcast allows to send a signal to all listeners
type Broadcast struct {
lock sync.RWMutex
ch chan struct{}
}
// NewBroadcast creates a new broadcast
func NewBroadcast() *Broadcast {
return &Broadcast{
lock: sync.RWMutex{},
ch: make(chan struct{}),
}
}
// Receive a channel on which the next (close) signal will be sent
func (b *Broadcast) Receive() <-chan struct{} {
b.lock.RLock()
defer b.lock.RUnlock()
return b.ch
}
// Send a signal to all listeners
func (b *Broadcast) Send() {
b.lock.Lock()
defer b.lock.Unlock()
close(b.ch)
b.ch = make(chan struct{})
}Usage example:
func main() {
b := NewBroadcast()
done := make(chan bool)
quit := make(chan struct{})
go func() {
<-b.Receive()
done <- true
}()
go func() {
<-b.Receive()
<-b.Receive()
<-b.Receive()
<-b.Receive()
done <- true
}()
go func() {
<-done
<-done
close(done)
close(quit)
}()
func() {
for {
select {
case <-quit:
return
case <-time.After(1 * time.Millisecond):
b.Send()
}
}
}()
}I simply close the "current" channel to send a broadcast to all listeners and I create a new channel for the next
Send. A ReadWriteMutex is added to prevent concurrency issues.I would appreciate any comment regarding the code, the style and anything else!
Solution
Concept
The basic concept of having a channel that you hand out to all receivers, and then close that channel as a signal, is a good one. The way you use that concept in your example usage, is a bit contrived, and I don't think it represents a real use-case, but it's good enough for an idea of how it could be used.
Communication
In Go, the best practice is to use "communication" over "locks". This is documented in the
Other than the Once and WaitGroup types, most are intended for use by low-level library routines. Higher-level synchronization is better done via channels and communication.
In your code, what does this mean? It means the
Further, there's no need for a
For "Communication" in go, you need a go routine to communicate with. The go-routine is linked to the
Now, you just need to communicate state changes to/from that go-routine using channels.
In code, it would look like:
What's important in the above code? There is no
State changes are communicated via the
Note that it now manages resources beyond the simple struct that it was, it's now got a go-routine you don't want to leak, so it should implement the
It's very important to note that all the channels are unbuffered. This allows for a clean and reliable communication path, you can't have stale receivers, etc.
The basic concept of having a channel that you hand out to all receivers, and then close that channel as a signal, is a good one. The way you use that concept in your example usage, is a bit contrived, and I don't think it represents a real use-case, but it's good enough for an idea of how it could be used.
Communication
In Go, the best practice is to use "communication" over "locks". This is documented in the
sync package as:Other than the Once and WaitGroup types, most are intended for use by low-level library routines. Higher-level synchronization is better done via channels and communication.
In your code, what does this mean? It means the
Mutex.Lock() code is not great, and that you should find a way to accomplish this with communication instead.Further, there's no need for a
RWMutex in your code, a simpler Mutex would be better...For "Communication" in go, you need a go routine to communicate with. The go-routine is linked to the
Broadcast instance, and it's the only routine that knows the Broadcast state, and the only go-routine that can change that state.Now, you just need to communicate state changes to/from that go-routine using channels.
In code, it would look like:
// Broadcast allows to send a signal to all listeners
type Broadcast struct {
poke chan bool
receivers chan chan struct{}
done chan bool
}
// NewBroadcast creates a new broadcast
func NewBroadcast() *Broadcast {
bc := &Broadcast{
poke: make(chan bool),
receivers: make(chan chan struct{}),
done: make(chan bool),
}
go bc.listen()
return bc
}
func (b *Broadcast) listen() {
defer close(b.done)
notify := make(chan struct{})
for {
select {
case ok := <-b.poke:
if !ok {
return
}
// all current receivers get a closed channel
close(notify)
// set up next batch of receivers.
notify = make(chan struct{})
case b.receivers <- notify:
// great. A Receiver has our channel
}
}
}
// Close makes Broadcast implement the Closer interface
func (b *Broadcast) Close() error {
select {
case b.poke <- false:
case <-b.done:
}
return nil
}
// Receive a channel on which the next (close) signal will be sent
func (b *Broadcast) Receive() <-chan struct{} {
select {
case r := <-b.receivers:
return r
case <-b.done:
// FIXME - should probably return an error.
return nil
}
}
// Send a signal to all current Receivers
func (b *Broadcast) Send() {
select {
case b.poke <- true:
case <-b.done:
// FIXME - attempt to notify on a closed broadcaster... oops. Should probably return an error
}
}What's important in the above code? There is no
sync usage, and there's a go-routine listen() for the Broadcast that manages the state. It listens for new Receivers and it listens for notification events. It is single-threaded, and as a consequence, it does not need any locking.State changes are communicated via the
receiver, poke and done channels.Note that it now manages resources beyond the simple struct that it was, it's now got a go-routine you don't want to leak, so it should implement the
Close() method, and you would do something like:broadcaster := NewBroadcast()
defer broadcaster.Close()It's very important to note that all the channels are unbuffered. This allows for a clean and reliable communication path, you can't have stale receivers, etc.
Code Snippets
// Broadcast allows to send a signal to all listeners
type Broadcast struct {
poke chan bool
receivers chan chan struct{}
done chan bool
}
// NewBroadcast creates a new broadcast
func NewBroadcast() *Broadcast {
bc := &Broadcast{
poke: make(chan bool),
receivers: make(chan chan struct{}),
done: make(chan bool),
}
go bc.listen()
return bc
}
func (b *Broadcast) listen() {
defer close(b.done)
notify := make(chan struct{})
for {
select {
case ok := <-b.poke:
if !ok {
return
}
// all current receivers get a closed channel
close(notify)
// set up next batch of receivers.
notify = make(chan struct{})
case b.receivers <- notify:
// great. A Receiver has our channel
}
}
}
// Close makes Broadcast implement the Closer interface
func (b *Broadcast) Close() error {
select {
case b.poke <- false:
case <-b.done:
}
return nil
}
// Receive a channel on which the next (close) signal will be sent
func (b *Broadcast) Receive() <-chan struct{} {
select {
case r := <-b.receivers:
return r
case <-b.done:
// FIXME - should probably return an error.
return nil
}
}
// Send a signal to all current Receivers
func (b *Broadcast) Send() {
select {
case b.poke <- true:
case <-b.done:
// FIXME - attempt to notify on a closed broadcaster... oops. Should probably return an error
}
}broadcaster := NewBroadcast()
defer broadcaster.Close()Context
StackExchange Code Review Q#159692, answer score: 6
Revisions (0)
No revisions yet.