你还不知道ThreadLocal线程本地存储吗

ThreadLocal线程本地存储

当访问共享数据时,通常需要使用同步来控制并发程序的访问。那么有没有别的方法来解决呢?当然有,那就是使得共享数据不共享了,ThreadLocal就实现了该操作。该类提供了线程局部变量,为每一个线程创建一个单独的变量副本,使得每个线程都可以独立的改变自己所拥有的变量副本,而不会影响其他线程所对应的副本,消除了竞争条件。

采用的以空间换时间的做法,在每个Thread里维护一个以开地址法实现的ThreadLocal.ThreadLocalMap,把数据隔离
线程本地存储根除了对变量的共享,每当线程访问threadLocals变量时,访问的都是各自线程自己的threadLocals变量。

public class ThreadLocalVariableHolder {
    private static ThreadLocal<Integer> myThreadLocal = new ThreadLocal<Integer>() {
        // 初始值默认为null 设置初始值为0
        protected Integer initialValue() {
            return 0;
        }
    };

    public static void increment() {
        myThreadLocal.set(myThreadLocal.get() + 1);
    }

    public static int get(){
        return myThreadLocal.get();
    }

    public static void main(String[] args) {
        // 线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        for(int i=0;i<3;i++){
            executorService.execute(new MyThread());
        }
        executorService.shutdown();
    }
}

public class MyThread implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            ThreadLocalVariableHolder.increment();
            System.out.println(Thread.currentThread().getName() + ":" + ThreadLocalVariableHolder.get());
            Thread.yield();
        }
    }
}

输出结果

pool-1-thread-1:1
pool-1-thread-2:1
pool-1-thread-3:1
pool-1-thread-1:2
pool-1-thread-2:2
pool-1-thread-3:2
pool-1-thread-1:3
pool-1-thread-2:3
pool-1-thread-3:3

可以看到ThreadLocal把不同线程的数据进行了隔离,互不影响

来看一下ThreadLocal是如何实现的吧

Thread类

// 线程的本地变量存储在线程的threadLocals变量中,并不是存储在ThreadLocal实例中
// 每一个线程都有一个自己的ThreadLocalMap,ThreadLocalMap类似于Map,key为ThreadLocal对象,value为存储的值,所以一个ThreadLocalMap可以存储多个ThreadLocal

ThreadLocal.ThreadLocalMap threadLocals = null;

ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

ThreadLocal源码逻辑

  • ThreadLocal对象通过Thread.currentThread()获取当前Thread对象
  • 当前Thread获取对象内部持有的ThreadLocalMap容器
  • 从ThreadLocalMap容器中用ThreadLocal对象作为key,操作当前Thread中的变量副本

提供了四个方法:

  • get()  返回此线程局部变量的当前线程副本中的值

    public T get() {
       // 获取当前线程
        Thread t = Thread.currentThread();
       // 获取当前线程实例的threadLocals
        ThreadLocalMap map = getMap(t);
       // 不为空
        if (map != null) {
           // 根据当前的ThreadLocal对象引用来取值
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
       // 为空则设置key为当前的ThreadLocal对象,value为initialValue设置的初始值
        return setInitialValue();
    }
  • initialValue()  返回此线程局部变量当前线程的初始值,当线程第一次调用get()或set()方法时调用,并且只调用一次

  • remove()  移除此线程局部变量当前线程的值

public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}
  • set(T value)  将此线程局部变量的当前线程副本中的值设置为指定值

    public void set(T value) {
      // 获取当前线程
      Thread t = Thread.currentThread();
      // 获取该线程实例对象的threadLocals变量   getMap方法 return t.threadLocals;
      ThreadLocalMap map = getMap(t);
      // 不为空,key为当前的ThreadLocal对象引用,value为所存储的值
      if (map != null)
        map.set(this, value);
      else
        // 为空,则为threadLocals实例化对象
        createMap(t, value);
    }

    ThreadLocalMap getMap(Thread t) {
      return t.threadLocals;
    }

    void createMap(Thread t, T firstValue) {
      t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
  • 还有一个静态内部类 ThreadLocalMap 提供了一种用键值对方式存储每一个线程的变量副本的方法,key为当前的ThreadLocal对象,value为对象线程的变量副本

public class MyThread implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            ThreadLocalVariableHolder.increment();
            System.out.println(Thread.currentThread().getName() + ":" + ThreadLocalVariableHolder.get());
            Thread.yield();
        }
    }
}

