HiveBrain v1.2.0
Get Started
← Back to all entries
patterngoMinor

Reading and processing a big CSV file

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
readingfilecsvbigandprocessing

Problem

I am trying to write a script to read a CSV file containing 1 million domain names, look up these domain names and store the results in another CSV file. I am trying the following code, but the number of records I am able to parse with time is quite less. I want to optimize the performance of this to be able to parse more records in less amount of time and also keep the CPU utilization under control. What all should I be focusing on or where can I get help regarding the same:

```
package main

import (
"fmt"
"io"
"time"
"net"
"os"
"encoding/csv"
"strings"
)

func main() {

resc, errc := make(chan string), make(chan error)

fmt.Println("start time", time.Now())

concurrency := 1000
sem := make(chan bool, concurrency)

csvfile, err := os.Open("1-million-rows.csv")

if err != nil {
fmt.Println(err)
return
}

defer csvfile.Close()
reader := csv.NewReader(csvfile)
reader.FieldsPerRecord = -1 // see the Reader struct information below
row_count := 0
for {
sem <- true
record, err := reader.Read()
if err == io.EOF {
break
}
row_count += 1
// Stop at EOF.
domain_name := record[1]
go func(domain_name string) {
defer func() { <-sem }()
body, err := lookup(domain_name)
if err != nil {
errc <- err
return
}
resc <- string(body)
}(domain_name)

}

for i := 0; i < cap(sem); i++ {
sem <- true
}

for i := 0; i < row_count; i++ {
select {
case res := <-resc:
fmt.Println(res)
case err := <-errc:
fmt.Println(err)
}
}
}

func lookup(domain_name string) (string, error) {
ip, err := net.LookupIP(domain_name)
if err != nil {
return "", err
}
var ip_addresses []string
for i := range ip{
address := ip[i]
ip_a

Solution

Bugs

Your code does have a problem with the concurrency, and it's a big one.... your buffered channels are not going to be able to complete the system. The issue is the unbuffered resc and errc channels:

resc, errc := make(chan string), make(chan error)


Your code attempts to loop through a million input records, but your concurrency is set relatively low at 1000. After you have looped a thousand times, you have 1000 go-routines all looking to write a result to either the resc or errc channels.... but, because those channels are unbuffered, there needs to be something reading from them before they can be written to...

... but you have to queue up all 1000000 CSV records before you start to read those channels.

Because you never complete the loops (you're blocked writing to the resc channel), you never release a value from the sem either.... thus, you never get to progress in the loop.

Your code is buggy.

A solution would be to put the entire CSV-reading code in to a separate go-routine, so that your can start reading from the resc/errc channels immediately.
struct instead

Instead of having 2 return channels errc and resc, it would be better to return a simple struct:

type dnsLookup struct {
    domain string
    ips    []string
    err    error
}


then you can monitor just a single channel, and can also correlate any results to a name in a better way, and also correlate an error better too.
Concurrency

The use of the sem channel is OK for concurrency, but for a job like this I would instead defer to having a more discrete mechanism. Have a channel that you push domain names on to from the CSV parser, and then have X number of concurrent go-routines reading from that. Closing the channel indicates no-more-data to process. Use wait-groups to monitor completion....

func lookupRoutine(source <-chan string, wg *sync.Waitgroup, results chan dnsLookup) {
    defer wg.Done()
    for name := range source {
        results <- lookup(name)
    }
}


The above loop will process any/all available names from the source (until it is closed), and will then send values to the results channel.

Then, in your main loop, you can have a channel to send CSV parse results to:

names := make(chan string, 1000)
results := make(chan dnsLookup, 1000)

// parse names in a goroutine
go parseCSVData(csvfile, names)

wg := new(sync.WaitGroup)
wg.Add(concurrency)
for I := 0; i < concurrency; i++ {
    // parallel routine for lookups
    go lookupRoutine(names, wg, results)
}

// close the results when all lookup routines complete:
go func() {
    wg.Wait()
    close(results)
}

for r := range results {
    // print the results out here
    ...
}

Code Snippets

resc, errc := make(chan string), make(chan error)
type dnsLookup struct {
    domain string
    ips    []string
    err    error
}
func lookupRoutine(source <-chan string, wg *sync.Waitgroup, results chan dnsLookup) {
    defer wg.Done()
    for name := range source {
        results <- lookup(name)
    }
}
names := make(chan string, 1000)
results := make(chan dnsLookup, 1000)

// parse names in a goroutine
go parseCSVData(csvfile, names)

wg := new(sync.WaitGroup)
wg.Add(concurrency)
for I := 0; i < concurrency; i++ {
    // parallel routine for lookups
    go lookupRoutine(names, wg, results)
}

// close the results when all lookup routines complete:
go func() {
    wg.Wait()
    close(results)
}

for r := range results {
    // print the results out here
    ...
}

Context

StackExchange Code Review Q#126765, answer score: 6

Revisions (0)

No revisions yet.