跳到主要内容

Java nio 手记

NIO 即 non-blocking io ,非阻塞IO。

1. nio 的三大组件

nio 的三大核心组件:

  • Channel 管道
  • Buffer 缓冲区
  • Selector 多路复用器

1.1 Channel

Channel 是数据传输的双向通道,既可以从外部将数据读入(read)到内存中,也可以从内存写出(write)数据到外部。

常见的 Channel 有:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

SocketChannelServerSocketChannel 应用于网络编程中。


1.2 Buffer

Buffer 内存缓冲区,用来暂存从 Channel 中读入的数据,或者是将要写出到外部的数据。

常见的 Buffer 有:

  • ByteBuffer 字节缓冲区(抽象类),实现:

    • MappedByteBuffer
    • DirectByteBuffer 通过 ByteBuffer 的静态方法 allocateDirect() 获取,使用直接内存
    • HeapByteBuffer 通过 ByteBuffer 的静态方法 allocate() 获取,使用Jvm的堆内存
  • ShortBuffer

  • IntBuffer

  • LongBuffer

  • FloatBuffer

  • DoubleBuffer

  • CharBuffer


1.3 Selector

多路复用器,用来管理多个 Channel,监听 Channel 上发生的事件。Channel 工作在非阻塞模式下,不会让线程吊死在一个 Channel 上。适合连接数多,但流量低的场景(low traffic)。


2. ByteBuffer 字节缓冲区

2.1 基本使用

缓冲区手绘:

Buffer 有三个重要的属性:

  • capacity 缓冲区容量(大小)
  • limit
  • position 当前指针

如图所示,在初始化一个缓冲区后,指针(position)默认指向起始位置 0,limit = capacity = 缓冲区的大小。

当像缓冲区读入数据时,读入一次,指针 +1。

看下面一段代码,是从 text.txt 文本文件中读取数据到缓冲区。

public static void main(String[] args) {
// 通过文件输入流获取 FileChannel 对象,读取 text.txt 中的内容。
// 这里使用了 try-with-resource 的形式,用来自动关闭 channel 释放资源。
try (FileChannel channel = new FileInputStream("text.txt").getChannel()) {
// 创建一个5个字节大小的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(5);
// 通过 Channel 的 read() 方法将数据读入到缓冲区中
while (channel.read(buffer) != -1){
// 切换读模式
buffer.flip();
// 遍历打印每个字节
while (buffer.hasRemaining()){
char c = (char) buffer.get();
System.out.println(c);
}
// 切换写模式
buffer.clear();
}
}
}

这里面包含两个需要知道的方法,切换读模式和切换写模式。

// 切换读模式
buffer.flip();

切换读模式从方法名上可以看出,是翻转的意思,翻转了什么呢?是改变了缓冲区的指针和limit,看下方法的源代码:

public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

上手绘图:

切换读模式后,指针重新指向起始位置,我们可以从缓冲区的起始位置开始读取,每读取一次指针加1。如果没有切换读模式,指针指向的是缓冲区最后一个数据的下一个索引,所以是无法读取数据的。

切换到写模式:

buffer.clear();

源代码:

public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}

切换到写模式之后,指针重新指向缓冲区的起始位置,limit = capacity,写入时开始从缓冲区的起始位置写入,缓冲区内的数据将被覆盖掉。

假设缓冲区中存在上次未读完的数据,即切换到写模式之前,position < limit,说明有数据未读,此时通过 clear() 切换写模式后再次写入数据,可能会把上一次未读的数据覆盖掉。如果不想覆盖掉上一次未读的数据,可以使用 compact() 方法切换写模式。

compact() 方法是将上次未读的数据拷贝到缓冲区起始,指针指向到拷贝后的最后一个数据的下一个索引,再次写入数据时不会覆盖掉之前未读完的数据。


2.2 与 String 的互相转换

String to ByteBuffer

方式一,将 String 转为字节数组,再调用 ByteBuffer 的 put(byte[] bytes) 方法,传入字节数组。

