knight_ka | 生活及学习笔记

BIO、NIO、AIO编程详解(中)

BIO、NIO、AIO编程详解(中)

@(IO)[BIO|NIO|AIO]
接下来看Java NIO。

NIO概述理解

NIO是同步非阻塞IO.
我们知道,阻塞IO是在调用inputStream.read()和outputStream.write();两个方法时是阻塞的,在调用serverSocket.accept()的时候也是阻塞的。
那么NIO就要解决这两个问题:1.服务器启动后不用阻塞去等待客户端连接。2.在进行数据交互时不会阻塞(读取和写入)。

NIO:
有人称为New I/O,因为它是jdk1.4之后java新增的I/O类库,这是官方叫法。
因为jdk1.4之前,所有的I/O类库都是阻塞性I/O,而NIO的出现就是为了解决I/O的阻塞问题的。所以NIO还有一种叫法就是非阻塞I/O(Non-block I/O)

NIO核心组件

通道
nio通道
缓冲区buffer
nio buffer
多路复用器Selector

NIO服务端

NIO服务端序列图
nio序列图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package com.designPattern.Reactor.NIO2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* 因为jdk1.4之前,所有的I/O类库都是阻塞性I/O,而NIO的出现就是为了解决I/O的阻塞问题的。所以NIO还有一种叫法就是非阻塞I/O(Non-block I/O)
* NIO服务器的启动:
1.打开服务端通道:ServerSocketChannel
2.设置通道监听的地址及连接数:InetSocketAddress
3.打开一个多路复用器:selector
4.将服务端通道注册到多路复用器中,监听客户端的连接:OP_ACCEPT事件
5.多路复用器轮询就绪的key:selectionKey
> selectionKey对象代表了当前通道的的所有信息。
- interest集合(注册的事件)
- ready集合(是否有事件就绪)
- Channel (所属的通道)
- Selector (所属的复用器)
- 附加的对象(可选)
6.如果有OP_ACCEPT事件就绪 --> 获取就绪的socketChannel,并注册到复用器监听读事件:OP_READ
7.如果有OP_READ事件事件就绪 --> 读取通道中的数据 --> 把写入到通道中
* <p>
* Created by Tnp.
*/
public class NIOServer {
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (Exception e) {
e.printStackTrace();
}
}
try {
//1.创建服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2.给该通道设置端口及阻塞方式
serverSocketChannel.configureBlocking(false); //非阻塞
/**
* 1)backlog 用于在TCP层接收链接的缓冲池的最大个数,这个个数可在应用层中的listen函数里设置,当客户链接请求大于这个个数
* (缓冲池满),其它的未进入链接缓冲池的客户端在tcp层上tcp模块会自动重新链接,直到超时(大约57秒后)
2)应用层链接(connect)完成时,要从tcp层的链接缓冲池中移出一个(accept函数实现)
3).backlog是连接请求队列的最大长度
* */
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); //backlog 1024
//3.把该通道注册到多路复用器中,并设置监听的事件
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 此处应该用一个死循环
while (true) {
//4.多路复用器轮询监听的事件
selector.select(1000); //设置1000ms唤醒一次selector
//5.如果有事件就绪则进行处理
Set<SelectionKey> selectionKeys = selector.selectedKeys(); //访问“已选择键集(selected key set)”中的就绪通道
//查找处于就绪状态的channel
Iterator<SelectionKey> iterator = selectionKeys.iterator(); //获取到所有已经就绪的channel
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
System.out.println(key);
/**
* 注意每次迭代的iterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。
* 下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
* */
iterator.remove();
try {
//6.设置连接的socket参数
/**
* isValid( )方法来检查它是否仍然表示一种有效的关系,
* 当键被取消时,它将被放在相关的选择器的已取消的键的集合里。
* 注销不会立即被取消,但键会立即失效。当再次调用select( )方法时,
* 已取消的键的集合中的被取消的键将被清理掉,并且相应的注销也将完成。
*
* */
if (key.isValid()) {
if (key.isAcceptable()) { //判断是否有accept事件就绪
//获取通道,并转型成自己想要操作的通道类型
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = ssc.accept(); //监听有没有新链接到达,会一直阻塞到有新连接,
// 这个操作完成相当于完成了TCP的三次握手,物理链路正式建立
//将新创建的SocketChannel设置为异步非阻塞,同时可以配置TCP缓冲区大小等配置
socketChannel.configureBlocking(false);
//7.向多路复用器监听read操作
socketChannel.register(selector, SelectionKey.OP_READ); //监听此通道是否有可读事件
}
//8.如果通道有可读事件,异步处理消息到bytebuffer。
if (key.isReadable()) {
//不能把它放在if(key.isAcceptable())中,因为通道第一次连接的时候会是Accpet事件,新连接的时候只注册读事件的监听,"
// 然后会把这个key从迭代器中remove掉。当下一次添加到set集合中的时候就是有读事件发生的时候才会添加进来,
// 所以下一次进来的时候key.isAcceptable()是false,而key.readable()可能是true.
//1.当前通道可读时,获取当前通道
SocketChannel sc = (SocketChannel) key.channel();
//2.创建一个ByteBuffer 用来存储读取的消息
ByteBuffer readBuffer = ByteBuffer.allocate(1024);//设置大小为1024个字节
//3.将通道中的内容读取到bytebuffer中
int read = sc.read(readBuffer); //返回读取的字节数
System.out.println("当前线程数:"+Thread.activeCount());
System.out.println("当前线程数:"+Thread.activeCount());
System.out.println("当前线程数:"+Thread.activeCount());
if (read != -1) {
/**
* flip方法将Buffer从写模式切换到读模式。调用flip()方法会将position设回0,并将limit设置成之前position的值。
* 换句话说,position现在用于标记读的位置,
* limit表示之前写进了多少个byte、char等 —— 现在能读取多少个byte、char等。
* 设置本次可以读取的内容空间
* */
readBuffer.flip();
//remaining方法的具体实现:return limit - position
//返回可以读取的内容大小
int remaining = readBuffer.remaining();
//创建该大小的byte数组
byte[] bytes = new byte[remaining];
readBuffer.get(bytes); //把buffer中的内容读取到byte数组中
//将byte数组转换成字符串操作
String requestStr = new String(bytes, "UTF-8");
System.out.println("接受到的内容为:" + requestStr);
String responseStr = requestStr.contains("time") ? new java.util.Date(System
.currentTimeMillis()).toString() : "错误的请求";
//响应消息到一个新的byteBuffer中
byte[] responseBytes = responseStr.getBytes();
ByteBuffer writerBuffer = ByteBuffer.allocate(responseBytes.length);
writerBuffer.put(responseBytes); //将数组中的元素put到缓存中
writerBuffer.flip(); //初始化buffer指针
sc.write(writerBuffer); //由于它是异步非阻塞,所以可能出现“写半包情况” 需要循环读取写事件,进行把buffer中的数据写入
}
}
}
} catch (Exception e) {
e.printStackTrace();
//在需要结束通道的地方释放资源,如果当前连接有问题,则关闭当前连接
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}
//在需要的地方关闭多路复用器
// if(selector != null){
// selector.close(); //关闭多路复用器
// }
} catch (IOException e) {
e.printStackTrace();
}
}
}

