单线程模型
单线程模型
所有的 IO 操作都在同一个 NIO 线程上面完成,NIO 线程需要处理客户端的 TCP 连接,同时读取客户端 Socket 的请求或者应答消息以及向客户端发送请求或者应答消息。如下图:
所有的 I/O 操作都在同一个 NIO 线程上面完成。可以看到有多个客户端连接到 Reactor,Reactor 内部有一个 dispatch(分发器)。有连接请求后,Reactor 会通过 dispatch 把请求交给 Acceptor 进行处理,有 IO 读写事件之后,又会通过 dispatch 交给具体的 Handler 进行处理。此时一个 Reactor 既然负责处理连接请求,又要负责处理读写请求,一般来说处理连接请求是很快的,但是处理具体的读写请求就要涉及到业务逻辑处理了,相对慢太多了。Reactor 正在处理读写请求的时候,其他请求只能等着,只有等处理完了,才可以处理下一个请求。
该模型的处理时序图如下:
单线程模型只适用并发量比较小的应用场景。当一个 NIO 线程同时处理上万个连接时,处理速度会变慢,会导致大量的客户端连接超时,超时之后如果进行重发,更加会加重了 NIO 线程的负载,最终会有大量的消息积压和处理超时,NIO 线程会成为系统的瓶颈。对于一些小容量应用场景,可以使用单线程模型,但是对于高负载、大并发的应用却不合适。实际当中基本不会采用单线程模型。
代码案例
public class Main {
public static void main(String[] args) {
Reactor reactor = new Reactor(9090);
reactor.run();
}
}
public class Reactor implements Runnable {
ServerSocketChannel serverSocketChannel;
Selector selector;
// 在构造方法中,注册了连接事件,并且在 selectionKey 对象附加了一个 Acceptor 对象,这是用来处理连接请求的类。
public Reactor(int port) {
try {
serverSocketChannel = ServerSocketChannel.open();
selector = Selector.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
// 注册了连接事件
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 并且在 selectionKey 对象附加了一个 Acceptor 对象,这是用来处理连接请求的类
// 发生了连接事件后,Reactor 类的 dispatcher 方法拿到了 Acceptor 附加对象,调用了 Acceptor 的 run 方法
selectionKey.attach(new Acceptor(selector, serverSocketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
// Reactor类实现了Runnable接口,并且实现了run方法,在run方法中,监听各种事件,有了事件后,调用dispatcher方法
@Override
public void run() {
while (true) {
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispatcher(selectionKey);
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 在 dispatcher 方法中,拿到了 selectionKey 附加的对象,随后调用 run 方法,注意此时是调用 run 方法,并没有开启线程,只是一个普通的调用而已。
private void dispatcher(SelectionKey selectionKey) {
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
}
public class Acceptor implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
// 在 Acceptor 的 run 方法中又注册了读事件,然后在 selectionKey 附加了一个 WorkHandler 对象;
// Acceptor 的 run 方法执行完毕后,就会继续回到 Reactor 类中的 run 方法,负责监听事件。
@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("有客户端连接上来了," + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
// 当客户端写事件发生后,Reactor 又会调用 dispatcher 方法,此时拿到的附加对象是WorkHandler,所以又跑到了 WorkHandler 中的run方法。
selectionKey.attach(new WorkerHandlerThreadPool(socketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
// WorkHandler就是真正负责处理客户端写事件的了
public class WorkHandler implements Runnable {
private SocketChannel socketChannel;
public WorkHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);
String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
System.out.println(socketChannel.getRemoteAddress() + "发来的消息是:" + message);
//System.out.println("sleep 10s");
//Thread.sleep(10000);
socketChannel.write(ByteBuffer.wrap("你的消息我收到了".getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
e.printStackTrace();
}
}
}
对应的测试用客户端代码如下:
public class Client {
public static void main(String[] args) {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost", 9090));
new Thread(() -> {
while (true) {
try {
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
inputStream.read(bytes);
System.out.println(new String(bytes, StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
while (true) {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine();
socket.getOutputStream().write(s.getBytes());
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*
有客户端连接上来了,/127.0.0.1:64571
有客户端连接上来了,/127.0.0.1:64577
/127.0.0.1:64571发来的消息是:123
/127.0.0.1:64577发来的消息是:456
/127.0.0.1:64571发来的消息是:我是第一个人
/127.0.0.1:64577发来的消息是:我是第二个人
*/