HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Map reduce tester ported from bash to Python

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
mapbashreducepythonfromtesterported

Problem

My MapReduce tester is clearly ported from Shell, short of args=None for line in args or read_input(), what's a better way of importing->testing the function outside of subprocess?

Or does it not matter, i.e.: my "hack" is fine?
test_mapreduce.py

from unittest import TestCase, main as unittest_main
from subprocess import check_output as run
from os import path

class TestMapReduce(TestCase):
    top_path = ''
    map_reduce = lambda self, mapper_name, reducer_name, datafile_name: run(
        ['python', path.join(self.top_path, reducer_name),  # Reduce
         run(['sort',  # Shuffle, could be replaced with python `sorted`
              run(['python', path.join(self.top_path, mapper_name),  # Map
                   path.join(self.top_path, 'data', datafile_name)])])])

    @classmethod
    def setUpClass(cls):
        if not path.isfile('setup.py'):
            cls.top_path = path.join('..', '..')
            if not path.isfile(path.join(cls.top_path, 'setup.py')):
                raise AssertionError("Haven't found right directory to `cd` into")

    def test_with_student_test_posts(self):
        print self.map_reduce('mapper.py', 'reducer.py', 'student_test_posts.csv')

if __name__ == '__main__':
    unittest_main()


mapper.py

#!/usr/bin/env python

from fileinput import input as read_input

def mapper():
    for line in read_input():
        data = line.strip().split('\t')

        if len(data) != 6:
            continue

        date, time, store, item, cost, payment = data
        print "{0}\t{1}".format(store, cost)

if __name__ == '__main__':
    mapper()


PS: Should I refactor to use the map and reduce inbuilt functions?

Solution

It's strange to use map_reduce = lambda ... to define a method. This is the same, written the common way:

def map_reduce(self, mapper_name, reducer_name, datafile_name):
    run(
        ['python', path.join(self.top_path, reducer_name),  # Reduce
         run(['sort',  # Shuffle, could be replaced with python `sorted`
              run(['python', path.join(self.top_path, mapper_name),  # Map
                   path.join(self.top_path, 'data', datafile_name)])])])


And this hack of calling python -> sort -> python is not fine at all. Python can certainly sort. Then your pipeline would become python -> python -> python, and at that point it's beyond silly to call subprocesses for this. You should do the whole thing in a single Python process, instead of 3 different processes.

Refactoring with map and reduce

Here's one way to refactor mapper to use Python's map function:

def line2cols(line):
    return line.strip().split('\t')

def has6cols(cols):
    return len(cols) == 6

def cols2out(cols):
    return '{}\t{}'.format(*cols)

def mapper():
    return map(cols2out, filter(has6cols, map(line2cols, read_input())))


And here's an example reducer using Python's reduce:

def reducer(seq):
    def f(a, b):
        if len(a) > len(b):
            return a
        return b
    return reduce(f, seq, '')


This is quite stupid, it just finds the longest string in the sequence.

I hope this helps.

UPDATE

It's a bit difficult to understand what you're trying to do.


My MapReduce tester is clearly ported from Shell, short of args=None for line in args or read_input(), what's a better way of importing->testing the function outside of subprocess?

When I read this I didn't quite get what you're talking about shell. In the code I saw you're calling Python, twice, which is clearly not fine.

Let me try again, to guess what you're trying to do. Maybe you have a Python mapper script, and you have a Python reducer script, which you use in some framework? And you want to write some unit tests to check that these scripts in fact work? I mean the scripts as black boxes, as in, you want to test the complete scripts, rather than the underlying Python functions / classes? I'm really just guessing here, maybe I'm completely wrong.

If this is indeed what you want, then don't. Don't try to test the scripts, test the underlying implementation. If the implementation passes, the scripts should print correct output too. If you want to test the script outputs in addition to the underlying implementation, then you'd be just testing the basic ability to print, which seems rather pointless.

Code Snippets

def map_reduce(self, mapper_name, reducer_name, datafile_name):
    run(
        ['python', path.join(self.top_path, reducer_name),  # Reduce
         run(['sort',  # Shuffle, could be replaced with python `sorted`
              run(['python', path.join(self.top_path, mapper_name),  # Map
                   path.join(self.top_path, 'data', datafile_name)])])])
def line2cols(line):
    return line.strip().split('\t')


def has6cols(cols):
    return len(cols) == 6


def cols2out(cols):
    return '{}\t{}'.format(*cols)


def mapper():
    return map(cols2out, filter(has6cols, map(line2cols, read_input())))
def reducer(seq):
    def f(a, b):
        if len(a) > len(b):
            return a
        return b
    return reduce(f, seq, '')

Context

StackExchange Code Review Q#60278, answer score: 4

Revisions (0)

No revisions yet.