【Netty】RecvByteBufAllocator源码分析

前言

前面写过《Netty 服务端启动全流程源码分析》,BossGroup 获取到客户端连接 SocketChannel 后会将其注册到 WorkerGroup,由 WorkerGroup 来驱动数据 IO 读写。WorkerGroup 的 EventLoop 监听到 Channel 有OP_READ事件时,会调用Channel.Unsafe.read()方法,Netty 会将读取到的数据包装成 ByteBuf,然后触发回调pipeline.fireChannelRead(byteBuf)将事件传播出去。

整体流程是清楚了,但是对于详细的数据接收细节没有介绍,本篇文章会做介绍。

前置知识

熟悉 Java Nio 编程的同学应该知道,要想从 SocketChannel 读取数据,需要先创建一个 ByteBuffer,然后调用SocketChannel.read(ByteBuffer)方法。但是由于读取操作并未实际发生,程序并不知道有多少数据需要接收,导致我们并不知道需要创建一个多大的 ByteBuffer,大了会造成内存的浪费,小了又需要频繁扩容。而且 ByteBuffer 本身不支持扩容操作,你需要重新申请一个更大的 ByteBuffer,然后进行内存的复制,开销就更大了。Netty 是如何解决这个问题的呢?后面会介绍。

还有一个知识点,读者需要提前了解。对于注册到Selector多路复用器上,且监听OP_READ事件的 Channel,Selector判断的其实就是 Channel 的有效可读字节数。意思就是说,对于有数据可读的 Channel,如果你数据没有读完,下次select()多路复用器依然会再返回它。所以,Netty 会进行循环读,前面说过了,你不知道对端会发送给你多少数据,默认单次最多读 16 次,超过 16 次数据还没读完,本次就不再继续处理了,因为 Netty 怕阻塞其他 IO 事件,后面会详细分析。

AbstractNioByteChannel.read()分析

之前的文章已经分析过,当 Netty 检测到 Channel 有可读事件时,会调用AbstractNioByteChannel.read()方法,下面是该方法整体的一个源码分析:

/*
客户端发送数据时触发。
见 io.netty.channel.nio.NioEventLoop.processSelectedKey
 */

