HiveBrain v1.2.0
Get Started
← Back to all entries
snippetpythonMinor

How to improve performace of this Map Reduce function, Python mrjob

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
thismapperformacefunctionimprovereducepythonhowmrjob

Problem

I'm trying to get the most out of this code, so I would understand what should I look for in the future. The code below, works fine, I just want to make it more efficient.

Any suggestions?

from mrjob.job import MRJob
import operator
import re

# append result from each reducer 
output_words = []

class MRSudo(MRJob):

    def init_mapper(self):
        # move list of tuples across mapper
        self.words = []

    def mapper(self, _, line):
        command = line.split()[-1]
        self.words.append((command, 1))

    def final_mapper(self):
        for word_pair in self.words:
            yield word_pair

    def reducer(self, command, count): 
        # append tuples to the list
        output_words.append((command, sum(count)))

    def final_reducer(self):
        # Sort tuples in the list by occurence
        map(operator.itemgetter(1), output_words)
        sorted_words = sorted(output_words, key=operator.itemgetter(1), reverse=True)
        for result in sorted_words:
            yield result

    def steps(self):
        return [self.mr(mapper_init=self.init_mapper,
                        mapper=self.mapper,
                        mapper_final=self.final_mapper,
                        reducer=self.reducer,
                        reducer_final=self.final_reducer)]

if __name__ == '__main__':
    MRSudo.run()

Solution

Since the reduce function in this case is commutative and associative you can use a combiner to pre-aggregate values.

def combiner_count_words(self, word, counts):
    # sum the words we've seen so far
    yield (word, sum(counts))

def steps(self):
    return [self.mr(mapper_init=self.init_mapper,
                    mapper=self.mapper,
                    mapper_final=self.final_mapper,
                    combiner= self.combiner_count_words,
                    reducer=self.reducer,
                    reducer_final=self.final_reducer)]

Code Snippets

def combiner_count_words(self, word, counts):
    # sum the words we've seen so far
    yield (word, sum(counts))

def steps(self):
    return [self.mr(mapper_init=self.init_mapper,
                    mapper=self.mapper,
                    mapper_final=self.final_mapper,
                    combiner= self.combiner_count_words,
                    reducer=self.reducer,
                    reducer_final=self.final_reducer)]

Context

StackExchange Code Review Q#24776, answer score: 2

Revisions (0)

No revisions yet.