HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Multi-threaded socket server high load

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
multihighloadserversocketthreaded

Problem

I'm trying to make a backend for QuizUp like application: user connects to a server, sends credentials and gets paired up with another user. After that server handles each pair, periodicaly sending server messages to each user in a pair and also redirecting user's mesages between them.

Server class:

```
private static class Server{

private static final int NUM_THREADS = 2400;

private ExecutorService executorService;

private ServerSocket serverSocket;

private int listeningPort;

public volatile boolean isRunning;

private Thread mainThread;

private volatile Map playRequests;

public Server(int port){

try {
executorService = Executors.newFixedThreadPool(NUM_THREADS);
listeningPort = port;
serverSocket = new ServerSocket(listeningPort);
isRunning = true;
playRequests = new ConcurrentHashMap();
mainThread = new Thread(new Runnable(){

@Override
public void run() {
handleIncomingConnections();
}
});

} catch (IOException e) {
System.out.println(e.toString());
}
}

public void run(){
mainThread.start();
}

private void handleIncomingConnections(){
while(isRunning){
try {
final Socket client = serverSocket.accept();
Runnable gameRunnable = new Runnable(){

@Override
public void run() {
try{
BufferedReader reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
PrintWriter writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(client.getOutputStream())), true);

String read = null;
String id = null;
boolean isRequesting = f

Solution

Blocking I/O doesn't scale well

Blocking I/O usually requires a 1:1 coupling between threads and streams. Thin clients can get away with using blocking I/O, because they're not going to have 100+ connections open. For servers with long-lasting connections, it's not workable.

You've already noticed this with your memory requirements spiking. Consider that, if a thread uses just 256KB (or even 1MB) in stack space, and you run 2400 threads, you're running 600MB (or 2400MB) just for your executor.

Enter non-blocking I/O

Java 1.4 introduced non-blocking I/O (NIO) to get around the 1:1 thread-to-stream coupling. NIO works pretty much like event handling in a GUI: you attach streams to a selector, you poll that selector for interesting events (e.g. stream has unread data, connection is ready), and then you do something with that event. Just like a GUI doesn't need a separate thread per component, NIO allows you to run a server with basically a single worker thread on your end.

Here's a quick & dirty example of something running under NIO, minus the executor and clean-up for brevity:

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class Repeater {
  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel server = ServerSocketChannel.open();
    server.configureBlocking(false);
    server.socket().bind(null);
    server.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler());
    System.out.println("Listening on " + server.socket().getInetAddress() + " @ " + server.socket().getLocalPort());

    while ( selector.select() > 0 ) {
      Iterator keys = selector.selectedKeys().iterator();
      while ( keys.hasNext() ) {
        SelectionKey key = keys.next();
        try {
          ((Handler) key.attachment()).handle(selector, key);
        } catch (Exception ex) {
          ex.printStackTrace();
          continue;
        } finally {
          keys.remove(); // [!]
        }
      }
    }
  }

  static interface Handler {
    void handle(Selector selector, SelectionKey key) throws IOException;
  }

  static final int BUFFER_SIZE_IN_BYTES = 140;
  static class AcceptHandler implements Handler {
    public void handle(Selector selector, SelectionKey key) throws IOException {
      if ( key.isAcceptable() ) {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel client = server.accept();
        client.configureBlocking(false);
        client.write(ByteBuffer.wrap(("Please leave a message no longer than " + BUFFER_SIZE_IN_BYTES + " bytes.\r\n").getBytes()));

        client.register(selector, SelectionKey.OP_READ, new ReadHandler());
      }
    }
  }

  static class ReadHandler implements Handler { 
    private final ByteBuffer myStorage = ByteBuffer.allocate(BUFFER_SIZE_IN_BYTES); 
    public void handle(Selector selector, SelectionKey key) throws IOException {
      if ( key.isReadable() ) {
        SocketChannel client = (SocketChannel) key.channel();
        client.read(myStorage);
        if ( !myStorage.hasRemaining() || new String(myStorage.array(), 0, myStorage.position()).endsWith("\n") ) {
          myStorage.flip();
          client.write(myStorage);
          myStorage.clear();
        } else {
          client.register(selector, SelectionKey.OP_READ, this);
        }
      }
    }
  }
}


A quick test on a 64-bit JVM had process size on about 22MB for 4000 connections, and 36MB for 10000 connections. Actual use will be a bit more because this example is simplistic, but you get a sense of how it scales.

If you need to do serious processing before sending an answer, you can still delegate the channel handling to an executor.

...or consider asynchronous I/O

Java 7 introduced asynchronous I/O under the name NIO.2, which looks like it can do away with manually controlling the selector. I haven't used it in any serious manner yet, so I can't comment on how well it handles things or its ease of use, but it's worth taking a look to see which approach better floats your boat.

Networking libraries

I'm assuming you're also doing this as an exercise to yourself. If not, consider using Apache MINA to shield you from some of the complexity.

Code Snippets

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class Repeater {
  public static void main(String[] args) throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel server = ServerSocketChannel.open();
    server.configureBlocking(false);
    server.socket().bind(null);
    server.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler());
    System.out.println("Listening on " + server.socket().getInetAddress() + " @ " + server.socket().getLocalPort());

    while ( selector.select() > 0 ) {
      Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
      while ( keys.hasNext() ) {
        SelectionKey key = keys.next();
        try {
          ((Handler) key.attachment()).handle(selector, key);
        } catch (Exception ex) {
          ex.printStackTrace();
          continue;
        } finally {
          keys.remove(); // [!]
        }
      }
    }
  }

  static interface Handler {
    void handle(Selector selector, SelectionKey key) throws IOException;
  }

  static final int BUFFER_SIZE_IN_BYTES = 140;
  static class AcceptHandler implements Handler {
    public void handle(Selector selector, SelectionKey key) throws IOException {
      if ( key.isAcceptable() ) {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel client = server.accept();
        client.configureBlocking(false);
        client.write(ByteBuffer.wrap(("Please leave a message no longer than " + BUFFER_SIZE_IN_BYTES + " bytes.\r\n").getBytes()));

        client.register(selector, SelectionKey.OP_READ, new ReadHandler());
      }
    }
  }

  static class ReadHandler implements Handler { 
    private final ByteBuffer myStorage = ByteBuffer.allocate(BUFFER_SIZE_IN_BYTES); 
    public void handle(Selector selector, SelectionKey key) throws IOException {
      if ( key.isReadable() ) {
        SocketChannel client = (SocketChannel) key.channel();
        client.read(myStorage);
        if ( !myStorage.hasRemaining() || new String(myStorage.array(), 0, myStorage.position()).endsWith("\n") ) {
          myStorage.flip();
          client.write(myStorage);
          myStorage.clear();
        } else {
          client.register(selector, SelectionKey.OP_READ, this);
        }
      }
    }
  }
}

Context

StackExchange Code Review Q#47851, answer score: 8

Revisions (0)

No revisions yet.