patternMinor
Newbie Map/Reduce word frequency counter
Viewed 0 times
mapfrequencyreducewordnewbiecounter
Problem
I present you my first ever module to learn some Erlang awaiting your scrutiny . It does a word frequency count using map/reduce . I'm an Erlang noob, so I would very much like feedback about:
Unneeded verbosity / code
Erlang 'style' mistakes
Things that can be done more elegantly
```
-module(main).
-export([map/1]).
-export([main/0]).
-export([reducer_wrapper/1]).
-export([reducer/2]).
-define(NUMBER_OF_PARTITIONS, 100).
main() ->
Lines = readlines('pg20417.txt') ,
lists:foreach( fun main:map/1, Lines ),
start_reduce().
%% signals all reducer_wrappers that all emits are received, and the actual reduce can be started
start_reduce() ->
lists:foreach( fun(PartitionNr) -> get_reducer(PartitionNr) ! start_reduce end, lists:seq(1, ?NUMBER_OF_PARTITIONS) ).
%% emits every word with cnt = 1
map(Line) ->
Words = string:tokens( string:to_lower(Line), " .,;:\n()?"),
lists:foreach ( fun(Word) -> get_reducer( Word, ?NUMBER_OF_PARTITIONS ) ! {emit, {Word, [1]}} end, Words ).
%% returns the reducer Pid
get_reducer( PartitionNr ) ->
ProcessName = list_to_atom( lists:concat( [PartitionNr, '_reducer'] ) ),
case whereis( ProcessName ) of
undefined ->
Pid = spawn(?MODULE, reducer_wrapper, [[]]),
register( ProcessName, Pid ),
Pid;
Pid ->
Pid
end.
get_reducer( Key, NumberOfPartitions ) ->
get_reducer( erlang:phash2(Key) rem NumberOfPartitions + 1 ).
%% the reducer wrapper proces
reducer_wrapper(KeyValueTuples) ->
receive
{ emit, KeyValueTuple } ->
reducer_wrapper( [KeyValueTuple] ++ KeyValueTuples );
start_reduce ->
Dictionary = lists:foldl( fun main:reducer/2, dict:new(), KeyValueTuples ),
show_all( dict:to_list(Dictionary) )
end.
reducer( { Key, [Cnt] }, Dictionary ) ->
case dict:is_key( Key, Dictionary ) of
true ->
dict:store( Key, dict:fetch( Key, Dictionary ) + Cnt, Dictionary );
false ->
dict:store( Key, Cnt,
Unneeded verbosity / code
Erlang 'style' mistakes
Things that can be done more elegantly
```
-module(main).
-export([map/1]).
-export([main/0]).
-export([reducer_wrapper/1]).
-export([reducer/2]).
-define(NUMBER_OF_PARTITIONS, 100).
main() ->
Lines = readlines('pg20417.txt') ,
lists:foreach( fun main:map/1, Lines ),
start_reduce().
%% signals all reducer_wrappers that all emits are received, and the actual reduce can be started
start_reduce() ->
lists:foreach( fun(PartitionNr) -> get_reducer(PartitionNr) ! start_reduce end, lists:seq(1, ?NUMBER_OF_PARTITIONS) ).
%% emits every word with cnt = 1
map(Line) ->
Words = string:tokens( string:to_lower(Line), " .,;:\n()?"),
lists:foreach ( fun(Word) -> get_reducer( Word, ?NUMBER_OF_PARTITIONS ) ! {emit, {Word, [1]}} end, Words ).
%% returns the reducer Pid
get_reducer( PartitionNr ) ->
ProcessName = list_to_atom( lists:concat( [PartitionNr, '_reducer'] ) ),
case whereis( ProcessName ) of
undefined ->
Pid = spawn(?MODULE, reducer_wrapper, [[]]),
register( ProcessName, Pid ),
Pid;
Pid ->
Pid
end.
get_reducer( Key, NumberOfPartitions ) ->
get_reducer( erlang:phash2(Key) rem NumberOfPartitions + 1 ).
%% the reducer wrapper proces
reducer_wrapper(KeyValueTuples) ->
receive
{ emit, KeyValueTuple } ->
reducer_wrapper( [KeyValueTuple] ++ KeyValueTuples );
start_reduce ->
Dictionary = lists:foldl( fun main:reducer/2, dict:new(), KeyValueTuples ),
show_all( dict:to_list(Dictionary) )
end.
reducer( { Key, [Cnt] }, Dictionary ) ->
case dict:is_key( Key, Dictionary ) of
true ->
dict:store( Key, dict:fetch( Key, Dictionary ) + Cnt, Dictionary );
false ->
dict:store( Key, Cnt,
Solution
A few initial points:
As a general comment, you've really tied together both map/reduce and word counting. Ideally what you'd want to do is have a structure/module which knows about how to do map/reduce by itself, and be ignorant of the work that's going on. You would then have another module which is able to invoke a map/reduce job by passing in the Map function, the Reduce function, the number of mappers and reducers and the data that's being processed.
I realise you're learning, so you don't need to do all this. But down the track I'd recommend attempting to abstract away the map/reduce stuff into a reusable module.
Hope that helps.
OJ
PS. were you hoping for someone to show you how they'd code it up? I'm new to this code review site and I'm not sure whether I should be posting "fixed" code or not :)
- Your mapping phase is all done on one process, so you're not getting any parallelism. You should fire off
Mmappers to do the mapping for you, and give them bits of the data to map over. They can pass this on to one ofRreducers who do the reduction.
[KeyValueTuple] ++ KeyValueTuplesis slow as you're creating lists and concatenating them together. Instead, just prepend values to the list in O(1) time:[KeyValueTuple|KeyValueTuples]
- The way you're spawning/creating the reducer process is a bit unnecessary. If I were you I'd spawn
Rreducers up front and dish out the work in a round-robin fashion. This will give you better control of load distribution.
- Instead of calling
dict:is_keythen, iftrue, callingdict:fetchdo both of them in one step by callingdict:find, then matching against{ok, Value}orerror. This saves on the double-lookup.
As a general comment, you've really tied together both map/reduce and word counting. Ideally what you'd want to do is have a structure/module which knows about how to do map/reduce by itself, and be ignorant of the work that's going on. You would then have another module which is able to invoke a map/reduce job by passing in the Map function, the Reduce function, the number of mappers and reducers and the data that's being processed.
I realise you're learning, so you don't need to do all this. But down the track I'd recommend attempting to abstract away the map/reduce stuff into a reusable module.
Hope that helps.
OJ
PS. were you hoping for someone to show you how they'd code it up? I'm new to this code review site and I'm not sure whether I should be posting "fixed" code or not :)
Context
StackExchange Code Review Q#1869, answer score: 6
Revisions (0)
No revisions yet.