HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Echo server with CompletableFuture

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
serverwithcompletablefutureecho

Problem

I recently wrote a simple echo server in Java 7 using the NIO APIs so it was asynchronous and non-blocking. Then I decided, as a learning experience, to redo it with Java 8 hoping to use a more functional style and not have nested callbacks. I'm struggling to understand how to use the new CompletableFuture class with the Futures returned by AsynchronousSocketChannel.

This code currently works but is slower than the Java 7 version. If anyone can point out ways to improve it (or maybe that I'm going about it completely wrong) it would be appreciated. The main problem, to me at least, is that I now have to call get() on three Futures, whereas before I didn't have any blocking operations.

```
try (final AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open()) {

listener.setOption(StandardSocketOptions.SO_REUSEADDR, true);
listener.bind(new InetSocketAddress("localhost", 8080));

while (true) {

AsynchronousSocketChannel connection = listener.accept().get();
CompletableFuture connectionPromise =
CompletableFuture.completedFuture(connection);

CompletableFuture readerPromise = CompletableFuture.supplyAsync(() -> {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
try {
connection.read(buffer).get();
return (ByteBuffer) buffer.flip();
} catch (InterruptedException | ExecutionException e) {
connectionPromise.completeExceptionally(e);
}
return null;
});

readerPromise.thenAcceptAsync((buffer) -> {
if (buffer != null) {
try {
connection.write(buffer).get();
connection.close();
} catch (InterruptedException | ExecutionException | IOException e) {
readerPromise.completeExceptionally(e);

Solution

-
As far as I see supplyAsync and thenAcceptAsync uses ForkJoinPool.commonPool() which does not use more than three threads on my machine (and it depends on the number of available processors). It could be a bottleneck if you have more than three clients at the same time. Both methods can have an Executor argument which would be used instead of commonPool().

-
The code calls completeExceptionally in the catch block here:

readerPromise.thenAcceptAsync((buffer) -> {
    if (buffer != null) {
        try {
            connection.write(buffer).get();
            connection.close();
        } catch (InterruptedException | ExecutionException | IOException e {
            readerPromise.completeExceptionally(e);
        } 
    }
});


It seems to me that it does not have any effect. Javadoc of completeExceptionally says the following:


If not already completed, causes invocations of get() and related methods to throw the given exception.

The lambda passed to thenAcceptAsync is called when the CompleteFuture (readerPromise in this case) is already completed so it doesn't change anything. Consider the following PoC:

@Test
public void test3() throws Exception {
    final CompletableFuture reader = CompletableFuture.supplyAsync(() -> {
        return "data";
    });
    reader.thenAcceptAsync(x -> {
        System.out.println("reader.thenAcceptAsync: " + x);
        boolean transitionToCompleted = reader.completeExceptionally(new RuntimeException(
                "overridden Future result"));
        System.out.println(transitionToCompleted); // prints "false"
    });

    Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
    System.out.println("reader result: " + reader.get()); // prints "data"
}


(Note that there is an obtrudeException method but I guess you don't need that either.)

-
The same is true for connectionPromise.completeExceptionally(e), furthermore, connectionPromise seems completely unused in the code above. I'd remove it.

-
thenAcceptAsync will run only when the the previous stage completes normally. The following also could be useful:


In all other cases, if a stage's computation terminates abruptly with an (unchecked) exception or error, then all dependent stages requiring its completion complete exceptionally as well, with a CompletionException holding the exception as its cause.

Source: CompletionStage javadoc

According to that you might be able to change the reader lambda to the following:

final CompletableFuture readerPromise = CompletableFuture.supplyAsync(() -> {
    try {
        final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
        connection.read(buffer).get();
        buffer.flip();
        return buffer;
    } catch (final Exception e) {
        throw new RuntimeException(e);
    }
});


It never returns null, so you can remove the null check from writer lambda:

final CompletableFuture writerPromise = readerPromise.thenAcceptAsync((buffer) -> {
    try {
        connection.write(buffer).get();
        connection.close();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
});


Finally, if there's an exception, you could log that with the following:

writerPromise.exceptionally(x -> {
    x.printStackTrace(); // TODO: use logging instead
    return null;
});


(I'm not familiar with these APIs but this looks right for me.)

-
Note that I've removed the ugly casting with changing

return (ByteBuffer) buffer.flip();


to

buffer.flip();
return buffer;


-
Calling Future.get() in the lambdas does not seem to asynchronous for me too (althought they're run by another threads) but I've not idea how should it be handled. The two APIs does not seem compatible. (I guess you could get better answers on Stack Overflow with a more specific question.)

Code Snippets

readerPromise.thenAcceptAsync((buffer) -> {
    if (buffer != null) {
        try {
            connection.write(buffer).get();
            connection.close();
        } catch (InterruptedException | ExecutionException | IOException e {
            readerPromise.completeExceptionally(e);
        } 
    }
});
@Test
public void test3() throws Exception {
    final CompletableFuture<String> reader = CompletableFuture.supplyAsync(() -> {
        return "data";
    });
    reader.thenAcceptAsync(x -> {
        System.out.println("reader.thenAcceptAsync: " + x);
        boolean transitionToCompleted = reader.completeExceptionally(new RuntimeException(
                "overridden Future result"));
        System.out.println(transitionToCompleted); // prints "false"
    });

    Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
    System.out.println("reader result: " + reader.get()); // prints "data"
}
final CompletableFuture<ByteBuffer> readerPromise = CompletableFuture.supplyAsync(() -> {
    try {
        final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
        connection.read(buffer).get();
        buffer.flip();
        return buffer;
    } catch (final Exception e) {
        throw new RuntimeException(e);
    }
});
final CompletableFuture<Void> writerPromise = readerPromise.thenAcceptAsync((buffer) -> {
    try {
        connection.write(buffer).get();
        connection.close();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
});
writerPromise.exceptionally(x -> {
    x.printStackTrace(); // TODO: use logging instead
    return null;
});

Context

StackExchange Code Review Q#47354, answer score: 3

Revisions (0)

No revisions yet.