《Java I/O 模型》专栏文章索引
👉《Java I/O 模型》Java BIO
👉《Java I/O 模型》Java NIO
👉《Java I/O 模型》Java AIO
《Java I/O 模型》Java NIO
🚀1. Java NIO 介绍
🎁Java NIO(New IO) 又被称为 Java Non-Blocking IO,是在 Java 1.4 开始引入的一个新的 IO API. NIO 支持面向缓冲区的、基于通道的 IO 操作,以更高效的方式进行文件的读写操作。传统 IO 的读写操作只能阻塞执行,线程在读写期间不能干其他事情。例如,调用 socket.read() 时,如果服务器一直没有数据传输过来,线程就一直阻塞,而 NIO 可以配置 socket 为非阻塞模式。
NIO 的非阻塞模式详解
- NIO 非阻塞读,使一个线程从某个通道发送请求或读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变为可读取之前,该线程可以继续做其他的事情。
- NIO 非阻塞写,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做其他事情。
NIO 和 BIO 的比较
- BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,IO 块比 IO 流效率更高。
- BIO 是阻塞的,NIO 是非阻塞的。
- BIO 基于字节流和字符流进行操作,NIO 基于通道(Channel)和缓存区(Buffer)进行操作,数据总是从通道读取到缓存区中,或者从缓存区写入到通道中。
NIO | BIO |
---|---|
面向缓存区(Buffer) | 面向流(Stream) |
非阻塞IO(Non-Blocking IO) | 阻塞IO(Blocking IO) |
选择器(Selector) |
NIO 有三大核心组件
- 通道(Channel):Java NIO 的通道类似流,但又有些不同,既可以从通道中读取数据,又可以写数据到通道中,但流的读写通常都是单向的。通道可以非阻塞读取和写入数据,通道支持读取或写入缓存区,也支持异步地读写。
- 缓存区(Buffer):缓存区本质上是一块可以写入数据并从中读取数据的内存,这块内存被包装成 NIO Buffer 对象,并提供了一组方法用来方便的访问这块内存。
- 选择器(Selector):可以监听一个或多个 NIO 通道,并确定哪些通道已经准备好进行读取或写入,单个线程可以管理多个通道,从而管理多个网络连接提高效率。
🚀2. 缓存区(Buffer)
缓存区是一个用于特定基本类型的容器,由 java.nio 包定义,所有缓存区都是 Buffer 抽象类的子类。主要用于与 NIO 通道进行交互,数据从通道读入缓存区或者从缓存区写入通道中。
缓存区就像一个数组,可以保存多个相同类型的数据,根据数据类型划分,有以下 Buffer 常用子类
- ByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
虽然上述 Buffer 类各自管理的数据类型不同,但是都是采用类似的方式管理数据,通过下面的方法获取一个 Buffer 对象
static xxxBuffer allocate(int capacity):创建一个容量为capacity的xxxBuffer对象
缓存区的基本属性
- 容量(capacity):作为一个内存块,Buffer 具有固定的大小,也成为“容量”,缓冲区容量不能为负,并且创建后不能更改。
- 限制(limit):表示缓存区中可以操作数据的大小(limit 后数据不能进行读写),缓存区的限制不能为负,并且不能大于其容量。写入模式下,限制等于 buffer 的容量,读取模式下,limit 等于写入的数据量。
- 位置(position):下一个要读取或写入的数据的索引,缓存区的位置不能为负,并且不能大于其限制。
- 标记(mark)与重置(reset):标记是一个索引,通过 Buffer 中的 mark() 方法指定 Buffer 中一个特定的位置,之后通过调用 reset() 方法恢复到这个位置。标记、位置、限制、容量遵循不变式
0<=mark<=position<=limit<=capacity
缓存区常见方法
Buffer clear() //清空缓存区并返回对缓存区的引用
Buffer flip() //为将缓存区的界限设置为当前位置,并将当前位置重置为0
int capacity() //返回Buffer的capacity大小
boolean hasRemaining()//判断缓存区中是否还有元素
int limit() //返回缓存区的界限(limit)的位置
Buffer limit(int n)//将设置缓存区界限为n,并返回一个具有新limit的缓存区对象
Buffer mark() //对缓存区设置标记
int position() //返回缓存区的当前位置position
Buffer position(int n)//将设置缓存区的当前位置为n,并返回修改后的缓存区对象
int remaining() //返回position和limit之间的元素个数
Buffer reset() //将位置position转到以前设置的mark所在的位置
Buffer rewind() //将位置设为为0.取消设置的mark
缓存区的数据操作
Buffer 所有子类提供了两个用于数据操作的方法:get() put() 方法获取 Buffer 中的数据
get(): 读取单个字节
get(byte[] dst): 批量读取多个字节到 dst 中
get(int index): 读取指定索引位置的字节(不会移动position)
放入数据到 Buffer 中
put(byte b): 将单个字节写入缓存区的当前位置
put(byte[] src): 将src中的字节写入缓存区的当前位置
put(int index, byte b): 将指定字节写入缓存区的索引位置(不会移动position)
使用缓存区读写数据一般遵循以下四个步骤
- 写入数据到缓存区
- 调用 buffer.flip() 方法,转换为读取模式
- 从缓存区中读取数据
- 调用 buffer.clear() 方法或 buffer.compact() 方法清除缓存区
案例演示
public class BufferTest {
@Test
public void test01() {
//1.分配一个缓存区,容量设置为10
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println(buffer.position()); //0
System.out.println(buffer.limit()); //10
System.out.println(buffer.capacity()); //10
System.out.println("----------------------");
//put往缓存区中添加数据
String name = "java nio";
buffer.put(name.getBytes());
System.out.println(buffer.position()); //8
System.out.println(buffer.limit()); //10
System.out.println(buffer.capacity()); //10
System.out.println("----------------------");
//3.flip()为将缓存区的界限设置为当前位置,并将当前位置设置为0 可读模式
buffer.flip();
System.out.println(buffer.position()); //0
System.out.println(buffer.limit()); //8
System.out.println(buffer.capacity()); //10
System.out.println("----------------------");
//4.get数据的获取
char ch = (char) buffer.get();
System.out.println(ch); //j
System.out.println(buffer.position()); //1
System.out.println(buffer.limit()); //8
System.out.println(buffer.capacity()); //10
}
@Test
public void test02() {
//分配一个缓存区,容量设置为10
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println(buffer.position()); //0
System.out.println(buffer.limit()); //10
System.out.println(buffer.capacity()); //10
System.out.println("----------------------");
//put往缓存区中添加数据
String name = "java nio";
buffer.put(name.getBytes());
System.out.println(buffer.position()); //8
System.out.println(buffer.limit()); //10
System.out.println(buffer.capacity()); //10
System.out.println("----------------------");
//clear清除缓存区中的数据
buffer.clear();
System.out.println(buffer.position()); //0
System.out.println(buffer.limit()); //10
System.out.println(buffer.capacity()); //10
System.out.println((char)buffer.get()); //j, 数据并没有被恢复,只是恢复了position的位置
System.out.println("----------------------");
//定义一个缓存区
ByteBuffer buf = ByteBuffer.allocate(10);
String n = "javanio";
buf.put(n.getBytes());
buf.flip();
//读取数据
byte[] b = new byte[2];
buf.get(b);
String rs = new String(b);
System.out.println(rs); //ja
System.out.println(buffer.position()); //1
System.out.println(buffer.limit()); //10
System.out.println(buffer.capacity()); //10
System.out.println("----------------------");
buf.mark(); //标记此刻这个位置:2
byte[] b2 = new byte[3];
buf.get(b2);
System.out.println(new String(b2)); //van
System.out.println(buffer.position()); //1
System.out.println(buffer.limit()); //10
System.out.println(buffer.capacity()); //10
System.out.println("----------------------");
buf.reset(); //回到标记位置
if (buf.hasRemaining()) {
System.out.println(buf.remaining()); //5
}
}
}
直接内存与非直接内存
byte buffer 可以是两种类型,一种是基于直接内存(也就是非堆内存),另一种是非直接内存(也就是堆内存)。对于直接内存来说,JVM 将会在 IO 操作上具有更高的性能,因为它是直接作用于本地系统的 IO 操作;而非直接内存,如果要进行 IO 操作,会先从本进程内存复制到直接内存,再利用本地 IO 处理。
从数据流的角度,非直接内存是下面这样的作用链
本地IO---->直接内存---->非直接内存---->直接内存---->本地IO
而直接内存的作用链为
本地IO---->直接内存---->本地IO
在做 IO 处理时,例如通过网络发送大量数据,直接内存具有更高的效率,因为直接内存使用 allocateDirtect 创建。虽然它比申请普通的堆内存需要耗费更高的性能,但是这部分的数据是在 JVM 之外的,它不会占用应用的内存。
因此,如果有很大的数据需要缓存,并且数据生命周期很长,那么使用直接内存比较合适。如果不能带来很明显的性能提升,还是推荐直接使用堆内存,字节缓冲区是直接缓冲区还是非字节缓冲区,可以通过调用 isDirect() 方法确定。
@Test
public void test03() {
//创建一个非直接内存的缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//buffer.isDirect()用于判断是否为直接内存
System.out.println(buffer.isDirect()); //false
System.out.println("----------------------");
//创建一个直接内存的缓存区
ByteBuffer buffer2 = ByteBuffer.allocateDirect(1024);
System.out.println(buffer2.isDirect()); //true
}
直接内存使用场景
- 数据量很大并且这些数据的生命周期很长
- 频繁的 IO 操作,例如网络并发场景
🚀3. 通道(Channel)
通道(Channel)由 java.nio.channels 包定义,表示 IO 源与目标打开的连接,它类似于传统的“流”,只不过通道(Channel)本身不能直接访问数据,只能与 Buffer 进行交互。
NIO 的通道类似于流,但有些区别如下
- 通道可以同时进行读写,而流只能读或者只能写
- 通道可以实现异步读写数据,流只能同步读或同步写
- 通道可以从缓存读数据,也可以写数据到缓存
通道(Channel)在 NIO 中是一个接口
public interface Channel extends Closeable()
常用的 Channel 实现类
- FileChannel:用于读取、写入、映射和操作文件的通道
- DatagramChannel:通过 UDP 读写网络中的数据通道
- SocketChannel:通过 TCP 读写网络中的数据通道
- ServerSocketChannel:可以监听新建立的 TCP 连接,对每一个新建立的连接都会创建一个 SocketChannel
对于 FileChannel 类,获取通道的一种方式是对支持通道的对象调用 getChannel() 方法,支持通道的类如下
- FileInputStream
- FileOutputStream
- RandomAccessFile
- DatagramSocket
- Socket
- ServerSocket
获取通道的其他方式是使用 Files 类的静态方法 newByteChannel() 获取字节通道或通过通道的静态方法 open() 打开并返回指定通道。
FileChannel 的常用方法
int read(ByteBuffer dst) 从Channel当中读取数据至ByteBuffer
long read(ByteBuffer[] dsts) 将Channel当中的数据“分散”至ByteBuffer[]
int write(ByteBuffer src) 将ByteBuffer当中的数据写入到Channel
long write(ByteBuffer[] srcs) 将ByteBuffer[]当中的数据“聚集”到Channel
long position() 返回此通道的文件位置
FileChannel position(long p) 设置此通道的文件位置
long size() 返回此通道的文件的当前大小
FileChannel truncate(long s) 将此通道的文件截取为给定大小
void force(boolean meteData) 强制将所有对此通道的文件更新写入到存储设备中
案例1-本地文件写数据
public class ChannelTest {
@Test
public void write() {
try {
//1.字节输出流通向目标文件
FileOutputStream fos = new FileOutputStream("data01.txt");
//2.得到字节输出流对应的通道Channel
FileChannel channel = fos.getChannel();
//3.分配缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("hello, friends".getBytes());
//4.把缓存区切换为写模式
buffer.flip();
channel.write(buffer);
channel.close();
System.out.println("写数据到文件中!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行结果截图
案例2-本地文件读数据
@Test
public void read() throws Exception {
//1.定义一个文件字节输入流与源文件接通
FileInputStream is = new FileInputStream("data01_txt");
//2.需要得到文件字节输入流的文件通道
FileChannel channel = is.getChannel();
//3.定义一个缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//4.读取数据到缓存区
channel.read(buffer);
buffer.flip(); //归位
//5.读取出缓存区中的数据并输出即可
String rs = new String(buffer.array(), 0, buffer.remaining());
System.out.println(rs);
}
运行结果如下
hello, friends
案例3-使用 Buffer 完成文件复制
@Test
public void copy() throws Exception {
//源文件
File srcFile = new File("C:\\Users\\Desktop\\1.jpg");
File destFile = new File("C:\\Users\\Desktop\\1_copy.jpg");
//得到一个字节输出流、字节输入流
FileInputStream fis = new FileInputStream(srcFile);
FileOutputStream fos = new FileOutputStream(destFile);
//得到文件通道
FileChannel fisChannel = fis.getChannel();
FileChannel fosChannel = fos.getChannel();
//分配缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
//必须先清空缓存区然后再写入数据到缓存区
buffer.clear();
//开始读取一次数据
int flag = fisChannel.read(buffer);
if (flag == -1) {
break;
}
//已经读取了数据,把缓存区的模式切换为可读模式
buffer.flip();
fosChannel.write(buffer);
}
fisChannel.close();
fosChannel.close();
System.out.println("复制完成");
}
案例4-分散(Scatter)和聚集(Gatter)
- 分散读取(Scatter):把 Channel 通道的数据读取到多个缓存区中
- 聚集写入(Gathering):是指将多个 Buffer 中的数据聚集到 Channel
@Test
public void test() throws Exception {
//1.字节输入管道
FileInputStream is = new FileInputStream("data01.txt");
FileChannel isChannel = is.getChannel();
//2.字节输出管道
FileOutputStream os = new FileOutputStream("data02.txt");
FileChannel osChannel = os.getChannel();
//3.定义多个缓存区做数据分散
ByteBuffer buffer1 = ByteBuffer.allocate(4);
ByteBuffer buffer2 = ByteBuffer.allocate(1024);
ByteBuffer[] buffers = {buffer1, buffer2};
//4.从通道中读取数据分散到多个缓存区
isChannel.read(buffers);
//5.从每个缓存区中查询是否有数据读取到
for (ByteBuffer buffer : buffers) {
buffer.flip(); //切换到读数据模式
System.out.println(new String(buffer.array(), 0, buffer.remaining()));
}
//6.聚集写入到通道
osChannel.write(buffers);
isChannel.close();
osChannel.close();
System.out.println("文件复制");
}
运行结果截图如下
案例5-从目标通道中去复制原通道数据
@Test
public void test02() throws Exception {
//1.字节输入通道
FileInputStream is = new FileInputStream("data01.txt");
FileChannel isChannel = is.getChannel();
//2.字节输出管道
FileOutputStream os = new FileOutputStream("data03.txt");
FileChannel osChannel = os.getChannel(); //目标通道
//3.复制数据
osChannel.transferFrom(isChannel, isChannel.position(), isChannel.size());
isChannel.close();
osChannel.close();
System.out.println("复制完成");
}
运行结果截图如下
案例6-把原通道数据复制到目标通道
@Test
public void test03() throws Exception{
//1.字节输入通道
FileInputStream is = new FileInputStream("data01.txt");
FileChannel isChannel = is.getChannel();
//2.字节输出管道
FileOutputStream os = new FileOutputStream("data04.txt");
FileChannel osChannel = os.getChannel();
//3.复制数据
isChannel.transferTo(isChannel.position(), isChannel.size(), osChannel);
isChannel.close();
osChannel.close();
System.out.println("复制完成");
}
运行结果截图如下
🚀4. 选择器(Selector)
选择器(Selector)是 SelectableChannel 对象的多路复用器,它可以同时监控多个 SelectableChannel 的 IO 状况,利用选择器(Selector)可以使一个单独的线程管理多个通道(Channel).
Selector 是非阻塞 IO 的核心
- Java 的 NIO 使用非阻塞的 IO 方式,可以用一个线程处理多个客户端的连接,就会使用到选择器。
- 选择器能够检测到多个注册的通道,通道上若有事件发生便获取事件然后针对每个事件进行相应的处理,这样便可以只用一个单线程去管理多个通道,即管理多个连接和请求。
- 只有在连接的通道真正有读写事件发生时,才会进行读写,而不用为每个连接都创建一个线程,不用去维护多个线程避免了多线程之间的上下文切换带来的开销
创建 Selector:通过 Selector.open() 方法创建一个 Selector
Selector selector = Selector.open()
向选择器注册通道:SelectableChannel.register(Selector sel, int ops)
//1.获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2.切换非阻塞模式
ssChannel.configureBlocking(false);
//3.绑定连接
ssChannel.bind(new InetSocketAddress(9898));
//4.获取选择器
Selector selector = Selector.open();
//5.将通道注册到选择器上,并指定"监听接收事件"
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
当调用 register(Selector sel, mt ops) 向通道注册选择器时,通过第二个参数 ops 指定选择器对通道的监听事件,可以监听的事件类型有
- 读:SelectionKey.OP_READ(1)
- 写:SelectionKey.OP_WRITE(4)
- 连接:SelectionKey.OP_CONNECT(8)
- 接收:SelectionKey.OP_ACCEPT(16)
若注册时不止监听一个事件,则可以使用“位或”操作符连接
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE
🚀5. NIO 非阻塞式网络通信原理分析
Java NIO 的组件—–选择器(Selector)能够实现一个 IO 线程可以并发处理 N 个客户端连接和读写操作,从根本上解决了传统同步阻塞 IO 的一个连接一个线程的模型在性能上的缺陷问题,提高了架构的性能、弹性伸缩能力和可靠性。
服务端流程
1、当客户端连接服务端时,服务端会通过 ServerSocketChannel 得到 SocketChannel
ServerSocketChannel ssChannel = ServerSocketChannel.open();
2、切换阻塞模式
ss.Channel.configureBlocking(false);
3、绑定连接
ssChannel.bind(new InetSocketAddress(9999));
4、获取选择器
Selector selector = Selector.open();
5、将通道注册到选择器上,并且指定“监听接收事件”
ssChannel.register(seletor, SelectionKey.OP_ACCEPT);
6、轮询式的获取选择器上已经“准备就绪”的事件
//轮询式的获取选择器上已经"准备就绪"的事件
while (selector.select() > 0) {
System.out.println("轮询");
//获取当前选择器中所有注册的"选择键(已就绪的监听事件)"
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
//获取准备“就绪”的事件
SelectionKey sk = it.next();
//判断具体是什么事件准备就绪
if (sk.isAcceptable()) {
//若“接收就绪”,获取客户端连接
SocketChannel sChannel = ssChannel.accept();
//切换非阻塞模式
sChannel.configureBlocking(false);
//将该通道注册到选择器上
sChannel.register(selector, SelectionKey.OP_READ);
} else if (sk.isReadable()) {
//获取当前选择器上"读就绪"状态的通道
SocketChannel sChannel = (SocketChannel) sk.channel();
//读取数据
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = 0;
while ((len = sChannel.read(buf)) > 0) {
buf.flip();
System.out.println(new String(buf.array(), 0, len));
System.out.println();
buf.clear();
}
}
}
//取消选择键SelectionKey
it.remove();
客户端流程
1、获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
2、切换非阻塞模式
sChannel.configureBlocking(false);
3、分配指定大小的缓存区
ByteBuffer buf = ByteBuffer.allocate(1024);
4、发送数据给服务端
Scanner scan = new Scanner(System.in);
while (scan.hasNext()) {
String str = scan.nextLine();
buf.put((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(System.currentTimeMillis())
+ "\n" + str).getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
}
//关闭通道
sChannel.close();
🚀6. NIO 非阻塞式网络通信案例
服务端接收客户端的连接请求,并接收多个客户端发送过来的事件。
客户端代码如下
public class Client {
public static void main(String[] args) throws IOException {
//1.获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
//2.切换为非阻塞模式
sChannel.configureBlocking(false);
//3.分配指定大小缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//4.发送数据给服务端
Scanner sc = new Scanner(System.in);
while (true) {
System.out.print("请输入:");
String msg = sc.nextLine();
buffer.put(("华仔仔:"+msg).getBytes());
buffer.flip();
sChannel.write(buffer);
buffer.clear();
}
}
}
服务端代码如下
public class Server {
public static void main(String[] args) throws IOException {
System.out.println("---------服务端启动-----------");
//1.获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2.切换为非阻塞模式
ssChannel.configureBlocking(false);
//3.绑定连接的端口
ssChannel.bind(new InetSocketAddress(9999));
//4.获取选择器
Selector selector = Selector.open();
//5.将通道都注册到选择器上去,并且开始指定监听接收事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
//6.使用Selector选择器轮询已经就绪好的事件
while (selector.select() > 0) {
System.out.println("开始一轮事件处理...");
//7.获取选择器中的所有注册的通道中已经就绪好的事件
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
//8.开始遍历这些准备好的事件
while (it.hasNext()) {
//提取当前这个事件
SelectionKey sk = it.next();
//9.判断这个事件具体是什么事件
if (sk.isAcceptable()) {
//10.直接获取当前接入的客户端通道
SocketChannel sChannel = ssChannel.accept();
//11.将客户端通道也设置为非阻塞式的
sChannel.configureBlocking(false);
//12.将客户端通道也注册到选择器Selector上
sChannel.register(selector, SelectionKey.OP_READ);
} else if (sk.isReadable()) {
//13.获取当前选择器上的"读就绪事件"
SocketChannel sChannel = (SocketChannel) sk.channel();
//14.开始读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = 0;
while ((len = sChannel.read(buffer)) > 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear(); //清除之前的数据
}
}
//处理完毕当前事件后,需要移除当前事件,否则会重复处理
it.remove();
}
}
}
}
运行结果如下
---------服务端启动-----------
开始一轮事件处理...
开始一轮事件处理...
开始一轮事件处理...
开始一轮事件处理...
华仔仔:你好,我是client1
开始一轮事件处理...
华仔仔:你好,我是client2
开始一轮事件处理...
华仔仔:你好,我是client3
--------client1-----------
请输入:你好,我是client1
--------client2-----------
请输入:你好,我是client2
--------client3-----------
请输入:你好,我是client3
🚀7. NIO 网络编程实现群聊系统应用
需求说明
- 编写一个 NIO 群聊系统,实现客户端与客户端的通信需求(非阻塞)
- 服务端:可以检测用户上线、离线,并实现消息转发功能
- 客户端:通过通道(Channel) 可以无阻塞发送信息给其他所有客户端用户,同时可以接受其他客户端用户通过服务端转发的消息
服务端代码实现
public class Server {
private Selector selector;
private ServerSocketChannel ssChannel;
private static final int PORT = 9999;
//初始化工作
public Server() {
try {
//1.创建选择器
selector = Selector.open();
//2.获取通道
ssChannel = ServerSocketChannel.open();
//3.切换为非阻塞模式
ssChannel.configureBlocking(false);
//4.绑定连接的端口
ssChannel.bind(new InetSocketAddress(PORT));
//5.将通道都注册到选择器上去,并且开始指定监听接收事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
//监听
public void listen() {
try {
while (selector.select() > 0) {
//获取选择器中所有注册通道的就绪事件
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
//开始遍历这个事件
while (it.hasNext()) {
//提取这个事件
SelectionKey sk = it.next();
//判断这个事件
if (sk.isAcceptable()) {
//客户端接入请求
//获取当前客户端通道
SocketChannel schannel = ssChannel.accept();
//注册成非阻塞模式
schannel.configureBlocking(false);
//注册给选择器,监听读数据的事件
schannel.register(selector, SelectionKey.OP_READ);
} else if (sk.isReadable()) {
//处理这个客户端的消息接收它,然后实现转发逻辑
readClientData(sk);
}
it.remove(); //处理完毕之后,需要移除当前事件
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//接收当前客户端消息,转发给其他全部客户端通道
private void readClientData(SelectionKey sk) {
SocketChannel sChannel = null;
try {
//直接得到当前客户端通道
sChannel = (SocketChannel) sk.channel();
//创建缓存区对象,开始接收客户端通道的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = sChannel.read(buffer);
if (count > 0) {
buffer.flip();
//提取读取到的信息
String msg = new String(buffer.array(), 0, buffer.remaining());
System.out.println("接收到了客户端消息:"+msg);
//把这个消息推送给全部客户端接收
sendMsgToAllClient(msg, sChannel);
}
} catch (Exception e) {
try {
System.out.println("有人离线了:"+sChannel.getRemoteAddress());
//当前客户端离线
sk.cancel(); //取消注册
sChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
//把当前客户端的消息推送给当前全部在线注册的channel
private void sendMsgToAllClient(String msg, SocketChannel sChannel) throws IOException {
System.out.println("服务端开始转发这个消息,当前处理的线程"+ Thread.currentThread().getName());
for (SelectionKey key : selector.keys()) {
Channel channel = key.channel();
if (channel instanceof SocketChannel && channel != sChannel) {
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
((SocketChannel) channel).write(buffer);
}
}
}
public static void main(String[] args) {
//创建服务端对象
Server server = new Server();
//开始监听客户端的各种消息事件:连接、群聊消息、离线消息
server.listen();
}
}
客户端代码实现
public class Client {
private Selector selector;
private static int PORT = 9999;
private SocketChannel socketChannel;
//初始化客户端信息
public Client() {
try {
//创建选择器
selector = Selector.open();
//连接服务器
socketChannel= SocketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
//设置非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("当前客户端准备完成");
} catch (Exception e) {
e.printStackTrace();
}
}
private void sendToServer(String s) {
try {
socketChannel.write(ByteBuffer.wrap(("华仔说:"+s).getBytes()));
} catch (Exception e) {
e.printStackTrace();
}
}
private void readInfo() throws IOException {
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
sc.read(buffer);
System.out.println(new String(buffer.array()).trim());
System.out.println("-dsd---------------------");
}
it.remove();
}
}
}
public static void main(String[] args) {
Client client = new Client();
//定义一个线程,专门负责监听服务端发送过来的读消息事件
new Thread(() -> {
try {
client.readInfo();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
//发消息
Scanner sc = new Scanner(System.in);
while (sc.hasNextLine()) {
System.out.println("-----------------------");
String s = sc.nextLine();
client.sendToServer(s);
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/5137.html