Java nio 手记
1. nio 的三大组件
nio 的三大核心组件:
- Channel 管道
- Buffer 缓冲区
- Selector 多路复用器
1.1 Channel
Channel
是数据传输的双向通道,既可以从外部将数据读入(read)到内存中,也可以从内存写出(write)数据到外部。
常见的 Channel 有:
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
SocketChannel
和 ServerSocketChannel
应用于网络编程中。
1.2 Buffer
Buffer
内存缓冲区,用来暂存从 Channel 中读入的数据,或者是将要写出到外部的数据。
常见的 Buffer 有:
ByteBuffer 字节缓冲区(抽象类),实现:
- MappedByteBuffer
- DirectByteBuffer
通过 ByteBuffer 的静态方法 allocateDirect() 获取,使用直接内存
- HeapByteBuffer
通过 ByteBuffer 的静态方法 allocate() 获取,使用Jvm的堆内存
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
CharBuffer
1.3 Selector
多路复用器,用来管理多个 Channel,监听 Channel 上发生的事件。Channel 工作在非阻塞模式下,不会让线程吊死在一个 Channel 上。适合连接数多,但流量低的场景(low traffic)。
2. ByteBuffer 字节缓冲区
2.1 基本使用
缓冲区手绘:
Buffer 有三个重要的属性:
- capacity 缓冲区容量(大小)
- limit
- position 当前指针
如图所示,在初始化一个缓冲区后,指针(position)默认指向起始位置 0,limit = capacity = 缓冲区的大小。
当像缓冲区读入数据时,读入一次,指针 +1。
看下面一段代码,是从 text.txt 文本文件中读取数据到缓冲区。
public static void main(String[] args) {
// 通过文件输入流获取 FileChannel 对象,读取 text.txt 中的内容。
// 这里使用了 try-with-resource 的形式,用来自动关闭 channel 释放资源。
try (FileChannel channel = new FileInputStream("text.txt").getChannel()) {
// 创建一个5个字节大小的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(5);
// 通过 Channel 的 read() 方法将数据读入到缓冲区中
while (channel.read(buffer) != -1){
// 切换读模式
buffer.flip();
// 遍历打印每个字节
while (buffer.hasRemaining()){
char c = (char) buffer.get();
System.out.println(c);
}
// 切换写模式
buffer.clear();
}
}
}
这里面包含两个需要知道的方法,切换读模式和切换写模式。
// 切换读模式
buffer.flip();
切换读模式从方法名上可以看出,是翻转的意思,翻转了什么呢?是改变了缓冲区的指针和limit,看下方法的源代码:
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
上手绘图:
切换读模式后,指针重新指向起始位置,我们可以从缓冲区的起始位置开始读取,每读取一次指针加1。如果没有切换读模式,指针指向的是缓冲区最后一个数据的下一个索引,所以是无法读取数据的。
切换到写模式:
buffer.clear();
源代码:
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
切换到写模式之后,指针重新指向缓冲区的起始位置,limit = capacity,写入时开始从缓冲区的起始位置写入,缓冲区内的数据将被覆盖掉。
假设缓冲区中存在上次未读完的数据,即切换到写模式之前,position < limit,说明有数据未读,此时通过 clear()
切换写模式后再次写入数据,可能会把上一次未读的数据覆盖掉。如果不想覆盖掉上一次未读的数据,可以使用 compact()
方法切换写模式。
compact()
方法是将上次未读的数据拷贝到缓冲区起始,指针指向到拷贝后的最后一个数据的下一个索引,再次写入数据时不会覆盖掉之前未读完的数据。
2.2 与 String 的互相转换
String to ByteBuffer
方式一,将 String 转为字节数组,再调用 ByteBuffer 的 put(byte[] bytes)
方法,传入字节数组。
ByteBuffer buffer1 = ByteBuffer.allocate(10);
buffer1.put("Hello".getBytes());
方式二:StandardCharsets.UTF_8.encode()
,注意:获取到 buffer 处于读模式。
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("Hello");
方式三:ByteBuffer.wrap(byte[] array)
方法,注意:获取到 buffer 处于读模式。
ByteBuffer buffer3 = ByteBuffer.wrap("Hello".getBytes());
ByteBuffer to String
传入的 buffer 需要处于读模式。
String s = StandardCharsets.UTF_8.decode(buffer).toString();
2.3 分散读和集中写
Java nio 支持 Scatter Reads(分散读) 和 Gather Writers(集中写),即可以将数据读入到不同的 buffer 中,或者将多个 buffer 中的数据集中写出。
ScatteringByteChannel
接口里的 read() 方法支持将数据读入到多个 buffer 中;
GatheringByteChannel
接口里的 write() 方法支持将多个 buffer 中的数据写出到外部。
重点是 要理解分散读和集中写的思想。
分散读:
/**
* Scattering Reads 分散读
* @author liyan
* @since 2021-11-02 15:45
*/
public class ScatteringReadsDemo {
public static void main(String[] args) {
// try-with-resource 写法,创建一个 FileChannel 对象,读取 text.txt,文本:HelloWorld
try (FileChannel channel = new RandomAccessFile("text.txt", "r").getChannel()) {
ByteBuffer bbf1 = ByteBuffer.allocate(3);
ByteBuffer bbf2 = ByteBuffer.allocate(3);
ByteBuffer bbf3 = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{bbf1, bbf2, bbf3}); // read()方法,支持将数据读入到多个 buffer 中
bbf1.flip(); // 切换读模式
bbf2.flip();
bbf3.flip();
System.out.println(StandardCharsets.UTF_8.decode(bbf1)); // Hel
System.out.println(StandardCharsets.UTF_8.decode(bbf2)); // loW
System.out.println(StandardCharsets.UTF_8.decode(bbf3)); // orld
} catch (IOException e) {
e.printStackTrace();
}
}
}
集中写:
/**
* Gathering Writes 集中写
* @author liyan
* @since 2021-11-02 16:39
*/
public class GatheringWritesDemo {
public static void main(String[] args) {
ByteBuffer bbf1 = StandardCharsets.UTF_8.encode("Hello ");
ByteBuffer bbf2 = StandardCharsets.UTF_8.encode("Netty ");
ByteBuffer bbf3 = StandardCharsets.UTF_8.encode("~");
try (FileChannel channel = new RandomAccessFile("text1.txt", "rw").getChannel()) {
channel.write(new ByteBuffer[]{bbf1, bbf2, bbf3}); // write() 方法,可以将多个buffer中的数据集中写出到文件中
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.4 黏包和半包
所谓黏包,是在网络传输过程中为了提高网络传输的效率,将多个数据包黏在一起一次传输,导致接收时多个数据黏在一起。
半包一般是由于服务器缓冲区大小不足导致的数据被截断。
3. 文件编程
3.1 FileChannel
FileChannel
是用来操作文件的 channel,它工作在阻塞模式下。
FileChannel 的创建方式:
- 通过 FileInputStream 创建,这种方式创建的 FileChannel 只能读入数据。
FileChannel channel = new FileInputStream("text.txt").getChannel();
- 通过 FileOutputStream 创建,这种方式创建的 FileChannel 只能写出数据。
FileChannel channel = new FileOutputStream("text.txt").getChannel();
- 通过 RandomAccessFile 创建,这种方式创建的 FileChannel 根据构建 RandomAccessFile 时指定的读写模式决定可读可写。
FileChannel channel = new RandomAccessFile("text1.txt", "rw").getChannel();
Channel 之间传输数据:
使用 FileChannel 的 transferTo() 方法将一个通道中的数据传输到另一个通道中。
transferTo() 方法的三个参数:
- position 源文件传输的起始位置,从哪开始传
- count 传输数据的字节数,从起始位置开始要传多少数据
- target 目标 channel
方法返回值是本次传输的字节数。
public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;
如果需要拷贝文件,可以使用 transferTo() 方法,它的底层使用了操作系统的零拷贝进行优化,效率比较高。但一次只能传输 2GB 大小的数据,传输大文件时要做处理。
示例代码:
/**
* @author liyan
* @since 2021-11-02 17:37
*/
public class FileChannelTransferToDemo {
public static void main(String[] args) {
try (
FileChannel from = new FileInputStream("from.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel()
) {
// transferTo() 一次只能传输2G数据,如果传输的文件大于2G,需要多次传输
long size = from.size(); // 源文件大小
for (long left = size; left > 0; ) { // left 定义源文件的剩余待传大小,初始值等于源文件的初始大小
System.out.printf("当前文件大小:%s, 剩余待传:%s\n", size, left);
// left = left - 本次传输的数据量
// size - left 表示传输的起始指针,为了多次传输时,接着上次的位置
left -= from.transferTo((size - left), left, to); // transferTo() 的传输效率高,底层使用了操作系统的零拷贝进行优化,transferTo传输上限:一次传2g
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.2 Path 和 Paths
Path
和 Paths
是在 Java 1.7 时引入的,可用于在文件系统中定位文件的对象,它通常表示依赖于系统的文件路径。
Path 用来表示文件或者目录的路径,Paths 是一个工具类,用来创建 Path 对象。
Paths 中只包含两个 get 方法用来创建 Path 对象:
字符串路径:
public static Path get(String first, String... more) {
return FileSystems.getDefault().getPath(first, more);
}
URI对象:
public static Path get(URI uri) {
// ...
}
3.3 Files
Files 是用来操作文件和目录的类库,常用的 Api:
- 判断文件、目录是否存在。
static boolean exists(Path path, LinkOption... options)
- 创建目录,只能创建一级目录。
static Path createDirectory(Path dir, FileAttribute<?>... attrs)
- 创建多级目录。
static Path createDirectories(Path dir, FileAttribute<?>... attrs)
- 拷贝文件,效率和 FileChannel 的 transferTo() 方法效率差不多,但底层实现不同。如果目标文件已经存在,则会抛出
FileAlreadyExistsException
异常。如果希望覆盖掉目标,可以在方法第三个参数传入StandardCopyOption.REPLACE_EXISTING
。
static Path copy(Path source, Path target, CopyOption... options)
- 移动文件。
static Path move(Path source, Path target, CopyOption... options)
- 删除文件和空目录(文件或目录不存在会抛异常,删除非空目录时会抛异常)。
static void delete(Path path)
- 如果存在要删除的文件或者目录则执行删除操作。
static boolean deleteIfExists(Path path)
- 遍历目录和文件。
static Path walkFileTree(Path start, FileVisitor<? super Path> visitor)
- 遍历目录和文件。
public static Stream<Path> walk(Path start, FileVisitOption... options) throws IOException {
return walk(start, Integer.MAX_VALUE, options);
}
代码:(利用 walkFileTree() 遍历目录并复制)
/**
* 利用 walkFileTree 遍历目录并复制
* @author liyan
* @since 2021-11-18 16:40
*/
public class WalkFileTreeDemo1 {
public static void main(String[] args) throws IOException {
String source = "C:\\Users\\liyan\\Downloads\\";
String target = "D:\\";
Path sourcePath = Paths.get(source, "redis-6.2.6");
Files.walkFileTree(sourcePath, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
Path to = Paths.get(dir.toString().replace(source, target));
Files.copy(dir, to);
return super.preVisitDirectory(dir, attrs);
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Path to = Paths.get(file.toString().replace(source, target));
Files.copy(file, to);
return super.visitFile(file, attrs);
}
});
}
}
4. 网络编程
4.1 ServerSocketChannel
ServerSocketChannel 是面向流的侦听套接字的可选通道,用来监听 Socket。
常用Api:
- 创建 ServerSocketChannel 对象。
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
- 将通道的 Socket 绑定到本地地址,并将 Socket 配置为侦听连接。
public final ServerSocketChannel bind(SocketAddress local)
throws IOException
{
return bind(local, 0);
}
- 接受 Socket 连接,并返回一个 SocketChannel 对象。(在阻塞模式下,在没有连接时会阻塞当前线程)
public abstract SocketChannel accept() throws IOException;
- 设置 channel 是否工作在阻塞模式。(通道默认是工作在阻塞模式下的,通过该方法传入 false 可以设置通道为非阻塞模式)
public final SelectableChannel configureBlocking(boolean block)
throws IOException
{
// ...
}
4.2 SocketChannel
SocketChannel 是面向流连接套接字的可选通道。
常用Api:
- 将通道内的数据读入到 buffer 中。(在没有可读数据时)
public abstract int read(ByteBuffer dst) throws IOException;
- 设置 channel 是否工作在阻塞模式。(通道默认是工作在阻塞模式下的,通过该方法传入 false 可以设置通道为非阻塞模式)
public final SelectableChannel configureBlocking(boolean block)
throws IOException
{
// ...
}
4.3 阻塞IO
阻塞模式下,accept()
和 read()
这些阻塞方法都会导致当前线程被阻塞,虽然阻塞的线程不会占用 CPU,但单线程情况下,阻塞方法之间相互影响,几乎没办法同时工作,需要多线程的支持。
在多线程下,又会有新的问题:
- 32位的 JVM 一个线程占用 320K 内存,64位 JVM 一个线程占用 1024K 内存,如果连接数过多,必定会引发内存溢出,并且线程数过多,频繁的上下文切换也会导致系统性能降低;
- 可以使用线程池技术减少线程数和线程间的上下文切换,但如果建立很多连接,长时间的连接会阻塞线程池中的所有线程,因此这种方式只适合短连接,不适合长连接。
测试代码:(服务端)
/**
* 阻塞IO 服务端
* @author liyan
* @since 2021-11-18 17:48
*/
public class Server {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open(); // 通过 open() 方法,创建一个 ServerSocketChannel
ssc.bind(new InetSocketAddress(8080)); // 将 Socket 绑定到本地 8080 端口并监听
List<SocketChannel> channels = new ArrayList<>(); // 存放 SocketChannel 的集合,处理多个连接
while (true) {
System.out.println("等待连接...");
SocketChannel sc = ssc.accept(); // accept() 是阻塞方法,会阻塞线程,当有连接请求时线程会继续执行
System.out.printf("建立连接:%s\n", sc);
channels.add(sc);
System.out.printf("当前集合 size: %s,元素:%s\n", channels.size(), channels);
ByteBuffer buffer = ByteBuffer.allocate(16);
for (SocketChannel channel : channels) { // 遍历集合中的所有 SocketChannel,当有可读数据时读入到 buffer
System.out.printf("当前遍历的channel:%s\n", channel);
channel.read(buffer); // read() 也是阻塞方法,当有可读数据时才会执行
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
buffer.clear();
}
}
}
}
4.4 非阻塞IO
非阻塞模式下,accept()
和 read()
方法在没有连接或者可读数据时也不会阻塞线程,通过修改上面的示例代码,非阻塞模式下可以处理多个连接和读取数据,方法之间不会相互影响。但是这种情况程序在一直遍历,线程需要一直工作。
非阻塞模式下:
- ServerSocketChannel 在没有连接时,accept() 方法返回 null;
- SocketChannel 在没有可读数据时,read() 方法返回 0;
- 在写出数据时,线程只需等待数据写出到 channel 即可,无需等待 channel 通过网络把数据发出去。
- 在没有连接和可读数据时,现在仍在不断运行,占用 CPU 资源;
- 数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)
测试代码:(服务端)
/**
* 非阻塞IO 服务端
* @author liyan
* @since 2021-11-19 09:40
*/
public class Server {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open(); // 通过 open() 方法,创建一个 ServerSocketChannel
ssc.configureBlocking(false); // 设置 ServerSocketChannel 工作在非阻塞模式下
ssc.bind(new InetSocketAddress(8080)); // 将 Socket 绑定到本地 8080 端口并监听
List<SocketChannel> channels = new ArrayList<>(); // 存放 SocketChannel 的集合,处理多个连接
while (true) {
SocketChannel sc = ssc.accept(); // accept() 是阻塞方法,会阻塞线程,当有连接请求时线程会继续执行
if (sc != null) {
sc.configureBlocking(false); // 设置 SocketChannel 工作在非阻塞模式下
System.out.printf("建立连接:%s\n", sc);
channels.add(sc);
System.out.printf("当前集合 size: %s,元素:%s\n", channels.size(), channels);
}
ByteBuffer buffer = ByteBuffer.allocate(16);
for (SocketChannel channel : channels) { // 遍历集合中的所有 SocketChannel,当有可读数据时读入到 buffer
int read = channel.read(buffer);// read() 也是阻塞方法,当有可读数据时才会执行
if (read > 0) {
System.out.printf("当前读取的channel:%s\n", channel);
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
buffer.clear();
}
}
}
}
}
4.5 多路复用器
Selector(多路复用器)解决了在非阻塞模式下,在无可用连接和数据时线程仍不断运行,占用 CPU 资源的问题。单线程情况下,配合多路复用器 Selector 可以完成对 Channel 不同事件(accept/connect/read/write)的监听,在有事件触发时线程执行,在没有事件触发时,线程可以一直阻塞,避免资源浪费。
多路复用器 Selector 只适用于网络 IO(如ServerSocketChannel,SocketChannel),不适用于文件 IO(FileChannel)。
多路复用器的基本使用:
通过 Selector 的
open()
方法创建 Selector 对象。public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
将 Channel 注册到 Selector 中,调用的 Channel 中的 register() 方法,该方法会返回一个
SelectionKey
对象,这个对象与注册的 Channel 一一对应,可以通过该对象获取 Channel,获取关注的事件类型,或者附件对象等。// 两个参数:(多路复用器对象,关注的事件)
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException
{
return register(sel, ops, null);
}// 三个参数:(多路复用器对象,关注的事件,附件对象)
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException
{
// ...
}注册到 Selector 的 Channel 必须设置为非阻塞模式,否则会抛出异常。
四种事件:(SelectionKey 类中定义的四个数值常量)
OP_ACCEPT
有连接请求OP_CONNECT
客户端连接建立后(客户端侧)OP_READ
可读OP_WRITE
可写
ServerSocketChannel 只能关注
OP_ACCEPT
事件,SocketChannel 只能关注OP_READ
、OP_WRITE
、OP_CONNECT
事件。
关注事件,可以在注册时关注事件,也可以通过
SelectionKey
调用interestOps()
方法关注事件。public abstract SelectionKey interestOps(int ops);
这个方法在使用时要注意,假设 Channel 关注了一个
OP_READ
事件,下面该 Channel 还想同时关注OP_WRITE
事件,在通过 interestOps() 关注事件时要两个事件相加,如果只单独写一个只会替换原来关注的事件。selectionKey.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);
监听 Channel 事件。
可以通过下面三种方法来监听是否有事件发生,方法返回值代表有多少 Channel 发生了事件。
- 一直阻塞线程,直到有事件被触发。(常用)
int count = selector.select();
- 阻塞线程,直到有事件被触发,或者到超时时间。
int count = selector.select(long time);
- 不阻塞线程,该方法会返回发生事件的 Channel 数量,根据返回值判断是否有事件发生。
int count = selector.selectNow();
select() 方法何时不阻塞线程:
- 事件发生时。
- 客户端发起连接,会出发 OP_ACCEPT;
- 客户端发送数据过来,客户端正常、异常关闭都会出发 OP_READ,如果发送的时间大于缓冲区大小,会出发多次 OP_READ,直到把数据读完;
- channel 可写时会出发多次 OP_WRITE,例如发送较大数据,一次没传完会再次出发 OP_WRITE;
- 在 Linux 系统中,发生 nio bug时。
- 调用 selector.weakup();
- 调用 selector.close();
- selector 所在线程 interrupt。
代码:
/**
* 非阻塞IO 多路复用器 服务端
*
* @author liyan
* @since 2021-11-19 11:19
*/
public class Server {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open(); // 通过 open() 方法,创建一个 ServerSocketChannel
ssc.configureBlocking(false); // 设置 ServerSocketChannel 工作在非阻塞模式下
ssc.bind(new InetSocketAddress(8080)); // 将 Socket 绑定到本地 8080 端口并监听
Selector selector = Selector.open(); // 创建多路复用器
SelectionKey sscKey = ssc.register(selector, 0, null); // 注册 ServerSocketChannel 到 Selector
sscKey.interestOps(SelectionKey.OP_ACCEPT); // 关注 accept 事件
while (true) {
selector.select(); // 阻塞线程,等到 Channel 有事件触发
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 触发事件的 selectionKey 集合
Iterator<SelectionKey> iterator = selectionKeys.iterator(); // 获取迭代器,通过迭代器遍历,后续会通过迭代器在遍历过程中移除处理过的 selectionKey
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // selector 会把发生事件的 selectionKey 添加到集合中,但不会主动移除,需要我们手动移除处理过的 key
if (key.isAcceptable()) { // 如果是 accept 事件
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); // 拿到 ServerSocketChannel 对象
SocketChannel sc = serverSocketChannel.accept(); // 建立连接,获取 SocketChannel 对象
sc.configureBlocking(false); // 设置非阻塞模式
sc.register(selector, SelectionKey.OP_READ); // 将 SocketChannel 注册到 Selector,并关注 read 事件
}
if (key.isReadable()) { // 如果是 read 事件
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
try {
int read = sc.read(buffer); // 客户端关闭时会触发 read 事件,正常关闭时 read() 返回-1,异常关闭时 read() 会抛出 IO异常
if (read == -1) { // 客户端正常关闭,read 返回 -1
key.cancel(); // Selector 取消注册该 Channel
}
buffer.flip(); // buffer 切换读模式
System.out.println(StandardCharsets.UTF_8.decode(buffer));
} catch (Exception e) {
e.printStackTrace();
key.cancel();
}
}
}
}
}
}
4.6 配合多线程
单线程模式引入多路复用器 Selector 可以同时管理多个 Channel,处理多个事件,但如果其中某一个事件处理时间较长,还是会影响其它事件的处理。并且对于多核 CPU 来说,也是白白浪费资源。
为了有效利用起 CPU 的资源,可以使用多个 Selector 分工处理不同事件(设计思路):
- Boss 线程,配置一个 Selector,专门用来处理 OP_ACCEPT 事件;
- Worker 线程,配置一个 Selector,用来处理 OP_READ 事件;(根据 CPU 的核心数适当配置 Worker 的个数)
示例代码(服务端):
/**
* 多线程服务端
* @author liyan
* @since 2021-11-19 13:10
*/
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss"); // 设置线程名称
ServerSocketChannel ssc = ServerSocketChannel.open(); // 创建 ServerSocketChannel
ssc.configureBlocking(false); // 设置非阻塞模式
ssc.bind(new InetSocketAddress(8080)); // 绑定并监听本地端口
Selector boss = Selector.open(); // 创建多路复用器 Selector
ssc.register(boss, SelectionKey.OP_ACCEPT); // 将 ServerSocketChannel 注册到 Selector,并关注 OP_ACCEPT 事件
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; // 使用多个 Worker,数组长度是物理机的可用线程数
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger counter = new AtomicInteger(); // 计数器,用来轮询 Worker
while (true) {
log.debug("等待连接...");
boss.select(); // 阻塞线程,监听连接事件
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator(); // 获取发生事件的 SelectedKeys 集合的迭代器对象
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // 从集合中移除当前处理的 key
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept(); // 建立连接,并获取 SocketChannel 对象
log.debug("已建立连接:{}", sc.getRemoteAddress());
sc.configureBlocking(false); // 设置 SocketChannel 为非阻塞模式
workers[counter.getAndIncrement() % workers.length].register(sc); // 轮询注册 SocketChannel
}
}
}
}
static class Worker implements Runnable {
private String name;
private Selector selector; // 多路复用器
private Thread thread;
private ConcurrentLinkedQueue<Runnable> queue; // 用于线程间通信
private boolean first = true; // Worker register 是否是首次调用
public Worker(String name) {
this.name = name;
}
public void register(SocketChannel sc) throws IOException {
if (first) { // 首次调用时初始化对象,启动线程
selector = Selector.open();
thread = new Thread(this, name);
queue = new ConcurrentLinkedQueue<>();
thread.start();
first = false;
}
Runnable task = () -> { // 创建任务,将 SocketChannel 注册到 Worker 的 Selector
try {
sc.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
};
queue.add(task); // 添加任务到队列
selector.wakeup(); // 唤醒线程,让 worker 线程执行 SocketChannel 注册任务
}
@Override
public void run() {
while (true) {
try {
selector.select(); // 阻塞线程,等待事件发生
Runnable task = queue.poll(); // 弹出第一个元素(并移除)
if (task != null) {
task.run(); // 执行注册任务
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(64);
try {
int read = sc.read(buffer);// 读入数据
if (read == -1) {
log.debug("客户端断开连接:{}", sc.getRemoteAddress());
key.cancel(); // 从 Selector 移除注册
}
buffer.flip(); // 可读模式
log.debug("{}:{}", sc.getRemoteAddress().toString().split(":")[1], Charset.defaultCharset().decode(buffer));
} catch (IOException e) {
log.debug("客户端异常关闭:{}", sc.getRemoteAddress());
key.cancel();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端:
/**
* 客户端
* @author liyan
* @since 2021-11-19 19:56
*/
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
sc.write(Charset.defaultCharset().encode("测试连接 -- " + sc.getLocalAddress()));
try (Scanner scanner = new Scanner(System.in)) {
while (true) {
System.out.print("输入:");
String input = scanner.next();
sc.write(Charset.defaultCharset().encode(input));
}
}
}
}
根据上面的代码写了一个简单的聊天室:
/**
* 服务端(多线程版本)
* @author liyan
* @since 2021-11-19 13:10
*/
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss"); // 设置 main 线程的名称
ServerSocketChannel ssc = ServerSocketChannel.open(); // 打开 ServerSocketChannel
ssc.configureBlocking(false); // 设置阻塞模式
ssc.bind(new InetSocketAddress(8080)); // 绑定本地地址并监听 8080 端口
Selector boss = Selector.open(); // 打开多路复用器 Selector
ssc.register(boss, SelectionKey.OP_ACCEPT); // 将 ServerSocketChannel 注册到 Selector
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; // 使用多个 Worker,数组长度是物理机的可用线程数
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
Set<SocketChannel> clients = Collections.synchronizedSet(new HashSet<>()); // 当前已连接的客户端集合
AtomicInteger counter = new AtomicInteger(); // 计数器
while (true) {
log.debug("等待连接...");
boss.select(); // 监听事件
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) { // 连接请求
SocketChannel sc = ssc.accept();
log.debug("已建立连接:{}", sc.getRemoteAddress());
clients.add(sc);
sc.configureBlocking(false);
workers[counter.getAndIncrement() % workers.length].register(sc, clients); // 将客户端平均分配到 workers 中(轮询)
}
}
}
}
static class Worker implements Runnable {
private String name;
private Selector selector;
private Thread thread;
private ConcurrentLinkedQueue<Runnable> queue; // 用于线程间通信,向线程中添加要执行的任务
private boolean first = true; // Worker register 是否是首次调用
private Set<SocketChannel> clients;
public Worker(String name) {
this.name = name;
}
public void register(SocketChannel sc, Set<SocketChannel> clients) throws IOException {
if (first) {
selector = Selector.open(); // 创建 Selector
thread = new Thread(this, name); // 创建线程
queue = new ConcurrentLinkedQueue<>(); // 初始化队列
this.clients = clients;
thread.start(); // 启动线程
first = false;
}
clients.add(sc); // 将当前连接添加进集合
Runnable runnable = () -> { // 创建任务,将 SocketChannel 注册到 Worker 的 Selector
try {
sc.register(selector, SelectionKey.OP_READ);
sendInGroup(sc, "【" + sc.getRemoteAddress().toString().split(":")[1] + "】:上线了");
} catch (IOException e) {
e.printStackTrace();
}
};
queue.add(runnable); // 添加任务
selector.wakeup(); // 唤醒线程,worker 线程中执行 SocketChannel 注册任务
}
@Override
public void run() {
while (true) {
try {
selector.select(); // 阻塞线程,等待事件发生
Runnable runnable = queue.poll(); // 弹出第一个元素(并移除)
if (runnable != null) {
runnable.run(); // 执行注册任务
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) { // 可读事件
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(64);
try {
int read = sc.read(buffer);// 读入数据
if (read == -1) {
log.debug("客户端断开连接:{}", sc.getRemoteAddress());
key.cancel(); // 从 Selector 移除注册
sendInGroup(sc, "【" + sc.getRemoteAddress().toString().split(":")[1] + "】:下线了");
clients.remove(sc); // 从客户端集合中移除
continue;
}
buffer.flip(); // 可读模式
String received = "【" + sc.getRemoteAddress().toString().split(":")[1] + "】:" + Charset.defaultCharset().decode(buffer);
log.debug(received);
sendInGroup(sc, received); // 群发消息
} catch (IOException e) {
log.debug("客户端异常关闭:{}", sc.getRemoteAddress());
key.cancel();
sendInGroup(sc, "【" + sc.getRemoteAddress().toString().split(":")[1] + "】:下线了");
clients.remove(sc);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void sendInGroup(SocketChannel sc, String message) {
clients.stream()
.filter(socketChannel -> socketChannel != sc) // 过滤掉当前的 SocketChannel
.forEach(socketChannel -> { // 将消息发送给在线的其它客户端
try {
socketChannel.write(Charset.defaultCharset().encode(message));
} catch (IOException e) {
log.error("写出数据时发生异常", e);
}
});
}
}
}
**
* 客户端
* @author liyan
* @since 2021-11-19 19:56
*/
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
Selector selector = Selector.open(); // 创建 Selector
sc.configureBlocking(false);
SelectionKey selectionKey = sc.register(selector, SelectionKey.OP_READ);
new Thread(() -> { // 控制台输入单独起一个线程
try {
System.out.printf("服务器已连接,本机端口号:%s\n", sc.getLocalAddress());
} catch (IOException e) {
e.printStackTrace();
}
try (Scanner scanner = new Scanner(System.in)) {
while (true) {
String input = scanner.nextLine();
SocketChannel channel = (SocketChannel) selectionKey.channel();
channel.write(Charset.defaultCharset().encode(input));
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
while (true) {
try {
selector.select(); // 阻塞线程,等待可读事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(64);
try {
int read = sc.read(buffer);
if (read == -1) {
key.cancel();
log.debug("服务器已关闭");
}
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer).toString());
} catch (IOException e) {
key.cancel();
log.debug("服务器已关闭");
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}