HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Java 8 CompletableFuture - fan out implementation

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
completablefuturejavafanimplementationout

Problem

I was wondering what is the best way to implement a fan out type of functionality with Java 8 Completable future. I recently rewrote a function that had a bunch of old Future instances and then calling get in a loop, blocking on each one, to a somewhat cleaner variant using CompletableFuture. However I am seeing about 2x drop in performance so I am assuming something is not quite right in the way I'm using the new API. The code looks something like this:

```
if (!clinet.login()) {
throw new LoginException("There was a login error");
}
CompletableFuture> smths = CompletableFuture
.supplyAsync(client::getSmth);

CompletableFuture> smths2 = smths.thenApply(client::getInformation)
.thenApplyAsync((list) -> list.stream().map(obj -> mapper.map(obj, Data.class)).collect(toList()));

List>> waitGroup = new ArrayList<>();
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentileM12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentileM6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M24M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentileM6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentileM12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M24M));

CompletableFuture
.allOf(waitGroup.toArray(new CompletableFuture[waitGroup.size()]));

List data = smths2.join();
Map> volPercent = waitGroup.stream()
.map(CompletableFuture::join)
.flatMap((e) -> e.entrySet().stream())
.collect(groupingBy

Solution

CompletableFuture> smths = CompletableFuture
        .supplyAsync(client::getSmth);
CompletableFuture> smths2 = smths.thenApply(client::getInformation)
        .thenApplyAsync((list) -> list.stream().map(obj -> mapper.map(obj, Data.class))
        .collect(toList()));


Since smths is only used in the immediate line, you can consider combining both together:

CompletableFuture> smths2 = CompletableFuture
        .supplyAsync(client::getSmth).thenApply(client::getInformation)
        .thenApplyAsync(list -> list.stream().map(v -> mapper.map(v, Data.class))
        .collect(toList()));


On the same note, further down you are processing your smths2 by the following:

List data = smths2.join();
// Map something...
data.forEach((d) -> {
    Set asdasd = volPercent.get(d.getSymbol());
    if (asdasd != null) {
        d.add(asdasd);
    }
});


That can probably be done in a more functional way:

// Map something...
List data = smths2.join();
data.forEach(v -> Optional.ofNullable(volPercent.get(v.getSymbol()))
    .ifPresent(/* add this to v? */));


Your code actually wouldn't work as a List can't add() a Set, possibly only an addAll() if Data is the parent class of AnotherData. Please review this part.

CompletableFuture
        .allOf(waitGroup.toArray(new CompletableFuture[waitGroup.size()]));


The return value from the above doesn't seem to be used at all, copy-and-paste error? Looks like this is probably OK.

I'll like to review the chunk that is waitGroup.add(...), but since it isn't clear what notablesFuture is, and unless one assumes it's the equivalent of doing a List symbols = client.block().get() based on your original code... oh.

Now, it looks like notablesFuture is actually smths, and that changes the answer significantly...

Take #2

First, you probably can use better method names than just getIvPercentileM12M, getHvPercentile2M24M etc. Second, instead of manually creating the List of CompletableFutures, you can probably Stream it too using a small helper method to use method references in-place:

private static  Function ref(Function ref) {
    return ref;
}

private static Map> getMap(Client client,
        CompletableFuture> notablesFuture) {
    return Stream.of(
            ref(client::getIvPercentileM12M),
            ref(client::getIvPercentileM6M),
            ref(client::getIvPercentile2M6M),
            ref(client::getIvPercentile2M12M),
            ref(client::getIvPercentile2M24M),
            ref(client::getHvPercentileM6M),
            ref(client::getHvPercentile2M6M),
            ref(client::getHvPercentileM12M),
            ref(client::getHvPercentile2M12M),
            ref(client::getHvPercentile2M24M))
        .map(notablesFuture::thenComposeAsync).map(CompletableFuture::join)
        .flatMap(e -> e.entrySet().stream())
        .collect(Collectors.groupingBy(Map.Entry::getKey,
                    Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
}


Your code in your current method then becomes:

CompletableFuture> notablesFuture = CompletableFuture
        .supplyAsync(client::getSmth);
Map> volPercent = getMap(clientb, notablesFuture);
List data = notablesFuture.thenApply(
            client::getInformation).thenApplyAsync(
            list -> list.stream().map(v -> mapper.map(v, Data.class))
                    .collect(Collectors.toList())).join();
// use volPercent and data


Alternatively, you can consider making use of the CompletableFuture semantics completely (pun unintended):

// modify getMap's return value as such:
private static CompletableFuture>> getMap(Client client,
        CompletableFuture> notablesFuture) {
    return CompletableFuture.completedFuture(Stream.of(
            ref(client::getIvPercentileM12M),
            /* remaining method references... */)
        .map(notablesFuture::thenComposeAsync).map(CompletableFuture::join)
        /* remaining flatMap() and collect() steps... */ );
}


And the code in your current method can be then be modified into:

CompletableFuture> notablesFuture = CompletableFuture
        .supplyAsync(client::getSmth);
List result = getMap(clientb, notablesFuture).thenCombineAsync(
            notablesFuture.thenApply(client::getInformation).thenApplyAsync(
                list -> list.stream().map(v -> mapper.map(v, Data.class))
                        .collect(Collectors.toList())), lookup()).join();


The lookup() method processes your Map and List results together in the solution you require, and since as mentioned above about not being able to add() a Set to a List, this will have to be open-ended... With that said, here's a sample implementation for reference:

private static BiFunction>, List, List> lookup() {
    return (map, list) -> list.stream().map(v -> map.get(v.getSymbol()))
            .filter(Objects::nonNull).flatMap(Set::stream)
            .collect(Collectors.toList());
}

Code Snippets

CompletableFuture<List<String>> smths = CompletableFuture
        .supplyAsync(client::getSmth);
CompletableFuture<List<Data>> smths2 = smths.thenApply(client::getInformation)
        .thenApplyAsync((list) -> list.stream().map(obj -> mapper.map(obj, Data.class))
        .collect(toList()));
CompletableFuture<List<Data>> smths2 = CompletableFuture
        .supplyAsync(client::getSmth).thenApply(client::getInformation)
        .thenApplyAsync(list -> list.stream().map(v -> mapper.map(v, Data.class))
        .collect(toList()));
List<Data> data = smths2.join();
// Map something...
data.forEach((d) -> {
    Set<AnotherData> asdasd = volPercent.get(d.getSymbol());
    if (asdasd != null) {
        d.add(asdasd);
    }
});
// Map something...
List<Data> data = smths2.join();
data.forEach(v -> Optional.ofNullable(volPercent.get(v.getSymbol()))
    .ifPresent(/* add this to v? */));
CompletableFuture
        .allOf(waitGroup.toArray(new CompletableFuture[waitGroup.size()]));

Context

StackExchange Code Review Q#82122, answer score: 3

Revisions (0)

No revisions yet.