Netty是如何检测资源泄漏的?

为了减轻 GC 的压力、以及避免频繁向 OS 申请和释放内存,Netty 基于 JeMalloc 思想自己实现了一套内存管理方案。不管是堆内存还是直接内存,都可以交给 Netty 来统一管理,这带来了两个好处,一是可以减轻 GC 的压力,二是可以避免向 OS 频繁申请和释放内存,Netty 一次性申请一大块内存,然后按需分配。 同时,也带来一个坏处,就是开发者使用完毕后,必须及时释放掉资源,否则会导致内存泄漏。

综上所述,自己管理内存会带来更好的性能,但是也增大了内存泄漏的可能性。为了尽量避免内存泄漏,Netty 提供了 ResourceLeakDetector 资源泄漏探测器,它会对分配的资源进行检测,一旦发生泄漏,它会进行报告,让开发者能及时发现并进行修正。

Netty 是如何做到的呢?

拿 ByteBuf 为例,为了能检测到资源是否泄漏,Netty 会为 ByteBuf 对象创建一个弱引用 WeakReference 指向它,同时传入一个 refQueue,如果 ByteBuf 被 GC 回收了而没有调用 release 释放,则 JVM 会将 WeakReference 加入到 refQueue 中,Netty 通过 refQueue 就可以判断是否发生资源泄漏,一旦检测到泄漏就会调用reportLeak()报告泄漏情况。

笔者花了一个流程图,描述了 ResourceLeakDetector 的大致工作流程。Netty是如何检测资源泄漏的?

1. ResourceLeakDetector

还是以 ByteBuf 为例,网络 IO 的每次读写都需要 ByteBuf 支撑,为了避免频繁的创建和销毁 ByteBuf,Netty 通过Recycler来回收对象进行重用。同时为了避免频繁的申请和释放内存,Netty 通过 JeMalloc 技术来管理内存。

ByteBuf 不等于内存,ByteBuf 是 Java 对象,它工作需要内存做支撑。ByteBuf 本身通过 Recycler 来实现回收重用,内存通过 JeMalloc 来进行管理复用。


当 Channel 有数据可读时,Netty 默认会通过 PooledByteBufAllocator 创建一个 ByteBuf,并将数据写入到 ByteBuf,然后通过 Pipeline 将 ChannelRead 事件传播出去。如下,是一个简单的使用示例:

// VM Args:-Dio.netty.leakDetection.level=PARANOID 100%采样检测
public class LeakDemo {
 public static void main(String[] args) throws InterruptedException {
  ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(1024);
  buf = null;
  System.gc();
  Thread.sleep(1000);
  // 再申请一次,此时会检测到泄漏并报告
  PooledByteBufAllocator.DEFAULT.buffer(1024);
 }

 @Override
 protected void finalize() throws Throwable {
  System.out.println("finalize...");
 }
}

运行程序,控制台会报告资源泄漏的情况,输出如下:

16:02:31.409 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
io.netty.example.a.LeakDemo.main(LeakDemo.java:21)

1.1 检测等级

Netty 提供了 4 个检测等级,不同的级别采样率不同,开销也不一样,用户可以根据实际情况选择合适的级别。

检测等级 说明
DISABLED 禁用检测
SIMPLE 简单检测,少量的采样,不报告泄漏的位置
ADVANCED 高级检测,少量的采样,会报告泄漏的位置
PARANOID 偏执检测,100%采样,会报告泄漏的位置

通过设置 JVM 参数-Dio.netty.leakDetection.level=PARANOID来调整检测等级。

1.2 源码分析

分析一下源码,看看 Netty 是如何检测资源泄漏并及时报告用户的。在看 ResourceLeakDetecto 前,先了解几个比较重要的类。

1.2.1 DefaultResourceLeak

DefaultResourceLeak 是默认的资源泄漏追踪器,它继承自WeakReference,它会为追踪对象建立一个弱引用连接,当追踪对象被 GC 回收后,JVM 会将WeakReference加入到 refQueue,通过 refQueue 就能判断是否存在资源泄漏了。Netty是如何检测资源泄漏的?先看它的属性:

/*
追踪记录的头节点,单向链表。
访问对象时,会记录访问的堆栈信息
 */

