HiveBrain v1.2.0
Get Started
← Back to all entries
patternMinor

Newbie Map/Reduce word frequency counter

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
mapfrequencyreducewordnewbiecounter

Problem

I present you my first ever module to learn some Erlang awaiting your scrutiny . It does a word frequency count using map/reduce . I'm an Erlang noob, so I would very much like feedback about:

Unneeded verbosity / code

Erlang 'style' mistakes

Things that can be done more elegantly

```
-module(main).

-export([map/1]).
-export([main/0]).
-export([reducer_wrapper/1]).
-export([reducer/2]).

-define(NUMBER_OF_PARTITIONS, 100).

main() ->
Lines = readlines('pg20417.txt') ,
lists:foreach( fun main:map/1, Lines ),
start_reduce().

%% signals all reducer_wrappers that all emits are received, and the actual reduce can be started
start_reduce() ->
lists:foreach( fun(PartitionNr) -> get_reducer(PartitionNr) ! start_reduce end, lists:seq(1, ?NUMBER_OF_PARTITIONS) ).

%% emits every word with cnt = 1
map(Line) ->
Words = string:tokens( string:to_lower(Line), " .,;:\n()?"),
lists:foreach ( fun(Word) -> get_reducer( Word, ?NUMBER_OF_PARTITIONS ) ! {emit, {Word, [1]}} end, Words ).

%% returns the reducer Pid
get_reducer( PartitionNr ) ->
ProcessName = list_to_atom( lists:concat( [PartitionNr, '_reducer'] ) ),
case whereis( ProcessName ) of
undefined ->
Pid = spawn(?MODULE, reducer_wrapper, [[]]),
register( ProcessName, Pid ),
Pid;
Pid ->
Pid
end.

get_reducer( Key, NumberOfPartitions ) ->
get_reducer( erlang:phash2(Key) rem NumberOfPartitions + 1 ).

%% the reducer wrapper proces
reducer_wrapper(KeyValueTuples) ->
receive
{ emit, KeyValueTuple } ->
reducer_wrapper( [KeyValueTuple] ++ KeyValueTuples );
start_reduce ->
Dictionary = lists:foldl( fun main:reducer/2, dict:new(), KeyValueTuples ),
show_all( dict:to_list(Dictionary) )
end.

reducer( { Key, [Cnt] }, Dictionary ) ->
case dict:is_key( Key, Dictionary ) of
true ->
dict:store( Key, dict:fetch( Key, Dictionary ) + Cnt, Dictionary );
false ->
dict:store( Key, Cnt,

Solution

A few initial points:

  • Your mapping phase is all done on one process, so you're not getting any parallelism. You should fire off M mappers to do the mapping for you, and give them bits of the data to map over. They can pass this on to one of R reducers who do the reduction.



  • [KeyValueTuple] ++ KeyValueTuples is slow as you're creating lists and concatenating them together. Instead, just prepend values to the list in O(1) time: [KeyValueTuple|KeyValueTuples]



  • The way you're spawning/creating the reducer process is a bit unnecessary. If I were you I'd spawn R reducers up front and dish out the work in a round-robin fashion. This will give you better control of load distribution.



  • Instead of calling dict:is_key then, if true, calling dict:fetch do both of them in one step by calling dict:find, then matching against {ok, Value} or error. This saves on the double-lookup.



As a general comment, you've really tied together both map/reduce and word counting. Ideally what you'd want to do is have a structure/module which knows about how to do map/reduce by itself, and be ignorant of the work that's going on. You would then have another module which is able to invoke a map/reduce job by passing in the Map function, the Reduce function, the number of mappers and reducers and the data that's being processed.

I realise you're learning, so you don't need to do all this. But down the track I'd recommend attempting to abstract away the map/reduce stuff into a reusable module.

Hope that helps.
OJ

PS. were you hoping for someone to show you how they'd code it up? I'm new to this code review site and I'm not sure whether I should be posting "fixed" code or not :)

Context

StackExchange Code Review Q#1869, answer score: 6

Revisions (0)

No revisions yet.