patternpythonMinor
Map reduce tester ported from bash to Python
Viewed 0 times
mapbashreducepythonfromtesterported
Problem
My MapReduce tester is clearly ported from Shell, short of
Or does it not matter, i.e.: my "hack" is fine?
test_mapreduce.py
mapper.py
PS: Should I refactor to use the
args=None for line in args or read_input(), what's a better way of importing->testing the function outside of subprocess?Or does it not matter, i.e.: my "hack" is fine?
test_mapreduce.py
from unittest import TestCase, main as unittest_main
from subprocess import check_output as run
from os import path
class TestMapReduce(TestCase):
top_path = ''
map_reduce = lambda self, mapper_name, reducer_name, datafile_name: run(
['python', path.join(self.top_path, reducer_name), # Reduce
run(['sort', # Shuffle, could be replaced with python `sorted`
run(['python', path.join(self.top_path, mapper_name), # Map
path.join(self.top_path, 'data', datafile_name)])])])
@classmethod
def setUpClass(cls):
if not path.isfile('setup.py'):
cls.top_path = path.join('..', '..')
if not path.isfile(path.join(cls.top_path, 'setup.py')):
raise AssertionError("Haven't found right directory to `cd` into")
def test_with_student_test_posts(self):
print self.map_reduce('mapper.py', 'reducer.py', 'student_test_posts.csv')
if __name__ == '__main__':
unittest_main()mapper.py
#!/usr/bin/env python
from fileinput import input as read_input
def mapper():
for line in read_input():
data = line.strip().split('\t')
if len(data) != 6:
continue
date, time, store, item, cost, payment = data
print "{0}\t{1}".format(store, cost)
if __name__ == '__main__':
mapper()PS: Should I refactor to use the
map and reduce inbuilt functions?Solution
It's strange to use
And this hack of calling
Refactoring with
Here's one way to refactor
And here's an example reducer using Python's
This is quite stupid, it just finds the longest string in the sequence.
I hope this helps.
UPDATE
It's a bit difficult to understand what you're trying to do.
My MapReduce tester is clearly ported from Shell, short of args=None for line in args or read_input(), what's a better way of importing->testing the function outside of subprocess?
When I read this I didn't quite get what you're talking about shell. In the code I saw you're calling Python, twice, which is clearly not fine.
Let me try again, to guess what you're trying to do. Maybe you have a Python mapper script, and you have a Python reducer script, which you use in some framework? And you want to write some unit tests to check that these scripts in fact work? I mean the scripts as black boxes, as in, you want to test the complete scripts, rather than the underlying Python functions / classes? I'm really just guessing here, maybe I'm completely wrong.
If this is indeed what you want, then don't. Don't try to test the scripts, test the underlying implementation. If the implementation passes, the scripts should print correct output too. If you want to test the script outputs in addition to the underlying implementation, then you'd be just testing the basic ability to print, which seems rather pointless.
map_reduce = lambda ... to define a method. This is the same, written the common way:def map_reduce(self, mapper_name, reducer_name, datafile_name):
run(
['python', path.join(self.top_path, reducer_name), # Reduce
run(['sort', # Shuffle, could be replaced with python `sorted`
run(['python', path.join(self.top_path, mapper_name), # Map
path.join(self.top_path, 'data', datafile_name)])])])And this hack of calling
python -> sort -> python is not fine at all. Python can certainly sort. Then your pipeline would become python -> python -> python, and at that point it's beyond silly to call subprocesses for this. You should do the whole thing in a single Python process, instead of 3 different processes.Refactoring with
map and reduceHere's one way to refactor
mapper to use Python's map function:def line2cols(line):
return line.strip().split('\t')
def has6cols(cols):
return len(cols) == 6
def cols2out(cols):
return '{}\t{}'.format(*cols)
def mapper():
return map(cols2out, filter(has6cols, map(line2cols, read_input())))And here's an example reducer using Python's
reduce:def reducer(seq):
def f(a, b):
if len(a) > len(b):
return a
return b
return reduce(f, seq, '')This is quite stupid, it just finds the longest string in the sequence.
I hope this helps.
UPDATE
It's a bit difficult to understand what you're trying to do.
My MapReduce tester is clearly ported from Shell, short of args=None for line in args or read_input(), what's a better way of importing->testing the function outside of subprocess?
When I read this I didn't quite get what you're talking about shell. In the code I saw you're calling Python, twice, which is clearly not fine.
Let me try again, to guess what you're trying to do. Maybe you have a Python mapper script, and you have a Python reducer script, which you use in some framework? And you want to write some unit tests to check that these scripts in fact work? I mean the scripts as black boxes, as in, you want to test the complete scripts, rather than the underlying Python functions / classes? I'm really just guessing here, maybe I'm completely wrong.
If this is indeed what you want, then don't. Don't try to test the scripts, test the underlying implementation. If the implementation passes, the scripts should print correct output too. If you want to test the script outputs in addition to the underlying implementation, then you'd be just testing the basic ability to print, which seems rather pointless.
Code Snippets
def map_reduce(self, mapper_name, reducer_name, datafile_name):
run(
['python', path.join(self.top_path, reducer_name), # Reduce
run(['sort', # Shuffle, could be replaced with python `sorted`
run(['python', path.join(self.top_path, mapper_name), # Map
path.join(self.top_path, 'data', datafile_name)])])])def line2cols(line):
return line.strip().split('\t')
def has6cols(cols):
return len(cols) == 6
def cols2out(cols):
return '{}\t{}'.format(*cols)
def mapper():
return map(cols2out, filter(has6cols, map(line2cols, read_input())))def reducer(seq):
def f(a, b):
if len(a) > len(b):
return a
return b
return reduce(f, seq, '')Context
StackExchange Code Review Q#60278, answer score: 4
Revisions (0)
No revisions yet.