@SuppressWarnings("unused")
private volatile TraceRecord head;
@SuppressWarnings("unused")
private volatile int droppedRecords;

// 活跃的资源集合
private final Set<DefaultResourceLeak<?>> allLeaks;

// 追踪对象的一致性哈希码,确保关闭对象和追踪对象一致
private final int trackedHash;

再看构造函数:

/**
 * @param referent 引用本身,ByteBuf
 * @param refQueue 弱引用队列
 * @param allLeaks 活跃的资源集合
 */

DefaultResourceLeak(
        Object referent,
        ReferenceQueue<Object> refQueue,
        Set<DefaultResourceLeak<?>> allLeaks) {
    super(referent, refQueue);

    assert referent != null;

    // 计算追踪对象的一致性哈希,close时判断追踪对象和关闭对象是同一个
    trackedHash = System.identityHashCode(referent);
    // 将当前DefaultResourceLeak加入到活跃资源集合中
    allLeaks.add(this);
    // 记录追踪的堆栈信息,TraceRecord.BOTTOM代表链尾
    headUpdater.set(thisnew TraceRecord(TraceRecord.BOTTOM));
    this.allLeaks = allLeaks;
}

DefaultResourceLeak 的创建过程还是比较简单的,重要的是 TraceRecord 的创建,它才是记录追踪堆栈的功能类。

1.2.2 TraceRecord

TraceRecord 记录着追踪对象访问的堆栈轨迹,它继承自Throwable,这样它就可以通过调用Throwable.getStackTrace()获取堆栈跟踪的元素数组了。

TraceRecord 类本身不复杂,重要的是它的toString()方法,它会把追踪对象的访问堆栈信息给构建出来。

属性如下:

// 额外的提示信息
private final String hintString;
// 下一个节点
private final TraceRecord next;
private final int pos;

构造函数:

/**
 * @param next 下一个节点
 * @param hint 额外的提示信息
 */

TraceRecord(TraceRecord next, Object hint) {
    // This needs to be generated even if toString() is never called as it may change later on.
    hintString = hint instanceof ResourceLeakHint ? ((ResourceLeakHint) hint).toHintString() : hint.toString();
    this.next = next;
    this.pos = next.pos + 1;
}

toString()方法,用来构建追踪对象的堆栈信息:

// 构建跟踪的堆栈信息
@Override
public String toString() {
    StringBuilder buf = new StringBuilder(2048);
    if (hintString != null) {
        buf.append("tHint: ").append(hintString).append(NEWLINE);
    }

    // 获取堆栈信息
    StackTraceElement[] array = getStackTrace();
    // 跳过前三个元素,前三个堆栈信息是ResourceLeakDetector相关,报告出来无意义
    out: for (int i = 3; i < array.length; i++) {
        StackTraceElement element = array[i];
        String[] exclusions = excludedMethods.get();
        for (int k = 0; k < exclusions.length; k += 2) {
            if (exclusions[k].equals(element.getClassName())
                && exclusions[k + 1].equals(element.getMethodName())) { // lgtm[java/index-out-of-bounds]
                continue out;
            }
        }
        buf.append('t');
        buf.append(element.toString());
        buf.append(NEWLINE);
    }
    return buf.toString();
}

1.2.3 DefaultResourceLeak

回到 DefaultResourceLeak,以PooledByteBufAllocator.newDirectBuffer()申请池化的直接内存为例,它创建完 ByteBuf 后不会立即返回,它需要在 ByteBuf 发生泄漏时感知到,因此需要对 ByteBuf 做一个包装。

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // 申请一个池化的,基于直接内存的ByteBuf,这里的细节先不管
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;
    final ByteBuf buf;
    if (directArena != null) {
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = PlatformDependent.hasUnsafe() ?
            UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
        new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    // 尝试感知Buf的资源泄漏
    return toLeakAwareBuffer(buf);
}

toLeakAwareBuffer()会判断是简单检测还是高级检测,返回不同的包装类,这个包装类后面会说。

