patternjavaMinor
Java 8 CompletableFuture - fan out implementation
Viewed 0 times
completablefuturejavafanimplementationout
Problem
I was wondering what is the best way to implement a fan out type of functionality with Java 8 Completable future. I recently rewrote a function that had a bunch of old
```
if (!clinet.login()) {
throw new LoginException("There was a login error");
}
CompletableFuture> smths = CompletableFuture
.supplyAsync(client::getSmth);
CompletableFuture> smths2 = smths.thenApply(client::getInformation)
.thenApplyAsync((list) -> list.stream().map(obj -> mapper.map(obj, Data.class)).collect(toList()));
List>> waitGroup = new ArrayList<>();
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentileM12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentileM6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M24M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentileM6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentileM12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M24M));
CompletableFuture
.allOf(waitGroup.toArray(new CompletableFuture[waitGroup.size()]));
List data = smths2.join();
Map> volPercent = waitGroup.stream()
.map(CompletableFuture::join)
.flatMap((e) -> e.entrySet().stream())
.collect(groupingBy
Future instances and then calling get in a loop, blocking on each one, to a somewhat cleaner variant using CompletableFuture. However I am seeing about 2x drop in performance so I am assuming something is not quite right in the way I'm using the new API. The code looks something like this:```
if (!clinet.login()) {
throw new LoginException("There was a login error");
}
CompletableFuture> smths = CompletableFuture
.supplyAsync(client::getSmth);
CompletableFuture> smths2 = smths.thenApply(client::getInformation)
.thenApplyAsync((list) -> list.stream().map(obj -> mapper.map(obj, Data.class)).collect(toList()));
List>> waitGroup = new ArrayList<>();
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentileM12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentileM6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getIvPercentile2M24M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentileM6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M6M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentileM12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M12M));
waitGroup.add(notablesFuture.thenComposeAsync(clientb::getHvPercentile2M24M));
CompletableFuture
.allOf(waitGroup.toArray(new CompletableFuture[waitGroup.size()]));
List data = smths2.join();
Map> volPercent = waitGroup.stream()
.map(CompletableFuture::join)
.flatMap((e) -> e.entrySet().stream())
.collect(groupingBy
Solution
CompletableFuture> smths = CompletableFuture
.supplyAsync(client::getSmth);
CompletableFuture> smths2 = smths.thenApply(client::getInformation)
.thenApplyAsync((list) -> list.stream().map(obj -> mapper.map(obj, Data.class))
.collect(toList()));Since
smths is only used in the immediate line, you can consider combining both together:CompletableFuture> smths2 = CompletableFuture
.supplyAsync(client::getSmth).thenApply(client::getInformation)
.thenApplyAsync(list -> list.stream().map(v -> mapper.map(v, Data.class))
.collect(toList()));On the same note, further down you are processing your
smths2 by the following:List data = smths2.join();
// Map something...
data.forEach((d) -> {
Set asdasd = volPercent.get(d.getSymbol());
if (asdasd != null) {
d.add(asdasd);
}
});That can probably be done in a more functional way:
// Map something...
List data = smths2.join();
data.forEach(v -> Optional.ofNullable(volPercent.get(v.getSymbol()))
.ifPresent(/* add this to v? */));Your code actually wouldn't work as a
List can't add() a Set, possibly only an addAll() if Data is the parent class of AnotherData. Please review this part.CompletableFuture
.allOf(waitGroup.toArray(new CompletableFuture[waitGroup.size()]));The return value from the above doesn't seem to be used at all, copy-and-paste error? Looks like this is probably OK.
I'll like to review the chunk that is
waitGroup.add(...), but since it isn't clear what notablesFuture is, and unless one assumes it's the equivalent of doing a List symbols = client.block().get() based on your original code... oh.Now, it looks like
notablesFuture is actually smths, and that changes the answer significantly...Take #2
First, you probably can use better method names than just
getIvPercentileM12M, getHvPercentile2M24M etc. Second, instead of manually creating the List of CompletableFutures, you can probably Stream it too using a small helper method to use method references in-place:private static Function ref(Function ref) {
return ref;
}
private static Map> getMap(Client client,
CompletableFuture> notablesFuture) {
return Stream.of(
ref(client::getIvPercentileM12M),
ref(client::getIvPercentileM6M),
ref(client::getIvPercentile2M6M),
ref(client::getIvPercentile2M12M),
ref(client::getIvPercentile2M24M),
ref(client::getHvPercentileM6M),
ref(client::getHvPercentile2M6M),
ref(client::getHvPercentileM12M),
ref(client::getHvPercentile2M12M),
ref(client::getHvPercentile2M24M))
.map(notablesFuture::thenComposeAsync).map(CompletableFuture::join)
.flatMap(e -> e.entrySet().stream())
.collect(Collectors.groupingBy(Map.Entry::getKey,
Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
}Your code in your current method then becomes:
CompletableFuture> notablesFuture = CompletableFuture
.supplyAsync(client::getSmth);
Map> volPercent = getMap(clientb, notablesFuture);
List data = notablesFuture.thenApply(
client::getInformation).thenApplyAsync(
list -> list.stream().map(v -> mapper.map(v, Data.class))
.collect(Collectors.toList())).join();
// use volPercent and dataAlternatively, you can consider making use of the
CompletableFuture semantics completely (pun unintended):// modify getMap's return value as such:
private static CompletableFuture>> getMap(Client client,
CompletableFuture> notablesFuture) {
return CompletableFuture.completedFuture(Stream.of(
ref(client::getIvPercentileM12M),
/* remaining method references... */)
.map(notablesFuture::thenComposeAsync).map(CompletableFuture::join)
/* remaining flatMap() and collect() steps... */ );
}And the code in your current method can be then be modified into:
CompletableFuture> notablesFuture = CompletableFuture
.supplyAsync(client::getSmth);
List result = getMap(clientb, notablesFuture).thenCombineAsync(
notablesFuture.thenApply(client::getInformation).thenApplyAsync(
list -> list.stream().map(v -> mapper.map(v, Data.class))
.collect(Collectors.toList())), lookup()).join();The
lookup() method processes your Map and List results together in the solution you require, and since as mentioned above about not being able to add() a Set to a List, this will have to be open-ended... With that said, here's a sample implementation for reference:private static BiFunction>, List, List> lookup() {
return (map, list) -> list.stream().map(v -> map.get(v.getSymbol()))
.filter(Objects::nonNull).flatMap(Set::stream)
.collect(Collectors.toList());
}Code Snippets
CompletableFuture<List<String>> smths = CompletableFuture
.supplyAsync(client::getSmth);
CompletableFuture<List<Data>> smths2 = smths.thenApply(client::getInformation)
.thenApplyAsync((list) -> list.stream().map(obj -> mapper.map(obj, Data.class))
.collect(toList()));CompletableFuture<List<Data>> smths2 = CompletableFuture
.supplyAsync(client::getSmth).thenApply(client::getInformation)
.thenApplyAsync(list -> list.stream().map(v -> mapper.map(v, Data.class))
.collect(toList()));List<Data> data = smths2.join();
// Map something...
data.forEach((d) -> {
Set<AnotherData> asdasd = volPercent.get(d.getSymbol());
if (asdasd != null) {
d.add(asdasd);
}
});// Map something...
List<Data> data = smths2.join();
data.forEach(v -> Optional.ofNullable(volPercent.get(v.getSymbol()))
.ifPresent(/* add this to v? */));CompletableFuture
.allOf(waitGroup.toArray(new CompletableFuture[waitGroup.size()]));Context
StackExchange Code Review Q#82122, answer score: 3
Revisions (0)
No revisions yet.