@Override
public final void read() {
    // 客户端Channel的配置
    final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
        clearReadPending();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    // 获取ByteBufAllocator,默认是 PooledByteBufAllocator
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    // 重置统计信息
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        /*
        当对端发送一个超大的数据包时,TCP会拆包。
        OP_READ事件只会触发一次,Netty需要循环读,默认最多读16次,因此ChannelRead()可能会触发多次,拿到的是半包数据。
        如果16次没把数据读完,没有关系,下次select()还会继续处理。
        对于Selector的可读事件,如果你没有读完数据,它会一直返回。
         */

        do {
            // 分配一个ByteBuf,大小能容纳可读数据,又不过于浪费空间。
            byteBuf = allocHandle.allocate(allocator);
            /*
            doReadBytes(byteBuf):ByteBuf内部有ByteBuffer,底层还是调用了SocketChannel.read(ByteBuffer)
            allocHandle.lastBytesRead()根据读取到的实际字节数,自适应调整下次分配的缓冲区大小。
             */

            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                // 没有读取到字节
                byteBuf.release();//释放ByteBuf,空的,没有意义。
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            // 递增已经读取的消息数量
            allocHandle.incMessagesRead(1);
            readPending = false;
            // 通过pipeline传播ChannelRead事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());//判断是否需要继续读

        // 读取完毕,pipeline传播ChannelReadComplete事件
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

这里我们重点关注一下recvBufAllocHandle()方法,很简单,就是对 Channel 绑定的recvHandle进行了判空校验,如果没绑定就创建一个。

@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
    if (recvHandle == null) {
        // 如果Channel对应的recvHandle是空的,则创建一个新实例
        recvHandle = config().getRecvByteBufAllocator().newHandle();
    }
    return recvHandle;
}

Channel 会依赖 RecvByteBufAllocator.Handle 来创建 ByteBuf,为什么不直接创建呢?因为前面说过的,不知道真正有多少数据要接收,不知道该创建多大的 ByteBuf,大了浪费空间,小了又要频繁扩容。基于这个原因,Channel 会把创建 ByteBuf 的任务交给 RecvByteBufAllocator.Handle 处理,希望它可以基于历史数据做统计分析,分配出一个容量大到足够容纳所有的数据,又小到不会浪费太多的空间。

RecvByteBufAllocator.Handle 的细节后面分析,这里先把read()流程分析完。

创建好一个大小合适的 ByteBuf 之后,Channel 会调用doReadBytes(byteBuf)将数据写入到 Bytebuf:

@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    // 设置 尝试读取的字节数,尽量把ByteBuf填满
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    // 将SocketChannel读取的数据写入到byteBuf
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

byteBuf.writeBytes(ScatteringByteChannel in, int length)其实就是将 ByteBuf 转换成 JDK 的 ByteBuffer,然后通过 JDK 原生的 SocketChannel 进行读取,它会返回实际的读取字节数。

如果实际读取的字节数小于等于 0,说明没有数据可读了,本次OP_READ事件处理完毕,触发pipeline.fireChannelReadComplete()回调,否则触发pipeline.fireChannelRead(byteBuf)将读取到的 ByteBuf 传递给其他 ChannelHandler 处理。

如果对端发送的数据包很大,很可能创建的 ByteBuf 不能一次性读完所有数据,所以 Channel 这里会进行循环读,整个读取的逻辑会放在一个while循环里,通过allocHandle.continueReading()判断是否需要继续读取数据。因此,即使 TCP 没有发生拆包,如果创建的 ByteBuf 过小,ChannelHandler 的channelRead()也会被触发多次,所以,切记不可错误的理解为「channelRead()是因为 TCP 拆包导致的」

RecvByteBufAllocator 分析

RecvByteBufAllocator 是 Netty 的「数据接收缓冲区分配器」,Channel 依赖它来创建大小合适的 ByteBuf,提升性能和节省内存。

RecvByteBufAllocator 是一个很简单的接口,它的工作由内部接口 Handle 完成,所以直接看 Handle 接口就行了。

interface Handle {
    /*
    通过ByteBufAllocator分配一个大小合适的ByteBuf。
    太大:浪费空间。
    太小:频繁扩容,内存复制开销。
     */

    ByteBuf allocate(ByteBufAllocator alloc);

    // 猜测需要分配的字节数
    int guess();

    // 重置已累积的任何计数器,并建议为下一个读循环应读取多少消息字节。
    void reset(ChannelConfig config);

    // 增加已读的消息数量
    void incMessagesRead(int numMessages);

    // 设置上一次读取到的字节数,AdaptiveRecvByteBufAllocator会根据该值 自适应调整下次分配的缓冲区大小。
    void lastBytesRead(int bytes);

    // 获取上一次读取的字节数
    int lastBytesRead();

    // 设置尝试读取的字节数
    void attemptedBytesRead(int bytes);

    // 获取尝试读取的字节数
    int attemptedBytesRead();

    // 是否还能继续读取
    boolean continueReading();

    // 读取完成
    void readComplete();
}

默认使用的 RecvByteBufAllocator 实现是 AdaptiveRecvByteBufAllocator,它可以自适应调整分配的 ByteBuf 大小,我们重点分析。

AdaptiveRecvByteBufAllocator 类图如下:【Netty】RecvByteBufAllocator源码分析

从上往下看吧,MaxMessagesRecvByteBufAllocator 很简单,在顶级接口的基础之上,限制了循环读的次数:

/*
当有可读事件时,Netty是循环读的,通过continueReading()判断是否需要继续读取。
该类主要是用来限制循环读取的次数的,默认是16.
 */

public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator {

    // 返回循环读的最大次数
    int maxMessagesPerRead();

    // 设置循环读的最大次数
    MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);
}

DefaultMaxMessagesRecvByteBufAllocator 是 MaxMessagesRecvByteBufAllocator 的默认实现,先看属性:

// 最大读取消息数量
private volatile int maxMessagesPerRead;
/*
是否尊重/关心 还有更多的数据可读?
如果为true,则无条件认为还有数据可读,直到下次循环读取到0字节为止。
这可能会导致多执行一次无效读,无意义的创建一个ByteBuf。
 */

private volatile boolean respectMaybeMoreData = true;

核心逻辑都在 Handle 里,所以直接看 Handle 即可:

public abstract class MaxMessageHandle implements ExtendedHandle {
    private ChannelConfig config;
    // 最大读取多少次消息,默认16次,没读完,下次select接着读。
    private int maxMessagePerRead;
    // 读取的总消息数
    private int totalMessages;
    // 读取的字节总数
    private int totalBytesRead;
    // 尝试读取的字节数,默认是ByteBuf的可写字节数,即尽量把ByteBuf填满。
    private int attemptedBytesRead;
    // 上次读取的字节数,根据它调整下次分配的缓冲区大小。
    private int lastBytesRead;
    private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
    /*
    是否还有更多数据可读的默认判断:attemptedBytesRead == lastBytesRead。
    即:本次读取的数据有没有填满ByteBuf,如果填满了,说明可能还有数据要读。否则就不读了,直接触发ChannelReadComplete()。
     */

    private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
        @Override
        public boolean get() {
            return attemptedBytesRead == lastBytesRead;
        }
    };

    // 根据config重置数据,每次处理新的Read事件时触发。
    @Override
    public void reset(ChannelConfig config) {
        this.config = config;
        maxMessagePerRead = maxMessagesPerRead();
        totalMessages = totalBytesRead = 0;
    }

    // 根据猜测的字节数,分配一个ByteBuf。
    @Override
    public ByteBuf allocate(ByteBufAllocator alloc) {
        return alloc.ioBuffer(guess());
    }

    // 递增统计消息的读取数量,默认超过16就不读了,防止阻塞IO线程,其他事件得不到处理。
    @Override
    public final void incMessagesRead(int amt) {
        totalMessages += amt;
    }

    // 根据上次读取的字节数,累加总读取到的字节数
    @Override
    public void lastBytesRead(int bytes) {
        lastBytesRead = bytes;
        if (bytes > 0) {
            totalBytesRead += bytes;
        }
    }

    @Override
    public final int lastBytesRead() {
        return lastBytesRead;
    }

    // 是否还要继续循环读取消息
    @Override
    public boolean continueReading() {
        /*
        判断依据:
            1.认为还有可读数据
            2.读取的消息数没有达到上限
         */

        return continueReading(defaultMaybeMoreSupplier);
    }

    @Override
    public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
        return config.isAutoRead() &&
               (!respectMaybeMoreData || maybeMoreDataSupplier.get()) // 认为还有可读数据
                &&
               totalMessages < maxMessagePerRead && totalBytesRead > 0;// 读取的消息数没有达到上限
    }

    @Override
    public void readComplete() {
    }

    @Override
    public int attemptedBytesRead() {
        return attemptedBytesRead;
    }

    // 设置尝试读取的字节数,默认为ByteBuf的可写字节数
    @Override
    public void attemptedBytesRead(int bytes) {
        attemptedBytesRead = bytes;
    }

    protected final int totalBytesRead() {
        return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
    }
}

MaxMessageHandle 会根据每次读取的字节数是否填满 ByteBuf 为依据,判断是否还要继续循环读。如果填满了说明 Channel 可能还有数据等待读取,反之已无数据可读,直接跳出循环即可。

说完父类,接下来看核心的 AdaptiveRecvByteBufAllocator,源码也不是很长:

/*
    自适应的,接收对端数据的ByteBuf分配器,分配的ByteBuf有合适的初试容量。
    避免太小导致频繁扩容,太大导致内存浪费,GC压力。

    基于历史的数据采集做预测:
        1.前一次接收的数据完全读满了ByteBuf,则下次会增大缓冲区。
        2.连续两次接收的数据小于指定值,则会缩小下次分配的缓冲区。
 */

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    // 默认最小值
    static final int DEFAULT_MINIMUM = 64;
    // 默认初试值
    static final int DEFAULT_INITIAL = 2048;
    // 默认最大值
    static final int DEFAULT_MAXIMUM = 65536;

    /*
    如果需要扩容下次分配的缓冲区大小,这个是扩容的索引步长。
     */

    private static final int INDEX_INCREMENT = 4;
    /*
    如果需要缩容下次分配的缓冲区大小,这个是缩容的索引步长。
     */

    private static final int INDEX_DECREMENT = 1;

    // 扩容表
    private static final int[] SIZE_TABLE;

    static {
        List<Integer> sizeTable = new ArrayList<Integer>();
        // 512字节内,以16字节为步长,递增
        for (int i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }

        // 512字节后,成倍扩容
        for (int i = 512; i > 0; i <<= 1) { // lgtm[java/constant-comparison]
            sizeTable.add(i);
        }

        // List转数组
        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) {
            SIZE_TABLE[i] = sizeTable.get(i);
        }
    }

    /**
     * @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
     */

    @Deprecated
    public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();

    /*
    通过给定size查找下标,二分查找法。
    如果size不在SIZE_TABLE内,返回最接近它的一个稍小值的索引
     */

    private static int getSizeTableIndex(final int size) {
        for (int low = 0, high = SIZE_TABLE.length - 1;;) {
            if (high < low) {
                return low;
            }
            if (high == low) {
                return high;
            }

            int mid = low + high >>> 1;
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else if (size < a) {
                high = mid - 1;
            } else if (size == a) {
                return mid;
            } else {
                return mid + 1;
            }
        }
    }

    private final class HandleImpl extends MaxMessageHandle {
        // 最小容量的索引
        private final int minIndex;
        // 最大容量的索引
        private final int maxIndex;
        // 默认容量的索引
        private int index;

        // 下次接受的缓冲区大小
        private int nextReceiveBufferSize;
        // 是否需要立即缩容?因为需要连续两次读取的字节数小于阈值,第一次设为true,第二次才缩容。
        private boolean decreaseNow;

        HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;

            // 根据初始值得到SIZE_TABLE的下标
            index = getSizeTableIndex(initial);
            // 第一次分配的缓冲区大小就是initial默认值
            nextReceiveBufferSize = SIZE_TABLE[index];
        }

        /*
        Unsafe.read()循环读数据时会调用该方法,bytes是上一次实际读取到的字节数。
         */

        @Override
        public void lastBytesRead(int bytes) {
            /*
            attemptedBytesRead():尝试读取的字节数。
            NioSocketChannel.doReadBytes()会将attemptedBytesRead设置为ByteBuf.writableBytes(),
            即只要有数据可读,Netty会尽量将ByteBuf写满。
             */

            if (bytes == attemptedBytesRead()) {
                // 实际读取的字节数填满了缓冲区,则扩容。
                record(bytes);
            }
            // 调用父类方法,将数值累加到 totalBytesRead
            super.lastBytesRead(bytes);
        }

        /*
        预测下次需要分配的缓冲区大小
         */

        @Override
        public int guess() {
            return nextReceiveBufferSize;
        }

        /*
         根据实际读取到的字节数,自适应调整 下次应该分配的缓冲区大小
         */

        private void record(int actualReadBytes) {
            // 如果连续两次,读取的字节数 小于等于 前一个容量大小
            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
                // 根据-1的索引步长进行缩容,需要连续两次触发才缩容,所以有一个decreaseNow
                if (decreaseNow) {
                    index = max(index - INDEX_DECREMENT, minIndex);//确保不低于最小值
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    decreaseNow = true;
                }
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                /*
                根据+4的索引步长进行扩容,但不能超过最大值。
                因此默认情况下的扩容逻辑:
                2048 > 32768 > 65536 > 65536(不变...)
                2k   > 32K   > 64K ...
                 */

                index = min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }

        @Override
        public void readComplete() {
            // 数据读取完毕,根据本次读取的总字节数,自适应调整下次应该分配的缓冲区大小
            record(totalBytesRead());
        }
    }

    // 最小、默认、最大容量在SIZE_TABLE中的下标
    private final int minIndex;
    private final int maxIndex;
    private final int initial;

    // 根据默认值创建一个:自适应接受缓冲区分配器:64、2048、65535
    public AdaptiveRecvByteBufAllocator() {
        this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
    }

    // 根据指定最小、默认、最大容量创建一个:自适应接受缓冲区分配器
    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
        checkPositive(minimum, "minimum");
        if (initial < minimum) {
            throw new IllegalArgumentException("initial: " + initial);
        }
        if (maximum < initial) {
            throw new IllegalArgumentException("maximum: " + maximum);
        }

        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) {
            this.minIndex = minIndex + 1;
        } else {
            this.minIndex = minIndex;
        }

        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) {
            this.maxIndex = maxIndex - 1;
        } else {
            this.maxIndex = maxIndex;
        }

        this.initial = initial;
    }

    @SuppressWarnings("deprecation")
    @Override
    public Handle newHandle() {
        return new HandleImpl(minIndex, maxIndex, initial);
    }

    @Override
    public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
        super.respectMaybeMoreData(respectMaybeMoreData);
        return this;
    }
}