// 尝试感知Buf的资源泄漏
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
    ResourceLeakTracker<ByteBuf> leak;
    // 获取检测等级
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:// 简单检测,返回SimpleLeakAwareByteBuf包装类
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:// 高级检测,返回AdvancedLeakAwareByteBuf包装类
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                // 将ByteBuf包装成AdvancedLeakAwareByteBuf,
                buf = new AdvancedLeakAwareByteBuf(buf, leak);
            }
            break;
        default:// 禁用检测,不包装直接返回
            break;
    }
    return buf;
}

AbstractByteBuf.leakDetector.track(buf)方法比较核心,它会返回一个 buf 的泄漏追踪器,当 buf 被正常释放时,包装类会自动关闭追踪器,反之资源泄漏时,追踪器可以感知到,并发出报告。

public final ResourceLeakTracker<T> track(T obj) {
    return track0(obj);
}

转交给track0()处理了,它主要做了两件事:创建追踪器、报告泄漏情况。

// 创建一个obj的泄漏追踪器
@SuppressWarnings("unchecked")
private DefaultResourceLeak track0(T obj) {
    // 获取检测等级
    Level level = ResourceLeakDetector.level;
    if (level == Level.DISABLED) {
        // 禁用检测
        return null;
    }

    // 小于PARANOID等级,需要判断是否采样
    if (level.ordinal() < Level.PARANOID.ordinal()) {
        if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
            reportLeak();
            return new DefaultResourceLeak(obj, refQueue, allLeaks);
        }
        return null;
    }
    // 报告泄漏
    reportLeak();
    // 等级为PARANOID,100%检测
    return new DefaultResourceLeak(obj, refQueue, allLeaks);
}

这里用到了几个属性,说明下:

/*
ByteBuf被检测后,会创建一个弱引用指向它,GC时如果ByteBuf没有强引用被回收,
则JVM会将WeakReference放入到refQueue中,通过refQueue就可以判断是否发生内存泄漏。
 */

private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();

// 已经报告的泄漏对象集合
private final Set<String> reportedLeaks =
    Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());


// 采样的间隔,默认128
private final int samplingInterval;

// 活跃的资源集合
private final Set<DefaultResourceLeak<?>> allLeaks =
    Collections.newSetFromMap(new ConcurrentHashMap<DefaultResourceLeak<?>, Boolean>());

说完了 DefaultResourceLeak 的创建,再看看它是如何报告泄漏的。

1.2.3.1 reportLeak()

reportLeak()的功能是报告资源的泄漏情况,前面说过当追踪对象被 GC 回收掉后,JVM 会将 WeakReference 加入到 refQueue 中,因此这里会遍历 refQueue,取出泄漏对象后,调用它的toString()来获取堆栈信息,reportUntracedLeak()就很简单了,只是通过 logger 进行输出。

// 报告泄漏情况
private void reportLeak() {
    if (!needReport()) {
        // 不需要报告,从refQueue中取出引用被clear掉
        clearRefQueue();
        return;
    }

    // 遍历refQueue,报告泄漏的情况
    for (;;) {
        DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
        if (ref == null) {
            // 没有泄漏的对象了,退出循环
            break;
        }

        // 先将自己清理掉
        if (!ref.dispose()) {
            continue;
        }

        // toString()就是泄漏的具体信息
        String records = ref.toString();
        // 添加到已报告的对象集合中
        if (reportedLeaks.add(records)) {
            // 调用logger.error()报告资源泄漏的情况
            if (records.isEmpty()) {
                reportUntracedLeak(resourceType);
            } else {
                reportTracedLeak(resourceType, records);
            }
        }
    }
}

DefaultResourceLeak.toString()用来构建堆栈信息,让用户感知到资源在哪里发生了泄漏。也比较简单,就是对 TraceRecord 做遍历,拼接方法调用的堆栈信息。

