HiveBrain v1.2.0
Get Started
← Back to all entries
patterngoMinor

Send multiple broadcast to multiple listeners

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
listenersbroadcastsendmultiple

Problem

I want to send multiple broadcast to multiple listeners in go.

I can't send data through a channel, because only one listener will get it, except for the close signal (second "multiple" of my title).

However once a channel is closed, I can't send any other signal (first "multiple" of my title).

Hence I wrote the following code (explanation at the end of this post):

package main

import "sync"

// Broadcast allows to send a signal to all listeners
type Broadcast struct {
    lock sync.RWMutex
    ch   chan struct{}
}

// NewBroadcast creates a new broadcast
func NewBroadcast() *Broadcast {
    return &Broadcast{
        lock: sync.RWMutex{},
        ch:   make(chan struct{}),
    }
}

// Receive a channel on which the next (close) signal will be sent
func (b *Broadcast) Receive() <-chan struct{} {
    b.lock.RLock()
    defer b.lock.RUnlock()
    return b.ch
}

// Send a signal to all listeners
func (b *Broadcast) Send() {
    b.lock.Lock()
    defer b.lock.Unlock()
    close(b.ch)
    b.ch = make(chan struct{})
}


Usage example:

func main() {

    b := NewBroadcast()

    done := make(chan bool)
    quit := make(chan struct{})

    go func() {
        <-b.Receive()
        done <- true
    }()

    go func() {
        <-b.Receive()
        <-b.Receive()
        <-b.Receive()
        <-b.Receive()
        done <- true
    }()

    go func() {
        <-done
        <-done
        close(done)
        close(quit)
    }()

    func() {
        for {
            select {
            case <-quit:
                return
            case <-time.After(1 * time.Millisecond):
                b.Send()
            }
        }
    }()

}


I simply close the "current" channel to send a broadcast to all listeners and I create a new channel for the next Send. A ReadWriteMutex is added to prevent concurrency issues.

I would appreciate any comment regarding the code, the style and anything else!

Solution

Concept

The basic concept of having a channel that you hand out to all receivers, and then close that channel as a signal, is a good one. The way you use that concept in your example usage, is a bit contrived, and I don't think it represents a real use-case, but it's good enough for an idea of how it could be used.
Communication

In Go, the best practice is to use "communication" over "locks". This is documented in the sync package as:

Other than the Once and WaitGroup types, most are intended for use by low-level library routines. Higher-level synchronization is better done via channels and communication.

In your code, what does this mean? It means the Mutex.Lock() code is not great, and that you should find a way to accomplish this with communication instead.

Further, there's no need for a RWMutex in your code, a simpler Mutex would be better...

For "Communication" in go, you need a go routine to communicate with. The go-routine is linked to the Broadcast instance, and it's the only routine that knows the Broadcast state, and the only go-routine that can change that state.

Now, you just need to communicate state changes to/from that go-routine using channels.

In code, it would look like:

// Broadcast allows to send a signal to all listeners
type Broadcast struct {
    poke      chan bool
    receivers chan chan struct{}
    done      chan bool
}

// NewBroadcast creates a new broadcast
func NewBroadcast() *Broadcast {
    bc := &Broadcast{
        poke:      make(chan bool),
        receivers: make(chan chan struct{}),
        done:      make(chan bool),
    }
    go bc.listen()
    return bc
}

func (b *Broadcast) listen() {

    defer close(b.done)

    notify := make(chan struct{})
    for {
        select {
        case ok := <-b.poke:
            if !ok {
                return
            }
            // all current receivers get a closed channel
            close(notify)
            // set up next batch of receivers.
            notify = make(chan struct{})
        case b.receivers <- notify:
            // great. A Receiver has our channel
        }
    }
}

// Close makes Broadcast implement the Closer interface
func (b *Broadcast) Close() error {
    select {
    case b.poke <- false:
    case <-b.done:
    }
    return nil
}

// Receive a channel on which the next (close) signal will be sent
func (b *Broadcast) Receive() <-chan struct{} {
    select {
    case r := <-b.receivers:
        return r
    case <-b.done:
        // FIXME - should probably return an error.
        return nil
    }
}

// Send a signal to all current Receivers
func (b *Broadcast) Send() {
    select {
    case b.poke <- true:
    case <-b.done:
        // FIXME - attempt to notify on a closed broadcaster... oops. Should probably return an error
    }
}


What's important in the above code? There is no sync usage, and there's a go-routine listen() for the Broadcast that manages the state. It listens for new Receivers and it listens for notification events. It is single-threaded, and as a consequence, it does not need any locking.

State changes are communicated via the receiver, poke and done channels.

Note that it now manages resources beyond the simple struct that it was, it's now got a go-routine you don't want to leak, so it should implement the Close() method, and you would do something like:

broadcaster := NewBroadcast()
defer broadcaster.Close()


It's very important to note that all the channels are unbuffered. This allows for a clean and reliable communication path, you can't have stale receivers, etc.

Code Snippets

// Broadcast allows to send a signal to all listeners
type Broadcast struct {
    poke      chan bool
    receivers chan chan struct{}
    done      chan bool
}

// NewBroadcast creates a new broadcast
func NewBroadcast() *Broadcast {
    bc := &Broadcast{
        poke:      make(chan bool),
        receivers: make(chan chan struct{}),
        done:      make(chan bool),
    }
    go bc.listen()
    return bc
}

func (b *Broadcast) listen() {

    defer close(b.done)

    notify := make(chan struct{})
    for {
        select {
        case ok := <-b.poke:
            if !ok {
                return
            }
            // all current receivers get a closed channel
            close(notify)
            // set up next batch of receivers.
            notify = make(chan struct{})
        case b.receivers <- notify:
            // great. A Receiver has our channel
        }
    }
}

// Close makes Broadcast implement the Closer interface
func (b *Broadcast) Close() error {
    select {
    case b.poke <- false:
    case <-b.done:
    }
    return nil
}

// Receive a channel on which the next (close) signal will be sent
func (b *Broadcast) Receive() <-chan struct{} {
    select {
    case r := <-b.receivers:
        return r
    case <-b.done:
        // FIXME - should probably return an error.
        return nil
    }
}

// Send a signal to all current Receivers
func (b *Broadcast) Send() {
    select {
    case b.poke <- true:
    case <-b.done:
        // FIXME - attempt to notify on a closed broadcaster... oops. Should probably return an error
    }
}
broadcaster := NewBroadcast()
defer broadcaster.Close()

Context

StackExchange Code Review Q#159692, answer score: 6

Revisions (0)

No revisions yet.