《Java I/O 模型》Java NIO

导读:本篇文章讲解 《Java I/O 模型》Java NIO,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

《Java I/O 模型》专栏文章索引
👉《Java I/O 模型》Java BIO
👉《Java I/O 模型》Java NIO
👉《Java I/O 模型》Java AIO

🚀1. Java NIO 介绍

🎁Java NIO(New IO) 又被称为 Java Non-Blocking IO,是在 Java 1.4 开始引入的一个新的 IO API. NIO 支持面向缓冲区的、基于通道的 IO 操作,以更高效的方式进行文件的读写操作。传统 IO 的读写操作只能阻塞执行,线程在读写期间不能干其他事情。例如,调用 socket.read() 时,如果服务器一直没有数据传输过来,线程就一直阻塞,而 NIO 可以配置 socket 为非阻塞模式

NIO 的非阻塞模式详解

  • NIO 非阻塞读,使一个线程从某个通道发送请求或读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变为可读取之前,该线程可以继续做其他的事情。
  • NIO 非阻塞写,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做其他事情

NIO 和 BIO 的比较

  • BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,IO 块比 IO 流效率更高。
  • BIO 是阻塞的,NIO 是非阻塞的。
  • BIO 基于字节流和字符流进行操作,NIO 基于通道(Channel)和缓存区(Buffer)进行操作,数据总是从通道读取到缓存区中,或者从缓存区写入到通道中。
NIO BIO
面向缓存区(Buffer) 面向流(Stream)
非阻塞IO(Non-Blocking IO) 阻塞IO(Blocking IO)
选择器(Selector)

NIO 有三大核心组件

  • 通道(Channel):Java NIO 的通道类似流,但又有些不同,既可以从通道中读取数据,又可以写数据到通道中,但流的读写通常都是单向的。通道可以非阻塞读取和写入数据,通道支持读取或写入缓存区,也支持异步地读写
  • 缓存区(Buffer):缓存区本质上是一块可以写入数据并从中读取数据的内存,这块内存被包装成 NIO Buffer 对象,并提供了一组方法用来方便的访问这块内存。
  • 选择器(Selector):可以监听一个或多个 NIO 通道,并确定哪些通道已经准备好进行读取或写入,单个线程可以管理多个通道,从而管理多个网络连接提高效率。
    在这里插入图片描述

🚀2. 缓存区(Buffer)

缓存区是一个用于特定基本类型的容器,由 java.nio 包定义,所有缓存区都是 Buffer 抽象类的子类。主要用于与 NIO 通道进行交互,数据从通道读入缓存区或者从缓存区写入通道中。
在这里插入图片描述
缓存区就像一个数组,可以保存多个相同类型的数据,根据数据类型划分,有以下 Buffer 常用子类

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

虽然上述 Buffer 类各自管理的数据类型不同,但是都是采用类似的方式管理数据,通过下面的方法获取一个 Buffer 对象

static xxxBuffer allocate(int capacity):创建一个容量为capacity的xxxBuffer对象

缓存区的基本属性

  • 容量(capacity):作为一个内存块,Buffer 具有固定的大小,也成为“容量”,缓冲区容量不能为负,并且创建后不能更改。
  • 限制(limit):表示缓存区中可以操作数据的大小(limit 后数据不能进行读写),缓存区的限制不能为负,并且不能大于其容量。写入模式下,限制等于 buffer 的容量,读取模式下,limit 等于写入的数据量。
  • 位置(position)下一个要读取或写入的数据的索引,缓存区的位置不能为负,并且不能大于其限制。
  • 标记(mark)与重置(reset)标记是一个索引,通过 Buffer 中的 mark() 方法指定 Buffer 中一个特定的位置,之后通过调用 reset() 方法恢复到这个位置。标记、位置、限制、容量遵循不变式0<=mark<=position<=limit<=capacity
    在这里插入图片描述

缓存区常见方法