AdaptiveRecvByteBufAllocator 根据名字就能看出来,它是一个自适应的数据接收缓冲区分配器,这个「自适应」体现在 ByteBuf 空间的分配上。

它使用三个属性分别定义 ByteBuf 分配的默认值、最小值、最大值,使用一个 int 数组SIZE_TABLE来定义扩/缩容的容量表,512 字节内,以 16 字节为步长扩容,512 字节后,成倍扩容。lastBytesRead()会记录下每次循环读取的实际字节数,如果读取的字节填满了 ByteBuf,则会调用record(bytes)进行扩容,扩容策略为:扩容索引+4,即 16 倍扩容,在不超过最大值的前提下,默认的扩容策略如下:

2048 > 32768 > 65536 > 65536(不变...)
2k   > 32K   > 64K ...

如果连续两次读取的字节数小于等于前一个缩容容量,则会进行缩容,缩容的策略是容量索引位-1,即 512 字节后,成倍缩容,512 字节前,缩小 16 字节。

由于需要连续两次读取的字节数少才会缩容,所以加了一个属性decreaseNow来记录是否需要立即缩容,第一次触发它为 true,第二次才缩容。

为什么需要连续两次呢?因为一次读取的字节数少可能是因为读到了上一个数据包的末尾,数据包本身还是很大的,所以不能缩容。连续两次才能说明一个完整的数据包很小,下次分配的 ByteBuf 可以小些以节省内存。

循环结束后,会调用allocHandle.readComplete(),它会根据此次读取的总字节数去做动态调整,为下次分配 ByteBuf 提供预测。例如,本次处理OP_READ事件,循环读 3 次,每次读取了 100KB,那么下次就会直接分配一个 300KB 的 ByteBuf,争取一次性读完。相反,如果读取的字节数少,就缩容节省内存。

总结

Channel 在接收对端数据时,因为不知道该分配多大的 ByteBuf 来接收,所以会将 ByteBuf 的分配任务交给 RecvByteBufAllocator,期望它能分配一个容量大到可以足够容纳数据,又小到不会浪费太多内存的 ByteBuf,默认的实现是 AdaptiveRecvByteBufAllocator,它会根据前面实际读取的字节数,自适应的调整下次分配的 ByteBuf 大小。

虽然 AdaptiveRecvByteBufAllocator 会尽量去预测下次分配 ByteBuf 的大小,但是预测会有不准的时候,因此 Channel 还是会进行循环读,防止 ByteBuf 分配的过小无法容纳所有数据。但是为了避免 IO 线程阻塞,其他 Channel 的事件得不到处理,默认会限制单次最多循环读 16 次,如果发送的数据包真的非常大,16 次都没有读完,Netty 本次也会放弃处理,等待下次select()轮询时再处理。

最后,再提醒一句,不要错误的理解为:channelRead()的触发是因为 TCP 拆包导致的!!!


原文始发于微信公众号(程序员小潘):【Netty】RecvByteBufAllocator源码分析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29384.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!