patternjavaMinor
Echo server with CompletableFuture
Viewed 0 times
serverwithcompletablefutureecho
Problem
I recently wrote a simple echo server in Java 7 using the NIO APIs so it was asynchronous and non-blocking. Then I decided, as a learning experience, to redo it with Java 8 hoping to use a more functional style and not have nested callbacks. I'm struggling to understand how to use the new
This code currently works but is slower than the Java 7 version. If anyone can point out ways to improve it (or maybe that I'm going about it completely wrong) it would be appreciated. The main problem, to me at least, is that I now have to call
```
try (final AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open()) {
listener.setOption(StandardSocketOptions.SO_REUSEADDR, true);
listener.bind(new InetSocketAddress("localhost", 8080));
while (true) {
AsynchronousSocketChannel connection = listener.accept().get();
CompletableFuture connectionPromise =
CompletableFuture.completedFuture(connection);
CompletableFuture readerPromise = CompletableFuture.supplyAsync(() -> {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
try {
connection.read(buffer).get();
return (ByteBuffer) buffer.flip();
} catch (InterruptedException | ExecutionException e) {
connectionPromise.completeExceptionally(e);
}
return null;
});
readerPromise.thenAcceptAsync((buffer) -> {
if (buffer != null) {
try {
connection.write(buffer).get();
connection.close();
} catch (InterruptedException | ExecutionException | IOException e) {
readerPromise.completeExceptionally(e);
CompletableFuture class with the Futures returned by AsynchronousSocketChannel.This code currently works but is slower than the Java 7 version. If anyone can point out ways to improve it (or maybe that I'm going about it completely wrong) it would be appreciated. The main problem, to me at least, is that I now have to call
get() on three Futures, whereas before I didn't have any blocking operations.```
try (final AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open()) {
listener.setOption(StandardSocketOptions.SO_REUSEADDR, true);
listener.bind(new InetSocketAddress("localhost", 8080));
while (true) {
AsynchronousSocketChannel connection = listener.accept().get();
CompletableFuture connectionPromise =
CompletableFuture.completedFuture(connection);
CompletableFuture readerPromise = CompletableFuture.supplyAsync(() -> {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
try {
connection.read(buffer).get();
return (ByteBuffer) buffer.flip();
} catch (InterruptedException | ExecutionException e) {
connectionPromise.completeExceptionally(e);
}
return null;
});
readerPromise.thenAcceptAsync((buffer) -> {
if (buffer != null) {
try {
connection.write(buffer).get();
connection.close();
} catch (InterruptedException | ExecutionException | IOException e) {
readerPromise.completeExceptionally(e);
Solution
-
As far as I see
-
The code calls
It seems to me that it does not have any effect. Javadoc of
If not already completed, causes invocations of
The lambda passed to
(Note that there is an
-
The same is true for
-
In all other cases, if a stage's computation terminates abruptly with an (unchecked) exception or error, then all dependent stages requiring its completion complete exceptionally as well, with a CompletionException holding the exception as its cause.
Source: CompletionStage javadoc
According to that you might be able to change the reader lambda to the following:
It never returns null, so you can remove the null check from writer lambda:
Finally, if there's an exception, you could log that with the following:
(I'm not familiar with these APIs but this looks right for me.)
-
Note that I've removed the ugly casting with changing
to
-
Calling
As far as I see
supplyAsync and thenAcceptAsync uses ForkJoinPool.commonPool() which does not use more than three threads on my machine (and it depends on the number of available processors). It could be a bottleneck if you have more than three clients at the same time. Both methods can have an Executor argument which would be used instead of commonPool().-
The code calls
completeExceptionally in the catch block here:readerPromise.thenAcceptAsync((buffer) -> {
if (buffer != null) {
try {
connection.write(buffer).get();
connection.close();
} catch (InterruptedException | ExecutionException | IOException e {
readerPromise.completeExceptionally(e);
}
}
});It seems to me that it does not have any effect. Javadoc of
completeExceptionally says the following:If not already completed, causes invocations of
get() and related methods to throw the given exception.The lambda passed to
thenAcceptAsync is called when the CompleteFuture (readerPromise in this case) is already completed so it doesn't change anything. Consider the following PoC:@Test
public void test3() throws Exception {
final CompletableFuture reader = CompletableFuture.supplyAsync(() -> {
return "data";
});
reader.thenAcceptAsync(x -> {
System.out.println("reader.thenAcceptAsync: " + x);
boolean transitionToCompleted = reader.completeExceptionally(new RuntimeException(
"overridden Future result"));
System.out.println(transitionToCompleted); // prints "false"
});
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
System.out.println("reader result: " + reader.get()); // prints "data"
}(Note that there is an
obtrudeException method but I guess you don't need that either.)-
The same is true for
connectionPromise.completeExceptionally(e), furthermore, connectionPromise seems completely unused in the code above. I'd remove it.-
thenAcceptAsync will run only when the the previous stage completes normally. The following also could be useful:In all other cases, if a stage's computation terminates abruptly with an (unchecked) exception or error, then all dependent stages requiring its completion complete exceptionally as well, with a CompletionException holding the exception as its cause.
Source: CompletionStage javadoc
According to that you might be able to change the reader lambda to the following:
final CompletableFuture readerPromise = CompletableFuture.supplyAsync(() -> {
try {
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
connection.read(buffer).get();
buffer.flip();
return buffer;
} catch (final Exception e) {
throw new RuntimeException(e);
}
});It never returns null, so you can remove the null check from writer lambda:
final CompletableFuture writerPromise = readerPromise.thenAcceptAsync((buffer) -> {
try {
connection.write(buffer).get();
connection.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
});Finally, if there's an exception, you could log that with the following:
writerPromise.exceptionally(x -> {
x.printStackTrace(); // TODO: use logging instead
return null;
});(I'm not familiar with these APIs but this looks right for me.)
-
Note that I've removed the ugly casting with changing
return (ByteBuffer) buffer.flip();to
buffer.flip();
return buffer;-
Calling
Future.get() in the lambdas does not seem to asynchronous for me too (althought they're run by another threads) but I've not idea how should it be handled. The two APIs does not seem compatible. (I guess you could get better answers on Stack Overflow with a more specific question.)Code Snippets
readerPromise.thenAcceptAsync((buffer) -> {
if (buffer != null) {
try {
connection.write(buffer).get();
connection.close();
} catch (InterruptedException | ExecutionException | IOException e {
readerPromise.completeExceptionally(e);
}
}
});@Test
public void test3() throws Exception {
final CompletableFuture<String> reader = CompletableFuture.supplyAsync(() -> {
return "data";
});
reader.thenAcceptAsync(x -> {
System.out.println("reader.thenAcceptAsync: " + x);
boolean transitionToCompleted = reader.completeExceptionally(new RuntimeException(
"overridden Future result"));
System.out.println(transitionToCompleted); // prints "false"
});
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
System.out.println("reader result: " + reader.get()); // prints "data"
}final CompletableFuture<ByteBuffer> readerPromise = CompletableFuture.supplyAsync(() -> {
try {
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
connection.read(buffer).get();
buffer.flip();
return buffer;
} catch (final Exception e) {
throw new RuntimeException(e);
}
});final CompletableFuture<Void> writerPromise = readerPromise.thenAcceptAsync((buffer) -> {
try {
connection.write(buffer).get();
connection.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
});writerPromise.exceptionally(x -> {
x.printStackTrace(); // TODO: use logging instead
return null;
});Context
StackExchange Code Review Q#47354, answer score: 3
Revisions (0)
No revisions yet.