FastThreadLocal源码分析

前面在分析 NioEventLoop 源码时提到过,Netty 默认使用io.netty.util.concurrent.DefaultThreadFactory线程工厂来创建新线程,它会创建FastThreadLocalThread线程来驱动 NioEventLoop 的执行,而不是 JDK 原生的 Thread,原因是FastThreadLocalThread可以提升FastThreadLocal的性能。

既然 JDK 已经提供了 ThreadLocal,为何 Netty 还要重复造轮子呢?原因无它,就是因为 JDK 的 ThreadLocal 效率不是很高。

JDK 将 ThreadLocal 作为 Key,值作为 Value 存放到ThreadLocalMap中,每个 Thread 都维护了一个 Map 容器。当线程使用的 ThreadLocal 对象逐渐增多,出现哈希冲突的概率就会变大,ThreadLocalMap处理哈希冲突的方式是「线性探测」,即根据 Key 的哈希计算 index,如果数组的该下标已经被占用,代表出现哈希冲突,它会环形的寻找下一个槽位,直到找到一个空的槽位,再将映射关系封装成 Entry 节点保存到数组。get()操作也是一样的流程,遇到哈希冲突,就要环形寻找。

总结就是 JDK 的 ThreadLocal,一旦出现哈希冲突,读写的时间复杂度会从 O(1)变成 O(n),Netty 为了更高的性能,自己实现了一个更快的 FastThreadLocal,本篇文章就带你揭秘 FastThreadLocal 的高性能内幕。

FastThreadLocal 源码

FastThreadLocal 是 ThreadLocal 的一个变体,当它和 FastThreadLocalThread 一起使用时,能提供更好的访问性能。FastThreadLocal源码分析

需要注意,必须配合 FastThreadLocalThread 使用,否则会退化成 JDK 的 ThreadLocal,效率可能反而会有影响。


InternalThreadLocalMap

InternalThreadLocalMap 是 Netty 用来代替 JDK 中的ThreadLocal.ThreadLocalMap类的,InternalThreadLocalMap 使用数组来代替 Hash 表,每个 FastThreadLocal 被创建时,会拥有一个全局唯一且递增的索引 index,该 index 就代表 FastThreadLocal 对应数组的下标,Value 会被直接放到该下标处,访问也是一样,根据 index 快速定位元素,非常的快速,压根就不存在哈希冲突,时间复杂度始终是 O(1),缺点就是会浪费点内存空间,不过在内存越来越廉价的今天,这是值得的。

先看几个和 FastThreadLocal 相关的属性,后面会用到:

/*
    非FastThreadLocalThread线程使用FastThreadLocal,Netty会创建一个InternalThreadLocalMap,
    保存到原生Thread.threadLocals里,相当于往原生ThreadLocalMap里又放了一个Map。
     */

    private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
            new ThreadLocal<InternalThreadLocalMap>();

 // index生成器
    private static final AtomicInteger nextIndex = new AtomicInteger();

 // 默认的数组大小
    private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;

 // 代替Null的一个对象,代表槽位未设值
    public static final Object UNSET = new Object();

    // 存放FastThreadLocal对应的Value
    private Object[] indexedVariables;

再看 FastThreadLocal 的属性,静态常量 variablesToRemoveIndex 的值是 0,它会在数组的 0 号位存储一个Set<FastThreadLocal>来保存线程使用过的 FastThreadLocal,目的是为了在removeAll()方法中进行批量的移除。实例常量 index 代表 FastThreadLocal 的唯一索引,它是全局唯一且递增的。

/*
占据的是数组0号下标,存放的是Set<FastThreadLocal>:当前线程使用到的所有FastThreadLocal。
目的是FastThreadLocal.removeAll()时批量移除。
*/

private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();

// FastThreadLocal的唯一索引,全局唯一且递增,从1开始,0存放需要移除的FastThreadLocal。
private final int index;

FastThreadLocal 的构造函数非常简单,就是生成一个索引。

public FastThreadLocal() {
    // index初始化,通过一个全局的AtomicInteger递增
    index = InternalThreadLocalMap.nextVariableIndex();
}


set()源码

调用set()方法保存值时,它会判断 Value 是否是 UNSET,如果是 UNSET 代表移除该实例,否则才是设置新值。首先需要获取到线程绑定的 InternalThreadLocalMap,然后根据 index 将 InternalThreadLocalMap 内的indexedVariables数组对应的下标填充 Value。

public final void set(V value) {
    if (value != InternalThreadLocalMap.UNSET) {
        // 获取当前线程绑定的InternalThreadLocalMap
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        setKnownNotUnset(threadLocalMap, value);
    } else {
        // 值为UNSET则代表是移除操作
        remove();
    }
}

