主要内容
IO复用的目的
在常规的IO操作过程中,例如文件的读写操作等,都是阻塞式的。当然,这些问题可以通过线程来解决;然而创建线程的成本是比较高的,而且可能会造成一系列的问题,例如线程之间的切换和数据共享,死锁等。虽然现代cpu的计算速度日新月异,但是,如果有一种方案可以不使用线程,而且又可以进行非阻塞式IO操作,那可乐而不为呢。selector就是这样一个解决方案。
Selector的作用
简单来说,Selector可以对多个已注册的SelectableChannel
的状态进行监控,例如某一个Channel是否可写、可读。一旦这些Channel的状态发生变化,我们就可以在程序中针对特定的状态进行操作。
SelectionKey
SelectionKey的作用是对SelectableChannel的状态进行维护和反馈,channel对象的状态有以下四种:
- readable 通道是否可读
- writable 通道是否可写
- connectable 通道是否可连接
- acceptable 通道是否可被接受
另外,SelectionKey里面还维护了一个和此对象绑定的channel对象,通过SelectionKey的channel()
方法可以获得。
Interest Set
Intereset set是指我们感兴趣的操作集合,这个集合和上面的SelectionKey对象的状态互相对应,它们分别是:
- OP_READ
- OP_WRITE
- OP_CONNECT
- OP_ACCEPT
调用SelectableChannel的register方法的时候,就可以指定这些集合。例如,如果想知道一个通道是否准备好读取数据,就可以这样做:
1 2 |
client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); |
第一行代码是必须的,channel的blocking模式必须设置为false才能被selector使用。一次还可以设置多个集合:
1 2 |
client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); |
Ready Operations
Ready Operations指那些已经就绪的通道状态,通过检测SelectionKey的Interest Set,就可以知道通道是否可执行操作,例如:
1 |
int readable = selectionKey.readyOps() & SelectionKey.OP_READ |
如果readable为真,表示通道可读。SelectionKey提供了一些快捷方法专门来检测通道的状态,它们是:
- isReadable() 相当于selectionKey.readyOps() & SelectionKey.OP_READ
- isWritable() 相当于selectionKey.readyOps() & SelectionKey.OP_WRITE
- isConnectable() 相当于selectionKey.readyOps() & SelectionKey.OP_CONNECT
- isAcceptable() 相当于selectionKey.readyOps() & SelectionKey.OP_ACCEPT
创建Selector对象
可以通过两个方法创建Selector的对象实例。第一种是使用Selector对象提供的静态方法open()
;第二种是通过SelectorProvider对象的openSelector()
方法。不过,当你查看Selector的open方法的源码,发现它实际上是使用了第二种方法返回的selector对象。
1 2 3 4 |
Selector selector = Selector.open(); //or SelectorProvider provider = SelectorProvider.provider(); Selector selector = provider.openSelector(); |
根据不同的平台,open方法返回的实现不同,在windows平台下,open返回的selector对象是WindowsSelectorImpl类型,而在Linux平台,返回的则是EPollSelectorImpl类型。
使用Selector对象
使用Selector对象的时候,首先注册channel,然后就可以调用Selector对象的select()
方法开始工作。
1 2 3 4 5 6 7 |
//这里省略一些初始化操作 socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true){ selector.select(); } |
select()
方法返回的是已就绪的通道个数。
selectedKeys集合
Selector对象内部维护一个selectedKyes集合属性,一旦已注册通道有就绪条件,这个集合就会被更新。可以通过遍历这个集合来对每个就绪的通道进行操作。例如:
1 2 3 4 5 6 7 8 9 10 11 12 |
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ //取出绑定的SelectionKey SelectionKey clientKey = iterator.next(); if (clientKey.isAcceptable()) { //do someting }else if (clientKey.isReadable()){ //read data from or write data to channel } iterator.remove(); } |
Selector的工作流程图
Selector的工作流程分两部走,第一步是注册过程,第二步是监听过程;
channel的注册过程
通过查看Selector的源码实现,简单总结一下Selector的工作流程,先来看Channel的注册过程:
在注册之前,selector对象必须存在,因此把selector对象的创建放在第一位。在channel的register方法中,实际上是调用了selector的register方法,并把自己(this)作为其中一个参数传递过去,如下代码所示:
当selector的register方法执行完毕,Selector类中的keys字段也被初始化完成,keys字段包含所有被注册的channel对象的信息。
selector的监听过程
当selector对象的select()
方法被执行,selector的监听过程就开始进行。select方法是selector对象的核心,当select方法发现有channel对象就绪时,就会更新它内部的selectedKeys
属性,这个属性表示当前就绪通道的对象集合。
完整代码
下面的代码中selector分别对两个通道进行监控,一个ServerSocketChannel服务器通道,用于接受客户端连接;另一个是SocketChannel,它是一个客户端连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
@Test private static void testNioServer() throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocket = ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(3333); serverSocket.bind(address); serverSocket.configureBlocking(false); serverSocket.register(selector, SelectionKey.OP_ACCEPT); while (true) { System.out.println("等待客户端建立连接..."); int clients = selector.select(); if (clients <= 0){ continue; //没有客户端 } //获取迭代器,检查是否有已经就绪的通道 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { //循环检查每一个通道 SelectionKey key = iterator.next(); //通道准备好接受客户端的连接 if (key.isAcceptable()) { SocketChannel client = serverSocket.accept(); //注册客户端 client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { //通道有数据可读 //获取绑定的通道,这是一个客户端连接 SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(8192); try{ int nread = client.read(buffer); if (nread != -1) { String output = new String(buffer.array()).trim(); System.out.println("从客户端接收到的信息 : " + output); } else { System.out.println("客户端正常关闭..."); client.close(); //客户端正常退出 } } catch (Exception e){ /** * 处理客户端异常关闭的情况, * 例如程序崩溃或客户端主机崩溃 */ client.close(); System.out.println(e.getMessage()); } } //每一个已被处理的通道都要从迭代器中移除, //且必须在进入下一次循环之前被移除 iterator.remove(); } } } |