HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaModerate

Producing a sorted wordcount with Spark

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
producingwithsparksortedwordcount

Problem

I'm currently learning how to use Apache Spark. In order to do so, I implemented a simple wordcount (not really original, I know). There already exists an example on the documentation providing the needed code. However, I tried to go one step further and to produce the result as a sorted list (by frequence) by using the spark modules.

A use case would be sortedWordcount("I fish a fish") -> [(fish, 2), (I,1), (a,1))]

Here is how I proceed:

```
public List sortedWordcount() {
SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD lines = sc.textFile("myFile.txt");

// classical wordcount, nothing new there
JavaRDD words = lines.flatMap(new FlatMapFunction() {
public Iterable call(String s) {
return Arrays.asList(s.split(" "));
}
});
JavaPairRDD pairs = words.mapToPair(new PairFunction() {
public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
});
JavaPairRDD counts = pairs.reduceByKey(new Function2() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});

// to enable sorting by value (count) and not key -> value-to-key conversion pattern
JavaPairRDD, Integer> countInKey = counts.mapToPair(a -> new Tuple2(new Tuple2(a._2, a._1), null)); // setting value to null, since it won't be used anymore

List, Integer>> wordSortedByCount = countInKey.sortByKey(new TupleComparator(), false).collect();

List result = new ArrayList<>();
IntStream.range(0, wantedSize).forEach(i -> result.add(wordSortedByCount.get(i)._1._2));
return result;
}

private class TupleComparator implements Comparator>, Serializable {
@Override
public int compare(Tuple2 tuple1, Tuple2 tuple2) {
return tuple1._1 < tuple2._1 ? 0 : 1;
}
}

Solution

I'll review this code with keeping in mind that you want to leave Apache Spark intact, as you intend to use it in the future for purposes it is intended to do.

I strongly believe that your current code can be written in pure Java 8, though that is not the point of the question.

Use lambdas wherever you can!

This will greatly enhance readability of your code.

As an example:

JavaRDD words = lines.flatMap(new FlatMapFunction() {
            public Iterable call(String s) {
                return Arrays.asList(s.split(" "));
                }
            });


could be rewritten as the following (I checked and FlatMapFunction is indeed an interface that qualifies to be a functional interface):

JavaRDD words = lines.flatMap(str -> Arrays.asList(s.split(" ")));


Similarly it holds that:

JavaPairRDD pairs = words.mapToPair(new PairFunction() {
            public Tuple2 call(String s) {
                return new Tuple2(s, 1);
            }
        });


can be rewritten as:

JavaPairRDD pairs = words.mapToPair(str -> new Tuple2<>(s, 1));


If type inference is not strong enough, then you need to write: new Tuple2(s, 1).

Lastly it also holds for:

JavaPairRDD counts = pairs.reduceByKey(new Function2() {
            public Integer call(Integer a, Integer b) {
                return a + b;
            }
        });


can be written as:

JavaPairRDD counts = pairs.reduceByKey((a, b) -> a + b);


or even better:

JavaPairRDD counts = pairs.reduceByKey(Integer::sum);


Use the new way to write comparators

You can much much easier write comparators in Java 8, it works via key extraction, you can turn this code:

private class TupleComparator implements Comparator>, Serializable {
    @Override
    public int compare(Tuple2 tuple1, Tuple2 tuple2) {
        return tuple1._1 < tuple2._1 ? 0 : 1;
    }
}


into this code:

Comparator> tupleComparator = Comparator.comparing(tuple2 -> tuple2._1);


This will automatically and automagically create a working comparator that works by comparator the _1 field of tuple2, which is inferred to be Tuple2.

Now this line:

List, Integer>> wordSortedByCount = countInKey.sortByKey(new TupleComparator(), false).collect();


can become:

List, Integer>> wordSortedByCount =
countInKey.sortByKey(Comparator.comparing(tuple2 -> tuple2._1, false).collect();


Normally one would use method references in case of Comparator.comparing, if Tuple2 had a get1() method, then you would write it as Comparator.comparing(Tuple2::get1).

Upon further inspecting your code I see that you possibly intentionally have made your comparator serializable, if you need that for Apache Spark, then you can do that as well, though the solution does get a fair bit uglier then:

List, Integer>> wordSortedByCount = 
countInKey.sortByKey((Comparator> & Serializable)Comparator.comparing(tuple2 -> tuple2._1, false).collect();


Via this mechanism it is possible to make a type implement a wrapper interface without writing boilerplate. It is more common in the form of (Runnable & Serializable)someRunnable than in this form though.

Final words

Lastly your code formatting is in need for some improvements with respect to indenting the anonymous classes, but it may be an oversight and the rest of the code looks great.

When I have some time I'd be eager to show you a pure Java 8 solution as I strongly believe that Apache Spark is making a mess of your code currently. I do understand however that you need to operate with Apache Spark at some point and for that you may be forced to use Apache Spark's classes.

My method using Java 8

As addendum I'll show how I would identify your problem in question and show you how I would do it.

Input: An input file, consisting of words.
Output: A list of the words sorted by frequency in which they occur.

Map occurenceMap = Files.readAllLines(Paths.get("myFile.txt"))
        .stream()
        .flatMap(line -> Arrays.stream(line.split(" ")))
        .collect(Collectors.groupingBy(i -> i, Collectors.counting()));
List sortedWords = occurenceMap.entrySet()
        .stream()
        .sorted(Comparator.comparing((Map.Entry entry) -> entry.getValue()).reversed())
        .map(Map.Entry::getKey)
        .collect(Collectors.toList());


This will do the following steps:

  • Read all lines into a List (care with large files!)



  • Turn it into a Stream.



  • Turn that into a Stream by flat mapping every String to a Stream splitting on the blanks.



  • Collect all elements into a Map grouping by the identity (i -> i) and using as downstream Collectors.counting() such that the map-value will be its count.



  • Get a Set> from the map.



  • Turn it into a Stream>.



  • Sort by the reverse order of the value of the entry.



  • Map the results to a Stream, you lose the frequency information here.



  • Collect the stream into a List.



Beware that the line `.sorted(Comparator.comparin

Code Snippets

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String s) {
                return Arrays.asList(s.split(" "));
                }
            });
JavaRDD<String> words = lines.flatMap(str -> Arrays.asList(s.split(" ")));
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
JavaPairRDD<String, Integer> pairs = words.mapToPair(str -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer a, Integer b) {
                return a + b;
            }
        });

Context

StackExchange Code Review Q#56641, answer score: 11

Revisions (0)

No revisions yet.