InternalThreadLocalMap.get()用来获取当前线程的 InternalThreadLocalMap 对象,如果线程是 FastThreadLocalThread,则可以快速获取,因为 FastThreadLocalThread 使用一个属性来记录它了。如果线程是普通的 Thread,则会慢速获取,利用 JDK 原生的 ThreadLocal 来保存 InternalThreadLocalMap。

// 获取当前线程的InternalThreadLocalMap
public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        // 如果当前线程是FastThreadLocalThread,则直接取变量threadLocalMap即可。
        return fastGet((FastThreadLocalThread) thread);
    } else {
        // 非FastThreadLocalThread线程,Netty也做了兼容,只是性能会有所影响。
        return slowGet();
    }
}

对于非 FastThreadLocalThread 线程,Netty 也做了兼容,退化成 JDK 的 ThreadLocal,在 ThreadLocal 中保存 InternalThreadLocalMap 对象。

/*
非FastThreadLocalThread线程也可以使用FastThreadLocal,Netty做了兼容,只是性能会有所影响。
 */

private static InternalThreadLocalMap slowGet() {
    /*
    非FastThreadLocalThread线程使用FastThreadLocal,Netty会创建一个InternalThreadLocalMap,
    保存到原生Thread.threadLocals里,相当于往原生ThreadLocalMap里又放了一个Map。
    这里就是从原生ThreadLocal中取出InternalThreadLocalMap,如果没有则塞一个进去。
     */

    InternalThreadLocalMap ret = slowThreadLocalMap.get();
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

获取到了线程绑定的 InternalThreadLocalMap,接下来就是将 Value 设置到数组了,会调用setKnownNotUnset()方法:

/*
Set值,已知不是UNSET,在set()已经判断过了
 */

private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
    // 将Object[]的index下标设为value
    if (threadLocalMap.setIndexedVariable(index, value)) {
        // 将当前FastThreadLocal添加到0号Set里,方便后面的removeAll()时使用。
        addToVariablesToRemove(threadLocalMap, this);
    }
}

threadLocalMap.setIndexedVariable()会将 Value 设置到数组的指定下标,如果需要扩容则会进行扩容。

// 将value设置到数组的index下标位置
public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = indexedVariables;
    if (index < lookup.length) {
        Object oldValue = lookup[index];
        lookup[index] = value;
        return oldValue == UNSET;
    } else {
        // 扩容并且Set
        expandIndexedVariableTableAndSet(index, value);
        return true;
    }
}

Value 保存到数组后,需要将 FastThreadLocal 添加到数组 0 号位的 Set 容器中,因为removeAll()时需要批量移除所有的 FastThreadLocal。

// 将FastThreadLocal保存到数组0号位的Set容器中,等待后面的批量删除
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    // 取出0号固定位元素
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    Set<FastThreadLocal<?>> variablesToRemove;
    if (v == InternalThreadLocalMap.UNSET || v == null) {
        // 没有值,则设置为Set<FastThreadLocal>
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
    } else {
        // 有值就强转为Set
        variablesToRemove = (Set<FastThreadLocal<?>>) v;
    }

    // 将FastThreadLocal添加到Set中,removeAll()时需要用到。
    variablesToRemove.add(variable);
}

set()操作到此就结束了。

get()源码

清楚了set()的流程,再看get()就已经非常简单了。

要想获取当前当前线程对应 FastThreadLocal 的 Value,首先肯定还是需要获取到线程绑定的 InternalThreadLocalMap,然后根据 index 去数组中取值,如果取到了就直接返回,没取到则根据initialize()填充初始值。

public final V get() {
    /*
    获取当前线程绑定的InternalThreadLocalMap
        1.对于FastThreadLocalThread,它直接使用属性threadLocalMap保存。
        2.对于非FastThreadLocalThread线程,会创建一个并塞到原生ThreadLocal中。
     */

    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    /*
    全局的FastThreadLocal对应的Value都放在InternalThreadLocalMap的Object[]里,根据index下标即可快速访问。
     */

    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {// UNSET可以理解为null的替代品,代表 无值,没有设置值的一个默认对象。
        return (V) v;
    }
    // 没有值,则初试化
    return initialize(threadLocalMap);
}

threadLocalMap.indexedVariable()非常简单,就是根据 index 定位数组元素:

public Object indexedVariable(int index) {
    Object[] lookup = indexedVariables;
    return index < lookup.length? lookup[index] : UNSET;
}

如果元素为 UNSET,说明还没有设置过值,需要进行初始化。

/*
get()发现值没有设置,会调用该方法进行初始化
 */

