netty网络编程2
7.Netty 线程模型
Reactor模式
Reactor 是反应堆的意思,Reactor 模型是指通过一个或多个输入同时传递给处理器。处理器将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式。Netty 整体是采用了主从Reactor模型
Reactor角色
Reactor模型中有三种角色,分别是
-
Acceptor:处理客户端新连接,并分派请求到处理器链中
-
Reactor:负责监听和分配事件,将I/O事件分派给对应的Handler。
-
Handler:事件处理,如编码、解码等
Reactor 线程模型
Reactor有三种线程模型分别是:单Reactor单线程模型、单Reactor多线程模型、主从Reactor多线程模型。而Netty正是采用最后一种。
单Reactor单线程模型
该模型下所有请求建立、IO读写、业务处理都在一个线程中完成。如果在业务中处理中出现了耗时操作,就会导致所有请求全部处理延时。因为他们是有由一个线程同步处理的。
单Reactor多线程模型
为了防止业务处理导致阻塞,在多线程模型下会用一个线程池来异步处理业务,当处理完成后在回写给客户端。
主从Reactor多线程模型
单React始终无法发挥现代服务器多核CPU的并行处理能力,所以Reactor是可以有多个的,并且有一主多从之分。一个主Reactor仅处理连接,而多个子Reactor用于处理IO读写。然后交给线程池处理业务。Tomcat就是采用该模式实现。
Netty线程模型
Netty采用了主从Reactor模型实现,服务端启动的时候,创建两个NioEventLoopGroup,主Reactor即对应Boss group 线程组,子Reactor对应Worker Group 线程组。一个用于用于接收客户端的Tcp连接,并将建立好的连接注册到Worker组,另一个用于处理I/O相关的读写操作,当IO事件触发后由对应Pipeline进行处理以及执行系统Task和定时任务Task。
Netty用于接收客户端请求的线程池职责如下。
(1)接收客户端的Tcp连接,初始化Channel参数;
(2)将链路状态变更事件通知给ChannelPiepeline。
Netty处理I/O操作的Reactor线程池职责如下
(1)异步读取通信对端的数据,发送读事件到ChannelPipeline
(2)异步发送消息到通信对端,调用ChannelPipeline的消息发送接口;
(3)执行系统调用Task
(4)执行定时任务Task,例如链路空闲状态监测定时任务。
通过调整线程池的线程个数、是否共享线程池等方式,Netty的Reactor线程模型可以在单线程、多线程和主从多线程间切换,这种灵活的配置方式可以最大程度地满足不同用户的个性化定制。
为了尽可能提升性能,Neety在很多地方进行了无锁化设计,例如在I/O线程内部进行串行操作,避免多线程竞争导致的性能下降问题。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的参数,可以同时启动多个串行化线程并行运行,这种局部无锁化的串行线程设计相比一个队列多个工作线程模型性能更优。
NioEventLoop
事件循环器,这里充当了Reactor的核心。每个NioEventLoop都会包含一个Selector选择器,用于处理IO事件,和两个taskQueue,一个用于存储用户提交的任务一个处理定时任务的Task。EventLoop中有一个独有的线程池,默认是不启动的,当有任务触发时就会启动,并一直轮询下去。
以下示例即声明大小为1的线程组,当submit提交任务之后该EventLoop就会启动
NioEventLoopGroup group= new NioEventLoopGroup(1);
// 提交任务
group.submit(() -> System.out.println("submit:"+Thread.currentThread().getId()));
group.shutdownGracefully();// 优雅关闭EventLoop
除了提交任务外,其更重要的事情是处理Channel 相关IO事件。如管道注册。调用register方法最终会调用NIO当中的register方法进行注册,只不过Netty已经实现封装好了,并且处理好了同步锁的问题。
EventLoopGroup.register(Channel)
Netty 为了安全的调用IO操作,把所有对IO的直接操作都封状了一个任务,交由IO线程执行。所以我们通过Netty来调用IO是不会立即返回的
NioChannel与Channelhandler
netty将原有的Nio Channel 被封装成了NioChannel,当然最终底层还是在调用NIO Channel。原来对Channel 中读写事件处理被封装成Channelhandler进行处理,并用引入Pipeline的概念。我们先来了解一下他们的用法
注:Channel与Pipeline 的概念还有很多知识体系,后续慢慢道来。这会我们先有印象即可。
Netty 基本用法
NioChannel 用法
通过一个示例来了解NioChannel的用法,在该示例中会接收客户端连接,并把消息打印出来。
完整的实现把它拆分成以下步骤:
1.初始化管道
初始化操作与原生NIO类似,都是打开管道、注册选择器最后绑定端口。但有一点要说明
NioChannel当中所有操作都是在EventLoop中完成的,所以在绑定端口之前必须先注册。
NioEventLoopGroup boss= new NioEventLoopGroup(1);
NioServerSocketChannel channel= new NioServerSocketChannel();
boss.register(channel);
channel.bind(new InetSocketAddress(8080));// 提交任务到 EventLoop
2.初始化Pipeline
原生NIO 是直接遍历选择集然后处理读写事件,在Netty中直接处理读写是不安全的也不推荐的,而是采用ChannelHandler来间接的处理读写事件。一般情况下读写是有多个步骤的。Netty中提供了Pipeline来组织这些ChannelHandler。Pipeline是一个链表容器,可以通过addFirst、addLast 在首尾增加Handler。
方法中 msg对象是已经建立好的连接管道即(NioSocketChannel)
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// 处理新连接 ,accept与READ事件都是用该方法来处理
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
handlerAccept(work,msg);
}
});
3.注册新管道,并初始化它
获取新管道(NioSocketChannel) ,需要重新注册至EventLoop,才能接收消息。简单起见直接注册到与其父Channel相同的EventLoop中(也可以用一个专门的EventLoop组来处理子管道事件)。接下来同样要初始化子管道。重写channelRead0 方法来收取消息。
private void handlerAccept(NioEventLoopGroup group, Object msg) {
NioSocketChannel channel= (NioSocketChannel) msg;
EventLoop loop = group.next();
loop.register(channel);
channel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println(msg.toString(Charset.defaultCharset()));
}
});
}
NioServerSocketChannel 与NioSocketChannel 是父子关系 ,需要分别注册和注初始化其pipeline
ServerBootStrap用法
上述的案例当中,即需自己初始化父Channel,又需要初始化子管道,比较复杂。Netty又提供了一个ServerBootStrap 进一步封装上述的注册、绑定、初始化等操作,已简化对Netty API的调用。
通过一个Http服务实现来告诉大家Netty常规用法。其所有实现分为以下步骤
1.初始化
通过ServerBootstrap 可直接设定服务的线程组,其中boss 用于处理NioSeverSocketChannl中的Accept事件,而Work组用于处理IO读写,以及用户提交的异步任务。通过指定Channel class 来告诉ServerBootstrap 要维护一个什么样的管道。
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup(8);
bootstrap.group(boss, work)
//指定要打开的管道 自动进行进行注册==》NioServerSocketChannel ->
.channel(NioServerSocketChannel.class)
- 设置子管道的Pipeline
然后就可以初始化子管道的Pipeline了,为其绑定对应的处理Handler即可。我们目标是实现一个Http服务,对应的三个基本操作是 解码、业务处理、编码。其中编解码是Htpp协议通用处理,Netty已自带处理器,直接添加即可。业务处理需要手动编写
// 初始化子管道
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast("decode", new HttpRequestDecoder()); // 输入
ch.pipeline().addLast("servlet", new MyServlet());
ch.pipeline().addFirst("encode", new HttpResponseEncoder());// 输出流
}
});
2.业务处理
通过实现SimpleChannelInboundHandler 可直接处理读事件用来接收客户端的请求,接下来构造一个Response 并设置状态码、响应头,以及响应消息即可完成一个简单的Http服务
private class MyServlet extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
response.content().writeBytes("上传完毕".getBytes());
ChannelFuture future = ctx.writeAndFlush(response);
future.addListener(ChannelFutureListener.CLOSE);
}
}
3.绑定端口
最后一个步骤是由ServerBoottrap 来绑定端口
ChannelFuture future = bootstrap.bind(port);
future.addListener(future1 -> System.out.println("服务启动成功"));
Netty Channel 基本介绍
在java 原生NIO操作中Channel 是一个非常核心的组件,它可以用于连接传输的两端,并提供传输和读写相关操作比如:绑定端口、建立连接、读写消息、以及关闭管道等。只有是SelectableChannel才可以注册到选择器,由选择器监听读写,从而实现非阻塞的功能。
Netty 的Channel 是在原生基础之进行封装,也就是原有的功能Netty中同样可以实现。同时引入了Pipeline与异步执行机制。
1、Pipeline 会组装串联若干个 ChannelHandler(管道处理器),所有针对管道主动或被动发生的事件都会在Pipeline上进行流转处理。
2、异步机制是指在原生 NIO Channel 中异步进行IO这是一种不安全行为,它会触发阻塞甚至会导致死锁。所以一般情况下在非 IO线程中操作Channel 都会以任务的形式进行封装,并提交到IO线程执行。而这些在Netty中都已经内部封装实现,即使异步调用Netty Channel都是安全的。
JAVA 原生NIO中不同Channel实现会有不同的功能,在Netty中也是类似的它不同的子类会包含对应原生NIO中的Channel 。常见如下图:
Netty Channel 基本套路
接下来我们已NioDatagramChannel 为例子讲解Channel的基本用法 。基于以下步骤可以基于Netty Channel构建一个最基本的UDP服务。
- 构建线程组,用于IO线程
- 创建管道并注册管道
- 初始化pipeline,为其添加Channel Handler处理器
- 绑定端口
//1.构建线程组,即IO线程
NioEventLoopGroup boss = new NioEventLoopGroup(1);
//2.创建管道并注册管道
NioDatagramChannel datagram = new NioDatagramChannel();
boss.register(datagram);
//3.初始化pipeline,为其添加Channel Handler处理器
datagram.pipeline().addLast(new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(msg);
}
});
//4.绑定端口,绑定后即启动IO线程,并对外开放服务
datagram.bind(new InetSocketAddress(8080));
System.in.read();// 防止主线程结束
绑定端口是一个IO操作,所以实际执行是会被以任务的形式提交到IO线程(NioEventLoop)。但其最终还是调用的JAVA 原生NIO 的绑定方法。这部分内容就在NioDatagramChannel.doBind() 方法中体现。
Bootstrap.bind()绑定方法也是通过channel.bind
以此类推调用Channel.write()操作,同样会被异步封装后,然后调用doWrite(),最终调到java nio Channel 中的Write或send。
类似方法还有很多:
bind==>doBind ==> java.nio.channels.Channel.bind()
write==>doWrite==> ...
connect==>doConnect==>...
close==>doClose ==>...
disconnect==>doDisconnect ==>.....
read==>doReadBytes ==> .....
由此可见doXXX方法即是直接对原生 Channel的IO调用。在非IO线程调用这是一种不安全的形为,所以所有do开头的方法都是不开放的(protected)。好在Netty 在Channel 中还提供了一个Unsafe 可以直接调用这些方法。
Unsafe
Unsafe 是Channel当中一个内部类,可以用于直接操作Channel 中的IO方法。而不必经过异步和管道。所以在Unsafe中调用IO方法它会立即返回。 但正如它的名字一样,这不是一种不安全的形为,需要调用者自已确保当前对Unsafe的调用是在IO线程下,否则就会报异常。以下方法除外:
localAddress()
remoteAddress()
closeForcibly()
//这是一个异步方法不会立马返回,而是完成后通知ChannelPromise
register(EventLoop, ChannelPromise)
deregister(ChannelPromise)
voidPromise()
Unsafe 是Channel的内部类, 不同的Channel 会对应不同的Unsafe 所提供的功能也不一样。如,其结构与继承关系如下图:
另外要说明的是 在Unsafe并不只是作为中介把调用转发到Cahnnel,其还提供如下作用:
-
线程检测:当前调用是否为IO线程
-
状态检测:写入前判断是否已注册
-
写入缓存:Write时把数据写入临时缓存中,当flush时才真正提交
-
触发读取:EventLoop 会基于读取事件通知Unsafe ,在由unsafe读取后 发送到pipeline
-
所以Unsafe中最核心作用并不是给开发者调用而是其内部的组件调用。他在Channel、Eventloop、Pipeline这个三组件间启动了一个桥梁作用。
如在一个次读取场景中流程是这样的:
- EventLoop 触发读取并通知unsafe // unsafe.read()
- unsafe调用channel 读取消息 // channel.doReadMessages(ByteBuf)
- unsafe将消息传入pipeline (pipeline触发消息入站) // pipeline.fireChannelRead(msg)
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 调用channel的读取消息
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// unsafe将消息传入pipeline
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
写入过程:
- 业务开发调用channel写入消息 //channel.write(msg)
- channel将消息写入 pipeline // pipeline.write(msg)
- pipeline 中的Handler异步处理消息 //ChannelOutboundHandler.write()
- pipeline调用unsafe写入消息 //unsafe.write(msg);
- unsafe调用Channel 完成写入 // channel.doWrite(msg)
private final class NioMessageUnsafe extends AbstractNioUnsafe {
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
try {
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
break;
}
} catch (IOException e) {
if (continueOnWriteError()) {
in.remove(e);
} else {
throw e;
}
}
}
}
}
ChannelPipeline
每个管道中都会有一条唯一的Pipeline 其用于流转的方式处理Channel中发生的事件比如注册、绑定端口、读写消息等。这些事件会在pipeline流中的格个节点轮转并依次处理,而每个节点就可以处理相对应的功能,这是一种责任链式的设计模式,其目的是为让各个节点处理理聚焦的业务
pipeline结构
事件是如何在pipeline中轮转的呢?其内部采用双向链表结构,通过ChannelHandlerContext 包装唯一的Handler,并通过prev与next属性分别链接节点上下的Context,从而组成链条。
pipeline中有Head与tail 两个Context对应链条的首尾。
ChannelHandler
ChannelHandler是指pipeline当中的节点,共有三种类型:
- 入站处理器:即ChannelInboundHandler的实现,可用于处理如消息读取等入站事件
- 出站处理器:即ChannelOutboundHandler的实现,可用于处理消息写入、端口绑定入出站事件
- 出入站处理器:即ChannelDuplexHandler的实现,可以处理所有出入站事件。某些协义的编解码操作想写在一个类里面,即可使用该处理器实现。
出入站事件
Channel 中的事件可分为出站与入站两种。
入站事件:是指站内发生的事件,如已读取的消息处理、管道注册、管道激活(绑定端口或已连接)这些都是由EventLoop基于IO事件被动开始发起的。请注意所有入站事件触发必须由ChannelInBoundInvoker的子类执行。
出站事件:出站事件是指向Channel的另一端(站发)发起请求或写入消息。如:bind、connect、close、write、flush 等。其均由ChannelOutboundInvoker触发并由ChannelOutboundHandler处理。与入站事件不同其都由开发者自己发起。
事件的触发
下图可以看出 pipeline 与Context分别实现了出入站接口,说明其可触发所有出入站事件,而Channel只承继出站口,只能触发出站事件。
ChannelHandlerContext
Context主要作用如下:
-
结构上链接上下节点
-
传递出入站事件,所有的事件都可以由Context进行上下传递
-
保证处理在IO线程上,前面所说所有的IO操作都需要异步提交到IO线程处理,这个逻辑就是由Context实现的。如下面的绑定操作就是保证了IO线程执行:
链条处理流程(事件传递)
出入站事件都是Channel 或pipeline 发起,并由Context进行上下传递。如果是入站事件将会从头部向下传递到尾部并跳过 OutboundHandler,而出站与之相反,从尾部往上传递,并跳过InboundHandler处理器。
接下来通过读写两个例子处理来说明pipeline的处理过程。
读取管道消息
1.初化UDP始道
NioDatagramChannel channel = new NioDatagramChannel();
new NioEventLoopGroup().register(channel);
Thread.sleep(100);
channel.bind(new InetSocketAddress(8081));
2.为管道添加入站处理节点1
pipeline.addLast(new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message = (String) msg;
System.out.println("入站事件1:"+msg);
message+=" 已处理";
ctx.fireChannelRead(message);
}
});
3.为管道添加出站处理节点1
//出站处理
pipeline.addLast(new ChannelOutboundHandlerAdapter(){
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("出站事件1:"+msg);
}
});
4.为管道添加入站处理节点2
pipeline.addLast(new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message = (String) msg;
System.out.println("入站事件2:"+msg);
message+=" 已处理";
ctx.writeAndFlush(message);
}
});
5.手动触发入站事件
pipeline.fireChannelRead("hello luban");
5.执行后得到如下结果
入站事件1:hello luban
入站事件2:hello luban 已处理
出站事件1:hello luban 已处理 已处理
执行流程说明:
- 基于pipeline 触发入站处理,其首由头部开始处理,并向下传递
- 节点1接收消息 ,并改写消息后通过ctx.fireChannelRead();往下传递
- 节点2接收消息,并打印。此时节占2并没有调用ctx.fireChannelRead(); 所以处理流程不会传到tail 节点
- 处理流程。
8.TCP粘包/拆包问题
TCP是一个“流”协议,所谓流就是没有界限的一串数据,可以想想河里的流水,是连成一片的,其间并没有分界线。TCP底层并不了解业务数据的具体含义,他会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也可能把多个小的包封装成一个大的包数据发送,这就是所谓的TCP粘包和拆包问题。
TCP粘包/拆包问题说明
我们通过图解对TCP粘包和拆包问题进行说明:
假设客户端分别发送了两个数据包D1和D2给服务器,由于服务器一次读取到的字节数是不确定的,故可能存在上面四种情况。
(1)服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
(2)服务端一次接收到了两个数据,D1和D2粘和在一起,被称为TCP粘包;
(3)服务端分两次读取到了两个数据包,第一次读取到完整的D1包和D2包的部分内容D2_1,第二次读取到了D2包的剩余内容D2_2,这被称为TCP拆包;
(4)服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和完整的D2包,这也是TCP拆包;
如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种情况,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。
TCP粘包/拆包发生的原因
问题产生的原因有三个,分别如下:
(1)应用程序write写人的字节大小大于套接口发送缓存区的大小
(2)进行MSS大小的TCP分段
(3)以太网帧的payload大于MTU进行IP分片。
图解如下:
TCP粘包/拆包的解决策略
由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议设计来解决,根据业界的主流协议的解决方案,可以归纳如下几种解决方案:
(1)消息定长,例如每个报文的大小为固定长度的200字节,如果不够,空位补空格;
(2)在包尾增加回车换行符进行分割,例如FTP协议;
(3)将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体的长度);
(4)更复杂的应用层协议。
9.Netty自带的几种常用的编解码类
为了解决TCP粘包/拆包导致的半包读写问题,Netty默认提供了多种编解码器用于处理半包。(处理TCP粘包/拆包实现一般都是在解码类中实现) 由于TCP 以流的方式进行数据传输,上层的应用协议为了对消息进行区分,往往采用如下4中方式。
(1)消息长度固定,累计读取到的长度总和为定长 LEN 的报文后,就会认为读取到了一个完整的消息:将计数器置位,重新开始读取下一个数据报文;
(2)将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛
(3)将特殊的分隔符作为消息结束的标志,回车换行符就是一种特殊的结束分隔符;
(4)通过消息头中定义长度字段来标识消息的总长度
Netty 对上面四种应用做了统一的抽象,提供了4种解码器来解决对应的问题,使用起来非常方便。有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包问题。
FixedLengthFrameDecoder
FixedLengthFrameDecoder 也叫定长解码器,它可以自动完成对定长消息的解码。对应上面第一种方式
ch.pipeline().addLast(new LineBasedFrameDecoder(20));
参数表示消息的固定长度
LineBasedFrameDecoder
LineBasedFrameDecoder 的工作原理是依次遍历 ByteBuf 中的可读字节,判断是否存在 “\n” 或者 “\r\n” ,如果有,就以此位置为结束位置,从可读索引到结束索引位置区间的字节就组成了一行。她是以换行符为结束标志的解码器。支持携带结束符或者不携带结束符两种解码方式,同时支持配置当行最大长度。如果连续读取到的最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。对应上面第二种方式
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
参数表示单条消息的最大长度
DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder 也叫分隔解码器,它可以自动完成指定分隔符作为码流结束标识的消息解码。对应上面第三种方式
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
第一个参数1024,表示单条消息的最大长度,当达到该长度后仍然没有找到分隔符,就会抛出 TooLongFrameException 异常,防止由于异常码流缺失分隔符导致内存溢出,这是 Netty 的解码器的可靠性保护;第二个参数就是分隔符缓冲对象。
LengthFieldBasedFrameDecoder
LengthFieldBasedFrameDecoder 也叫自定义长度解码器,它是通过消息的长度进行区分的,只需要传入正确的参数就可以轻松搞定读半包的问题。对应上面第三种方式,也是最为通用的方式。
LengthFieldBasedFrameDecoder是自定义长度解码器,所以构造函数中6个参数,基本都围绕那个定义长度域,进行的描述。
-
maxFrameLength – 发送的数据帧最大长度
-
lengthFieldOffset – 定义长度域位于发送的字节数组中的下标。换句话说:发送的字节数组中下标为${lengthFieldOffset}的地方是长度域的开始地方
-
lengthFieldLength – 用于描述定义的长度域的长度。换句话说:发送字节数组bytes时, 字节数组bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength]域对应于的定义长度域部分
-
lengthAdjustment – 满足公式: 发送的字节数组bytes.length – lengthFieldLength = bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength] + lengthFieldOffset + lengthAdjustment
-
initialBytesToStrip – 接收到的发送数据包,去除前initialBytesToStrip位
-
failFast – true: 读取到长度域超过maxFrameLength,就抛出一个 TooLongFrameException。false: 只有真正读取完长度域的值表示的字节之后,才会抛出 TooLongFrameException,默认情况下设置为true,建议不要修改,否则可能会造成内存溢出
举个栗子:
根据图可知:
第二个参数为1,数组位置从0开始,第三个参数为2,由于解码后不丢数据所以第四个参数为0,通过公式
发送数据包长度 = 长度域的值 + lengthFieldOffset + lengthFieldLength + lengthAdjustment。
可以计算出第四个参数为-3。这里第一个参数设置为了int的最大值可以根据业务自己修改。
所以代码为:
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 1, 2, -3, 0));
请记住公式: 发送数据包长度 = 长度域的值 + lengthFieldOffset + lengthFieldLength + lengthAdjustment。
自定义解码器
虽然netty为我们提供了解码器,这里提供一个自定义解码例子来理解netty内部是怎么实现自动解码的。
这里是通过继承ByteToMessageDecoder来实现解码,通过ByteToMessageDecoder我们能轻松实现自己的解码器。(上面介绍的四种也是继承ByteToMessageDecoder实现)
同样以 LengthFieldBasedFrameDecoder 自定义长度解码器中的例子我们实现自己的解码器
代码如下:
package com.gbcom.idic.rosterCollector.netty.server.decoder;
import com.gbcom.idic.rosterCollector.netty.common.constants.CubeMsgConstant;
import com.gbcom.idic.rosterCollector.netty.common.pojo.CubeMsg;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* @Author: dinghao
* @Date: 2020/10/22 15:11
*/
@Slf4j
public class TransforDecoder extends ByteToMessageDecoder {
/**
* devType 1
* length 2
* devId 17
* msgType 2
* seqence 4
* data N
*/
//最小的数据长度:开头标准位1字节
private static int MIN_DATA_LEN = CubeMsgConstant.devTypeLength + CubeMsgConstant.totalLength + CubeMsgConstant.devIdLength + CubeMsgConstant.msgTypeLength + CubeMsgConstant.seqenceLength;
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int readableBytes = byteBuf.readableBytes();
if (readableBytes >= MIN_DATA_LEN) {
log.debug("开始解码数据……");
//标记读操作的指针
byteBuf.markReaderIndex();
// 读取设备类型
byte devType = byteBuf.readByte();
// 读取消息长度(消息头 + 消息体)
short length = byteBuf.readShort();
if (length > readableBytes) {
log.debug("数据长度不够,数据协议len长度为:{},数据包实际可读内容为:{}正在等待处理拆包……", length, readableBytes);
byteBuf.resetReaderIndex();
/*
**结束解码,这种情况说明数据没有到齐,在父类ByteToMessageDecoder的callDecode中会对out和in进行判断
* 如果in里面还有可读内容即in.isReadable为true,cumulation中的内容会进行保留,,直到下一次数据到来,将两帧的数据合并起来,再解码。
* 以此解决拆包问题
*/
return;
}
byte[] devIdData = new byte[CubeMsgConstant.devIdLength];
byteBuf.readBytes(devIdData);
String devId = new String(devIdData, StandardCharsets.UTF_8);
short msgType = byteBuf.readShort();
int seqence = byteBuf.readInt();
byte[] data = new byte[length - MIN_DATA_LEN];
byteBuf.readBytes(data);
CubeMsg msg = new CubeMsg();
msg.setDevType(devType);
msg.setLength(length);
msg.setDevId(devId);
msg.setMsgType(msgType);
msg.setSeqence(seqence);
msg.setData(data);
list.add(msg);
if (length < readableBytes) {
//如果out有值,且in仍然可读,将继续调用decode方法再次解码in中的内容,以此解决粘包问题
log.debug("数据长度过长,数据协议len长度为:{},数据包实际可读内容为:{}正在等待处理粘包……", length, readableBytes);
}
} else {
log.debug("数据长度不符合要求,期待最小长度是:" + MIN_DATA_LEN + " 字节");
return;
}
}
}
ByteToMessageDecoder 源码分析:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
/**
* Called once data should be decoded from the given {@link ByteBuf}. This method will call
* {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
*/
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
10.基于Netty实现私有协议
下面利用上面介绍netty相关的知识来实现基于Netty的私有协议。自定义协议时要保证协议设计的可靠性,安全性以及可拓展性。
可靠性设计
Netty 协议可能会运行在非常恶劣的网络环境中,网络超时、闪断、对方进程僵死或者处理缓慢等情况情况都有可能发生。为了保证在这些极端异常的场景下Netty协议仍能够正常的工作或者自动恢复,需要对它的可靠性进行统一规划和设计。
-
心跳机制
采用心跳机制来检测链路的互通性,一旦发现网络故障,立即关闭链路,主动重连
-
重连机制
一般和心跳机制一起使用,当链路关闭时,间隔一定时间进行重连
-
重复登录保护
当客户端连接成功后,链路处于正常的状态下,不允许客户端重复登录,以防止客户端异常的状态下反复重连导致句柄资源被耗尽
-
消息缓存重发
无论客户端还是服务端,当链路终端之后,在链路恢复之前,缓存在消息队列的待发送数据不能丢失,等链路恢复后,重新发送这些消息,保证链路中断期间消息不丢失。考虑内存溢出问题,建议消息队列设置上限,当达到上限之后,应该拒绝继续向队列中添加新的消息或者清除旧消息
安全性设计
为了保证服务端安全性,服务端需要对客户端的IP地址进行合法性校验,如果在白名单内,则检验通过;否则拒绝对方连接
可拓展性设计
协议需要具备一定的拓展能力
代码实现:
11.Netty实现自己的内网穿透工具
本节通过Netty实现简单的内网穿透工具,其原理如下:
服务端部署在外网环境,服务端是开放7777端口的tcp服务
客户端配置了需要代理的本地端口80(这里可以随意更改)以及对外访问的端口10000(这里可以随意更改)和连接服务端的端口7777(客户端和服务端约定好的)
1、当服务端启动时,开放7777端口的tcp服务启动,开始接收客户端的连接
2、客户端启动时连接服务端的7777端口建立tcp连接,连接成功后发送账号密码给服务端
3、服务端验证账号密码成功后在服务端建立客户端需要对外访问端口(10000为例)的tcp服务,并返回客户端与服务连接是否成功信息,失败服务端会关闭与客户端的连接
4、客户端根据返回的信息确定是否确定关闭程序。此时外部可以通过服务端直接访问客户端对外访问的端口了
5、浏览器访问10000端口
6、浏览器与服务端的10000端口建立连接成功,此时服务端通过与客户端(通过7777端口建立的连接)连接的端口的通道向客户端发送一条携带浏览器与服务端的10000端口建立连接通道的id的请求连接消息
7、客户端收携带id的请求连接后,客户端与本地服务(80为例)建立连接,并将id与客户端和本地服务的连接放入Map中。
8、浏览器与服务端的10000端口建立连接成功后,服务端再次通过与客户端(通过7777端口建立的连接)连接的端口的通道向客户端发送一条携带浏览器与服务端的10000端口建立连接通道的id和请求数据的数据消息
9、客户端收到id以及请求数据(浏览器的请求数据),将请求数据转发到客户端和本地服务的通道中(id获取)
10、客户端读取本地服务的响应,并将响应的结果和id通过服务端通过与客户端(通过7777端口建立的连接)连接的端口的通道返回给服务端
11、服务端读取响应,根据id获取服务端和浏览器的连接通过,将响应写入浏览器
代码实现:
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/5310.html