public class ThreadLocalVariableHolder {
    private static ThreadLocal<Integer> myThreadLocal = new ThreadLocal<Integer>() {
        // 初始值默认为null 设置初始值为0
        protected Integer initialValue() {
            return 0;
        }
    };

    public static void increment() {
        myThreadLocal.set(myThreadLocal.get() + 1);
    }

    public static int get(){
        return myThreadLocal.get();
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for(int i=0;i<5;i++){
            executorService.execute(new MyThread());
        }
        executorService.shutdown();
    }
}

执行结果

pool-1-thread-1:1
pool-1-thread-2:1
pool-1-thread-3:1
pool-1-thread-3:2
pool-1-thread-2:2
pool-1-thread-1:2
pool-1-thread-2:3
pool-1-thread-3:3
pool-1-thread-4:1
pool-1-thread-1:3
pool-1-thread-4:2
pool-1-thread-4:3
pool-1-thread-5:1
pool-1-thread-5:2
pool-1-thread-5:3

存在内存泄露问题,每次使用完ThreadLocal,都调用它的remove()方法,清除数据

ThreadLocalMap源码逻辑

作为数据真正存储的位置,ThreadLocalMap对于ThreadLocal还是相当重要的

// 初始容量,该容量必须是2的次幂
private static final int INITIAL_CAPACITY = 16;
// 存储数据的Entry数组
private Entry[] table;
// entry的数量
private int size = 0;
// 阈值
private int threshold; 

// Entry中key为ThreadLocal,value为值
// key是一个弱引用
// 每个线程在往ThreadLocal中放值的时候,都是放入线程本身的ThreadLocalMap中,key是ThreadLocal,从而实现了线程隔离
// 只要发生了垃圾回收,且该对象没有强引用存在的话,弱引用就会被回收
static class Entry extends WeakReference<ThreadLocal<?>> {
  /** The value associated with this ThreadLocal. */
  Object value;

  Entry(ThreadLocal<?> k, Object v) {
    super(k);
    value = v;
  }
}
构造函数
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
  // 初始化
    table = new Entry[INITIAL_CAPACITY];
  // 使用key的哈希值与上15 计算数组下标
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
  // 初始化该节点
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
  // 设置阈值
    setThreshold(INITIAL_CAPACITY);
}
重要方法
set方法

存储数据

private void set(ThreadLocal<?> key, Object value) {

    // We don't use a fast path as with get() because it is at
    // least as common to use set() to create new entries as
    // it is to replace existing ones, in which case, a fast
    // path would fail more often than not.

    Entry[] tab = table;
    int len = tab.length;
  // 计算数组下标
    int i = key.threadLocalHashCode & (len-1);
// 如果出现hash冲突,会采用线性探测,如果当前位置有值,则会获取下一个索引来进行存储,该结构是一个环形
  // 直到找到空闲位置为止
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) { // 槽位不为空,进行线性探测
        ThreadLocal<?> k = e.get();
   // key值与当前key值相等,直接进行覆盖
        if (k == key) {
            e.value = value;
            return;
        }
   // Entry不为null,但是key为null,当前位置的key已经为null(失效了),则进行替换过期数据
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
      // 该循环完成,说明既没有找到原本存储key的位置,也没有找到失效的位置
    }
// 执行到这里说明查找到了entry为null的位置
    tab[i] = new Entry(key, value);
    int sz = ++size;
  // 没有清除掉失效的槽位,并且当前数量已经达到了阈值,则进行扩容
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

// 获取下一个数组下标位置
 private static int nextIndex(int i, int len) {
   return ((i + 1 < len) ? i + 1 : 0);
 }

// 替换掉失效的值  staleSlot为失效的槽位
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot)
 
