上一篇文章对 RecvByteBufAllocator 的源码进行了分析,介绍了 Netty 是如何接收对端发送过来的数据。以及 Netty 是如何通过 AdaptiveRecvByteBufAllocator 来自适应调整 ByteBuf 的动态分配,解决 Java ByteBuffer 分配过大浪费内存,分配过小又需要频繁扩容的问题。
本篇文章会分析,Netty 是如何将数据发送出去的。
前置知识
Netty 支持的数据传输类型
首先你需要知道,通过 Netty 来发送数据,它只支持两种数据类型:ByteBuf 和 FileRegion。前者可以看作是 ByteBuffer,普通的字节数据传输。而后者是文件传输,Netty 通过 FileRegion 来实现文件传输的零拷贝。
write()并不会发送数据,只是简单的将数据暂存到 ChannelOutboundBuffer。flush()才是真正的将数据通过 Socket 传输给对端。writeAndFlush()只是简单的执行以上两个方法而已。
当程序 write 了大量数据,或者虽然调用了 flush(),但是由于对端来不及接收数据,再或者由于网络原因等等情况,导致 TCP 缓冲区被写满,大量的消息积压在 ChannelOutboundBuffer,导致内存溢出。为了保护你的程序,Netty 给 Channel 设置了「高低水位线」,当积压的消息超过了高水位,Netty 会将 Channel 设为「不可写」状态并触发channelWritabilityChanged
回调,你可以通过Channel.isWritable()
判断是否要继续写数据。通过ChannelConfig.setWriteBufferHighWaterMark()
和ChannelConfig.setWriteBufferLowWaterMark()
设置 Channel 的高低水位线。
既然 write()操作是用户自己发起的,为啥还要订阅 Channel 的OP_WRITE
事件呢?因为 TCP 缓冲区可能被写满,此时你就应该订阅OP_WRITE
事件,暂时放弃写操作,等待Selector
通知你 Channel 可写时,你再继续写。
Java 原生的 SocketChannel 只支持写入 ByteBuffer,当你通过 Netty 写入 ByteBuf 时,它会将 ByteBuf 转换成 ByteBuffer 再写入,方法是ByteBuf.internalNioBuffer()
。
清楚 Java 对象在 JVM 中的内存布局
write(msg)
时,会将 msg 包装成 Entry 节点加入到链尾,其中一个属性pendingSize
记录着消息占用的内存空间,这个空间大小除了 msg 数据本身占用的空间外,还包含 Entry 对象占用的空间,因此默认会额外再加上 96。为啥是 96 后面会说明,首先你应该知道对象的对象头最大占用 16 字节,对象引用最少占用 4 字节,最多占用 8 字节,一个 long 类型占用 8 字节,int 类型占用 4 字节,boolean 类型占用 1 字节。另外 JVM 要求 Java 对象占用的空间必须是 8 字节的整数倍,因此还会有 padding 填充字节。
ChannelHandlerContext.writeAndFlush()分析
如下,分别是发送一个 ByteBuf 和 FileRegion 的简单示例:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 发送一个 hello
ctx.writeAndFlush(Unpooled.wrappedBuffer("hello".getBytes()));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 传输一个 a.txt 文件
RandomAccessFile accessFile = new RandomAccessFile("/disk/a.txt", "r");
DefaultFileRegion region = new DefaultFileRegion(accessFile.getChannel(), 0, accessFile.length());
ctx.writeAndFlush(region);
}
我先说一下 writeAndFlush 的整体流程,实际的发送细节下一节会解释。
调用ctx.writeAndFlush()
,会从当前 Handler 往前找能处理 write 事件的 Handler,如果调用的是ctx.channel().writeAndFlush()
,则会从 Pipeline 的 TailContext 开始向前找能处理 write 事件的 Handler,事件传播的路径稍微有点区别。默认情况下,会找到 HeadContext 来处理,源码如下:
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 确保发送的消息不为空
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
// 往后找能处理 write事件的Channel,默认会找到HeadContext。
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
// 如果是EventLoop线程,则直接执行,否则提交一个任务串行化执行。
if (executor.inEventLoop()) {
if (flush) {
// 调用的是writeAndFlush(),所有flush为true,这里会调用HeadContext.invokeWriteAndFlush()
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
找到 HeadContext 后,调用其invokeWriteAndFlush()
方法,其实就是将 write 和 flush 放在一个方法里调用了:
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
// 先通过handler调用write()
invokeWrite0(msg, promise);
// 再通过handler调用flush()
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
先看invokeWrite0()
,它会调用HeadContext.write()
,由于 write 操作需要和 JDK 的底层 API 交互,于是操作又会被转交给Channel.Unsafe
执行:
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 需要和JDK底层API交互,转交给Unsafe执行。
unsafe.write(msg, promise);
}
接下来会调用AbstractChannel.AbstractUnsafe.write()
方法,它首先会对发送的数据做过滤,只支持 ByteBuf 和 FileRegion 两种类型。然后会计算发送的数据占用的内存大小,因为前面说过积压的消息一旦超过 Channel 的高水位线会将 Channel 设为「不可写」状态,防止内存溢出。这两步做完以后,会把消息添加到输出缓冲区 ChannelOutboundBuffer 中。
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {// outboundBuffer会随着Channel一同被创建,一般不会为null,这里做了校验。
try {
ReferenceCountUtil.release(msg);
} finally {
safeSetFailure(promise,
newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
}
return;
}
int size;
try {
// 过滤写出消息,确保是ByteBuf或FileRegion,其他对象不支持写出。
msg = filterOutboundMessage(msg);
/*
估算消息占用的内存,作用:
因为write()不会把消息写出到Socket,会暂存在内存里,直到flush()。
Netty为了防止消息堆积,会设置高低水位,消息暂存的总量达到最高水位会将Channel设置不可写状态,
以保护你的程序,避免内存溢出。
详见:io.netty.channel.DefaultMessageSizeEstimator.HandleImpl.size()
对于FileRegion,会直接返回0,因为使用了零拷贝技术,不需要把文件读取到JVM进程。
*/
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
try {
ReferenceCountUtil.release(msg);
} finally {
safeSetFailure(promise, t);
}
return;
}
// write()只会把消息暂存在outboundBuffer,不会真正发送。
outboundBuffer.addMessage(msg, size, promise);
}
关注一下filterOutboundMessage()
,它除了过滤消息,还会试图将 HeapByteBuf 转换成 DirectByteBuf。Netty 为了提升数据发送的效率,和 Socket 直接读写的数据会使用直接内存,避免 IO 操作再发生内存拷贝。
// 过滤出站消息,只支持写出ByteBuf和FileRegion。
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
// 为了避免内存复制,Socket直接读写的数据都使用堆外内存
return newDirectBuffer(buf);
}
// 文件传输
if (msg instanceof FileRegion) {
return msg;
}
// 不支持的数据类型,抛异常
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
newDirectBuffer()
并不保证一定转换成功,如果使用的 ByteBufAllocator 是未池化的,且没有开启io.netty.threadLocalDirectBufferSize
,那么就意味着 Netty 需要申请一个没有被池化的 DirectByteBuf,这个操作是非常昂贵的,Netty 会放弃转换:
// 试图将HeapByteBuf转换成DirectByteBuf,如果转换的开销很大会放弃。
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
// 可读字节数为0,直接释放并返回共享空对象。
ReferenceCountUtil.safeRelease(buf);
return Unpooled.EMPTY_BUFFER;
}
// 获取Channel绑定的ByteBufAllocator
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {// 分配器是否是池化的,且能分配直接内存?
// 创建一个指定大小的直接内存ByteBuf,将数据写入,原buf释放
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
/*
如果设置了io.netty.threadLocalDirectBufferSize,Netty会在线程的FastThreadLocal中通过Stack实现一个轻量级的
ByteBuf对象池,ByteBuf写出到Socket后,会自动释放,这里会将它再push到线程绑定的Stack中进行重用。
*/
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
// 申请一个未池化的直接内存ByteBuf开销是很大的,测试过,比堆内存的申请慢10倍都不止,这里会直接放弃。
return buf;
}
如果设置了io.netty.threadLocalDirectBufferSize
,Netty 会为每个线程创建指定数量的 ByteBuf 对象缓存,这些 ByteBuf 是可以被重用的。实现逻辑是 Netty 会在 FastThreadLocal 存放一个 Stack,需要时pop()
一个出来,用完时push()
归还。
再来关注一下MessageSizeEstimator
,它负责计算待发送数据占用的内存,逻辑很简单,对于 FileRegion 会返回 0,因为 FileRegion 传输文件时使用了零拷贝技术,直接使用 mmap 内存映射,而不需要将文件加载到 JVM 进程,实现直接看io.netty.channel.DefaultMessageSizeEstimator.HandleImpl.size()
:
// 估算消息的内存占用,逻辑还是很简单的。
@Override
public int size(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
// FileRegion实现了零拷贝,并不需要将文件加载到JVM,因此占用的内存为0,不影响Channel水位线。
if (msg instanceof FileRegion) {
return 0;
}
return unknownSize;
}
关于 ChannelOutboundBuffer 代码下节会详细分析,这里只需要知道 write()只会将数据暂存到 ChannelOutboundBuffer,而不会真正发送就行了。
消息存入 ChannelOutboundBuffer,write 操作就算完成了。紧接着会调用invokeFlush0()
,它依然会转交给 Unsafe 执行,调用AbstractChannel.AbstractUnsafe.flush()
。它会做两件事:先把 ChannelOutboundBuffer 中待发送的 Entry 标记为flushed
,然后将要发送的 Entry 数据转换成 Java 的 ByteBuffer,使用 SocketChannel 进行真正的数据发送。
@Override
public final void flush() {
assertEventLoop();
// 得到SocketChannel绑定的ChannelOutboundBuffer
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// 先将unflushed节点标记为flushed
outboundBuffer.addFlush();
// 开始发送数据
flush0();
}
flush0()
会开始发送数据,它首先会检测 Channel 是否活跃,如果是非活跃状态,此次 flush()操作将失败,Entry 会被移除。如果 Channel 正常,会调用doWrite()
进行数据发送。
protected void flush0() {
if (inFlush0) {// 避免上一次flush0()还没执行完时,又触发了
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {// 非空校验
return;
}
inFlush0 = true;
// 如果连接已经失活了。
if (!isActive()) {
try {
if (!outboundBuffer.isEmpty()) {
if (isOpen()) {
/*
通道是打开的,稍后可能会被激活。
1.释放msg
2.触发失败通知
3.回收Entry
4.递减消息挂起的字节数
*/
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
/*
道都被关闭了,和上面的处理流程类似,只是不用通过触发channelWritabilityChanged()回调了。
*/
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
}
}
} finally {
inFlush0 = false;
}
return;
}
try {
// 连接正常,执行真正的write()操作
doWrite(outboundBuffer);
} catch (Throwable t) {
handleWriteError(t);
} finally {
inFlush0 = false;
}
}
doWrite()
是数据发送的核心,由子类实现,这里直接看NioSocketChannel.doWrite()
。它会获取 Java 原生的 SocketChannel,将队列中待发送的 ByteBuf 转换成 ByteBuffer,然后循环发送数据。单次循环发送的数据量受以下两个条件限制:
-
ByteBuffer 的数量限制。 -
TCP 参数设置的缓冲区的大小限制(ChannelOption.SO_SNDBUF)。
如果 ChannelOutboundBuffer 积压了大量的数据,单次可能无法发送完,因此会默认循环发 16 次。循环次数过多可能会阻塞 IO 线程,导致其他 Channel 的事件得不到处理。
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
// 循环写的次数,默认16次。可能有大量消息积压在输出缓冲区,同时为了避免阻塞IO线程,做了次数限制。
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {// 没有数据要写出了
// 取消监听 OP_WRITE事件
clearOpWrite();
return;
}
// 发送缓冲区的最大值,由 TCP参数: ChannelOption.SO_SNDBUF 设置。
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
/*
将要flush的Entry转换成Java原生的ByteBuffer数组,因为做了总数和总字节数的限制,所以一次可能无法send所有数据。
注意,这里只会处理ByteBuf,不会处理FileRegion。
*/
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
// 其实就是 nioBuffers.length,上一步方法中会进行设置
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
// ByteBuf处理完了,但是可能还有FileRegion需要处理。
writeSpinCount -= doWrite0(in);
break;
case 1: {
// 只有单个ByteBuf需要发送的情况
ByteBuffer buffer = nioBuffers[0];
// 尝试发送的字节数
int attemptedBytes = buffer.remaining();
// Java原生的SocketChannel.wrote(ByteBuffer)发送数据
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {// TCP缓冲区满,订阅OP_WRITE事件,等待可写时再继续处理
incompleteWrite(true);
return;
}
// 动态调整 发送缓冲区大小
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// 删除已经发送的Entry节点
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// 发送缓冲区有多个ByteBuf待发送
// 尝试发送的字节总数
long attemptedBytes = in.nioBufferSize();
// 调用Java原生的SocketChannel.write()进行数据的发送,返回值是实际发送的字节数
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
// 写入字节为0,可能是TCP缓冲区满了,订阅OP_WRITE事件,等待TCP可写时再执行。
incompleteWrite(true);
return;
}
// 根据本次实际写入的字节数,动态调整发送缓冲区:ChannelOption.SO_SNDBUF
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
/*
删除已经发送的数据,是根据实际写入的字节数去删除的,而不是根据ByteBuf的数量。
从flushedEntry开始,计算每个ByteBuf的大小,按个删除。
可能存在某个ByteBuf发送部分数据的情况,会调整它的readerIndex。
*/
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
/*
只有在 nioBufferCnt处理完了,调用doWrite0(in)处理FileRegion,且没处理完毕时,才可能走到这里。
如果FileRegion没有处理完,writeSpinCount会小于0,这里会继续订阅OP_WRITE事件,等待Channel可写时继续处理。
*/
incompleteWrite(writeSpinCount < 0);
}
此外,NioSocketChannel.doWrite()
只会发送 ByteBuf,FileRegion 的发送需要调用父类的AbstractNioByteChannel.doWrite0()
处理。
/*
NioSocketChannel只负责发送ByteBuf,
FileRegion的发送这边会处理。
*/
protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current();
if (msg == null) {
// Directly return here so incompleteWrite(...) is not called.
return 0;
}
// 数据发送
return doWriteInternal(in, in.current());
}
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
// 没有数据可读,直接删除节点
in.remove();
return 0;
}
// Java底层write()
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
// 已经传输的字节数 >= 字节总数,代表文件已经传输完毕,删除节点。
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
// 调用region.transferTo(javaChannel(), position)文件传输
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {// 实际发送的字节数
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {//FileRegion发送完毕,移除节点
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
/*
一般不会走到这里,可能是数据没有处理完毕,返回一个Integer.MAX_VALUE,
让writeSpinCount小于0,这样它就会订阅OP_WRITE事件,等待Channel可写时继续处理。
*/
return WRITE_STATUS_SNDBUF_FULL;
}
需要注意的是,flush()
操作可能存在两种情况:
-
数据正常发送完毕。 -
数据没有发完,就已经超过了最大循环次数,为了不阻塞 IO 线程,下次再处理。 -
TCP 缓冲区满,数据无法发送。
对于后面两种情况,都属于「不完整写入」,因此会调用incompleteWrite(setOpWrite)
稍后继续处理。针对第三种情况,Netty 需要订阅OP_WRITE
事件,等待Selector
通知 Channel 可写时继续发送数据。setOpWrite
参数代表是否要监听OP_WRITE
事件:
/**
* 不完整写入
* @param setOpWrite 是否要订阅OP_WRITE事件
*/
protected final void incompleteWrite(boolean setOpWrite) {
// setOpWrite为true,一般都是TCP缓冲区满了,此时需要订阅OP_WRITE事件,等待Channel可写时再继续处理。
if (setOpWrite) {
// 订阅OP_WRITE事件
setOpWrite();
} else {
// 取消订阅OP_WRITE事件
clearOpWrite();
// 提交一个flush任务,稍后执行,避免阻塞IO线程。
eventLoop().execute(flushTask);
}
}
至此,writeAndFlush()
的整个流程就处理完了,对于 ChannelOutboundBuffer 本节没有进行分析,看下节。
ChannelOutboundBuffer 源码分析
ChannelOutboundBuffer 是 Netty 的数据发送缓冲区,它跟随 SocketChannel 一同被创建。
先看属性:
/*
将ByteBuf包装成Entry时,额外占用的字节大小,因为除了ByteBuf本身的数据外,Entry对象也占用空间。
为啥是96?为啥还支持修改??
1.96是Netty根据64位的JVM计算的最大值。
2.如果你的程序运行在32位的JVM中,或者对象引用开启了压缩,你可以根据实际情况修改这个值。
分析为啥最多会占用96字节:
在64位的JVM中,一个Entry对象占用以下空间:
- 16字节的对象头空间:8字节Mark Word,最大8字节Klass Pointer
- 6个对象引用属性,最小4*6=24字节,最大8*6=48字节
- 2个long属性,2*8=16字节
- 2个int属性,2*4=8字节
- 1个boolean属性,1字节
- padding对齐填充,JVM要求对象占用的内存为8字节的整数倍,这里是7字节
合计最多为96字节.
*/
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
/*
发送数据时,需要将待发送的ByteBuf转换成ByteBuffer,考虑到write是个很频繁的操作,
为了避免频繁创建数组,这里进行了复用,每个线程会复用自己的ByteBuffer[]。
*/
private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
@Override
protected ByteBuffer[] initialValue() throws Exception {
// 默认大小为1024,后面有必要还会扩容
return new ByteBuffer[1024];
}
};
// 绑定的SocketChannel
private final Channel channel;
// 已经flush,等待发送的头节点。
private Entry flushedEntry;
// 已经write但是没flush的头节点,flush()时会通过它一直往后找
private Entry unflushedEntry;
// 链尾节点
private Entry tailEntry;
// flush的节点数量,发送数据时会从flushedEntry开始往后找flushed个节点。
private int flushed;
// 单次循环写的Nio Buffer数量
private int nioBufferCount;
// 单次循环写的Nio Buffer总大小
private long nioBufferSize;
// flush是否失败
private boolean inFail;
// 计算totalPendingSize属性的偏移量,通过CAS的方式来做修改。
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
// 输出缓冲区暂存的消息占用的总内存,通过该值判断是否达到高低水位,以修改Channel的可写状态。
@SuppressWarnings("UnusedDeclaration")
private volatile long totalPendingSize;
// unwritable属性的偏移量,通过CAS的方式来修改。
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
// Channel是否可写的状态,0可写,1不可写。输出缓冲区内存达到高低水位线时修改。
@SuppressWarnings("UnusedDeclaration")
private volatile int unwritable;
// Channel可写状态变更时触发的任务,消息的积压达到高低水位线时触发
private volatile Runnable fireChannelWritabilityChangedTask;
它本身是一个单向链表,由一系列 Entry 节点组成。它有三个节点指针:
-
flushedEntry:已经 flush,等待被发送的起始节点指针。 -
unflushedEntry:已经 write,等待 flush 的起始节点指针。 -
tailEntry:链尾指针。
笔者花了一个简图,来表示它是如何工作的:上节说过,执行flush(msg)
操作时,只是把数据暂存到 ChannelOutboundBuffer,核心方法是addMessage()
,它主要做了两件事:
-
将 msg 封装成 Entry 节点,加入到链尾。 -
统计输出缓冲区的消息总字节数是否达到高水位线,如果达到则将 Channel 设为「不可写」状态,且触发 ChannelWritabilityChanged
回调。
/**
* 将消息暂存到ChannelOutboundBuffer,暂存成功promise就会收到通知。
* @param msg 待发送的数据:ByteBuf/FileRegion
* @param size 数据占用的内存大小
* @param promise write成功会收到通知
*/
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 将msg包装成一个Entry,并加入到链尾。
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
// tail不为空,则添加到它的next
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;// tailEntry指向新添加的节点
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// 统计消息挂起的字节数,如果超过高水位线,需要修改Channel的可写状态并触发回调
incrementPendingOutboundBytes(entry.pendingSize, false);
}
先看Entry.newInstance()
,它会将 msg 封装成 Entry 节点,加入到链尾。Entry 有个属性pendingSize
用来记录消息占用的内存空间,需要注意的是,它除了 msg 本身的数据空间,还会加上 Entry 对象占用的空间,一个 Java 对象占用多少空间是在编译期就确定下来的,除了属性占用的空间外,读者还需要了解 Java 对象的内存布局。
/**
* 创建一个Entry节点,从对象池中取一个
* @param msg 消息本身
* @param size 由MessageSizeEstimator估算出消息占用的内存大小
* @param total 消息本身的大小(区别是FileRegion的处理)
* @param promise write()完成会收到通知
* @return
*/
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
/*
每次write()都需要一个Entry,考虑到write()是一个非常频繁的操作,
为了避免Entry的频繁创建和销毁,这里做了对象池的重用处理。
*/
Entry entry = RECYCLER.get();
entry.msg = msg;
/*
占用的内存为什么还要加上CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD???
除了ByteBuf占用的空间外,Entry本身也占用空间啊。
在64位的JVM中:
- 16字节的对象头空间:8字节Mark Word,最大8字节Klass Pointer
- 6个对象引用属性,最小4*6=24字节,最大8*6=48字节
- 2个long属性,2*8=16字节
- 2个int属性,2*4=8字节
- 1个boolean属性,1字节
- padding对齐填充,JVM要求对象占用的内存为8字节的整数倍,这里是7字节
合计最多为96字节,因此CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD的默认值即使96。
*/
entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
entry.total = total;
entry.promise = promise;
return entry;
}
Entry 加入链表后,incrementPendingOutboundBytes()
会累加字节总数,判断是否超过高水位线:
/**
* 统计消息挂起的字节数,如果超过高水位线,需要修改Channel的可写状态并触发回调
* @param size 消息占用内存的字节数
* @param invokeLater 是否稍后触发再回调
*/
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 统计消息暂存的内存大小,CAS的方式进行累加
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 暂存消息达到高水位线,消息积压了
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
// 修改可写状态,触发回调
setUnwritable(invokeLater);
}
}
setUnwritable()
会在数据总字节数超过高水位线时触发,它会通过自旋+CAS 的方式将unwritable
从 0 改为 1,然后触发回调:
// 将Channel设为不可写,CAS执行
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0) {
// CAS操作成功,触发回调
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
ChannelOutboundBuffer 为 write 操作所做的工作就这么多了,下面看 flush。
执行 flush 操作时,首先会调用outboundBuffer.addFlush()
将unflushed
节点标记为flushed
,其实就是移动flushedEntry
和unflushedEntry
指针,这个过程会检查 Entry 节点是否被取消,如果取消了会跳过节点,同时会递减该 Entry 占用的内存空间。
// 只是将节点标记为flushed,并没有真正发送数据,会跳过已经被取消的节点。
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
flushed ++;
// 将entry的promise设为 不可取消 状态
if (!entry.promise.setUncancellable()) {
// 设置失败,说明promise已经取消,需要释放消息,并递减挂起的字节数
int pending = entry.cancel();
// 递减缓冲区的消息字节总数,如果达到低水位线,则将Channel重新设为「可写」状态,并触发回调
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);// 不断往后找 待flush的节点
// 所有的节点都flush了,置为空
unflushedEntry = null;
}
}
节点状态标记完成后,会调用doWrite()
开始写数据。首先它需要 ChannelOutboundBuffer 将flushed
节点转换成 Java 原生的 ByteBuffer,方法是nioBuffers()
。因为 OS 对SocketChannel.write()
单次发送的字节数有限制,一般是Integer.MAX_VALUE
,所以单次转换需要提供两个参数:
-
maxCount:转换 ByteBuffer 的最大数量,默认是 1024。 -
maxBytes:最大字节数,默认是设置的 TCP 发送缓冲区大小(ChannelOption.SO_SNDBUF)。
/**
* 将要flush的Entry转换成Java原生的ByteBuffer数组,因为做了总数和总字节数的限制,所以一次可能无法send所有数据。
* @param maxCount 单次发送的ByteBuffer最大数量
* @param maxBytes 发送缓冲区的最大值,由 TCP参数: ChannelOption.SO_SNDBUF 设置。
* @return
*/
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
assert maxCount > 0;
assert maxBytes > 0;
long nioBufferSize = 0;
int nioBufferCount = 0;
// 由于write操作很频繁,避免ByteBuffer[]频繁创建和销毁,这里进行了复用,每个线程都有一个ByteBuffer[1024]
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;
// 确保Entry节点是flushed,且msg是ByteBuf类型
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
// 确保节点没有被取消,如果取消了,则跳过它。
if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
// 可读字节数就是要写出的字节数,确保大于0
if (readableBytes > 0) {
if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
// 发送的数据超过了maxBytes,退出循环
break;
}
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
// -1代表没有设置ByteBuf的nioBufferCount,ByteBuf中ByteBuffer的数量
entry.count = count = buf.nioBufferCount();
}
// 是否需要更多的空间
int neededSpace = min(maxCount, nioBufferCount + count);
// 如果ByteBuffer的数量超过了默认值1024,就去申请更多的空间
if (neededSpace > nioBuffers.length) {
// 成倍扩容,直到数组长度足够。并塞回到FastThreadLocal。
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// 将ByteBuf转ByteBuffer,且缓存到Entry中
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
// 设置ByteBuffer
nioBuffers[nioBufferCount++] = nioBuf;
} else {
// 一个ByteBuf包含多个ByteBuffer的情况处理,循环遍历设置
nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
}
if (nioBufferCount >= maxCount) {
// ByteBuffer的数量超过了maxCount,退出循环
break;
}
}
}
entry = entry.next;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
这里关注一下NIO_BUFFERS
属性,它是一个 FastThreadLocal,每个线程都有自己的 ByteBuffer[]缓存,默认长度 1024,可以被复用。这里为什么要复用呢?因为作为一个网络 IO 框架,flush 肯定是一个非常频繁的操作,为了避免每次都创建 ByteBuffer[],复用可以提升系统性能,减轻 GC 的压力。
如果一个 ByteBuf 由很多个 ByteBuffer 组成,默认的 1024 个 ByteBuffer 可能不够用,此时会调用expandNioBufferArray()
进行扩容:
// 对array进行扩容
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// 成倍扩容
newCapacity <<= 1;
if (newCapacity < 0) {// int溢出
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
ByteBuffer[] newArray = new ByteBuffer[newCapacity];
// 元素迁移
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
将待发送的 ByteBuf 转换成 ByteBuffer 后,NioSocketChannel 会调用 JDK 底层的 SocketChannel.write()进行真正的数据发送。
数据发送完毕后,需要移除 ChannelOutboundBuffer 中的节点。节点的添加是从链尾开始,移除则是从链头开始的。ChannelOutboundBuffer 是根据实际发送的字节数来移除节点的,因此会存在某个 ByteBuf 只发送了部分数据的情况,如果某个 ByteBuf 数据没有发送完,那么该节点并不会被移除,只会调整它的readerIndex
索引,下次继续发送剩余数据。
/**
* 根据写入到TCP缓冲区的字节数来移除ByteBuf。
* @param writtenBytes
*/
public void removeBytes(long writtenBytes) {
for (;;) {
// 从flushedEntry开始计算
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
// 如果单个ByteBuf的数据 <= writtenBytes,则直接移除Entry节点
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
// 存在某个ByteBuf发送部分数据的情况,调整它的readerIndex,下次继续发送。
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
// 重置 NIO_BUFFERS
clearNioBuffers();
}
至此,Netty 的数据发送核心流程全部分析结束。
总结
为了避免每次 write 都将数据写入 TCP 缓冲区,Netty 的 Channel 提供了两种操作:write 和 flush,这需要依赖一个核心类 ChannelOutboundBuffer。write 只是将数据暂存到缓冲区,flush 才是发送数据。同时为了避免消息积压的太多导致 OOM,Netty 提供了高低水位线,当暂存的消息到达高水位时,Netty 会将 Channel 设为「不可写」,同时触发回调,用户可以根据该状态判断是否需要继续写入消息。
ChannelOutboundBuffer 本身是个单向链表,负责管理暂存的消息,当需要发送数据时,它还会负责将 ByteBuf 转换成 ByteBuffer,因为 JDK 底层的 SocketChannel 只支持写入 ByteBuffer。
数据发送完毕后,ChannelOutboundBuffer 还要负责根据实际发送的字节数来移除 Entry 节点,因为存在某个 ByteBuf 只发送了部分数据的情况,针对这种特殊情况,ChannelOutboundBuffer 不会将节点移除,而是调整它的readerIndex
索引,下次继续发送剩余数据。
原文始发于微信公众号(程序员小潘):【Netty】ChannelOutboundBuffer源码分析
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29380.html