Java并发编程——ThreadLocal详解

导读:本篇文章讲解 Java并发编程——ThreadLocal详解,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

引出

错误代码:

public class ThreadLocalExample {
    //希望每一个线程获得的num都是0
    private static int num = 0;
    public static void main(String[] args) {
        Thread[] threads = new Thread[5];
        for (int i = 0; i < 5; i++) {
            threads[i] = new Thread(() -> {
                num += 5;
                System.out.println(Thread.currentThread().getName() + ":num:" + num);
            });
        }

        for (int i = 0; i < 5; i++) {
            threads[i].start();
        }
    }

}

输出结果:

Thread-0:num:10
Thread-4:num:20
Thread-2:num:15
Thread-1:num:10
Thread-3:num:25

我们定义了一个共享变量num,开启多个线程访问该变量,并对变量进行修改。根据输出结果我们看到出现了线程不安全的情况。(该情况可能需要多运行几次,或者提高线程数量)

针对这种变量是同一个,但是每个线程都使用同一个初始值的情况,就需要引出ThreadLocal了。

概念

多线程访问同一个共享变量的时候容易出现并发问题,特别是多个线程对一个变量进行写入的时候,为了保证线程安全,一般使用者在访问共享变量的时候需要进行额外的同步措施才能保证线程安全性。ThreadLocal是除了加锁这种同步方式之外的一种保证一种规避多线程访问出现线程不安全的方法,当我们在创建一个变量后,如果每个线程对其进行访问的时候访问的都是线程自己的变量这样就不会存在线程不安全问题。

ThreadLocal是JDK包提供的,它提供线程本地变量,如果创建一乐ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的一个副本,在实际多线程操作的时候,操作的是自己本地内存中的变量,从而规避了线程安全问题,如下图所示:

image-20211112170433423

基本使用

代码修改

针对上文提出的错误代码,我们使用ThreadLocal对其进行改进。

我们首先定义ThreadLocal,有如下两种方式设置初始值。然后通过set方法和get方法设置值以及获取值。

public class ThreadLocalExample {
    //static ThreadLocal<Integer> local = ThreadLocal.withInitial(() -> 0);
    static ThreadLocal<Integer> local = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            return 0;
        }
    };

    public static void main(String[] args) {
        Thread[] thread = new Thread[5];
        
        for (int i = 0; i < 5; i++) {
            thread[i] = new Thread(() -> {
                //得到初始值
                int num = local.get().intValue();
                //设置
                local.set(num + 5);
                System.out.println(Thread.currentThread().getName() + " " + local.get());
            });
        }
        
        for (int i = 0; i < 5; i++) {
            thread[i].start();
        }
    }
}

输出结果:

Thread-0 5
Thread-3 5
Thread-2 5
Thread-1 5
Thread-4 5

从输出结果我们看出,这次并没有产生线程安全的问题。

新案例

我们可以重新设置一个案例来复现线程间共享变量出现的问题。

SimpleDateFormat是一个线程不安全的类,我们通过以下代码来复线问题。

创建多个线程进行时间的格式化。

public class ThreadLocalDemo {
    //非线程安全的
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static Date parse(String strDate) throws ParseException {
        return sdf.parse(strDate);
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(parse("2021-11-30 20:12:20"));
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            });

        }
    }
}

运行之后我们可以发现输出结果中夹杂着错误异常。

image-20211112172931413

我们可以在源码中发现问题:

image-20211112173251434

由上图可见,在DateFormat中存在一个numberFormat的共享变量。所以多个线程之间就会出现线程不安全的问题。

解决这种我们可以通过ThreadLocal的方式也可以通过加锁的方式。

加锁

我们只需要在格式化的方法上添加synchronized关键字即可。

public synchronized static Date parse(String strDate) throws ParseException {
    return sdf.parse(strDate);
}

ThreadLocal

public class ThreadLocalDemo {
    //非线程安全的
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    //创建ThreadLocal
    private static ThreadLocal<DateFormat> dateFormatThreadLocal = new ThreadLocal<>();