ByteBuffer buffer1 = ByteBuffer.allocate(10);
buffer1.put("Hello".getBytes());

方式二:StandardCharsets.UTF_8.encode(),注意:获取到 buffer 处于读模式。

ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("Hello");

方式三:ByteBuffer.wrap(byte[] array) 方法,注意:获取到 buffer 处于读模式。

ByteBuffer buffer3 = ByteBuffer.wrap("Hello".getBytes());

ByteBuffer to String

传入的 buffer 需要处于读模式。

String s  = StandardCharsets.UTF_8.decode(buffer).toString();

2.3 分散读和集中写

Java nio 支持 Scatter Reads(分散读) 和 Gather Writers(集中写),即可以将数据读入到不同的 buffer 中,或者将多个 buffer 中的数据集中写出。

ScatteringByteChannel 接口里的 read() 方法支持将数据读入到多个 buffer 中;

GatheringByteChannel 接口里的 write() 方法支持将多个 buffer 中的数据写出到外部。

重点是 要理解分散读和集中写的思想。

分散读:

/**
* Scattering Reads 分散读
* @author liyan
* @since 2021-11-02 15:45
*/
public class ScatteringReadsDemo {
public static void main(String[] args) {
// try-with-resource 写法,创建一个 FileChannel 对象,读取 text.txt,文本:HelloWorld
try (FileChannel channel = new RandomAccessFile("text.txt", "r").getChannel()) {
ByteBuffer bbf1 = ByteBuffer.allocate(3);
ByteBuffer bbf2 = ByteBuffer.allocate(3);
ByteBuffer bbf3 = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{bbf1, bbf2, bbf3}); // read()方法,支持将数据读入到多个 buffer 中
bbf1.flip(); // 切换读模式
bbf2.flip();
bbf3.flip();
System.out.println(StandardCharsets.UTF_8.decode(bbf1)); // Hel
System.out.println(StandardCharsets.UTF_8.decode(bbf2)); // loW
System.out.println(StandardCharsets.UTF_8.decode(bbf3)); // orld
} catch (IOException e) {
e.printStackTrace();
}
}
}

集中写:

/**
* Gathering Writes 集中写
* @author liyan
* @since 2021-11-02 16:39
*/
public class GatheringWritesDemo {

public static void main(String[] args) {
ByteBuffer bbf1 = StandardCharsets.UTF_8.encode("Hello ");
ByteBuffer bbf2 = StandardCharsets.UTF_8.encode("Netty ");
ByteBuffer bbf3 = StandardCharsets.UTF_8.encode("~");

try (FileChannel channel = new RandomAccessFile("text1.txt", "rw").getChannel()) {
channel.write(new ByteBuffer[]{bbf1, bbf2, bbf3}); // write() 方法,可以将多个buffer中的数据集中写出到文件中
} catch (IOException e) {
e.printStackTrace();
}
}
}

2.4 黏包和半包

所谓黏包,是在网络传输过程中为了提高网络传输的效率,将多个数据包黏在一起一次传输,导致接收时多个数据黏在一起。

半包一般是由于服务器缓冲区大小不足导致的数据被截断。


3. 文件编程

3.1 FileChannel

FileChannel 是用来操作文件的 channel,它工作在阻塞模式下。

FileChannel 的创建方式:

  1. 通过 FileInputStream 创建,这种方式创建的 FileChannel 只能读入数据。
FileChannel channel = new FileInputStream("text.txt").getChannel();

  1. 通过 FileOutputStream 创建,这种方式创建的 FileChannel 只能写出数据。
FileChannel channel = new FileOutputStream("text.txt").getChannel();

  1. 通过 RandomAccessFile 创建,这种方式创建的 FileChannel 根据构建 RandomAccessFile 时指定的读写模式决定可读可写。
FileChannel channel = new RandomAccessFile("text1.txt", "rw").getChannel();

Channel 之间传输数据:

使用 FileChannel 的 transferTo() 方法将一个通道中的数据传输到另一个通道中。