{
  Entry[] tab = table;
  int len = tab.length;
  Entry e;

  // Back up to check for prior stale entry in current run.
  // We clean out whole runs at a time to avoid continual
  // incremental rehashing due to garbage collector freeing
  // up refs in bunches (i.e., whenever the collector runs).
  int slotToExpunge = staleSlot;
  // 向前扫描,直到找到无效的槽位
  for (int i = prevIndex(staleSlot, len);
       (e = tab[i]) != null;
       i = prevIndex(i, len))
    // 找到无效的槽位,slotToExpunge复为当前位置
    if (e.get() == null)
      slotToExpunge = i;

  // Find either the key or trailing null slot of run, whichever
  // occurs first
  // 向后扫描,找到失效的槽位
  for (int i = nextIndex(staleSlot, len);
       (e = tab[i]) != null;
       i = nextIndex(i, len)) {
    ThreadLocal<?> k = e.get();

    // If we find key, then we need to swap it
    // with the stale entry to maintain hash table order.
    // The newly stale slot, or any other stale slot
    // encountered above it, can then be sent to expungeStaleEntry
    // to remove or rehash all of the other entries in run.
    // 找到key,替换掉无效的槽位中的值
    if (k == key) {
      e.value = value;

      tab[i] = tab[staleSlot];
      tab[staleSlot] = e;

      // Start expunge at preceding stale entry if it exists
      // 向前扫描时没有找到无效的槽位,将slotToExpunge设为当前位置
      if (slotToExpunge == staleSlot)
        slotToExpunge = i;
      // 从slotToExpunge开始清理槽位
      cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
      return;
    }

    // If we didn't find stale entry on backward scan, the
    // first stale entry seen while scanning for key is the
    // first still present in the run.
    // 如果当前的槽位已经无效,并且向前扫描时没有找到无效的槽位,将slotToExpunge设为当前位置
    if (k == null && slotToExpunge == staleSlot)
      slotToExpunge = i;
  }

  // 没有找到对应的key新增加一个entry
  // If key not found, put new entry in stale slot
  tab[staleSlot].value = null;
  tab[staleSlot] = new Entry(key, value);

  // If there are any other stale entries in run, expunge them
  // 扫描过程中存在无效的槽位,进行清理
  if (slotToExpunge != staleSlot)
    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

// 清理无效的槽位
private boolean cleanSomeSlots(int i, int n) {
  boolean removed = false;
  Entry[] tab = table;
  int len = tab.length;
  do {
    i = nextIndex(i, len);
    Entry e = tab[i];
    // key已失效
    if (e != null && e.get() == null) {
      n = len;
      removed = true;
      i = expungeStaleEntry(i);
    }
  } while ( (n >>>= 1) != 0);
  return removed;
}

private int expungeStaleEntry(int staleSlot) {
  Entry[] tab = table;
  int len = tab.length;

  // expunge entry at staleSlot
  // 去掉对value的引用
  tab[staleSlot].value = null;
  tab[staleSlot] = null;
  size--;

  // Rehash until we encounter null
  Entry e;
  int i;
  for (i = nextIndex(staleSlot, len);
       (e = tab[i]) != null;
       i = nextIndex(i, len)) {
    ThreadLocal<?> k = e.get();
    // key为null,去掉对value的引用
    if (k == null) {
      e.value = null;
      tab[i] = null;
      size--;
    } else {
      int h = k.threadLocalHashCode & (len - 1);
      if (h != i) {
        tab[i] = null;

        // Unlike Knuth 6.4 Algorithm R, we must scan until
        // null because multiple entries could have been stale.
        while (tab[h] != null)
          h = nextIndex(h, len);
        tab[h] = e;
      }
    }
  }
  return i;
}

// 进行扩容操作
private void rehash() {
  expungeStaleEntries();

  // Use lower threshold for doubling to avoid hysteresis
  if (size >= threshold - threshold / 4)
    resize();
}
// 扩容扩大2倍
private void resize() {
  Entry[] oldTab = table;
  int oldLen = oldTab.length;
  int newLen = oldLen * 2;
  Entry[] newTab = new Entry[newLen];
  int count = 0;

  for (int j = 0; j < oldLen; ++j) {
    Entry e = oldTab[j];
    if (e != null) {
      ThreadLocal<?> k = e.get();
      if (k == null) {
        e.value = null// Help the GC
      } else {
        int h = k.threadLocalHashCode & (newLen - 1);
        while (newTab[h] != null)
          h = nextIndex(h, newLen);
        newTab[h] = e;
        count++;
      }
    }
  }

  setThreshold(newLen);
  size = count;
  table = newTab;
}
getEntry方法

获取Entry值

private Entry getEntry(ThreadLocal<?> key) {
  // 计算数组下标
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
        return e;
    else
      // 没有命中
      // 在存放数据的时候采用的是开方定址法,所以可能存在当前key的散列值和元素所在索引并不完全对应的情况
        return getEntryAfterMiss(key, i, e);
}

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
  Entry[] tab = table;
  int len = tab.length;

  while (e != null) {
    ThreadLocal<?> k = e.get();
    if (k == key)
      return e;
    if (k == null// key等于null,说明该key已经失效了,进行回收,可以有效的避免内存泄漏
      expungeStaleEntry(i);
    else
      i = nextIndex(i, len);
    e = tab[i];
  }
  return null;
}
remove方法
private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
  // 计算数组下标
    int i = key.threadLocalHashCode & (len-1);
  
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
      // 找到对应的key
        if (e.get() == key) {
          // 清除对ThreadLocal的弱引用
            e.clear();
          // 清理key为null的元素
            expungeStaleEntry(i);
            return;
        }
    }
}