Buffer clear() //清空缓存区并返回对缓存区的引用
Buffer flip() //为将缓存区的界限设置为当前位置,并将当前位置重置为0
int capacity() //返回Buffer的capacity大小 
boolean hasRemaining(//判断缓存区中是否还有元素 
int limit() //返回缓存区的界限(limit)的位置 
Buffer limit(int n)//将设置缓存区界限为n,并返回一个具有新limit的缓存区对象 
Buffer mark() //对缓存区设置标记 
int position() //返回缓存区的当前位置position 
Buffer position(int n)//将设置缓存区的当前位置为n,并返回修改后的缓存区对象 
int remaining() //返回position和limit之间的元素个数 
Buffer reset() //将位置position转到以前设置的mark所在的位置 
Buffer rewind() //将位置设为为0.取消设置的mark

缓存区的数据操作

Buffer  所有子类提供了两个用于数据操作的方法:get() put() 方法获取 Buffer 中的数据
get(): 读取单个字节
get(byte[] dst): 批量读取多个字节到 dst 中
get(int index): 读取指定索引位置的字节(不会移动position)

放入数据到 Bufferput(byte b): 将单个字节写入缓存区的当前位置
put(byte[] src): 将src中的字节写入缓存区的当前位置
put(int index, byte b): 将指定字节写入缓存区的索引位置(不会移动position)

使用缓存区读写数据一般遵循以下四个步骤

  1. 写入数据到缓存区
  2. 调用 buffer.flip() 方法,转换为读取模式
  3. 从缓存区中读取数据
  4. 调用 buffer.clear() 方法或 buffer.compact() 方法清除缓存区

案例演示

public class BufferTest {
    @Test
    public void test01() {
        //1.分配一个缓存区,容量设置为10
        ByteBuffer buffer = ByteBuffer.allocate(10);
        System.out.println(buffer.position()); //0
        System.out.println(buffer.limit());   //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");

        //put往缓存区中添加数据
        String name = "java nio";
        buffer.put(name.getBytes());
        System.out.println(buffer.position()); //8
        System.out.println(buffer.limit());    //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");

        //3.flip()为将缓存区的界限设置为当前位置,并将当前位置设置为0 可读模式
        buffer.flip();
        System.out.println(buffer.position()); //0
        System.out.println(buffer.limit());    //8
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");

        //4.get数据的获取
        char ch = (char) buffer.get();
        System.out.println(ch);  //j
        System.out.println(buffer.position()); //1
        System.out.println(buffer.limit());    //8
        System.out.println(buffer.capacity()); //10
    }

    @Test
    public void test02() {
        //分配一个缓存区,容量设置为10
        ByteBuffer buffer = ByteBuffer.allocate(10);
        System.out.println(buffer.position()); //0
        System.out.println(buffer.limit());   //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");

        //put往缓存区中添加数据
        String name = "java nio";
        buffer.put(name.getBytes());
        System.out.println(buffer.position()); //8
        System.out.println(buffer.limit());    //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");

        //clear清除缓存区中的数据
        buffer.clear();
        System.out.println(buffer.position()); //0
        System.out.println(buffer.limit());    //10
        System.out.println(buffer.capacity()); //10
        System.out.println((char)buffer.get()); //j, 数据并没有被恢复,只是恢复了position的位置
        System.out.println("----------------------");

        //定义一个缓存区
        ByteBuffer buf = ByteBuffer.allocate(10);
        String n = "javanio";
        buf.put(n.getBytes());
        buf.flip();

        //读取数据
        byte[] b = new byte[2];
        buf.get(b);
        String rs = new String(b);
        System.out.println(rs); //ja

        System.out.println(buffer.position()); //1
        System.out.println(buffer.limit());    //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");

        buf.mark(); //标记此刻这个位置:2

        byte[] b2 = new byte[3];
        buf.get(b2);
        System.out.println(new String(b2)); //van
        System.out.println(buffer.position()); //1
        System.out.println(buffer.limit());    //10
        System.out.println(buffer.capacity()); //10
        System.out.println("----------------------");

        buf.reset();  //回到标记位置
        if (buf.hasRemaining()) {
            System.out.println(buf.remaining()); //5
        }
    }
}

直接内存与非直接内存

byte buffer 可以是两种类型,一种是基于直接内存(也就是非堆内存),另一种是非直接内存(也就是堆内存)。对于直接内存来说,JVM 将会在 IO 操作上具有更高的性能,因为它是直接作用于本地系统的 IO 操作;而非直接内存,如果要进行 IO 操作,会先从本进程内存复制到直接内存,再利用本地 IO 处理

从数据流的角度,非直接内存是下面这样的作用链

本地IO---->直接内存---->非直接内存---->直接内存---->本地IO

而直接内存的作用链为

本地IO---->直接内存---->本地IO

在做 IO 处理时,例如通过网络发送大量数据,直接内存具有更高的效率,因为直接内存使用 allocateDirtect 创建。虽然它比申请普通的堆内存需要耗费更高的性能,但是这部分的数据是在 JVM 之外的,它不会占用应用的内存。

因此,如果有很大的数据需要缓存,并且数据生命周期很长,那么使用直接内存比较合适。如果不能带来很明显的性能提升,还是推荐直接使用堆内存,字节缓冲区是直接缓冲区还是非字节缓冲区,可以通过调用 isDirect() 方法确定。

@Test
 public void test03() {
     //创建一个非直接内存的缓存区
     ByteBuffer buffer = ByteBuffer.allocate(1024);
     //buffer.isDirect()用于判断是否为直接内存
     System.out.println(buffer.isDirect()); //false
     System.out.println("----------------------");
     //创建一个直接内存的缓存区
     ByteBuffer buffer2 = ByteBuffer.allocateDirect(1024);
     System.out.println(buffer2.isDirect()); //true
 }

直接内存使用场景

  • 数据量很大并且这些数据的生命周期很长
  • 频繁的 IO 操作,例如网络并发场景

🚀3. 通道(Channel)

通道(Channel)由 java.nio.channels 包定义,表示 IO 源与目标打开的连接,它类似于传统的“流”,只不过通道(Channel)本身不能直接访问数据,只能与 Buffer 进行交互。

NIO 的通道类似于流,但有些区别如下

  • 通道可以同时进行读写,而流只能读或者只能写
  • 通道可以实现异步读写数据,流只能同步读或同步写
  • 通道可以从缓存读数据,也可以写数据到缓存

通道(Channel)在 NIO 中是一个接口

public interface Channel extends Closeable()

常用的 Channel 实现类

  • FileChannel:用于读取、写入、映射和操作文件的通道
  • DatagramChannel:通过 UDP 读写网络中的数据通道
  • SocketChannel:通过 TCP 读写网络中的数据通道
  • ServerSocketChannel:可以监听新建立的 TCP 连接,对每一个新建立的连接都会创建一个 SocketChannel

对于 FileChannel 类,获取通道的一种方式是对支持通道的对象调用 getChannel() 方法,支持通道的类如下

  • FileInputStream
  • FileOutputStream
  • RandomAccessFile
  • DatagramSocket
  • Socket
  • ServerSocket

获取通道的其他方式是使用 Files 类的静态方法 newByteChannel() 获取字节通道或通过通道的静态方法 open() 打开并返回指定通道。

FileChannel 的常用方法

int read(ByteBuffer dst)Channel当中读取数据至ByteBuffer
long read(ByteBuffer[] dsts)Channel当中的数据“分散”至ByteBuffer[]
int write(ByteBuffer src)ByteBuffer当中的数据写入到Channel
long write(ByteBuffer[] srcs)ByteBuffer[]当中的数据“聚集”到Channel
long position()   返回此通道的文件位置
FileChannel position(long p)  设置此通道的文件位置
long size()   返回此通道的文件的当前大小
FileChannel truncate(long s)   将此通道的文件截取为给定大小
void force(boolean meteData)   强制将所有对此通道的文件更新写入到存储设备中

案例1-本地文件写数据

public class ChannelTest {
    @Test
    public void write() {
        try {
            //1.字节输出流通向目标文件
            FileOutputStream fos = new FileOutputStream("data01.txt");
            //2.得到字节输出流对应的通道Channel
            FileChannel channel = fos.getChannel();
            //3.分配缓存区
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.put("hello, friends".getBytes());
            //4.把缓存区切换为写模式
            buffer.flip();
            channel.write(buffer);
            channel.close();
            System.out.println("写数据到文件中!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果截图
在这里插入图片描述
案例2-本地文件读数据

@Test
public void read() throws Exception {
    //1.定义一个文件字节输入流与源文件接通
    FileInputStream is = new FileInputStream("data01_txt");
    //2.需要得到文件字节输入流的文件通道
    FileChannel channel = is.getChannel();
    //3.定义一个缓存区
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    //4.读取数据到缓存区
    channel.read(buffer);
    buffer.flip(); //归位
    //5.读取出缓存区中的数据并输出即可
    String rs = new String(buffer.array(), 0, buffer.remaining());
    System.out.println(rs);
}

运行结果如下

hello, friends

案例3-使用 Buffer 完成文件复制

@Test
public void copy() throws Exception {
//源文件
File srcFile = new File("C:\\Users\\Desktop\\1.jpg");
File destFile = new File("C:\\Users\\Desktop\\1_copy.jpg");
//得到一个字节输出流、字节输入流
FileInputStream fis = new FileInputStream(srcFile);
FileOutputStream fos = new FileOutputStream(destFile);
//得到文件通道
FileChannel fisChannel = fis.getChannel();
FileChannel fosChannel = fos.getChannel();
//分配缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
    //必须先清空缓存区然后再写入数据到缓存区
    buffer.clear();
    //开始读取一次数据
    int flag = fisChannel.read(buffer);
    if (flag == -1) {
        break;
    }
    //已经读取了数据,把缓存区的模式切换为可读模式
    buffer.flip();
    fosChannel.write(buffer);
}
fisChannel.close();
fosChannel.close();
System.out.println("复制完成");
}

案例4-分散(Scatter)和聚集(Gatter)

  • 分散读取(Scatter):把 Channel 通道的数据读取到多个缓存区中
  • 聚集写入(Gathering):是指将多个 Buffer 中的数据聚集到 Channel
@Test
public void test() throws Exception {
   //1.字节输入管道
   FileInputStream is = new FileInputStream("data01.txt");
   FileChannel isChannel = is.getChannel();
   //2.字节输出管道
   FileOutputStream os = new FileOutputStream("data02.txt");
   FileChannel osChannel = os.getChannel();
   //3.定义多个缓存区做数据分散
   ByteBuffer buffer1 = ByteBuffer.allocate(4);
   ByteBuffer buffer2 = ByteBuffer.allocate(1024);
   ByteBuffer[] buffers = {buffer1, buffer2};
   //4.从通道中读取数据分散到多个缓存区
   isChannel.read(buffers);
   //5.从每个缓存区中查询是否有数据读取到
   for (ByteBuffer buffer : buffers) {
       buffer.flip(); //切换到读数据模式
       System.out.println(new String(buffer.array(), 0, buffer.remaining()));
   }
   //6.聚集写入到通道
   osChannel.write(buffers);
   isChannel.close();
   osChannel.close();
   System.out.println("文件复制");
}

运行结果截图如下
在这里插入图片描述
案例5-从目标通道中去复制原通道数据

@Test
public void test02() throws Exception {
    //1.字节输入通道
    FileInputStream is = new FileInputStream("data01.txt");
    FileChannel isChannel = is.getChannel();
    //2.字节输出管道
    FileOutputStream os = new FileOutputStream("data03.txt");
    FileChannel osChannel = os.getChannel(); //目标通道
    //3.复制数据
    osChannel.transferFrom(isChannel, isChannel.position(), isChannel.size());
    isChannel.close();
    osChannel.close();
    System.out.println("复制完成");
}

运行结果截图如下
在这里插入图片描述
案例6-把原通道数据复制到目标通道

@Test
public void test03() throws Exception{
    //1.字节输入通道
    FileInputStream is = new FileInputStream("data01.txt");
    FileChannel isChannel = is.getChannel();
    //2.字节输出管道
    FileOutputStream os = new FileOutputStream("data04.txt");
    FileChannel osChannel = os.getChannel();
    //3.复制数据
    isChannel.transferTo(isChannel.position(), isChannel.size(), osChannel);
    isChannel.close();
    osChannel.close();
    System.out.println("复制完成");
}

运行结果截图如下
在这里插入图片描述

🚀4. 选择器(Selector)

选择器(Selector)是 SelectableChannel 对象的多路复用器,它可以同时监控多个 SelectableChannel 的 IO 状况,利用选择器(Selector)可以使一个单独的线程管理多个通道(Channel).

Selector 是非阻塞 IO 的核心

  • Java 的 NIO 使用非阻塞的 IO 方式,可以用一个线程处理多个客户端的连接,就会使用到选择器。
  • 选择器能够检测到多个注册的通道,通道上若有事件发生便获取事件然后针对每个事件进行相应的处理,这样便可以只用一个单线程去管理多个通道,即管理多个连接和请求。
  • 只有在连接的通道真正有读写事件发生时,才会进行读写,而不用为每个连接都创建一个线程,不用去维护多个线程避免了多线程之间的上下文切换带来的开销

在这里插入图片描述

创建 Selector:通过 Selector.open() 方法创建一个 Selector

Selector selector = Selector.open()

向选择器注册通道SelectableChannel.register(Selector sel, int ops)

 //1.获取通道
 ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2.切换非阻塞模式
 ssChannel.configureBlocking(false);
 //3.绑定连接
 ssChannel.bind(new InetSocketAddress(9898));
 //4.获取选择器
 Selector selector = Selector.open();
 //5.将通道注册到选择器上,并指定"监听接收事件"
 ssChannel.register(selector, SelectionKey.OP_ACCEPT);

当调用 register(Selector sel, mt ops) 向通道注册选择器时,通过第二个参数 ops 指定选择器对通道的监听事件,可以监听的事件类型有

  • 读:SelectionKey.OP_READ(1)
  • 写:SelectionKey.OP_WRITE(4)
  • 连接:SelectionKey.OP_CONNECT(8)
  • 接收:SelectionKey.OP_ACCEPT(16)

若注册时不止监听一个事件,则可以使用“位或”操作符连接

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE

🚀5. NIO 非阻塞式网络通信原理分析

Java NIO 的组件—–选择器(Selector)能够实现一个 IO 线程可以并发处理 N 个客户端连接和读写操作,从根本上解决了传统同步阻塞 IO 的一个连接一个线程的模型在性能上的缺陷问题,提高了架构的性能、弹性伸缩能力和可靠性。
在这里插入图片描述
服务端流程

1、当客户端连接服务端时,服务端会通过 ServerSocketChannel 得到 SocketChannel

ServerSocketChannel ssChannel = ServerSocketChannel.open();

2、切换阻塞模式

ss.Channel.configureBlocking(false);

3、绑定连接

ssChannel.bind(new InetSocketAddress(9999));

4、获取选择器

Selector selector = Selector.open();

5、将通道注册到选择器上,并且指定“监听接收事件”

ssChannel.register(seletor, SelectionKey.OP_ACCEPT);

6、轮询式的获取选择器上已经“准备就绪”的事件

//轮询式的获取选择器上已经"准备就绪"的事件
while (selector.select() > 0) {
 System.out.println("轮询");
 //获取当前选择器中所有注册的"选择键(已就绪的监听事件)"
 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
 while (it.hasNext()) {
     //获取准备“就绪”的事件
     SelectionKey sk = it.next();
     //判断具体是什么事件准备就绪
     if (sk.isAcceptable()) {
         //若“接收就绪”,获取客户端连接
         SocketChannel sChannel = ssChannel.accept();
         //切换非阻塞模式
         sChannel.configureBlocking(false);
         //将该通道注册到选择器上
         sChannel.register(selector, SelectionKey.OP_READ);
     } else if (sk.isReadable()) {
         //获取当前选择器上"读就绪"状态的通道
         SocketChannel sChannel = (SocketChannel) sk.channel();
         //读取数据
         ByteBuffer buf = ByteBuffer.allocate(1024);
         int len = 0;
         while ((len = sChannel.read(buf)) > 0) {
             buf.flip();
             System.out.println(new String(buf.array(), 0, len));
             System.out.println();
             buf.clear();
         }
     }
 }
 //取消选择键SelectionKey
 it.remove();

客户端流程

1、获取通道

SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));

2、切换非阻塞模式

sChannel.configureBlocking(false);

3、分配指定大小的缓存区

ByteBuffer buf = ByteBuffer.allocate(1024);

4、发送数据给服务端

Scanner scan = new Scanner(System.in);
while (scan.hasNext()) {
    String str = scan.nextLine();
    buf.put((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(System.currentTimeMillis())
    + "\n" + str).getBytes());
    buf.flip();
    sChannel.write(buf);
    buf.clear();
}
//关闭通道
sChannel.close();

🚀6. NIO 非阻塞式网络通信案例

服务端接收客户端的连接请求,并接收多个客户端发送过来的事件。

客户端代码如下

public class Client {
    public static void main(String[] args) throws IOException {
        //1.获取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
        //2.切换为非阻塞模式
        sChannel.configureBlocking(false);
        //3.分配指定大小缓存区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //4.发送数据给服务端
        Scanner sc = new Scanner(System.in);
        while (true) {
            System.out.print("请输入:");
            String msg = sc.nextLine();
            buffer.put(("华仔仔:"+msg).getBytes());
            buffer.flip();
            sChannel.write(buffer);
            buffer.clear();
        }

    }
}

服务端代码如下

public class Server {
    public static void main(String[] args) throws IOException {
        System.out.println("---------服务端启动-----------");
        //1.获取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        //2.切换为非阻塞模式
        ssChannel.configureBlocking(false);
        //3.绑定连接的端口
        ssChannel.bind(new InetSocketAddress(9999));
        //4.获取选择器
        Selector selector = Selector.open();
        //5.将通道都注册到选择器上去,并且开始指定监听接收事件
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);
        //6.使用Selector选择器轮询已经就绪好的事件
        while (selector.select() > 0) {
            System.out.println("开始一轮事件处理...");
            //7.获取选择器中的所有注册的通道中已经就绪好的事件
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            //8.开始遍历这些准备好的事件
            while (it.hasNext()) {
                //提取当前这个事件
                SelectionKey sk = it.next();
                //9.判断这个事件具体是什么事件
                if (sk.isAcceptable()) {
                    //10.直接获取当前接入的客户端通道
                    SocketChannel sChannel = ssChannel.accept();
                    //11.将客户端通道也设置为非阻塞式的
                    sChannel.configureBlocking(false);
                    //12.将客户端通道也注册到选择器Selector上
                    sChannel.register(selector, SelectionKey.OP_READ);
                } else if (sk.isReadable()) {
                    //13.获取当前选择器上的"读就绪事件"
                    SocketChannel sChannel = (SocketChannel) sk.channel();
                    //14.开始读取数据
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int len = 0;
                    while ((len = sChannel.read(buffer)) > 0) {
                        buffer.flip();
                        System.out.println(new String(buffer.array(), 0, len));
                        buffer.clear();  //清除之前的数据
                    }
                }
                //处理完毕当前事件后,需要移除当前事件,否则会重复处理
                it.remove();
            }
        }
    }
}

运行结果如下

---------服务端启动-----------
开始一轮事件处理...
开始一轮事件处理...
开始一轮事件处理...
开始一轮事件处理...
华仔仔:你好,我是client1
开始一轮事件处理...
华仔仔:你好,我是client2
开始一轮事件处理...
华仔仔:你好,我是client3

--------client1-----------
请输入:你好,我是client1
--------client2-----------
请输入:你好,我是client2
--------client3-----------
请输入:你好,我是client3

🚀7. NIO 网络编程实现群聊系统应用

需求说明

  • 编写一个 NIO 群聊系统,实现客户端与客户端的通信需求(非阻塞)
  • 服务端:可以检测用户上线、离线,并实现消息转发功能
  • 客户端:通过通道(Channel) 可以无阻塞发送信息给其他所有客户端用户,同时可以接受其他客户端用户通过服务端转发的消息

服务端代码实现

public class Server {

    private Selector selector;
    private ServerSocketChannel ssChannel;
    private static final int PORT = 9999;

    //初始化工作
    public Server() {
        try {
            //1.创建选择器
            selector = Selector.open();
            //2.获取通道
            ssChannel = ServerSocketChannel.open();
            //3.切换为非阻塞模式
            ssChannel.configureBlocking(false);
            //4.绑定连接的端口
            ssChannel.bind(new InetSocketAddress(PORT));
            //5.将通道都注册到选择器上去,并且开始指定监听接收事件
            ssChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //监听
    public void listen() {
        try {
            while (selector.select() > 0) {
                //获取选择器中所有注册通道的就绪事件
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                //开始遍历这个事件
                while (it.hasNext()) {
                    //提取这个事件
                    SelectionKey sk = it.next();
                    //判断这个事件
                    if (sk.isAcceptable()) {
                        //客户端接入请求
                        //获取当前客户端通道
                        SocketChannel schannel = ssChannel.accept();
                        //注册成非阻塞模式
                        schannel.configureBlocking(false);
                        //注册给选择器,监听读数据的事件
                        schannel.register(selector, SelectionKey.OP_READ);
                    } else if (sk.isReadable()) {
                        //处理这个客户端的消息接收它,然后实现转发逻辑
                        readClientData(sk);
                    }
                    it.remove(); //处理完毕之后,需要移除当前事件
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //接收当前客户端消息,转发给其他全部客户端通道
    private void readClientData(SelectionKey sk) {
        SocketChannel sChannel = null;
        try {
            //直接得到当前客户端通道
            sChannel = (SocketChannel) sk.channel();
            //创建缓存区对象,开始接收客户端通道的数据
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = sChannel.read(buffer);
            if (count > 0) {
                buffer.flip();
                //提取读取到的信息
                String msg = new String(buffer.array(), 0, buffer.remaining());
                System.out.println("接收到了客户端消息:"+msg);
                //把这个消息推送给全部客户端接收
                sendMsgToAllClient(msg, sChannel);
            }
        } catch (Exception e) {
            try {
                System.out.println("有人离线了:"+sChannel.getRemoteAddress());
                //当前客户端离线
                sk.cancel(); //取消注册
                sChannel.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }

    //把当前客户端的消息推送给当前全部在线注册的channel
    private void sendMsgToAllClient(String msg, SocketChannel sChannel) throws IOException {
        System.out.println("服务端开始转发这个消息,当前处理的线程"+ Thread.currentThread().getName());
        for (SelectionKey key : selector.keys()) {
            Channel channel = key.channel();
            if (channel instanceof SocketChannel && channel != sChannel) {
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                ((SocketChannel) channel).write(buffer);
            }
        }
    }

    public static void main(String[] args) {
        //创建服务端对象
        Server server = new Server();
        //开始监听客户端的各种消息事件:连接、群聊消息、离线消息
        server.listen();
    }

}

客户端代码实现

public class Client {

    private Selector selector;
    private static int PORT = 9999;
    private SocketChannel socketChannel;

    //初始化客户端信息
    public Client() {
        try {
            //创建选择器
            selector = Selector.open();
            //连接服务器
             socketChannel= SocketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
            //设置非阻塞模式
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
            System.out.println("当前客户端准备完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void sendToServer(String s) {
        try {
            socketChannel.write(ByteBuffer.wrap(("华仔说:"+s).getBytes()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void readInfo() throws IOException {
        while (selector.select() > 0) {
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                if (key.isReadable()) {
                    SocketChannel sc = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    sc.read(buffer);
                    System.out.println(new String(buffer.array()).trim());
                    System.out.println("-dsd---------------------");
                }
                it.remove();
            }
        }
    }

    public static void main(String[] args) {
        Client client = new Client();
        //定义一个线程,专门负责监听服务端发送过来的读消息事件
        new Thread(() -> {
            try {
                client.readInfo();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        //发消息
        Scanner sc = new Scanner(System.in);
        while (sc.hasNextLine()) {
            System.out.println("-----------------------");
            String s = sc.nextLine();
            client.sendToServer(s);
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/5137.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!