transferTo() 方法的三个参数:

  • position 源文件传输的起始位置,从哪开始传
  • count 传输数据的字节数,从起始位置开始要传多少数据
  • target 目标 channel

方法返回值是本次传输的字节数。

public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;

如果需要拷贝文件,可以使用 transferTo() 方法,它的底层使用了操作系统的零拷贝进行优化,效率比较高。但一次只能传输 2GB 大小的数据,传输大文件时要做处理。

示例代码:

/**
* @author liyan
* @since 2021-11-02 17:37
*/
public class FileChannelTransferToDemo {

public static void main(String[] args) {
try (
FileChannel from = new FileInputStream("from.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel()
) {
// transferTo() 一次只能传输2G数据,如果传输的文件大于2G,需要多次传输
long size = from.size(); // 源文件大小
for (long left = size; left > 0; ) { // left 定义源文件的剩余待传大小,初始值等于源文件的初始大小
System.out.printf("当前文件大小:%s, 剩余待传:%s\n", size, left);
// left = left - 本次传输的数据量
// size - left 表示传输的起始指针,为了多次传输时,接着上次的位置
left -= from.transferTo((size - left), left, to); // transferTo() 的传输效率高,底层使用了操作系统的零拷贝进行优化,transferTo传输上限:一次传2g
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

3.2 Path 和 Paths

PathPaths 是在 Java 1.7 时引入的,可用于在文件系统中定位文件的对象,它通常表示依赖于系统的文件路径。

Path 用来表示文件或者目录的路径,Paths 是一个工具类,用来创建 Path 对象。


Paths 中只包含两个 get 方法用来创建 Path 对象:

字符串路径:

public static Path get(String first, String... more) {
return FileSystems.getDefault().getPath(first, more);
}

URI对象:

public static Path get(URI uri) {
// ...
}

3.3 Files

Files 是用来操作文件和目录的类库,常用的 Api:

  1. 判断文件、目录是否存在。
static boolean exists(Path path, LinkOption... options)

  1. 创建目录,只能创建一级目录。
static Path createDirectory(Path dir, FileAttribute<?>... attrs)

  1. 创建多级目录。
static Path createDirectories(Path dir, FileAttribute<?>... attrs)

  1. 拷贝文件,效率和 FileChannel 的 transferTo() 方法效率差不多,但底层实现不同。如果目标文件已经存在,则会抛出 FileAlreadyExistsException 异常。如果希望覆盖掉目标,可以在方法第三个参数传入 StandardCopyOption.REPLACE_EXISTING
static Path copy(Path source, Path target, CopyOption... options)

  1. 移动文件。
static Path move(Path source, Path target, CopyOption... options)

  1. 删除文件和空目录(文件或目录不存在会抛异常,删除非空目录时会抛异常)。
static void delete(Path path)

  1. 如果存在要删除的文件或者目录则执行删除操作。
static boolean deleteIfExists(Path path)

  1. 遍历目录和文件。
static Path walkFileTree(Path start, FileVisitor<? super Path> visitor)

  1. 遍历目录和文件。
public static Stream<Path> walk(Path start, FileVisitOption... options) throws IOException {
return walk(start, Integer.MAX_VALUE, options);
}

代码:(利用 walkFileTree() 遍历目录并复制)

/**
* 利用 walkFileTree 遍历目录并复制
* @author liyan
* @since 2021-11-18 16:40
*/
public class WalkFileTreeDemo1 {

public static void main(String[] args) throws IOException {
String source = "C:\\Users\\liyan\\Downloads\\";
String target = "D:\\";
Path sourcePath = Paths.get(source, "redis-6.2.6");

Files.walkFileTree(sourcePath, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
Path to = Paths.get(dir.toString().replace(source, target));
Files.copy(dir, to);
return super.preVisitDirectory(dir, attrs);
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Path to = Paths.get(file.toString().replace(source, target));
Files.copy(file, to);
return super.visitFile(file, attrs);
}
});
}
}

4. 网络编程

4.1 ServerSocketChannel

ServerSocketChannel 是面向流的侦听套接字的可选通道,用来监听 Socket。

常用Api:

  1. 创建 ServerSocketChannel 对象。
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}

  1. 将通道的 Socket 绑定到本地地址,并将 Socket 配置为侦听连接。
public final ServerSocketChannel bind(SocketAddress local)
throws IOException
{
return bind(local, 0);
}

  1. 接受 Socket 连接,并返回一个 SocketChannel 对象。(在阻塞模式下,在没有连接时会阻塞当前线程)
public abstract SocketChannel accept() throws IOException;

  1. 设置 channel 是否工作在阻塞模式。(通道默认是工作在阻塞模式下的,通过该方法传入 false 可以设置通道为非阻塞模式)
public final SelectableChannel configureBlocking(boolean block)
throws IOException
{
// ...
}

4.2 SocketChannel

SocketChannel 是面向流连接套接字的可选通道。

常用Api:

  1. 将通道内的数据读入到 buffer 中。(在没有可读数据时)
public abstract int read(ByteBuffer dst) throws IOException;

  1. 设置 channel 是否工作在阻塞模式。(通道默认是工作在阻塞模式下的,通过该方法传入 false 可以设置通道为非阻塞模式)
public final SelectableChannel configureBlocking(boolean block)
throws IOException
{
// ...
}

4.3 阻塞IO

阻塞模式下,accept()read() 这些阻塞方法都会导致当前线程被阻塞,虽然阻塞的线程不会占用 CPU,但单线程情况下,阻塞方法之间相互影响,几乎没办法同时工作,需要多线程的支持。

在多线程下,又会有新的问题:

  1. 32位的 JVM 一个线程占用 320K 内存,64位 JVM 一个线程占用 1024K 内存,如果连接数过多,必定会引发内存溢出,并且线程数过多,频繁的上下文切换也会导致系统性能降低;
  2. 可以使用线程池技术减少线程数和线程间的上下文切换,但如果建立很多连接,长时间的连接会阻塞线程池中的所有线程,因此这种方式只适合短连接,不适合长连接。

测试代码:(服务端)

/**
* 阻塞IO 服务端
* @author liyan
* @since 2021-11-18 17:48
*/
public class Server {

public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open(); // 通过 open() 方法,创建一个 ServerSocketChannel
ssc.bind(new InetSocketAddress(8080)); // 将 Socket 绑定到本地 8080 端口并监听
List<SocketChannel> channels = new ArrayList<>(); // 存放 SocketChannel 的集合,处理多个连接
while (true) {
System.out.println("等待连接...");
SocketChannel sc = ssc.accept(); // accept() 是阻塞方法,会阻塞线程,当有连接请求时线程会继续执行
System.out.printf("建立连接:%s\n", sc);
channels.add(sc);
System.out.printf("当前集合 size: %s,元素:%s\n", channels.size(), channels);

ByteBuffer buffer = ByteBuffer.allocate(16);
for (SocketChannel channel : channels) { // 遍历集合中的所有 SocketChannel,当有可读数据时读入到 buffer
System.out.printf("当前遍历的channel:%s\n", channel);
channel.read(buffer); // read() 也是阻塞方法,当有可读数据时才会执行
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
buffer.clear();
}
}
}
}

4.4 非阻塞IO

非阻塞模式下,accept()read() 方法在没有连接或者可读数据时也不会阻塞线程,通过修改上面的示例代码,非阻塞模式下可以处理多个连接和读取数据,方法之间不会相互影响。但是这种情况程序在一直遍历,线程需要一直工作。

非阻塞模式下:

  1. ServerSocketChannel 在没有连接时,accept() 方法返回 null;
  2. SocketChannel 在没有可读数据时,read() 方法返回 0;
  3. 在写出数据时,线程只需等待数据写出到 channel 即可,无需等待 channel 通过网络把数据发出去。
  4. 在没有连接和可读数据时,现在仍在不断运行,占用 CPU 资源;
  5. 数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)

测试代码:(服务端)

/**
* 非阻塞IO 服务端
* @author liyan
* @since 2021-11-19 09:40
*/
public class Server {

public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open(); // 通过 open() 方法,创建一个 ServerSocketChannel
ssc.configureBlocking(false); // 设置 ServerSocketChannel 工作在非阻塞模式下
ssc.bind(new InetSocketAddress(8080)); // 将 Socket 绑定到本地 8080 端口并监听
List<SocketChannel> channels = new ArrayList<>(); // 存放 SocketChannel 的集合,处理多个连接
while (true) {
SocketChannel sc = ssc.accept(); // accept() 是阻塞方法,会阻塞线程,当有连接请求时线程会继续执行
if (sc != null) {
sc.configureBlocking(false); // 设置 SocketChannel 工作在非阻塞模式下
System.out.printf("建立连接:%s\n", sc);
channels.add(sc);
System.out.printf("当前集合 size: %s,元素:%s\n", channels.size(), channels);
}
ByteBuffer buffer = ByteBuffer.allocate(16);
for (SocketChannel channel : channels) { // 遍历集合中的所有 SocketChannel,当有可读数据时读入到 buffer
int read = channel.read(buffer);// read() 也是阻塞方法,当有可读数据时才会执行
if (read > 0) {
System.out.printf("当前读取的channel:%s\n", channel);
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
buffer.clear();
}
}
}
}
}