InheritableThreadLocal类解决ThreadLocal继承性问题

InheritableThreadLocal是ThreadLocal的子类,该类提供了一个特性,可以让子线程访问在父线程中设置的本地变量

public class InheritableThreadLocal<Textends ThreadLocal<T{
    
    protected T childValue(T parentValue) {
        return parentValue;
    }

    // 获取ThreadLocalMap时获取的是该线程中的inheritableThreadLocals变量
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

    // 创建ThreadLocalMap时,使用的是inheritableThreadLocals变量而不是threadLocals变量
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

如果一个线程是从其他某个线程中创建的,这个类将提供继承的值,如果一个线程A在线程局部变量中已有值,那么当线程A创建其他某个线程B时,线程B的线程局部变量将跟线程A是一样的,可以重写childValue()方法,该方法用来初始化子线程在线程局部变量中的值,使用父线程在线程局部变量中的值作为传入参数

// Thread实例化时的初始化过程
private void init(ThreadGroup g, Runnable target, String name,
                  long stackSize, AccessControlContext acc,
                  boolean inheritThreadLocals)
 
{
    // 省略无关代码
  
   // 当前线程
    Thread parent = currentThread();
    
    // inheritThreadLocals为true 且 父线程的inheritableThreadLocals不为null
    if (inheritThreadLocals && parent.inheritableThreadLocals != null)
       // 则设置子线程的inheritableThreadLocals,值为父线程的inheritableThreadLocals的值复制而来
        this.inheritableThreadLocals =
            ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
    
}

javaweb中就是使用这种方式来使得子线程可以获取请求信息的

RequestContextHolder.setRequestAttributes(requestAttributes,true);

内存泄漏问题

// ThreadLocal中的Entry
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}
你还不知道ThreadLocal线程本地存储吗
ThreadLocal内存泄漏

在ThreadLocal.ThreadLocalMap中的key为ThreadLocal对象实例,这个Map中的key是一个弱引用,当把ThreadLocal对象实例置为null后,没有任何强引用指向ThreadLocal对象实例,所以ThreadLocal对象实例会被gc回收,但是Map中的value却不会被回收(此时entry是一个key为null,但是value不为null),因为存在一条从当前thread连接过来的强引用,只有当前thread结束之后,当前thread的强引用才会断开,此时Map中的value才会被gc回收。

但是在使用线程池的时候,由于线程是重复利用的,不会被回收,所以就可能出现内存泄漏

当然JDK的设计中也有考虑到这点,所以在get()、set()、remove()中会扫描key为null的Entry,将对应的value也设置为null,这样value就会被回收

所以当使用完之后,需要调用ThreadLocal对象的remove()方法来删除掉

https://zhhll.icu/2020/多线程/基础/9.ThreadLocal/


原文始发于微信公众号(bug生产基地):你还不知道ThreadLocal线程本地存储吗

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/237888.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!