patternjavaModerate
Using Java 8 parallel streams
Viewed 0 times
streamsparallelusingjava
Problem
I'm trying to get more familiar with the new Java 8 features, so I am rewriting one of my earlier projects. It includes a class that keeps track of the best entry seen so far:
Where
My questions:
Note: the quality function could be very expensive, so I want to make sure that it is only called once per element.
import java.util.List;
public class Maxinator {
private final QualityFunction qualityFunction;
private T best;
private double bestQuality;
public Maxinator(QualityFunction qualityFunction) {
this.qualityFunction = qualityFunction;
reset();
}
public void reset() {
best = null;
bestQuality = Double.NEGATIVE_INFINITY;
}
public T getBest() {
return best;
}
public void updateBest(List population) {
population.parallelStream()
.forEach(i -> {
double quality = qualityFunction.computeQuality(i);
if (quality > bestQuality) {
best = i;
bestQuality = quality;
}
});
}
}Where
QualityFunction is simply the following interface:public interface QualityFunction {
public abstract double computeQuality(T individual);
}My questions:
- Are there any concurrency issues with the code in the
forEachmodifyingbestandbestQuality? Or are these automatically taken care of by the parallel stream processing?
- Is there a more idiomatic way to write this, using the new APIs (
collect,reduce, etc.)?
Note: the quality function could be very expensive, so I want to make sure that it is only called once per element.
Solution
Your code is not thread-safe. Each of the threads will, in parallel, be accessing both the
Your Lambda is, in essence, modifying external data from the stream, and this is an anti-pattern for streams. It has side-effects.
You should change your code to use the collect mechanism. There are a few ways to do it, but, you should look at this example for guidance: Reduction
Some Notes about my following suggestion:
Putting these observations together, I would suggest something like:
best, and the bestQuality variables.Your Lambda is, in essence, modifying external data from the stream, and this is an anti-pattern for streams. It has side-effects.
You should change your code to use the collect mechanism. There are a few ways to do it, but, you should look at this example for guidance: Reduction
Some Notes about my following suggestion:
- I converted the method to a static method, and created an inner accumulator class.
- you should possibly refine this answer to suit your needs more in the class structure.
Putting these observations together, I would suggest something like:
public class Maxinator {
@FunctionalInterface
public interface QualityFunction {
double computeQuality(V value);
}
private Maxinator() {
// no public instances
}
private static final class AccumulateResult {
T bestItem = null;
double bestScore = Double.MIN_VALUE;
public T getBestItem() {
return bestItem;
}
public void accept(QualityFunction function, T item) {
double score = function.computeQuality(item);
if (score > bestScore) {
bestScore = score;
bestItem = item;
}
}
public AccumulateResult combine(AccumulateResult r) {
if (r.bestScore > bestScore) {
bestScore = r.bestScore;
bestItem = r.bestItem;
}
return this;
}
}
public static T getBest(final QualityFunction qualityFunction, final List population) {
return population.parallelStream().collect(Collector.of(
AccumulateResult::new,
(a,t) -> a.accept(qualityFunction, t),
(a, b) -> a.combine(b))
).getBestItem();
}
}Code Snippets
public class Maxinator {
@FunctionalInterface
public interface QualityFunction<V> {
double computeQuality(V value);
}
private Maxinator() {
// no public instances
}
private static final class AccumulateResult<T> {
T bestItem = null;
double bestScore = Double.MIN_VALUE;
public T getBestItem() {
return bestItem;
}
public void accept(QualityFunction<T> function, T item) {
double score = function.computeQuality(item);
if (score > bestScore) {
bestScore = score;
bestItem = item;
}
}
public AccumulateResult<T> combine(AccumulateResult<T> r) {
if (r.bestScore > bestScore) {
bestScore = r.bestScore;
bestItem = r.bestItem;
}
return this;
}
}
public static <T> T getBest(final QualityFunction<T> qualityFunction, final List<T> population) {
return population.parallelStream().collect(Collector.of(
AccumulateResult<T>::new,
(a,t) -> a.accept(qualityFunction, t),
(a, b) -> a.combine(b))
).getBestItem();
}
}Context
StackExchange Code Review Q#60401, answer score: 14
Revisions (0)
No revisions yet.