4.5 多路复用器

Selector(多路复用器)解决了在非阻塞模式下,在无可用连接和数据时线程仍不断运行,占用 CPU 资源的问题。单线程情况下,配合多路复用器 Selector 可以完成对 Channel 不同事件(accept/connect/read/write)的监听,在有事件触发时线程执行,在没有事件触发时,线程可以一直阻塞,避免资源浪费。

多路复用器 Selector 只适用于网络 IO(如ServerSocketChannel,SocketChannel),不适用于文件 IO(FileChannel)。


多路复用器的基本使用:

  1. 通过 Selector 的 open() 方法创建 Selector 对象。

    public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
    }

  1. 将 Channel 注册到 Selector 中,调用的 Channel 中的 register() 方法,该方法会返回一个 SelectionKey 对象,这个对象与注册的 Channel 一一对应,可以通过该对象获取 Channel,获取关注的事件类型,或者附件对象等。

    // 两个参数:(多路复用器对象,关注的事件)
    public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException
    {
    return register(sel, ops, null);
    }
    // 三个参数:(多路复用器对象,关注的事件,附件对象)
    public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException
    {
    // ...
    }

    注册到 Selector 的 Channel 必须设置为非阻塞模式,否则会抛出异常。

    四种事件:(SelectionKey 类中定义的四个数值常量)

    1. OP_ACCEPT 有连接请求
    2. OP_CONNECT 客户端连接建立后(客户端侧)
    3. OP_READ 可读
    4. OP_WRITE 可写

    ServerSocketChannel 只能关注 OP_ACCEPT 事件,SocketChannel 只能关注 OP_READOP_WRITEOP_CONNECT 事件。


  1. 关注事件,可以在注册时关注事件,也可以通过 SelectionKey 调用 interestOps() 方法关注事件。

    public abstract SelectionKey interestOps(int ops);

    这个方法在使用时要注意,假设 Channel 关注了一个 OP_READ 事件,下面该 Channel 还想同时关注 OP_WRITE 事件,在通过 interestOps() 关注事件时要两个事件相加,如果只单独写一个只会替换原来关注的事件。

    selectionKey.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);

  1. 监听 Channel 事件。

    可以通过下面三种方法来监听是否有事件发生,方法返回值代表有多少 Channel 发生了事件。

    1. 一直阻塞线程,直到有事件被触发。(常用)
    int count = selector.select();

    1. 阻塞线程,直到有事件被触发,或者到超时时间。
    int count = selector.select(long time);

    1. 不阻塞线程,该方法会返回发生事件的 Channel 数量,根据返回值判断是否有事件发生。
    int count = selector.selectNow();

    select() 方法何时不阻塞线程:

    1. 事件发生时。
      • 客户端发起连接,会出发 OP_ACCEPT;
      • 客户端发送数据过来,客户端正常、异常关闭都会出发 OP_READ,如果发送的时间大于缓冲区大小,会出发多次 OP_READ,直到把数据读完;
      • channel 可写时会出发多次 OP_WRITE,例如发送较大数据,一次没传完会再次出发 OP_WRITE;
      • 在 Linux 系统中,发生 nio bug时。
    2. 调用 selector.weakup();
    3. 调用 selector.close();
    4. selector 所在线程 interrupt。

