目录
一、什么是netty
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
二、BIO &NIO &AIO的概念
BIO
所谓IO即input和output的缩写,是对数据的流入和流出的一种抽象。我们平时说的IO大多指的是BIO。BIO(BlockingIO)是一个同步阻塞的IO,是面向流传输的。阻塞的IO当没有获取到数据时(没有读写操作)会一直处于等待阻塞状态,占用系统资源。虽然可以使用多线程解决,但是频繁(来一个请求创建一个线程)的创建线程对我们的CPU就很不友好,于是NIO就应运而出。
NIO
NIO是同步非阻塞IO(NonBlocking IO),是面向缓存区,非阻塞的IO并不会等待直到获取到数据之后返回,而是不管有没有数据都会立即响应结果。
NIO的三个组件
Channel(通道):通常NIO的操作都是由通道开始的,channel类似于流,每个channel对应一个buffer缓冲区,buffer底层是个数组。Channel会注册到Selector(选择器),由Selector根据channel读写事件的发生将其交给空闲的线程处理。
Buffer(缓冲区):buffer就是一个数组,本质上相当于一个内存区,读取数据时并不会一个字节一个字节的读取,而是将数据先写入到buffer(缓存区),再统一的写到硬盘上。
Selector(选择器):选择器也可以叫做多路复用器(选择那个线程执行),可以对应一个或多个线程。NIO 的Buffer和Channel都是既可以读也可以写的。
AIO
AIO是一个异步非阻塞IO,由操作系统完成后回调通知服务端程序启用线程去处理,一般使用于连接数较多且连接时间长的应用。
三者区别
老张爱喝茶,废话不说,煮开水。 出场人物:老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。
老张把水壶放到火上,立等水开。(同步阻塞BIO) 老张觉得自己有点傻
老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞NIO) 老张还是觉得自己有点傻,于是变高端了,买了把会响笛的那种水壶。水开之后,能大声发出嘀~~~~的噪音。
老张把响水壶放到火上,立等水开。(异步阻塞) 老张觉得这样傻等意义不大
老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞AIO) 老张觉得自己聪明了。
三、为什么使用Netty
- Jdk的Nio API复杂,而且还有空轮训的bug。
- Netty封装了NIO的API,API简单易用,开发门槛低。
- 内置很多编解码功能,支持多种主流协议。
- 性能高,社区活跃。
四、代码
BIO实现服务器&客户端
package com.xiaojie.bio;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description: bio服务器模拟
* @author: xiaojie
* @date: 2021.07.29
*/
public class BioServer {
static int count = 0;
public static void main(String[] args) throws Exception {
//创建一个线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//创建serversocket
ServerSocket serverSocket = new ServerSocket();
//绑定端口号
serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 9999));
System.out.println("等待发送消息。。。。。进入阻塞状态。。。。。");
while (true) {
final Socket accept = serverSocket.accept();
final PrintWriter printWriter = new PrintWriter(accept.getOutputStream());
executorService.submit(() -> {
count++;
System.out.println("count..............." + count);
//获取OutputStream
try {
byte[] bytes = new byte[1024];
if (accept.getInputStream().read(bytes) > 0) {
System.out.println("接收到客户端的消息是" + new String(bytes).trim());
}
//服务端响应
String responMsg = "我是服务端响应信息:hello client";
printWriter.write(responMsg);
printWriter.flush();
} catch (IOException e) {
e.printStackTrace();
} finally {
/* try {
serverSocket.close();
accept.close();
printWriter.close();
} catch (IOException e) {
e.printStackTrace();
}*/
}
});
}
}
}
package com.xiaojie.bio;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
* @Description:客户端
* @author: xiaojie
* @date: 2021.07.29
*/
public class BioClient {
public static void main(String[] args) throws Exception {
//创建socket
Socket socket = new Socket();
//连接
socket.connect( new InetSocketAddress(InetAddress.getLocalHost(),9999));
//创建PrintWriter
PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
//读回数据
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//发送的请求数据
String requeMsg="我是客户端发送请求:hello server";
//客户端写出数据
printWriter.write(requeMsg);
printWriter.flush();
//读回数据
// String readLine = bufferedReader.readLine();
byte[] bytes = new byte[1024];
int read = socket.getInputStream().read(bytes);
if (read>0){
System.out.println("服务端返回消息是:"+new String(bytes).trim());
}
while(true){
}
//关闭连接
// socket.close();
// printWriter.close();
// bufferedReader.close();
}
}
NIO实现服务器&客户端
- nio执行过程
- 创建一个ServerSocketChannel和Selector,将serverSocketChannel注册到Selector上
- selector通过select()方法监听channel事件,当客户端连接时selector监听到连接事件,获取到ServerSocketChannel注册时绑定的selectionKey
- selectionKey通过channel()方法可以获取绑定的ServerSocketChannel
- ServerSocketChannel通过accept()方法得到SocketChannel
- 将SocketChannel注册到Selector上,关心read事件
- 注册后返回一个SelectionKey,会和该SocketChannel关联
- selector继续通过select()方法监听事件,当客户端发送数据给服务端,selector监听到read事件,获取到SocketChannel注册时绑定的selectionKey
- selectionKey通过channel()方法可以获取绑定的socketChannel
- 将socketChannel里的数据读取出来
- 用socketChannel将服务端数据写回客户端
package com.xiaojie.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/*
nio执行过程
1、创建一个ServerSocketChannel和Selector,将serverSocketChannel注册到Selector上
2、selector通过select()方法监听channel事件,当客户端连接时selector监听到连接事件,获取到ServerSocketChannel注册时绑定的selectionKey
3、selectionKey通过channel()方法可以获取绑定的ServerSocketChannel
4、ServerSocketChannel通过accept()方法得到SocketChannel
5、将SocketChannel注册到Selector上,关心read事件
6、注册后返回一个SelectionKey,会和该SocketChannel关联
7、selector继续通过select()方法监听事件,当客户端发送数据给服务端,selector监听到read事件,获取到SocketChannel注册时绑定的selectionKey
8、selectionKey通过channel()方法可以获取绑定的socketChannel
9、将socketChannel里的数据读取出来
10、用socketChannel将服务端数据写回客户端
*/
/**
* @Description:
* @author: xiaojie
* @date: 2021.07.29
*/
public class NioServer {
/*
* 初始化
* @param port
* @todo
* @author xiaojie
* @date 2021/7/29 14:17
* @return void
*/
private Selector selector;
public void initServer(int port) throws IOException {
//创建ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置通道为非阻塞
serverSocketChannel.configureBlocking(false);
//绑定端口号
serverSocketChannel.bind(new InetSocketAddress(port));
//获取通道管理器
this.selector = Selector.open();
/*将通道管理器和通道绑定,并为通道注册SelectionKey.OP_ACCEPT事件,
*当该事件到达时,selector.select()会返回,如果该事件没有到达selector.select()会一直阻塞
*/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
/*
* 监听
* @todo
* @author xiaojie
* @date 2021/7/29 14:26
* @return void
*/
public void listen() throws IOException {
while (true) {
int select = selector.select();
if (select == 0) {
continue;
}
//获取选中的迭代器
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//删除防止重复处理
iterator.remove();
if (key.isAcceptable()) {
//客户端请求连接事件
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//获取客户端连接的通道
SocketChannel channel = serverSocketChannel.accept();
//设置非阻塞
channel.configureBlocking(false);
//在与客户端连接成功之后,给通道设置权限
channel.register(this.selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
read(key);
}
}
}
}
/*
* 读取数据 并返回结果
* @todo
* @author xiaojie
* @date 2021/7/29 14:41
* @return void
*/
public void read(SelectionKey key) throws IOException {
//服务器可读取消息,得到事件发生的通道
SocketChannel socketChannel = (SocketChannel) key.channel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
socketChannel.read(byteBuffer);
byte[] array = byteBuffer.array();
String msg = new String(array);
System.out.println("服务端获取数据:" + msg.trim());
String res = "我是服务端响应信息:hello client";
ByteBuffer buffer = ByteBuffer.wrap(res.getBytes("utf-8"));
//将消息发给客户端
socketChannel.write(buffer);
}
public static void main(String[] args) throws IOException {
NioTcpServer server = new NioServer();
server.initServer(9999);
server.listen();
}
}
package com.xiaojie.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NioClient {
Selector selector;
public static void main(String[] args) throws IOException {
NioClient nioClient = new NioClient();
nioClient.initClient("127.0.0.1",9999);
nioClient.connection();
}
public void initClient(String ip,int port) throws IOException {
// 获取一个socket 通道
SocketChannel socketChannel = SocketChannel.open();
// 设置通道为非阻塞
socketChannel.configureBlocking(false);
// 获取一个通道管理器
this.selector = Selector.open();
// 客户端连接服务器,其实方法执行并没实现连接,需要在listen()方法中
// 调用channel.finishConnection才能完成连接
socketChannel.connect(new InetSocketAddress(ip,port));
// 将管道管理器和通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件
socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
}
public void connection() throws IOException{
// 轮询访问selector
while(true){
// 选择一组可以进行I/O操作的事件,放在selector中,客户端该方法不会阻塞
// 这里和服务端的方法不一样,查看api注释可以知道,服务端当至少一个通道被选中时
// selector的wakeup方法被调用,方法返回,而对于客户端来说,通道是一直被选中的
this.selector.select();
Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
while (it.hasNext()){
SelectionKey key = it.next();
it.remove();
// 连接事件发生
if(key.isConnectable()){
SocketChannel socketChannel =(SocketChannel) key.channel();
// 如果正在连接则完成连接
if(socketChannel.isConnectionPending()){
socketChannel.finishConnect();
}
// 设置成非阻塞
socketChannel.configureBlocking(false);
// 向服务器发送信息
ByteBuffer byteBuffer = ByteBuffer.wrap("我是客户端发送请求:hello server".getBytes());
socketChannel.write(byteBuffer);
// 连接成功之后注册读取服务器信息事件
socketChannel.register(this.selector,SelectionKey.OP_READ);
}else if(key.isReadable()){
read(key);
}
}
}
}
public void read(SelectionKey key) throws IOException{
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = channel.read(byteBuffer);
if(len!=-1){
System.out.println("接收到服务端信息:"+new String(byteBuffer.array(),0,len));
}
}
}
Netty实现服务器&客户端
1、创建ServerBootStrap实例。
2、设置并绑定Reactor线程池:EventLoopGroup,EventLoop就是处理所有注册到本线程的Selector上面的Channel。
3、设置并绑定服务端的channel。
4、创建处理网络事件的ChannelPipeline和handler,网络时间以流的形式在其中流转,handler完成多数的功能定制:比如编解码 SSl安全认证。
5、绑定并启动监听端口。
6、当轮训到准备就绪的channel后,由Reactor线程:NioEventLoop执行pipline中的方法,最终调度并执行channelHandler。
package com.xiaojie.netty.server;
import com.sun.webkit.EventLoop;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/*
*
1、创建ServerBootStrap实例
2、设置并绑定Reactor线程池:EventLoopGroup,EventLoop就是处理所有注册到本线程的Selector上面的Channel
3、设置并绑定服务端的channel
4、5、创建处理网络事件的ChannelPipeline和handler,网络时间以流的形式在其中流转,handler完成多数的功能定制:比如编解码 SSl安全认证
6、绑定并启动监听端口
7、当轮训到准备就绪的channel后,由Reactor线程:NioEventLoop执行pipline中的方法,最终调度并执行channelHandler
*/
/**
* @Description:
* @author: xiaojie
* @date: 2021.07.28
*/
public class Nettyserver {
public static void main(String[] args) {
//创建主线程池,接受请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
//创建work线程池,处理请求
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
//创建serverbootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class).group(bossGroup, workGroup)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new ServerHandler());
}
});
ChannelFuture future =serverBootstrap.bind(8090).sync();
System.out.println("starting...................");
//wait until server is closed
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//关闭线程池
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
package com.xiaojie.netty.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* @Description:
* @author: xiaojie
* @date: 2021.07.28
*/
public class ServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
// 接受我们的数据
ByteBuf byteBuf = (ByteBuf) o;
String request = byteBuf.toString(CharsetUtil.UTF_8);
System.out.println("获取到客户端的请求:" + request);
// 响应内容:
ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端响应信息:hello client", CharsetUtil.UTF_8));
}
}
package com.xiaojie.netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @Description: netty客户端
* @author: xiaojie
* @date: 2021.07.29
*/
public class NettyClient {
public static void main(String[] args) {
//创建线程池
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
//创建bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(eventLoopGroup).remoteAddress("127.0.0.1",8090)
.handler(new ChannelInitializer<SocketChannel>() {
//处理消息
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast( new ClientHandler());
}
});
//开始连接
try {
ChannelFuture channelFuture = bootstrap.connect().sync();
ChannelFuture sync = channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//关闭
eventLoopGroup.shutdownGracefully();
}
}
}
package com.xiaojie.netty.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* @Description: 客户端处理handler
* @author: xiaojie
* @date: 2021.07.29
*/
public class ClientHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf=(ByteBuf) msg;
String respon = byteBuf.toString(CharsetUtil.UTF_8);
System.out.println("获取到服务器端响应数据是:"+respon);
}
/*
*
* @param ctx
* @在建立连接并准备传输数据时调用
* @author xiaojie
* @date 2021/7/30 10:00
* @return void
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i=0;i<10;i++){
ctx.writeAndFlush(Unpooled.copiedBuffer("我是客户端发送请求:hello server", CharsetUtil.UTF_8));
}
}
}
四、粘包和拆包
在netty服务器的代码中运行后结果如下图,这种现象我们叫粘包/拆包。产生的原因是应用层面使用了Netty,但是对于操作系统来说,只认TCP协议,尽管我们的应用层是按照 ByteBuf 为 单位来发送数据,server按照Bytebuf读取,但是到了底层操作系统仍然是按照字节流发送数据,因此,数据到了服务端,也是按照字节流的方式读入,然后到了 Netty 应用层面,重新拼装成 ByteBuf,而这里的 ByteBuf 与客户端按顺序发送的 ByteBuf 可能是不对等的。因此,我们需要在客户端根据自定义协议来组装我们应用层的数据包,然后在服务端根据我们的应用层的协议来组装数据包,这个过程通常在服务端称为拆包,而在客户端称为粘包。拆包和粘包是相对的,一端粘了包,另外一端就需要将粘过的包拆开。说直白点就是ByteBuf(缓存区的锅),当我们发送的数据小于ByteBuf(缓存区)的大小时,就会发生粘包(发送的数据多条合并成一条),当发送的数据大于ByteBuf(缓存区)的大小时,就会发生拆包(一次发送的数据没有完全包含我们要发送的数据)。
解决粘包拆包Netty自带四中拆包器
- LineBasedFrameDecoder: 行拆包器,在发送数据时以换行符(\n)作为分隔,接收端通过LineBasedFrameDecoder 将粘包的byteBuf进行拆包。
- FixedLengthFrameDecoder:固定长度的拆包器,顾名思义就是针对每个数据包长度固定。
- DelimiterBasedFrameDecoder:分隔符拆包器,可以自定义分隔符。
- LengthFieldBasedFrameDecoder:基于长度域的拆包器。
代码如下:
参考:
https://baike.baidu.com/item/Netty/10061624?fr=aladdin
https://blog.csdn.net/Zev_java/article/details/113932466
https://blog.csdn.net/u012250875/article/details/78341874
https://www.cnblogs.com/shineman-zhang/articles/13884407.html
https://netty.io/
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/18531.html