前面在分析 NioEventLoop 源码时提到过,Netty 默认使用io.netty.util.concurrent.DefaultThreadFactory
线程工厂来创建新线程,它会创建FastThreadLocalThread
线程来驱动 NioEventLoop 的执行,而不是 JDK 原生的 Thread,原因是FastThreadLocalThread
可以提升FastThreadLocal
的性能。
既然 JDK 已经提供了 ThreadLocal,为何 Netty 还要重复造轮子呢?原因无它,就是因为 JDK 的 ThreadLocal 效率不是很高。
JDK 将 ThreadLocal 作为 Key,值作为 Value 存放到ThreadLocalMap
中,每个 Thread 都维护了一个 Map 容器。当线程使用的 ThreadLocal 对象逐渐增多,出现哈希冲突的概率就会变大,ThreadLocalMap
处理哈希冲突的方式是「线性探测」,即根据 Key 的哈希计算 index,如果数组的该下标已经被占用,代表出现哈希冲突,它会环形的寻找下一个槽位,直到找到一个空的槽位,再将映射关系封装成 Entry 节点保存到数组。get()
操作也是一样的流程,遇到哈希冲突,就要环形寻找。
总结就是 JDK 的 ThreadLocal,一旦出现哈希冲突,读写的时间复杂度会从 O(1)变成 O(n),Netty 为了更高的性能,自己实现了一个更快的 FastThreadLocal,本篇文章就带你揭秘 FastThreadLocal 的高性能内幕。
FastThreadLocal 源码
FastThreadLocal 是 ThreadLocal 的一个变体,当它和 FastThreadLocalThread 一起使用时,能提供更好的访问性能。
需要注意,必须配合 FastThreadLocalThread 使用,否则会退化成 JDK 的 ThreadLocal,效率可能反而会有影响。
InternalThreadLocalMap
InternalThreadLocalMap 是 Netty 用来代替 JDK 中的ThreadLocal.ThreadLocalMap
类的,InternalThreadLocalMap 使用数组来代替 Hash 表,每个 FastThreadLocal 被创建时,会拥有一个全局唯一且递增的索引 index,该 index 就代表 FastThreadLocal 对应数组的下标,Value 会被直接放到该下标处,访问也是一样,根据 index 快速定位元素,非常的快速,压根就不存在哈希冲突,时间复杂度始终是 O(1),缺点就是会浪费点内存空间,不过在内存越来越廉价的今天,这是值得的。
先看几个和 FastThreadLocal 相关的属性,后面会用到:
/*
非FastThreadLocalThread线程使用FastThreadLocal,Netty会创建一个InternalThreadLocalMap,
保存到原生Thread.threadLocals里,相当于往原生ThreadLocalMap里又放了一个Map。
*/
private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
new ThreadLocal<InternalThreadLocalMap>();
// index生成器
private static final AtomicInteger nextIndex = new AtomicInteger();
// 默认的数组大小
private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;
// 代替Null的一个对象,代表槽位未设值
public static final Object UNSET = new Object();
// 存放FastThreadLocal对应的Value
private Object[] indexedVariables;
再看 FastThreadLocal 的属性,静态常量 variablesToRemoveIndex 的值是 0,它会在数组的 0 号位存储一个Set<FastThreadLocal>
来保存线程使用过的 FastThreadLocal,目的是为了在removeAll()
方法中进行批量的移除。实例常量 index 代表 FastThreadLocal 的唯一索引,它是全局唯一且递增的。
/*
占据的是数组0号下标,存放的是Set<FastThreadLocal>:当前线程使用到的所有FastThreadLocal。
目的是FastThreadLocal.removeAll()时批量移除。
*/
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
// FastThreadLocal的唯一索引,全局唯一且递增,从1开始,0存放需要移除的FastThreadLocal。
private final int index;
FastThreadLocal 的构造函数非常简单,就是生成一个索引。
public FastThreadLocal() {
// index初始化,通过一个全局的AtomicInteger递增
index = InternalThreadLocalMap.nextVariableIndex();
}
set()源码
调用set()
方法保存值时,它会判断 Value 是否是 UNSET,如果是 UNSET 代表移除该实例,否则才是设置新值。首先需要获取到线程绑定的 InternalThreadLocalMap,然后根据 index 将 InternalThreadLocalMap 内的indexedVariables
数组对应的下标填充 Value。
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
// 获取当前线程绑定的InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
setKnownNotUnset(threadLocalMap, value);
} else {
// 值为UNSET则代表是移除操作
remove();
}
}
InternalThreadLocalMap.get()
用来获取当前线程的 InternalThreadLocalMap 对象,如果线程是 FastThreadLocalThread,则可以快速获取,因为 FastThreadLocalThread 使用一个属性来记录它了。如果线程是普通的 Thread,则会慢速获取,利用 JDK 原生的 ThreadLocal 来保存 InternalThreadLocalMap。
// 获取当前线程的InternalThreadLocalMap
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
// 如果当前线程是FastThreadLocalThread,则直接取变量threadLocalMap即可。
return fastGet((FastThreadLocalThread) thread);
} else {
// 非FastThreadLocalThread线程,Netty也做了兼容,只是性能会有所影响。
return slowGet();
}
}
对于非 FastThreadLocalThread 线程,Netty 也做了兼容,退化成 JDK 的 ThreadLocal,在 ThreadLocal 中保存 InternalThreadLocalMap 对象。
/*
非FastThreadLocalThread线程也可以使用FastThreadLocal,Netty做了兼容,只是性能会有所影响。
*/
private static InternalThreadLocalMap slowGet() {
/*
非FastThreadLocalThread线程使用FastThreadLocal,Netty会创建一个InternalThreadLocalMap,
保存到原生Thread.threadLocals里,相当于往原生ThreadLocalMap里又放了一个Map。
这里就是从原生ThreadLocal中取出InternalThreadLocalMap,如果没有则塞一个进去。
*/
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
获取到了线程绑定的 InternalThreadLocalMap,接下来就是将 Value 设置到数组了,会调用setKnownNotUnset()
方法:
/*
Set值,已知不是UNSET,在set()已经判断过了
*/
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
// 将Object[]的index下标设为value
if (threadLocalMap.setIndexedVariable(index, value)) {
// 将当前FastThreadLocal添加到0号Set里,方便后面的removeAll()时使用。
addToVariablesToRemove(threadLocalMap, this);
}
}
threadLocalMap.setIndexedVariable()
会将 Value 设置到数组的指定下标,如果需要扩容则会进行扩容。
// 将value设置到数组的index下标位置
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
// 扩容并且Set
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
Value 保存到数组后,需要将 FastThreadLocal 添加到数组 0 号位的 Set 容器中,因为removeAll()
时需要批量移除所有的 FastThreadLocal。
// 将FastThreadLocal保存到数组0号位的Set容器中,等待后面的批量删除
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
// 取出0号固定位元素
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
// 没有值,则设置为Set<FastThreadLocal>
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
// 有值就强转为Set
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
// 将FastThreadLocal添加到Set中,removeAll()时需要用到。
variablesToRemove.add(variable);
}
set()
操作到此就结束了。
get()源码
清楚了set()
的流程,再看get()
就已经非常简单了。
要想获取当前当前线程对应 FastThreadLocal 的 Value,首先肯定还是需要获取到线程绑定的 InternalThreadLocalMap,然后根据 index 去数组中取值,如果取到了就直接返回,没取到则根据initialize()
填充初始值。
public final V get() {
/*
获取当前线程绑定的InternalThreadLocalMap
1.对于FastThreadLocalThread,它直接使用属性threadLocalMap保存。
2.对于非FastThreadLocalThread线程,会创建一个并塞到原生ThreadLocal中。
*/
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
/*
全局的FastThreadLocal对应的Value都放在InternalThreadLocalMap的Object[]里,根据index下标即可快速访问。
*/
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {// UNSET可以理解为null的替代品,代表 无值,没有设置值的一个默认对象。
return (V) v;
}
// 没有值,则初试化
return initialize(threadLocalMap);
}
threadLocalMap.indexedVariable()
非常简单,就是根据 index 定位数组元素:
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
如果元素为 UNSET,说明还没有设置过值,需要进行初始化。
/*
get()发现值没有设置,会调用该方法进行初始化
*/
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue();// 默认返回null,子类重写
} catch (Exception e) {
PlatformDependent.throwException(e);
}
// 将初始化的值保存到数组中
threadLocalMap.setIndexedVariable(index, v);
// 将FastThreadLocal添加到数组0号位的Set容器中
addToVariablesToRemove(threadLocalMap, this);
return v;
}
至此,get()
流程也全部结束。
remove()源码
FastThreadLocal 使用完毕记得即使移除掉,调用remove()
方法即可。
要想移除 FastThreadLocal,首先依然是要获取到线程绑定的 InternalThreadLocalMap。
public final void remove() {
// 获取当前线程绑定的InternalThreadLocalMap,再remove
remove(InternalThreadLocalMap.getIfSet());
}
接下来需要做三件事:
-
根据 FastThreadLocal 的 index 将数组中指定位置填充 UNSET。 -
从 Set 容器中删除 FastThreadLocal 对象。 -
需要触发 onRemoval()
钩子函数。
// 从InternalThreadLocalMap中移除当前FastThreadLocal
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
// 从数组中删除对应下标的值:重置为UNSET
Object v = threadLocalMap.removeIndexedVariable(index);
// 从数组0号位的Set容器中删除
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
// 触发钩子函数
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
1.根据 FastThreadLocal 的 index 将数组中指定位置填充 UNSET。
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
// 填充UNSET,代表删除
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}
2.取出数组 0 号位的 Set 容器,删除 FastThreadLocal 对象。
// 从0号固定位中取出Set<FastThreadLocal>并移除指定FastThreadLocal
private static void removeFromVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
// 取出数组0号位的Set容器
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
// 从容器中删除FastThreadLocal
variablesToRemove.remove(variable);
}
3.如果要移除的 Value 不是 UNSET,说明曾经设置过具体的值,再移除它时需要触发onRemoval()
钩子函数,让子类能够监听到这个移除动作。默认什么也不做,子类重写。
// value被移除时触发的回调,默认什么也不做,子类实现
protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception {
}
removeAll()源码
removeAll()
是静态方法,它不针对某个 FastThreadLocal,而是将当前线程的所有 FastThreadLocal 全部移除。
首先仍然需要获取到当前线程绑定的 InternalThreadLocalMap,从数组的 0 号位取出 Set 容器,遍历 Set 容器,按个移除 FastThreadLocal。
/*
移除所有绑定到当前线程的FastThreadLocal实例。
*/
public static void removeAll() {
// 获取当前绑定的InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
// 取出0号固定位元素 就是Set<FastThreadLocal>,迭代遍历,依次remove()
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[0]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
// 遍历 按个删除
tlv.remove(threadLocalMap);
}
}
} finally {
// 最后将InternalThreadLocalMap重置
InternalThreadLocalMap.remove();
}
}
总结
Netty 的 FastThreadLocal 是 JDK ThreadLocal 的一个变体,当它和 FastThreadLocalThread 一起使用时,能提供更好的访问性能。
它优化的思路是使用数组来代替 JDK 的哈希表,避免了哈希冲突,使得读写的时间复杂度始终能保持在 O(1)。缺点就是会浪费一定的内存空间,当 FastThreadLocal 数量过大时,全局递增的索引就会很大,数组的长度也会越来越长,而且索引只会递增不会递减,这意味着数组只会扩容而不会缩容,开发者需要特别注意 FastThreadLocal 的对象数量,不要滥用,否则会因为无法申请一大块连续的内存空间引起频繁 GC,最终导致 OOM!
原文始发于微信公众号(程序员小潘):FastThreadLocal源码分析
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29398.html