HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

ConsumingRouter to consume based on condition

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
consumingrouterconsumeconditionbased

Problem

Started as to make a stream splitter to split stream based on condition, but finally found that I did ConsumingRouter, to consume based on condition and while have to provide consumers before using this router not as splited stream can provide consumers then , and use this router inside forEach of stream, please check it and provide me with your feedback and if there is another way to do it:

public class ConsumingRouter implements Consumer {
    private Map, Consumer> map = new ConcurrentHashMap<>();

    public void sub(Predicate p, Consumer c) {
        map.put(p, c);
    }

    @Override
    public void accept(T t) {
        map.entrySet().forEach(e -> {
            if (e.getKey().test(t))
                e.getValue().accept(t);
        });
    }
}


How to use it :

public static void main(String[] args) {
    ConsumingRouter router = new ConsumingRouter();
    Predicate p1 = (i) -> i % 2 != 0;
    Predicate p2 = (i) -> i % 2 == 0;
    Consumer c1 = i -> {
        System.out.println(Thread.currentThread().getName() + " Route 1 : "
                + i);
    };
    Consumer c2 = i -> {
        System.out.println(Thread.currentThread().getName() + " Route 2 : "
                + i);
    };
    router.sub(p1, c1);
    router.sub(p2, c2);
    IntStream.range(0, 1000).boxed().forEach(router);
}

Solution

There is a Map.forEach(BiConsumer) that you can use as well:

@Override
public void accept(T t) {
    map.forEach((k,v) -> {
        if (k.test(t)) {
            v.accept(t);
        }
    });
}


Alternatively, you can convert that if-statement in the form of an Optional too if you can ignore null values (as commented by @Misha):

@Override
public void accept(T t) {
    // answer update: switched to Optional.of() for reasons mentioned in comments
    map.forEach((k,v) -> Optional.of(t).filter(k::test).ifPresent(v));
}


You can also take a look at this StackOverflow question for some insight about splitting two Streams into two Streams... similar to yours, except that you are only concerned with multiple Consumers to terminate the source Stream.

edit: Oh yeah, you may want to consider having your sub() (subscribe?) method return the reference to itself, so that you can do method chaining as such:

router.sub(p1, c1).sub(p2, c2);


edit 2: @Misha has a good suggestion for doing away with the Map, if that suits your requirements as well.

Code Snippets

@Override
public void accept(T t) {
    map.forEach((k,v) -> {
        if (k.test(t)) {
            v.accept(t);
        }
    });
}
@Override
public void accept(T t) {
    // answer update: switched to Optional.of() for reasons mentioned in comments
    map.forEach((k,v) -> Optional.of(t).filter(k::test).ifPresent(v));
}
router.sub(p1, c1).sub(p2, c2);

Context

StackExchange Code Review Q#105301, answer score: 5

Revisions (0)

No revisions yet.