代码:

/**
* 非阻塞IO 多路复用器 服务端
*
* @author liyan
* @since 2021-11-19 11:19
*/
public class Server {

public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open(); // 通过 open() 方法,创建一个 ServerSocketChannel
ssc.configureBlocking(false); // 设置 ServerSocketChannel 工作在非阻塞模式下
ssc.bind(new InetSocketAddress(8080)); // 将 Socket 绑定到本地 8080 端口并监听

Selector selector = Selector.open(); // 创建多路复用器
SelectionKey sscKey = ssc.register(selector, 0, null); // 注册 ServerSocketChannel 到 Selector
sscKey.interestOps(SelectionKey.OP_ACCEPT); // 关注 accept 事件

while (true) {
selector.select(); // 阻塞线程,等到 Channel 有事件触发
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 触发事件的 selectionKey 集合
Iterator<SelectionKey> iterator = selectionKeys.iterator(); // 获取迭代器,通过迭代器遍历,后续会通过迭代器在遍历过程中移除处理过的 selectionKey
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // selector 会把发生事件的 selectionKey 添加到集合中,但不会主动移除,需要我们手动移除处理过的 key

if (key.isAcceptable()) { // 如果是 accept 事件
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); // 拿到 ServerSocketChannel 对象
SocketChannel sc = serverSocketChannel.accept(); // 建立连接,获取 SocketChannel 对象
sc.configureBlocking(false); // 设置非阻塞模式
sc.register(selector, SelectionKey.OP_READ); // 将 SocketChannel 注册到 Selector,并关注 read 事件
}

if (key.isReadable()) { // 如果是 read 事件
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
try {
int read = sc.read(buffer); // 客户端关闭时会触发 read 事件,正常关闭时 read() 返回-1,异常关闭时 read() 会抛出 IO异常
if (read == -1) { // 客户端正常关闭,read 返回 -1
key.cancel(); // Selector 取消注册该 Channel
}
buffer.flip(); // buffer 切换读模式
System.out.println(StandardCharsets.UTF_8.decode(buffer));
} catch (Exception e) {
e.printStackTrace();
key.cancel();
}
}
}
}
}
}

4.6 配合多线程

单线程模式引入多路复用器 Selector 可以同时管理多个 Channel,处理多个事件,但如果其中某一个事件处理时间较长,还是会影响其它事件的处理。并且对于多核 CPU 来说,也是白白浪费资源。

为了有效利用起 CPU 的资源,可以使用多个 Selector 分工处理不同事件(设计思路):

  1. Boss 线程,配置一个 Selector,专门用来处理 OP_ACCEPT 事件;
  2. Worker 线程,配置一个 Selector,用来处理 OP_READ 事件;(根据 CPU 的核心数适当配置 Worker 的个数)


示例代码(服务端):

