Nonblocking I/O
Nonblocking I O
Allow me to demonstrate with a very simple network server that might be used for testing routers and the like. The server accepts a connection from any client. It then sends that client a continuous stream of bytes beginning with 0 and continuing through 255, at which point it starts again with 0. The server never closes the connection; it waits for the client to close. You could use this to test the speed of the server's network connection. Example 16-1 implements this protocol using classic I/O plus threads.
Example 16-1. DataStuffer implemented with classic I/O
import java.net.*; import java.io.*; public class DataStuffer { private static byte[] data = new byte[256]; public static void main(String[] args) throws IOException { int port = 9000; for (int i = 0; i < data.length; i++) data[i] = (byte) i; ServerSocket server = new ServerSocket(port); while (true) { Socket socket = server.accept( ); Thread stuffer = new StuffThread(socket); stuffer.start( ); } } private static class StuffThread extends Thread { private Socket socket; public StuffThread(Socket socket) { this.socket = socket; } public void run( ) { try { OutputStream out = new BufferedOutputStream(socket.getOutputStream( )); while (!socket.isClosed( )) { out.write(data); } } catch (IOException ex) { if (!socket.isClosed( )) { try { socket.close( ); } catch (IOException e) { // Oh well. We tried. } } } } } } |
Using channels instead of streams, we can implement this entire program in one thread and support many more clients to boot. The initial process is as follows:
- Open a ServerSocketChannel.
- Put the channel in nonblocking mode.
- Open a Selector.
- Register the ServerSocketChannel with the Selector for accept operations.
To create a nonblocking channel, open the server socket in the usual way:
ServerSocketChannel server = ServerSocketChannel.open( );
Then pass true to the configureBlocking( ) method to put it in nonblocking mode:
server.configureBlocking(false);
Next, create a Selector object:
Selector selector = Selector.open( );
This object will be responsible for managing all the different channels and deciding which one is ready to be read or written. Initially, you just have one channel, the server socket channel. When you register each channel with the Selector, you have to specify the kinds of operations for which you're registering. There are four kinds, each represented by a named constant in the SelectionKey class:
SelectionKey.ACCEPT
Accept a connection from a client.
SelectionKey.CONNECT
pen a connection to a server.
SelectionKey.READ
Read data from a channel.
SelectionKey.WRITE
Write data to a channel.
The ServerSocketChannel needs to be registered for accepting connections:
server.register(selector, SelectionKey.OP_ACCEPT);
From this point, you enter an infinite loop that selects the ready channels:
while (true) { selector.select( ); Set readyKeys = selector.selectedKeys( ); // process each ready key... }
Initially, the Selector is registered with only one key, so only one key can be selected. However, we're going to register the Selector with more keys inside the loop as connections are accepted. The keys themselves are processed in a finite loop, like this:
Iterator iterator = readyKeys.iterator( ); while (iterator.hasNext( )) { SelectionKey key = (SelectionKey) iterator.next( ); iterator.remove( ); // work with the key... }
It's necessary to remove each key from the set of ready keys before processing it. Should the key become ready again in the future, it is included in the next set returned by readyKeys( ).
Different keys may be ready to do different things. Some are ready for reading, some for writing, and some for accepting. When processing a key, the first thing to do is figure out what it's ready for:
if (key.isAcceptable( )) { // accept the connection and register the Selector // with the key for this connection... } else if (key.isWritable( )) { // write to the connection... }
This example doesn't need to read from the channel, but most applications do this as well.
The first possibility is that the Selector has found a channel ready to accept an incoming connection. In this case, we tell the server channel to accept the connection. This returns a SocketChannel that is then configured in nonblocking mode and registered with the same Selector. However, it's registered as being interested in write operations:
SocketChannel client = server.accept( ); client.configureBlocking(false); SelectionKey key2 = client.register(selector, SelectionKey.OP_WRITE);
The key also needs to know what data is being written to the channel and how much of it has already been written. This requires some sort of object that contains a reference to the actual data and an index into that data. For some servers, the data is a file or a stream of some kind. In this case, it's a constant byte array. In fact, the same byte array is written to all the different channels. However, different channels are at different positions in that array at different times, so we wrap a ByteBuffer around the array just for the use of this channel. As long as every connection treats its buffer as read-only, there won't be any conflicts. This buffer is then attached to the key:
ByteBuffer source = ByteBuffer.wrap(data); key2.attach(source);
The other possibility is that the key is not ready for accepting. Instead, it's ready for writing. In this case, the key points to a previously opened SocketChannel. If so, we get the channel for the socket and write some data onto the channel:
SocketChannel client = (SocketChannel) key.channel( ); ByteBuffer output = (ByteBuffer) key.attachment( ); if (!output.hasRemaining()) output.rewind( ); client.write(output);
Notice that the ByteBuffer that was attached to the key earlier when the channel was accepted is now retrieved.
This whole process is put together in two nested loops, as shown in Example 16-2.
Example 16-2. DataStuffer implemented with nonblocking I/O
import java.net.*; import java.io.*; import java.nio.*; import java.nio.channels.*; import java.util.*; public class NewDataStuffer { private static byte[] data = new byte[255]; public static void main(String[] args) throws IOException { for (int i = 0; i < data.length; i++) data[i] = (byte) i; ServerSocketChannel server = ServerSocketChannel.open( ); server.configureBlocking(false); server.socket( ).bind(new InetSocketAddress(9000)); Selector selector = Selector.open( ); server.register(selector, SelectionKey.OP_ACCEPT); while (true) { selector.select( ); Set readyKeys = selector.selectedKeys( ); Iterator iterator = readyKeys.iterator( ); while (iterator.hasNext( )) { SelectionKey key = (SelectionKey) iterator.next( ); iterator.remove( ); try { if (key.isAcceptable( )) { SocketChannel client = server.accept( ); System.out.println("Accepted connection from " + client); client.configureBlocking(false); ByteBuffer source = ByteBuffer.wrap(data); SelectionKey key2 = client.register(selector, SelectionKey.OP_WRITE); key2.attach(source); } else if (key.isWritable( )) { SocketChannel client = (SocketChannel) key.channel( ); ByteBuffer output = (ByteBuffer) key.attachment( ); if (!output.hasRemaining( )) { output.rewind( ); } client.write(output); } } catch (IOException ex) { key.cancel( ); try { key.channel().close( ); } catch (IOException cex) {} } } } } } |
That, in a nutshell, is how a nonblocking server is written. Now that you've seen the big picture, let's drill down and look more closely at the individual classes involved in this system.