private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
        v = initialValue();// 默认返回null,子类重写
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }
    // 将初始化的值保存到数组中
    threadLocalMap.setIndexedVariable(index, v);
    // 将FastThreadLocal添加到数组0号位的Set容器中
    addToVariablesToRemove(threadLocalMap, this);
    return v;
}

至此,get()流程也全部结束。

remove()源码

FastThreadLocal 使用完毕记得即使移除掉,调用remove()方法即可。

要想移除 FastThreadLocal,首先依然是要获取到线程绑定的 InternalThreadLocalMap。

public final void remove() {
    // 获取当前线程绑定的InternalThreadLocalMap,再remove
    remove(InternalThreadLocalMap.getIfSet());
}

接下来需要做三件事:

  1. 根据 FastThreadLocal 的 index 将数组中指定位置填充 UNSET。
  2. 从 Set 容器中删除 FastThreadLocal 对象。
  3. 需要触发onRemoval()钩子函数。
// 从InternalThreadLocalMap中移除当前FastThreadLocal
public final void remove(InternalThreadLocalMap threadLocalMap) {
    if (threadLocalMap == null) {
        return;
    }

    // 从数组中删除对应下标的值:重置为UNSET
    Object v = threadLocalMap.removeIndexedVariable(index);
    // 从数组0号位的Set容器中删除
    removeFromVariablesToRemove(threadLocalMap, this);

    if (v != InternalThreadLocalMap.UNSET) {
        try {
            // 触发钩子函数
            onRemoval((V) v);
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }
    }
}

1.根据 FastThreadLocal 的 index 将数组中指定位置填充 UNSET。

public Object removeIndexedVariable(int index) {
    Object[] lookup = indexedVariables;
    if (index < lookup.length) {
        Object v = lookup[index];
        // 填充UNSET,代表删除
        lookup[index] = UNSET;
        return v;
    } else {
        return UNSET;
    }
}

2.取出数组 0 号位的 Set 容器,删除 FastThreadLocal 对象。

// 从0号固定位中取出Set<FastThreadLocal>并移除指定FastThreadLocal
private static void removeFromVariablesToRemove(
    InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable)
 
{
    // 取出数组0号位的Set容器
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);

    if (v == InternalThreadLocalMap.UNSET || v == null) {
        return;
    }

    @SuppressWarnings("unchecked")
    Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
    // 从容器中删除FastThreadLocal
    variablesToRemove.remove(variable);
}

3.如果要移除的 Value 不是 UNSET,说明曾经设置过具体的值,再移除它时需要触发onRemoval()钩子函数,让子类能够监听到这个移除动作。默认什么也不做,子类重写。

// value被移除时触发的回调,默认什么也不做,子类实现
protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception {

}

removeAll()源码

removeAll()是静态方法,它不针对某个 FastThreadLocal,而是将当前线程的所有 FastThreadLocal 全部移除。

首先仍然需要获取到当前线程绑定的 InternalThreadLocalMap,从数组的 0 号位取出 Set 容器,遍历 Set 容器,按个移除 FastThreadLocal。

/*
移除所有绑定到当前线程的FastThreadLocal实例。
 */

public static void removeAll() {
    // 获取当前绑定的InternalThreadLocalMap
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
    if (threadLocalMap == null) {
        return;
    }

    try {
        // 取出0号固定位元素 就是Set<FastThreadLocal>,迭代遍历,依次remove()
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        if (v != null && v != InternalThreadLocalMap.UNSET) {
            @SuppressWarnings("unchecked")
            Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
            FastThreadLocal<?>[] variablesToRemoveArray =
                    variablesToRemove.toArray(new FastThreadLocal[0]);
            for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                // 遍历 按个删除
                tlv.remove(threadLocalMap);
            }
        }
    } finally {
        // 最后将InternalThreadLocalMap重置
        InternalThreadLocalMap.remove();
    }
}

总结

Netty 的 FastThreadLocal 是 JDK ThreadLocal 的一个变体,当它和 FastThreadLocalThread 一起使用时,能提供更好的访问性能。

优化的思路是使用数组来代替 JDK 的哈希表,避免了哈希冲突,使得读写的时间复杂度始终能保持在 O(1)。缺点就是会浪费一定的内存空间,当 FastThreadLocal 数量过大时,全局递增的索引就会很大,数组的长度也会越来越长,而且索引只会递增不会递减,这意味着数组只会扩容而不会缩容,开发者需要特别注意 FastThreadLocal 的对象数量,不要滥用,否则会因为无法申请一大块连续的内存空间引起频繁 GC,最终导致 OOM!


原文始发于微信公众号(程序员小潘):FastThreadLocal源码分析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29398.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!