/**
* 多线程服务端
* @author liyan
* @since 2021-11-19 13:10
*/
@Slf4j
public class MultiThreadServer {

public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss"); // 设置线程名称
ServerSocketChannel ssc = ServerSocketChannel.open(); // 创建 ServerSocketChannel
ssc.configureBlocking(false); // 设置非阻塞模式
ssc.bind(new InetSocketAddress(8080)); // 绑定并监听本地端口

Selector boss = Selector.open(); // 创建多路复用器 Selector
ssc.register(boss, SelectionKey.OP_ACCEPT); // 将 ServerSocketChannel 注册到 Selector,并关注 OP_ACCEPT 事件
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; // 使用多个 Worker,数组长度是物理机的可用线程数
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger counter = new AtomicInteger(); // 计数器,用来轮询 Worker
while (true) {
log.debug("等待连接...");
boss.select(); // 阻塞线程,监听连接事件
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator(); // 获取发生事件的 SelectedKeys 集合的迭代器对象
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // 从集合中移除当前处理的 key
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept(); // 建立连接,并获取 SocketChannel 对象
log.debug("已建立连接:{}", sc.getRemoteAddress());
sc.configureBlocking(false); // 设置 SocketChannel 为非阻塞模式
workers[counter.getAndIncrement() % workers.length].register(sc); // 轮询注册 SocketChannel
}
}
}
}