    private static DateFormat getDateFormat() {
        DateFormat dateFormat = dateFormatThreadLocal.get(); //从当前线程的范围内获得一个DateFormat
        if (dateFormat == null) {
            dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //Thread.currentThread();当前线程
            dateFormatThreadLocal.set(dateFormat); //要在当前线程的范围内设置一个simpleDateFormat对象.
        }
        return dateFormat;
    }

    public synchronized static Date parse(String strDate) throws ParseException {
        return getDateFormat().parse(strDate);
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(parse("2021-11-30 20:12:20"));
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            });

        }
    }
}

基本方法

  • set()

    当前线程范围内,设置一个值存储到ThreadLocal中,这个值仅对当前线程可见。

    相当于在当前线程范围内建立了副本。

  • get()

    当前线程范围内取出set方法设置的值.

  • remove()

    移除当前线程中存储的值

  • withInitial

    java8中的初始化方法

源码分析

首先查看赋值操作的源码

public void set(T value) { 
    Thread t = Thread.currentThread(); 
    // 如果当前线程已经初始化了map。
    // 如果没有初始化,则进行初始化。 
    ThreadLocalMap map = getMap(t); 
    if (map != null) 
        //修改value 
        map.set(this, value); 
    else 
        //初始化 
        createMap(t, value); 
}

初始化构造方法:

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    table = new Entry[INITIAL_CAPACITY]; //默认长度为16的数组
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); //计算数组下标
    table[i] = new Entry(firstKey, firstValue); //把key/value存储到i的位置.
    size = 1;
    setThreshold(INITIAL_CAPACITY);
}

赋值:

private void set(ThreadLocal<?> key, Object value) {

    // We don't use a fast path as with get() because it is at
    // least as common to use set() to create new entries as
    // it is to replace existing ones, in which case, a fast
    // path would fail more often than not.

    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);  //计算数组下标()

    //线性探索.()
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();

        // i的位置已经存在了值, 就直接替换
        if (k == key) {
            e.value = value;
            return;
        }

        //如果key==null,则进行replaceStaleEntry(替换空余的数组)
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }

    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}
  • 把当前的value保存到entry数组中

  • 清理无效的key

private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                               int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;

    // Back up to check for prior stale entry in current run.
    // We clean out whole runs at a time to avoid continual
    // incremental rehashing due to garbage collector freeing
    // up refs in bunches (i.e., whenever the collector runs).
    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;

    // Find either the key or trailing null slot of run, whichever
    // occurs first
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();

        // If we find key, then we need to swap it
        // with the stale entry to maintain hash table order.
        // The newly stale slot, or any other stale slot
        // encountered above it, can then be sent to expungeStaleEntry
        // to remove or rehash all of the other entries in run.
        if (k == key) {
            e.value = value;

            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;

            // Start expunge at preceding stale entry if it exists
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }

        // If we didn't find stale entry on backward scan, the
        // first stale entry seen while scanning for key is the
        // first still present in the run.
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }

    // If key not found, put new entry in stale slot
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    // If there are any other stale entries in run, expunge them
    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
  • 如果当前值对应的entry数组中key为null,那么该方法会向前查找到还存在key失效的entry,进行清理。

  • 通过线性探索的方式,解决hash冲突的问题。

内存泄漏

通过上面的分析,我们知道 expungeStaleEntry() 方法是帮助垃圾回收的,根据源码,我们可以发现get 和set 方法都可能触发清理方法 expungeStaleEntry() ,所以正常情况下是不会有内存溢出的 但是如果我们没有调用get 和set 的时候就会可能面临着内存溢出,养成好习惯不再使用的时候调用remove(),加快垃圾回收,避免内存溢出。

退一步说,就算我们没有调用get 和set 和remove 方法,线程结束的时候,也就没有强引用再指向ThreadLocal 中的ThreadLocalMap了,这样ThreadLocalMap 和里面的元素也会被回收掉,但是有一种危险是,如果线程是线程池的, 在线程执行完代码的时候并没有结束,只是归还给线程池,这个时候ThreadLocalMap 和里面的元素是不会回收掉的。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/16824.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!