多线程模型
多线程模型
多线程模型与单线程模型最大的区别是有专门的一组 NIO 线程处理 IO 操作,一般使用线程池的方式实现。一个 NIO 线程同时处理多条连接,一个连接只能属于 1 个 NIO 线程,这样能够防止并发操作问题。
多线程模型中有一个专门的线程负责监听和处理所有的客户端连接,网络 IO 操作由一个 NIO 线程池负责,它包含一个任务队列和 N 个可用的线程,由这些 NIO 线程负责消息的读取、解码、编码和发送。1 个 NIO 线程可以同时处理 N 条链路,但是 1 个链路只对应 1 个 NIO 线程,防止发生并发操作问题。在绝大多数场景下,Reactor 多线程模型都可以满足性能需求。但是在更高并发连接的场景(如百万客户端并发连接),它的性能似乎不能满足需求,于是便出现了下面的多 Reactor(主从 Reactor)模型。
代码案例
public class Acceptor implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("有客户端连接上来了," + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
// 修改此处就可以切换成多线程模型了
selectionKey.attach(new WorkerHandlerThreadPool(socketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class WorkerHandlerThreadPool implements Runnable {
static ExecutorService pool = Executors.newFixedThreadPool(2);
private SocketChannel socketChannel;
public WorkerHandlerThreadPool(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
System.out.println("WorkerHandlerThreadPool thread :" + Thread.currentThread().getName());
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer);
pool.execute(new Process(socketChannel, buffer));
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class Process implements Runnable {
private SocketChannel socketChannel;
private ByteBuffer byteBuffer;
public Process(SocketChannel socketChannel, ByteBuffer byteBuffer) {
this.byteBuffer = byteBuffer;
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
System.out.println("sleep 10s");
Thread.sleep(10000);
System.out.println("process thread:" + Thread.currentThread().getName());
String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
System.out.println(socketChannel.getRemoteAddress() + "发来的消息是:" + message);
socketChannel.write(ByteBuffer.wrap("你的消息我收到了".getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
有客户端连接上来了,/127.0.0.1:64797
有客户端连接上来了,/127.0.0.1:64800
WorkerHandlerThreadPool thread :main
sleep 10s
process thread:pool-1-thread-1
/127.0.0.1:64797发来的消息是:I am one
WorkerHandlerThreadPool thread :main
sleep 10s
WorkerHandlerThreadPool thread :main
sleep 10s
process thread:pool-1-thread-2
/127.0.0.1:64800发来的消息是:I am two
process thread:pool-1-thread-1
/127.0.0.1:64797发来的消息是:hah
*/
单 Reactor 多线程模型看起来是很不错了,但是还是有缺点:一个 Reactor 还是既然负责连接请求,又要负责读写请求,连接请求是很快的,而且一个客户端一般只要连接一次就可以了,但是会发生很多次写请求,如果可以有多个 Reactor,其中一个 Reactor 负责处理连接事件,多个 Reactor 负责处理客户端的写事件就好了,这样更符合单一职责,所以主从 Reactor 模型诞生了。