static class Worker implements Runnable {
private String name;
private Selector selector; // 多路复用器
private Thread thread;
private ConcurrentLinkedQueue<Runnable> queue; // 用于线程间通信
private boolean first = true; // Worker register 是否是首次调用

public Worker(String name) {
this.name = name;
}

public void register(SocketChannel sc) throws IOException {
if (first) { // 首次调用时初始化对象,启动线程
selector = Selector.open();
thread = new Thread(this, name);
queue = new ConcurrentLinkedQueue<>();
thread.start();
first = false;
}
Runnable task = () -> { // 创建任务,将 SocketChannel 注册到 Worker 的 Selector
try {
sc.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
};
queue.add(task); // 添加任务到队列
selector.wakeup(); // 唤醒线程,让 worker 线程执行 SocketChannel 注册任务
}

@Override
public void run() {
while (true) {
try {
selector.select(); // 阻塞线程,等待事件发生
Runnable task = queue.poll(); // 弹出第一个元素(并移除)
if (task != null) {
task.run(); // 执行注册任务
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(64);
try {
int read = sc.read(buffer);// 读入数据
if (read == -1) {
log.debug("客户端断开连接:{}", sc.getRemoteAddress());
key.cancel(); // 从 Selector 移除注册
}
buffer.flip(); // 可读模式
log.debug("{}:{}", sc.getRemoteAddress().toString().split(":")[1], Charset.defaultCharset().decode(buffer));
} catch (IOException e) {
log.debug("客户端异常关闭:{}", sc.getRemoteAddress());
key.cancel();
}
}

}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

客户端:

/**
* 客户端
* @author liyan
* @since 2021-11-19 19:56
*/
public class Client {

public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
sc.write(Charset.defaultCharset().encode("测试连接 -- " + sc.getLocalAddress()));
try (Scanner scanner = new Scanner(System.in)) {
while (true) {
System.out.print("输入:");
String input = scanner.next();
sc.write(Charset.defaultCharset().encode(input));
}
}
}
}

根据上面的代码写了一个简单的聊天室:

/**
* 服务端(多线程版本)
* @author liyan
* @since 2021-11-19 13:10
*/
@Slf4j
public class MultiThreadServer {

public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss"); // 设置 main 线程的名称
ServerSocketChannel ssc = ServerSocketChannel.open(); // 打开 ServerSocketChannel
ssc.configureBlocking(false); // 设置阻塞模式
ssc.bind(new InetSocketAddress(8080)); // 绑定本地地址并监听 8080 端口

Selector boss = Selector.open(); // 打开多路复用器 Selector
ssc.register(boss, SelectionKey.OP_ACCEPT); // 将 ServerSocketChannel 注册到 Selector
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; // 使用多个 Worker,数组长度是物理机的可用线程数
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
Set<SocketChannel> clients = Collections.synchronizedSet(new HashSet<>()); // 当前已连接的客户端集合
AtomicInteger counter = new AtomicInteger(); // 计数器
while (true) {
log.debug("等待连接...");
boss.select(); // 监听事件
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) { // 连接请求
SocketChannel sc = ssc.accept();
log.debug("已建立连接:{}", sc.getRemoteAddress());
clients.add(sc);
sc.configureBlocking(false);
workers[counter.getAndIncrement() % workers.length].register(sc, clients); // 将客户端平均分配到 workers 中(轮询)
}
}
}
}

static class Worker implements Runnable {
private String name;
private Selector selector;
private Thread thread;
private ConcurrentLinkedQueue<Runnable> queue; // 用于线程间通信,向线程中添加要执行的任务
private boolean first = true; // Worker register 是否是首次调用
private Set<SocketChannel> clients;

public Worker(String name) {
this.name = name;
}

public void register(SocketChannel sc, Set<SocketChannel> clients) throws IOException {
if (first) {
selector = Selector.open(); // 创建 Selector
thread = new Thread(this, name); // 创建线程
queue = new ConcurrentLinkedQueue<>(); // 初始化队列
this.clients = clients;
thread.start(); // 启动线程
first = false;
}
clients.add(sc); // 将当前连接添加进集合
Runnable runnable = () -> { // 创建任务,将 SocketChannel 注册到 Worker 的 Selector
try {
sc.register(selector, SelectionKey.OP_READ);
sendInGroup(sc, "【" + sc.getRemoteAddress().toString().split(":")[1] + "】:上线了");
} catch (IOException e) {
e.printStackTrace();
}
};
queue.add(runnable); // 添加任务
selector.wakeup(); // 唤醒线程,worker 线程中执行 SocketChannel 注册任务
}

@Override
public void run() {
while (true) {
try {
selector.select(); // 阻塞线程,等待事件发生
Runnable runnable = queue.poll(); // 弹出第一个元素(并移除)
if (runnable != null) {
runnable.run(); // 执行注册任务
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {

SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) { // 可读事件
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(64);
try {
int read = sc.read(buffer);// 读入数据
if (read == -1) {
log.debug("客户端断开连接:{}", sc.getRemoteAddress());
key.cancel(); // 从 Selector 移除注册
sendInGroup(sc, "【" + sc.getRemoteAddress().toString().split(":")[1] + "】:下线了");
clients.remove(sc); // 从客户端集合中移除
continue;
}
buffer.flip(); // 可读模式
String received = "【" + sc.getRemoteAddress().toString().split(":")[1] + "】:" + Charset.defaultCharset().decode(buffer);
log.debug(received);
sendInGroup(sc, received); // 群发消息
} catch (IOException e) {
log.debug("客户端异常关闭:{}", sc.getRemoteAddress());
key.cancel();
sendInGroup(sc, "【" + sc.getRemoteAddress().toString().split(":")[1] + "】:下线了");
clients.remove(sc);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

private void sendInGroup(SocketChannel sc, String message) {
clients.stream()
.filter(socketChannel -> socketChannel != sc) // 过滤掉当前的 SocketChannel
.forEach(socketChannel -> { // 将消息发送给在线的其它客户端
try {
socketChannel.write(Charset.defaultCharset().encode(message));
} catch (IOException e) {
log.error("写出数据时发生异常", e);
}
});
}
}
}

**
* 客户端
* @author liyan
* @since 2021-11-19 19:56
*/
public class Client {

public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));

Selector selector = Selector.open(); // 创建 Selector
sc.configureBlocking(false);
SelectionKey selectionKey = sc.register(selector, SelectionKey.OP_READ);

new Thread(() -> { // 控制台输入单独起一个线程
try {
System.out.printf("服务器已连接,本机端口号:%s\n", sc.getLocalAddress());
} catch (IOException e) {
e.printStackTrace();
}
try (Scanner scanner = new Scanner(System.in)) {
while (true) {
String input = scanner.nextLine();
SocketChannel channel = (SocketChannel) selectionKey.channel();
channel.write(Charset.defaultCharset().encode(input));
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();

while (true) {
try {
selector.select(); // 阻塞线程,等待可读事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(64);
try {
int read = sc.read(buffer);
if (read == -1) {
key.cancel();
log.debug("服务器已关闭");
}
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer).toString());
} catch (IOException e) {
key.cancel();
log.debug("服务器已关闭");
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}