NIO客户端

NIO服务端序列图
nio序列图client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package com.designPattern.Reactor.NIO2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* NIO客户端状态:
* 1.获取服务端连接,不管有没有获取成功都会继续向下执行,程序不会停止不前。(异步)
* 2.NIO客户端会通过事件驱动,利用selector的轮询机制处理网络通信。
* Created by Tnp.
*/
public class NIOClient {
public static void main(String[] args) {
try {
//1.打开一个SocketChannel 套接字通道
SocketChannel socketChannel = SocketChannel.open();
//2.给通道设置参数,并配置要连接的Server地址和端口
socketChannel.configureBlocking(false); //非阻塞
//3.异步连接到服务器 -- 采用死循环
boolean b = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
Selector selector =selector = Selector.open(); //获取selector
if(b){ //连接成功
//3.1 将通道注册到selector中,并注册可读事件
socketChannel.register(selector, SelectionKey.OP_READ);
//3.2向服务器发送消息
doWriter(socketChannel);
}else{
//4.判断连接结果,如果连接失败则享Reactor线程中的多路复用器注册Connect事件
//因为是异步连接,可能会有没有连接成功的现象,如果没有连接成功,那么向selector中注册该通道的连接事件
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
boolean stop = false;
while(!stop){
//5.启动多路复用器
int select = selector.select(1000); //阻塞到至少有一个通道注册的事件就绪了。
System.out.println("阻塞状态:"+select); //阻塞则是0,非阻塞则是1
Set<SelectionKey> selectionKeys = selector.selectedKeys();//拿到已经就绪的通道
//6.轮询已经就绪的channel
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey key;
while(iterator.hasNext()){
key = iterator.next();
/**
* 注意每次迭代的iterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。
* 下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
* */
iterator.remove();
//7.处理数据
try{
//7.1 检查该键是不是仍然有效
if(key.isValid()){
//7.2判断是否连接成功,还可以通过key.readyOps();获取所有准备就绪的事件集合
if(key.isConnectable()){
/**
* 因为SocketChannel在非阻塞模式下调用connect()可能连接没有建立完成就已经返回了。
* 为了确定连接是否建立,可以调用finishConnect()的方法。
* */
SocketChannel sc = (SocketChannel)key.channel();
if(sc.finishConnect()){ //如果建立连接已完成
//向selector中注册读取事件
sc.register(selector,SelectionKey.OP_READ);
doWriter(sc);
}
}
//7.3 如果该key是可读事件就绪
if(key.isReadable()){
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//7.4 把通道中的内容读取到buffer中
int i = socketChannel.read(byteBuffer);
if(i > 0){ //读取到了内容0
byteBuffer.flip(); //初始化buffer中的指针,准备把buffer中的数据复制出来
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String s = new String(bytes, "UTF-8");
System.out.println("接收到服务端的消息为:"+s);
// 通讯结束,停止多路复用器轮询
stop = true;
}else if(i < 0){
//说明链路已经关闭,需要关闭通道释放资源
key.cancel();
socketChannel.close();
}else{
//读取的0字节 ..
}
}
}
}catch (IOException e){
// 如果有异常,则释放资源
e.printStackTrace();
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}
/**
* 多路复用器关闭后,所有注册在上面的channel和pipe等资源都会被自动关闭,所以不必重复释放资源。
* */
if(selector != null){
selector.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 向管道中写入消息
* */
private static void doWriter(SocketChannel socketChannel) throws IOException {
byte[] requestStr = "Query time".getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(requestStr.length);
byteBuffer.put(requestStr);
byteBuffer.flip(); //初始化缓存中的指针,方便下次的读取或者写入
socketChannel.write(byteBuffer);
System.out.println("byteBuffer中的可用内容空间为:"+byteBuffer.remaining());
/**
* 如果limit = position 则byteBuffer.hasRemaining()同样返回false
* 所以用remaining的大小来判断
* */
if(byteBuffer.remaining()< 0){ //limit > postion 如果当前位置小于limit,则说明byteBuffer中的数据已经全部发送出去了。
System.out.println("没有发送完毕,需要二次发送");
}
}
}

nio通讯分析

nio2