HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaModerate

Using Java 8 parallel streams

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
streamsparallelusingjava

Problem

I'm trying to get more familiar with the new Java 8 features, so I am rewriting one of my earlier projects. It includes a class that keeps track of the best entry seen so far:

import java.util.List;

public class Maxinator {

    private final QualityFunction qualityFunction;
    private T best;
    private double bestQuality;

    public Maxinator(QualityFunction qualityFunction) {
        this.qualityFunction = qualityFunction;
        reset();
    }

    public void reset() {
        best = null;
        bestQuality = Double.NEGATIVE_INFINITY;
    }

    public T getBest() {
        return best;
    }

    public void updateBest(List population) {
        population.parallelStream()
                .forEach(i -> {
                    double quality = qualityFunction.computeQuality(i);
                    if (quality > bestQuality) {
                        best = i;
                        bestQuality = quality;
                    }
                });
    }
}


Where QualityFunction is simply the following interface:

public interface QualityFunction {
    public abstract double computeQuality(T individual);
}


My questions:

  • Are there any concurrency issues with the code in the forEach modifying best and bestQuality? Or are these automatically taken care of by the parallel stream processing?



  • Is there a more idiomatic way to write this, using the new APIs (collect, reduce, etc.)?



Note: the quality function could be very expensive, so I want to make sure that it is only called once per element.

Solution

Your code is not thread-safe. Each of the threads will, in parallel, be accessing both the best, and the bestQuality variables.

Your Lambda is, in essence, modifying external data from the stream, and this is an anti-pattern for streams. It has side-effects.

You should change your code to use the collect mechanism. There are a few ways to do it, but, you should look at this example for guidance: Reduction

Some Notes about my following suggestion:

  • I converted the method to a static method, and created an inner accumulator class.



  • you should possibly refine this answer to suit your needs more in the class structure.



Putting these observations together, I would suggest something like:

public class Maxinator {

    @FunctionalInterface
    public interface QualityFunction {
        double computeQuality(V value);
    }

    private Maxinator() {
//        no public instances
    }

    private static final class AccumulateResult {
        T bestItem = null;
        double bestScore = Double.MIN_VALUE;

        public T getBestItem() {
            return bestItem;
        }

        public void accept(QualityFunction function, T item) {
            double score = function.computeQuality(item);
            if (score > bestScore) {
                bestScore = score;
                bestItem = item;
            }
        }

        public AccumulateResult combine(AccumulateResult r) {
            if (r.bestScore > bestScore) {
                bestScore = r.bestScore;
                bestItem = r.bestItem;
            }
            return this;
        }

    }

    public static  T getBest(final QualityFunction qualityFunction, final List population) {
        return population.parallelStream().collect(Collector.of(
                    AccumulateResult::new,
                    (a,t) -> a.accept(qualityFunction, t),
                    (a, b) -> a.combine(b))
               ).getBestItem();
    }
}

Code Snippets

public class Maxinator {

    @FunctionalInterface
    public interface QualityFunction<V> {
        double computeQuality(V value);
    }

    private Maxinator() {
//        no public instances
    }

    private static final class AccumulateResult<T> {
        T bestItem = null;
        double bestScore = Double.MIN_VALUE;

        public T getBestItem() {
            return bestItem;
        }

        public void accept(QualityFunction<T> function, T item) {
            double score = function.computeQuality(item);
            if (score > bestScore) {
                bestScore = score;
                bestItem = item;
            }
        }

        public AccumulateResult<T> combine(AccumulateResult<T> r) {
            if (r.bestScore > bestScore) {
                bestScore = r.bestScore;
                bestItem = r.bestItem;
            }
            return this;
        }

    }

    public static <T> T getBest(final QualityFunction<T> qualityFunction, final List<T> population) {
        return population.parallelStream().collect(Collector.of(
                    AccumulateResult<T>::new,
                    (a,t) -> a.accept(qualityFunction, t),
                    (a, b) -> a.combine(b))
               ).getBestItem();
    }
}

Context

StackExchange Code Review Q#60401, answer score: 14

Revisions (0)

No revisions yet.