@Override
public String toString() {
    TraceRecord oldHead = headUpdater.getAndSet(thisnull);
    if (oldHead == null) {
        // Already closed
        return EMPTY_STRING;
    }

    final int dropped = droppedRecordsUpdater.get(this);
    int duped = 0;

    int present = oldHead.pos + 1;
    StringBuilder buf = new StringBuilder(present * 2048).append(NEWLINE);
    buf.append("Recent access records: ").append(NEWLINE);

    int i = 1;
    Set<String> seen = new HashSet<String>(present);
    // 遍历TraceRecord,拼装堆栈信息
    for (; oldHead != TraceRecord.BOTTOM; oldHead = oldHead.next) {
        String s = oldHead.toString();
        if (seen.add(s)) {
            if (oldHead.next == TraceRecord.BOTTOM) {
                buf.append("Created at:").append(NEWLINE).append(s);
            } else {
                buf.append('#').append(i++).append(':').append(NEWLINE).append(s);
            }
        } else {
            duped++;
        }
    }

    if (duped > 0) {
        buf.append(": ")
            .append(duped)
            .append(" leak records were discarded because they were duplicates")
            .append(NEWLINE);
    }

    if (dropped > 0) {
        buf.append(": ")
            .append(dropped)
            .append(" leak records were discarded because the leak record count is targeted to ")
            .append(TARGET_RECORDS)
            .append(". Use system property ")
            .append(PROP_TARGET_RECORDS)
            .append(" to increase the limit.")
            .append(NEWLINE);
    }

    buf.setLength(buf.length() - NEWLINE.length());
    return buf.toString();
}

至此,创建资源泄漏追踪器和泄漏报告 的流程就全部结束了。

2. 简单检测和高级检测

创建完ResourceLeakTracker资源泄漏追踪器后,Netty 还需要将 ByteBuf 进行包装,这里用到了「装饰者模式」,装饰类的功能有两个:

  1. 追踪对象被 release 后,关闭追踪器。
  2. 追踪对象被访问后,记录堆栈。


对于第二个功能点,只有高级检测才需要,因此 Netty 提供了两个包装类:SimpleLeakAwareByteBuf 和 AdvancedLeakAwareByteBuf,简单检测和高级检测。它俩的区别就是简单检测不会记录追踪对象访问的堆栈信息,只会单纯的报告发生了泄漏,这样的好处是开销较小,坏处是无法确定泄漏的位置。 

Netty是如何检测资源泄漏的?

装饰类需要依赖一个原生的 ByteBuf,所有的操作都委托给 ByteBuf 去执行,它会在需要增强的方法前后插入一些扩展功能。

篇幅原因,代码就不全贴了,只贴增强后的release()方法吧。

// 对象释放的增强
@Override
public boolean release() {
    if (super.release()) {// 对象成功释放
        closeLeak();// 关闭追踪器
        return true;
    }
    return false;
}

closeLeak()会调用DefaultResourceLeak.close()关闭追踪:

// 关闭追踪
@Override
public boolean close() {
    // 从活跃资源集合中移除自己
    if (allLeaks.remove(this)) {
        // 清除弱引用
        clear();
        // 清空TraceRecord
        headUpdater.set(thisnull);
        return true;
    }
    return false;
}

对于 AdvancedLeakAwareByteBuf,它还需要记录访问的堆栈信息,大量的方法调用都需要记录堆栈,这里拿touch()方法为例:

/**
@param hint 追踪的额外信息
@return
*/

@Override
public ByteBuf touch(Object hint) {
    leak.record(hint);
    return this;
}

leak.record()会调用DefaultResourceLeak.record0()方法记录堆栈信息,创建一个 TraceRecord 加入到追踪记录的链表中,代码就不贴了。

3. 总结

Netty 根据 WeakReference 弱引用来判断对象是否发生内存泄漏,通过创建一个追踪对象的装饰类来进行增强,当追踪对象被 release 后,自动关闭追踪器,否则在发生泄漏时进行报告。

如果开启了资源泄漏检测,Netty 会为追踪对象创建一个泄漏追踪器ResourceLeakTrackerResourceLeakTracker包含一个单向链表,链表由一系列 TraceRecord 组成,它代表的是对象访问的堆栈记录,如果发生了资源泄漏,Netty 会根据这个链表构建资源泄漏的位置信息并写入日志

Netty 提供了两种检测机制,分别是简单的和高级的,对于高级检测,Netty 还会记录追踪对象的访问堆栈信息,在报告时可以快速定位到资源泄漏的具体位置,缺点是这会带来较大的额外开销,不建议在线上使用。


原文始发于微信公众号(程序员小潘):Netty是如何检测资源泄漏的?

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29412.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!