patternjavaModerate
Producing a sorted wordcount with Spark
Viewed 0 times
producingwithsparksortedwordcount
Problem
I'm currently learning how to use Apache Spark. In order to do so, I implemented a simple wordcount (not really original, I know). There already exists an example on the documentation providing the needed code. However, I tried to go one step further and to produce the result as a sorted list (by frequence) by using the spark modules.
A use case would be
Here is how I proceed:
```
public List sortedWordcount() {
SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD lines = sc.textFile("myFile.txt");
// classical wordcount, nothing new there
JavaRDD words = lines.flatMap(new FlatMapFunction() {
public Iterable call(String s) {
return Arrays.asList(s.split(" "));
}
});
JavaPairRDD pairs = words.mapToPair(new PairFunction() {
public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
});
JavaPairRDD counts = pairs.reduceByKey(new Function2() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
// to enable sorting by value (count) and not key -> value-to-key conversion pattern
JavaPairRDD, Integer> countInKey = counts.mapToPair(a -> new Tuple2(new Tuple2(a._2, a._1), null)); // setting value to null, since it won't be used anymore
List, Integer>> wordSortedByCount = countInKey.sortByKey(new TupleComparator(), false).collect();
List result = new ArrayList<>();
IntStream.range(0, wantedSize).forEach(i -> result.add(wordSortedByCount.get(i)._1._2));
return result;
}
private class TupleComparator implements Comparator>, Serializable {
@Override
public int compare(Tuple2 tuple1, Tuple2 tuple2) {
return tuple1._1 < tuple2._1 ? 0 : 1;
}
}
A use case would be
sortedWordcount("I fish a fish") -> [(fish, 2), (I,1), (a,1))]Here is how I proceed:
```
public List sortedWordcount() {
SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD lines = sc.textFile("myFile.txt");
// classical wordcount, nothing new there
JavaRDD words = lines.flatMap(new FlatMapFunction() {
public Iterable call(String s) {
return Arrays.asList(s.split(" "));
}
});
JavaPairRDD pairs = words.mapToPair(new PairFunction() {
public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
});
JavaPairRDD counts = pairs.reduceByKey(new Function2() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
// to enable sorting by value (count) and not key -> value-to-key conversion pattern
JavaPairRDD, Integer> countInKey = counts.mapToPair(a -> new Tuple2(new Tuple2(a._2, a._1), null)); // setting value to null, since it won't be used anymore
List, Integer>> wordSortedByCount = countInKey.sortByKey(new TupleComparator(), false).collect();
List result = new ArrayList<>();
IntStream.range(0, wantedSize).forEach(i -> result.add(wordSortedByCount.get(i)._1._2));
return result;
}
private class TupleComparator implements Comparator>, Serializable {
@Override
public int compare(Tuple2 tuple1, Tuple2 tuple2) {
return tuple1._1 < tuple2._1 ? 0 : 1;
}
}
Solution
I'll review this code with keeping in mind that you want to leave Apache Spark intact, as you intend to use it in the future for purposes it is intended to do.
I strongly believe that your current code can be written in pure Java 8, though that is not the point of the question.
Use lambdas wherever you can!
This will greatly enhance readability of your code.
As an example:
could be rewritten as the following (I checked and
Similarly it holds that:
can be rewritten as:
If type inference is not strong enough, then you need to write:
Lastly it also holds for:
can be written as:
or even better:
Use the new way to write comparators
You can much much easier write comparators in Java 8, it works via key extraction, you can turn this code:
into this code:
This will automatically and automagically create a working comparator that works by comparator the
Now this line:
can become:
Normally one would use method references in case of
Upon further inspecting your code I see that you possibly intentionally have made your comparator serializable, if you need that for Apache Spark, then you can do that as well, though the solution does get a fair bit uglier then:
Via this mechanism it is possible to make a type implement a wrapper interface without writing boilerplate. It is more common in the form of
Final words
Lastly your code formatting is in need for some improvements with respect to indenting the anonymous classes, but it may be an oversight and the rest of the code looks great.
When I have some time I'd be eager to show you a pure Java 8 solution as I strongly believe that Apache Spark is making a mess of your code currently. I do understand however that you need to operate with Apache Spark at some point and for that you may be forced to use Apache Spark's classes.
My method using Java 8
As addendum I'll show how I would identify your problem in question and show you how I would do it.
Input: An input file, consisting of words.
Output: A list of the words sorted by frequency in which they occur.
This will do the following steps:
Beware that the line `.sorted(Comparator.comparin
I strongly believe that your current code can be written in pure Java 8, though that is not the point of the question.
Use lambdas wherever you can!
This will greatly enhance readability of your code.
As an example:
JavaRDD words = lines.flatMap(new FlatMapFunction() {
public Iterable call(String s) {
return Arrays.asList(s.split(" "));
}
});could be rewritten as the following (I checked and
FlatMapFunction is indeed an interface that qualifies to be a functional interface):JavaRDD words = lines.flatMap(str -> Arrays.asList(s.split(" ")));Similarly it holds that:
JavaPairRDD pairs = words.mapToPair(new PairFunction() {
public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
});can be rewritten as:
JavaPairRDD pairs = words.mapToPair(str -> new Tuple2<>(s, 1));If type inference is not strong enough, then you need to write:
new Tuple2(s, 1).Lastly it also holds for:
JavaPairRDD counts = pairs.reduceByKey(new Function2() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});can be written as:
JavaPairRDD counts = pairs.reduceByKey((a, b) -> a + b);or even better:
JavaPairRDD counts = pairs.reduceByKey(Integer::sum);Use the new way to write comparators
You can much much easier write comparators in Java 8, it works via key extraction, you can turn this code:
private class TupleComparator implements Comparator>, Serializable {
@Override
public int compare(Tuple2 tuple1, Tuple2 tuple2) {
return tuple1._1 < tuple2._1 ? 0 : 1;
}
}into this code:
Comparator> tupleComparator = Comparator.comparing(tuple2 -> tuple2._1);This will automatically and automagically create a working comparator that works by comparator the
_1 field of tuple2, which is inferred to be Tuple2.Now this line:
List, Integer>> wordSortedByCount = countInKey.sortByKey(new TupleComparator(), false).collect();can become:
List, Integer>> wordSortedByCount =
countInKey.sortByKey(Comparator.comparing(tuple2 -> tuple2._1, false).collect();Normally one would use method references in case of
Comparator.comparing, if Tuple2 had a get1() method, then you would write it as Comparator.comparing(Tuple2::get1).Upon further inspecting your code I see that you possibly intentionally have made your comparator serializable, if you need that for Apache Spark, then you can do that as well, though the solution does get a fair bit uglier then:
List, Integer>> wordSortedByCount =
countInKey.sortByKey((Comparator> & Serializable)Comparator.comparing(tuple2 -> tuple2._1, false).collect();Via this mechanism it is possible to make a type implement a wrapper interface without writing boilerplate. It is more common in the form of
(Runnable & Serializable)someRunnable than in this form though.Final words
Lastly your code formatting is in need for some improvements with respect to indenting the anonymous classes, but it may be an oversight and the rest of the code looks great.
When I have some time I'd be eager to show you a pure Java 8 solution as I strongly believe that Apache Spark is making a mess of your code currently. I do understand however that you need to operate with Apache Spark at some point and for that you may be forced to use Apache Spark's classes.
My method using Java 8
As addendum I'll show how I would identify your problem in question and show you how I would do it.
Input: An input file, consisting of words.
Output: A list of the words sorted by frequency in which they occur.
Map occurenceMap = Files.readAllLines(Paths.get("myFile.txt"))
.stream()
.flatMap(line -> Arrays.stream(line.split(" ")))
.collect(Collectors.groupingBy(i -> i, Collectors.counting()));
List sortedWords = occurenceMap.entrySet()
.stream()
.sorted(Comparator.comparing((Map.Entry entry) -> entry.getValue()).reversed())
.map(Map.Entry::getKey)
.collect(Collectors.toList());This will do the following steps:
- Read all lines into a
List(care with large files!)
- Turn it into a
Stream.
- Turn that into a
Streamby flat mapping everyStringto aStreamsplitting on the blanks.
- Collect all elements into a
Mapgrouping by the identity (i -> i) and using as downstreamCollectors.counting()such that the map-value will be its count.
- Get a
Set>from the map.
- Turn it into a
Stream>.
- Sort by the reverse order of the value of the entry.
- Map the results to a
Stream, you lose the frequency information here.
- Collect the stream into a
List.
Beware that the line `.sorted(Comparator.comparin
Code Snippets
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
});JavaRDD<String> words = lines.flatMap(str -> Arrays.asList(s.split(" ")));JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});JavaPairRDD<String, Integer> pairs = words.mapToPair(str -> new Tuple2<>(s, 1));JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});Context
StackExchange Code Review Q#56641, answer score: 11
Revisions (0)
No revisions yet.