前言
前面写过《Netty 服务端启动全流程源码分析》,BossGroup 获取到客户端连接 SocketChannel 后会将其注册到 WorkerGroup,由 WorkerGroup 来驱动数据 IO 读写。WorkerGroup 的 EventLoop 监听到 Channel 有OP_READ
事件时,会调用Channel.Unsafe.read()
方法,Netty 会将读取到的数据包装成 ByteBuf,然后触发回调pipeline.fireChannelRead(byteBuf)
将事件传播出去。
整体流程是清楚了,但是对于详细的数据接收细节没有介绍,本篇文章会做介绍。
前置知识
熟悉 Java Nio 编程的同学应该知道,要想从 SocketChannel 读取数据,需要先创建一个 ByteBuffer,然后调用SocketChannel.read(ByteBuffer)
方法。但是由于读取操作并未实际发生,程序并不知道有多少数据需要接收,导致我们并不知道需要创建一个多大的 ByteBuffer,大了会造成内存的浪费,小了又需要频繁扩容。而且 ByteBuffer 本身不支持扩容操作,你需要重新申请一个更大的 ByteBuffer,然后进行内存的复制,开销就更大了。Netty 是如何解决这个问题的呢?后面会介绍。
还有一个知识点,读者需要提前了解。对于注册到Selector
多路复用器上,且监听OP_READ
事件的 Channel,Selector
判断的其实就是 Channel 的有效可读字节数。意思就是说,对于有数据可读的 Channel,如果你数据没有读完,下次select()
多路复用器依然会再返回它。所以,Netty 会进行循环读,前面说过了,你不知道对端会发送给你多少数据,默认单次最多读 16 次,超过 16 次数据还没读完,本次就不再继续处理了,因为 Netty 怕阻塞其他 IO 事件,后面会详细分析。
AbstractNioByteChannel.read()分析
之前的文章已经分析过,当 Netty 检测到 Channel 有可读事件时,会调用AbstractNioByteChannel.read()
方法,下面是该方法整体的一个源码分析:
/*
客户端发送数据时触发。
见 io.netty.channel.nio.NioEventLoop.processSelectedKey
*/
@Override
public final void read() {
// 客户端Channel的配置
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 获取ByteBufAllocator,默认是 PooledByteBufAllocator
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
// 重置统计信息
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
/*
当对端发送一个超大的数据包时,TCP会拆包。
OP_READ事件只会触发一次,Netty需要循环读,默认最多读16次,因此ChannelRead()可能会触发多次,拿到的是半包数据。
如果16次没把数据读完,没有关系,下次select()还会继续处理。
对于Selector的可读事件,如果你没有读完数据,它会一直返回。
*/
do {
// 分配一个ByteBuf,大小能容纳可读数据,又不过于浪费空间。
byteBuf = allocHandle.allocate(allocator);
/*
doReadBytes(byteBuf):ByteBuf内部有ByteBuffer,底层还是调用了SocketChannel.read(ByteBuffer)
allocHandle.lastBytesRead()根据读取到的实际字节数,自适应调整下次分配的缓冲区大小。
*/
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// 没有读取到字节
byteBuf.release();//释放ByteBuf,空的,没有意义。
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 递增已经读取的消息数量
allocHandle.incMessagesRead(1);
readPending = false;
// 通过pipeline传播ChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());//判断是否需要继续读
// 读取完毕,pipeline传播ChannelReadComplete事件
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
这里我们重点关注一下recvBufAllocHandle()
方法,很简单,就是对 Channel 绑定的recvHandle
进行了判空校验,如果没绑定就创建一个。
@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) {
// 如果Channel对应的recvHandle是空的,则创建一个新实例
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}
Channel 会依赖 RecvByteBufAllocator.Handle 来创建 ByteBuf,为什么不直接创建呢?因为前面说过的,不知道真正有多少数据要接收,不知道该创建多大的 ByteBuf,大了浪费空间,小了又要频繁扩容。基于这个原因,Channel 会把创建 ByteBuf 的任务交给 RecvByteBufAllocator.Handle 处理,希望它可以基于历史数据做统计分析,分配出一个容量大到足够容纳所有的数据,又小到不会浪费太多的空间。
RecvByteBufAllocator.Handle 的细节后面分析,这里先把read()
流程分析完。
创建好一个大小合适的 ByteBuf 之后,Channel 会调用doReadBytes(byteBuf)
将数据写入到 Bytebuf:
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 设置 尝试读取的字节数,尽量把ByteBuf填满
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
// 将SocketChannel读取的数据写入到byteBuf
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
byteBuf.writeBytes(ScatteringByteChannel in, int length)
其实就是将 ByteBuf 转换成 JDK 的 ByteBuffer,然后通过 JDK 原生的 SocketChannel 进行读取,它会返回实际的读取字节数。
如果实际读取的字节数小于等于 0,说明没有数据可读了,本次OP_READ
事件处理完毕,触发pipeline.fireChannelReadComplete()
回调,否则触发pipeline.fireChannelRead(byteBuf)
将读取到的 ByteBuf 传递给其他 ChannelHandler 处理。
如果对端发送的数据包很大,很可能创建的 ByteBuf 不能一次性读完所有数据,所以 Channel 这里会进行循环读,整个读取的逻辑会放在一个while
循环里,通过allocHandle.continueReading()
判断是否需要继续读取数据。因此,即使 TCP 没有发生拆包,如果创建的 ByteBuf 过小,ChannelHandler 的channelRead()
也会被触发多次,所以,切记不可错误的理解为「channelRead()是因为 TCP 拆包导致的」。
RecvByteBufAllocator 分析
RecvByteBufAllocator 是 Netty 的「数据接收缓冲区分配器」,Channel 依赖它来创建大小合适的 ByteBuf,提升性能和节省内存。
RecvByteBufAllocator 是一个很简单的接口,它的工作由内部接口 Handle 完成,所以直接看 Handle 接口就行了。
interface Handle {
/*
通过ByteBufAllocator分配一个大小合适的ByteBuf。
太大:浪费空间。
太小:频繁扩容,内存复制开销。
*/
ByteBuf allocate(ByteBufAllocator alloc);
// 猜测需要分配的字节数
int guess();
// 重置已累积的任何计数器,并建议为下一个读循环应读取多少消息字节。
void reset(ChannelConfig config);
// 增加已读的消息数量
void incMessagesRead(int numMessages);
// 设置上一次读取到的字节数,AdaptiveRecvByteBufAllocator会根据该值 自适应调整下次分配的缓冲区大小。
void lastBytesRead(int bytes);
// 获取上一次读取的字节数
int lastBytesRead();
// 设置尝试读取的字节数
void attemptedBytesRead(int bytes);
// 获取尝试读取的字节数
int attemptedBytesRead();
// 是否还能继续读取
boolean continueReading();
// 读取完成
void readComplete();
}
默认使用的 RecvByteBufAllocator 实现是 AdaptiveRecvByteBufAllocator,它可以自适应调整分配的 ByteBuf 大小,我们重点分析。
AdaptiveRecvByteBufAllocator 类图如下:
从上往下看吧,MaxMessagesRecvByteBufAllocator 很简单,在顶级接口的基础之上,限制了循环读的次数:
/*
当有可读事件时,Netty是循环读的,通过continueReading()判断是否需要继续读取。
该类主要是用来限制循环读取的次数的,默认是16.
*/
public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator {
// 返回循环读的最大次数
int maxMessagesPerRead();
// 设置循环读的最大次数
MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);
}
DefaultMaxMessagesRecvByteBufAllocator 是 MaxMessagesRecvByteBufAllocator 的默认实现,先看属性:
// 最大读取消息数量
private volatile int maxMessagesPerRead;
/*
是否尊重/关心 还有更多的数据可读?
如果为true,则无条件认为还有数据可读,直到下次循环读取到0字节为止。
这可能会导致多执行一次无效读,无意义的创建一个ByteBuf。
*/
private volatile boolean respectMaybeMoreData = true;
核心逻辑都在 Handle 里,所以直接看 Handle 即可:
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
// 最大读取多少次消息,默认16次,没读完,下次select接着读。
private int maxMessagePerRead;
// 读取的总消息数
private int totalMessages;
// 读取的字节总数
private int totalBytesRead;
// 尝试读取的字节数,默认是ByteBuf的可写字节数,即尽量把ByteBuf填满。
private int attemptedBytesRead;
// 上次读取的字节数,根据它调整下次分配的缓冲区大小。
private int lastBytesRead;
private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
/*
是否还有更多数据可读的默认判断:attemptedBytesRead == lastBytesRead。
即:本次读取的数据有没有填满ByteBuf,如果填满了,说明可能还有数据要读。否则就不读了,直接触发ChannelReadComplete()。
*/
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return attemptedBytesRead == lastBytesRead;
}
};
// 根据config重置数据,每次处理新的Read事件时触发。
@Override
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
// 根据猜测的字节数,分配一个ByteBuf。
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
// 递增统计消息的读取数量,默认超过16就不读了,防止阻塞IO线程,其他事件得不到处理。
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
// 根据上次读取的字节数,累加总读取到的字节数
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
}
}
@Override
public final int lastBytesRead() {
return lastBytesRead;
}
// 是否还要继续循环读取消息
@Override
public boolean continueReading() {
/*
判断依据:
1.认为还有可读数据
2.读取的消息数没有达到上限
*/
return continueReading(defaultMaybeMoreSupplier);
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) // 认为还有可读数据
&&
totalMessages < maxMessagePerRead && totalBytesRead > 0;// 读取的消息数没有达到上限
}
@Override
public void readComplete() {
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
// 设置尝试读取的字节数,默认为ByteBuf的可写字节数
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
protected final int totalBytesRead() {
return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
}
}
MaxMessageHandle 会根据每次读取的字节数是否填满 ByteBuf 为依据,判断是否还要继续循环读。如果填满了说明 Channel 可能还有数据等待读取,反之已无数据可读,直接跳出循环即可。
说完父类,接下来看核心的 AdaptiveRecvByteBufAllocator,源码也不是很长:
/*
自适应的,接收对端数据的ByteBuf分配器,分配的ByteBuf有合适的初试容量。
避免太小导致频繁扩容,太大导致内存浪费,GC压力。
基于历史的数据采集做预测:
1.前一次接收的数据完全读满了ByteBuf,则下次会增大缓冲区。
2.连续两次接收的数据小于指定值,则会缩小下次分配的缓冲区。
*/
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
// 默认最小值
static final int DEFAULT_MINIMUM = 64;
// 默认初试值
static final int DEFAULT_INITIAL = 2048;
// 默认最大值
static final int DEFAULT_MAXIMUM = 65536;
/*
如果需要扩容下次分配的缓冲区大小,这个是扩容的索引步长。
*/
private static final int INDEX_INCREMENT = 4;
/*
如果需要缩容下次分配的缓冲区大小,这个是缩容的索引步长。
*/
private static final int INDEX_DECREMENT = 1;
// 扩容表
private static final int[] SIZE_TABLE;
static {
List<Integer> sizeTable = new ArrayList<Integer>();
// 512字节内,以16字节为步长,递增
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
// 512字节后,成倍扩容
for (int i = 512; i > 0; i <<= 1) { // lgtm[java/constant-comparison]
sizeTable.add(i);
}
// List转数组
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
}
/**
* @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
*/
@Deprecated
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
/*
通过给定size查找下标,二分查找法。
如果size不在SIZE_TABLE内,返回最接近它的一个稍小值的索引
*/
private static int getSizeTableIndex(final int size) {
for (int low = 0, high = SIZE_TABLE.length - 1;;) {
if (high < low) {
return low;
}
if (high == low) {
return high;
}
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b) {
low = mid + 1;
} else if (size < a) {
high = mid - 1;
} else if (size == a) {
return mid;
} else {
return mid + 1;
}
}
}
private final class HandleImpl extends MaxMessageHandle {
// 最小容量的索引
private final int minIndex;
// 最大容量的索引
private final int maxIndex;
// 默认容量的索引
private int index;
// 下次接受的缓冲区大小
private int nextReceiveBufferSize;
// 是否需要立即缩容?因为需要连续两次读取的字节数小于阈值,第一次设为true,第二次才缩容。
private boolean decreaseNow;
HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
// 根据初始值得到SIZE_TABLE的下标
index = getSizeTableIndex(initial);
// 第一次分配的缓冲区大小就是initial默认值
nextReceiveBufferSize = SIZE_TABLE[index];
}
/*
Unsafe.read()循环读数据时会调用该方法,bytes是上一次实际读取到的字节数。
*/
@Override
public void lastBytesRead(int bytes) {
/*
attemptedBytesRead():尝试读取的字节数。
NioSocketChannel.doReadBytes()会将attemptedBytesRead设置为ByteBuf.writableBytes(),
即只要有数据可读,Netty会尽量将ByteBuf写满。
*/
if (bytes == attemptedBytesRead()) {
// 实际读取的字节数填满了缓冲区,则扩容。
record(bytes);
}
// 调用父类方法,将数值累加到 totalBytesRead
super.lastBytesRead(bytes);
}
/*
预测下次需要分配的缓冲区大小
*/
@Override
public int guess() {
return nextReceiveBufferSize;
}
/*
根据实际读取到的字节数,自适应调整 下次应该分配的缓冲区大小
*/
private void record(int actualReadBytes) {
// 如果连续两次,读取的字节数 小于等于 前一个容量大小
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
// 根据-1的索引步长进行缩容,需要连续两次触发才缩容,所以有一个decreaseNow
if (decreaseNow) {
index = max(index - INDEX_DECREMENT, minIndex);//确保不低于最小值
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
/*
根据+4的索引步长进行扩容,但不能超过最大值。
因此默认情况下的扩容逻辑:
2048 > 32768 > 65536 > 65536(不变...)
2k > 32K > 64K ...
*/
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
@Override
public void readComplete() {
// 数据读取完毕,根据本次读取的总字节数,自适应调整下次应该分配的缓冲区大小
record(totalBytesRead());
}
}
// 最小、默认、最大容量在SIZE_TABLE中的下标
private final int minIndex;
private final int maxIndex;
private final int initial;
// 根据默认值创建一个:自适应接受缓冲区分配器:64、2048、65535
public AdaptiveRecvByteBufAllocator() {
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}
// 根据指定最小、默认、最大容量创建一个:自适应接受缓冲区分配器
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
checkPositive(minimum, "minimum");
if (initial < minimum) {
throw new IllegalArgumentException("initial: " + initial);
}
if (maximum < initial) {
throw new IllegalArgumentException("maximum: " + maximum);
}
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum) {
this.minIndex = minIndex + 1;
} else {
this.minIndex = minIndex;
}
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum) {
this.maxIndex = maxIndex - 1;
} else {
this.maxIndex = maxIndex;
}
this.initial = initial;
}
@SuppressWarnings("deprecation")
@Override
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);
}
@Override
public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
super.respectMaybeMoreData(respectMaybeMoreData);
return this;
}
}
AdaptiveRecvByteBufAllocator 根据名字就能看出来,它是一个自适应的数据接收缓冲区分配器,这个「自适应」体现在 ByteBuf 空间的分配上。
它使用三个属性分别定义 ByteBuf 分配的默认值、最小值、最大值,使用一个 int 数组SIZE_TABLE
来定义扩/缩容的容量表,512 字节内,以 16 字节为步长扩容,512 字节后,成倍扩容。lastBytesRead()
会记录下每次循环读取的实际字节数,如果读取的字节填满了 ByteBuf,则会调用record(bytes)
进行扩容,扩容策略为:扩容索引+4,即 16 倍扩容,在不超过最大值的前提下,默认的扩容策略如下:
2048 > 32768 > 65536 > 65536(不变...)
2k > 32K > 64K ...
如果连续两次读取的字节数小于等于前一个缩容容量,则会进行缩容,缩容的策略是容量索引位-1,即 512 字节后,成倍缩容,512 字节前,缩小 16 字节。
由于需要连续两次读取的字节数少才会缩容,所以加了一个属性decreaseNow
来记录是否需要立即缩容,第一次触发它为 true,第二次才缩容。
为什么需要连续两次呢?因为一次读取的字节数少可能是因为读到了上一个数据包的末尾,数据包本身还是很大的,所以不能缩容。连续两次才能说明一个完整的数据包很小,下次分配的 ByteBuf 可以小些以节省内存。
循环结束后,会调用allocHandle.readComplete()
,它会根据此次读取的总字节数去做动态调整,为下次分配 ByteBuf 提供预测。例如,本次处理OP_READ
事件,循环读 3 次,每次读取了 100KB,那么下次就会直接分配一个 300KB 的 ByteBuf,争取一次性读完。相反,如果读取的字节数少,就缩容节省内存。
总结
Channel 在接收对端数据时,因为不知道该分配多大的 ByteBuf 来接收,所以会将 ByteBuf 的分配任务交给 RecvByteBufAllocator,期望它能分配一个容量大到可以足够容纳数据,又小到不会浪费太多内存的 ByteBuf,默认的实现是 AdaptiveRecvByteBufAllocator,它会根据前面实际读取的字节数,自适应的调整下次分配的 ByteBuf 大小。
虽然 AdaptiveRecvByteBufAllocator 会尽量去预测下次分配 ByteBuf 的大小,但是预测会有不准的时候,因此 Channel 还是会进行循环读,防止 ByteBuf 分配的过小无法容纳所有数据。但是为了避免 IO 线程阻塞,其他 Channel 的事件得不到处理,默认会限制单次最多循环读 16 次,如果发送的数据包真的非常大,16 次都没有读完,Netty 本次也会放弃处理,等待下次select()
轮询时再处理。
最后,再提醒一句,不要错误的理解为:channelRead()
的触发是因为 TCP 拆包导致的!!!
原文始发于微信公众号(程序员小潘):【Netty】RecvByteBufAllocator源码分析
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29384.html