HiveBrain v1.2.0
Get Started
← Back to all entries
patterngoMinor

Is this a good way of managing parallel go routines when I care about ordering of results?

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
thisroutinesmanagingwaycareorderingparallelaboutgoodresults

Problem

I have a process with a number of stages that need to be completed in sequence. Each stage is largely parallelisable, involving looping over a large data structure and processing each item independently, and collating the results into the data structure used by the next stage.

My approach is like the following:

var wg sync.WaitGroup
var control_routine sync.WaitGroup
stage_complete = make(chan bool)
stage_one_channel := make(chan map[typeFoo][]typeBar, 16)
stage_two_channel := make(chan map[typeBaz][]typeQux, 16)
stage_one_results := make(map[typeFoo][]typeBar)
stage_two_results := make(map[typeBaz][]typeQux)

// A routine to receive results as they arrive from each stage
go func() {
    for  {
        select {
        case new_stage_one_results := <- stage_one_channel:
            // merge new_stage_one_results into stage_one_results...
        case new_stage_two_results := <- stage_two_channel:
            // merge new_stage_two_results into stage_two_results...
        ...
        case stay_alive := <-stage_complete:
            control_routine.Done()
            if !stay_alive {
                return
            }
        }
    }
}()

// Begin stage One
control_routine.Add(1)
for i, some_data := range my_data {
    wg.Add(1)
    go func(i, datum){
        defer wg.Done()
        some_stage_one_results = make(map[typeFoo][]typeBar)
        // do some heavy lifting with some_data which populates some_stage_one_results...
        stage_one_channel <- some_stage_one_results
    }()
}
wg.Wait()
stage_complete <- true
control_routine.Wait()
// End stage one

// More stages much like the first...

// Finally send the control routine a terminal signal and wait for it to finish
stage_complete <- false
control_routine.Wait()


This basically seems to work, but I need to be sure of the following assumption (which some behaviour is causing me to doubt).

Can I be sure that all go routines in stage N will completely finish and the data sent through stage_N_channel

Solution

The answer to "Can I be sure...?" is no, there is nothing in your code that ensures that. That is why results sometimes go missing.

Missing results are a sign of a data race. You can use the Go race detector to find and fix these problems. Data races are categorically dangerous. They can corrupt memory.

You might be able to find and fix the problem by adding more synchronization, but your code is too complex as it is. Since you asked, a pattern you should probably be using is a particular producer-consumer pattern. It has two parts, 1) a producer goroutine that sends data to a channel and closes the channel after sending the last value, and 2) a consumer goroutine with a for loop that ranges over the channel. With this pattern you should be able to dispense with the select statement, the stage complete channel, and the control routine wait group.

Your code that begins for i, some_data := range my_data is what I'm calling a producer. It produces data on the channel stage_one_channel. Following wg.Wait(), you should replace the line stage_complete <- true with close(stage_one_channel).

Replace your for loop containing the select statement with a separate for loop for each stage, something like

for new_stage_one_results := range stage_one_channel {
    ...
}


When this for loop completes, you are sure that all data has been received from stage_one_channel and processed by the loop body. This is the synchronization your code is missing. Your current mechanism only waits for data to be sent, not received and processed.

When the last loop completes, your program can exit. You could retain some of your mechanism for waiting for this, but simpler would be swap what you currently have in your main goroutine for the "consumer" code that you currently have in that first function literal. That is, run your producers in one or more separate goroutines so you can have your consumers--the last one especially--in main. When the last consumer has processed the last data, you know your program is done, with nothing else to wait for.

Finally, let me reinforce, if go run -race says you have a problem, you have a problem.

Code Snippets

for new_stage_one_results := range stage_one_channel {
    ...
}

Context

StackExchange Code Review Q#57472, answer score: 5

Revisions (0)

No revisions yet.