一、概述及目录
多线程对于我们后端日常开发是一种加快程序处理的常用的方式,同时也是面试过程中常见的一个话题,本文一部分属于基础扫盲内容,另一部分属于高阶的部分,同时对于一些细节点有相应的实战。
二、实现多线程的方式
2.1 继承Tread类,重写run方法。start方法
public class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("MyThread线程方法执行" + i);
}
}
public static void main(String[] args) {
MyThread myThread = new MyThread();
myThread.start();
}
}
2.2 实现Runnable方法,并实现run接口方法
public class MyRunnable implements Runnable{
@Override
public void run() {
for (int i=0;i<10;i++){
System.out.println("线程方法执行"+i);
}
}
public static void main(String[] args) {
//创建了一个参数对象
MyRunnable myRunnable = new MyRunnable();
//创建了一个线程对象,并把参数传递给这个线程
//在线程启动后,执行的就是参数里面的run方法
Thread thread = new Thread(myRunnable);
//run方法
thread.start();
}
}
2.3 实现Callable接口重写call方法,Feature.get()获取返回值
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
for (int i=0;i<100;i++){
System.out.println("MyCallable运行次数"+i);
}
//返回值就是表示线程运行之后的结果
return "你好";
}
public static void main(String[] args) {
//线程开启之后需要执行里面的call方法
MyCallable myCallable = new MyCallable();
//可以获取线程执行完毕之后的结果,也可以作为参数传递给Thread对象
FutureTask<String> futureTask = new FutureTask<String>(myCallable);
//创建线程对象
Thread thread = new Thread(futureTask);
//开启线程
thread.start();
try {
//获取返回结果
String s = futureTask.get();
System.out.println(s);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
三、线程的执行流程
3.1 执行流程
线程状态从大的角度来说,可分为为:初始状态、可运行状态、不可运行状态和消亡状态,具体可细分为上图所示7个状态,说明如下:
1)线程的实现有三种方式,一是继承Thread类,二是实现Runnable接口,第三种就是实现Callable接口但不管怎样,当我们new了Thread实例后,线程就进入了初始状态;
2)当该对象调用了start()方法,就进入可运行状态; 3)进入可运行状态后,当该对象被操作系统选中,获得CPU时间片就会进入运行状态;
4)进入运行状态后涉及的情况就比较多,大致有如下情形: ﹒run()方法或main()方法结束后,线程就进入终止状态;
当线程调用了自身的sleep()方法或其他线程的join()方法,就会进入阻塞状态(该状态虽停止当前线程,但并不释放所占有的资源)。当sleep()结束或join()结束后,该线程进入可运行状态,继续等待OS分配时间片;
当线程刚进入可运行状态(注意,还没运行),发现将要调用的资源被锁住(synchroniza,lock),将会立即进入锁池状态,等待获取锁标记(这时的锁池里也许已经有了其他线程在等待获取锁标记,这时它们处于队列状态,既先到先得),一旦线程获得锁标记后,就转入可运行状态,等待OS分配CPU时间片;
当线程调用wait()方法后会进入等待队列(进入这个状态会释放所占有的所有资源,与阻塞状态不同),进入这个状态后,是不能自动唤醒的,必须依靠其他线程调用notify()或notifyAll()方法才能被唤醒(由于notify()只是唤醒一个线程,但我们由于不能确定具体唤醒的是哪一个线程,也许我们需要唤醒的线程不能够被唤醒,因此在实际使用时,一般都用notifyAll()方法,唤醒有所线程),线程被唤醒后会进入锁池,等待获取锁标记。
当线程调用stop方法,即可使线程进入消亡状态,但是由于stop方法是不安全的,不鼓励使用,大家可以通过run方法里的条件变通实现线程的stop。
3.2 start方法和 run方法的区别?
- start方法来启动一个线程,此时该线程处于就绪状态,而非运行状态,这时候就可以被JVM来调度,调度过程中JVM通过调用run方法来完成实际操作.
- run方法只是一个普通的函数调用,程序中依然只有主线程这个一个线程,,也就是说strat方法能够异步的调用run方法,但是直接调用run方法确实同步的,因此无法达到多线程的目的
四、控制线程执行顺序
[1] 使用线程的join方法:join():是Theard的方法,作用是调用线程需等待该join()线程执行完成后,才能继续用下运行
[2] 使用主线程的join方法
[3] 使用线程的wait方法,notify方法
[4] 使用线程的线程池方法
[5] 使用线程的Condition(条件变量)方法
[6] 使用线程的CountDownLatch(倒计数)方法
[7] 使用线程的CyclicBarrier(回环栅栏)方法 [8] 使用线程的Semaphore(信号量)方法
五、线程中断的方式
5.1 Stop 方法
源码中已经废弃了:
@Deprecated
public final void stop() {
……
throw new UnsupportedOperationException();
}
该方式是通过抛出ThreadDeath异常来达到停止线程的目的,因此异常抛出可能发生在程序的任何一个地方;由于抛出ThreadDeath异常,会导致该线程释放所持有的所有锁,而这种释放时间是不可控制的,可能会导致线程安全问题和数据不一样的情况,如在同步代码块中执行数据更新操作时线程突然被停止。
- 释放锁定的所有监视资源。
- stop方法会导致代码逻辑不完整,他收到停止命令后,会立即停止。
5.2 interrupt方法
原理: 对于 Java 而言,最正确的停止线程的方式是使用 interrupt。但 interrupt仅仅起到通知被停止线程的作用。而对于被停止的线程而言,它拥有完全的自主权,它既可以选择立即停止,也可以选择一段时间后停止,也可以选择压根不停止。
可中断的阻塞: 针对线程处于由sleep, wait, join,LockSupport.park等方法调用产生的阻塞状态时,调用interrupt方法,会抛出异常InterruptedException,同时会清除中断标记位,自动改为false。
5.2.1 实战
注意点:interrupted方法内部return的是currentThread而不是调用方法的线程
5.3 volatile通过标识位,停止线程,有的能够停止,有的却不能够???
package com.yyp.dream.juejin;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 测试 volatile 在生产消费中无效 案例
* @Author xiaomayi
*/
public class XiaoMaYiVolatileStopThread {
// 1. 声明了一个生产者 Producer,通过 volatile 标记的初始值为 false 的布尔值 canceled 来停止线程。
// 2. 在 run() 方法中,while 的判断语句是 num 是否小于 100000 及 canceled 是否被标记。
// 3. while 循环体中判断 num 如果是 50 的倍数就放到 storage 仓库中,storage 是生产者与消费者之间进行通信的存储器,当 num 大于 100000 或被通知停止时,会跳出 while 循环并执行 finally 语句块,告诉大家“生产者结束运行”。
static class Producer implements Runnable {
public volatile boolean canceled = false;
BlockingQueue storage;
public Producer(BlockingQueue storage) {
this.storage = storage;
}
@Override
public void run() {
int num = 0;
try {
while (num <= 100000 && !canceled) {
if (num % 100 == 0) {
storage.put(num);
System.out.println(num + "是100的倍数,被放到仓库中了。");
}
num++;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("生产者结束运行");
}
}
}
//对于消费者 Consumer,它与生产者共用同一个仓库 storage,并且在方法内通过 needMoreNums() 方法判断是否需要继续使用更多的数字
//刚才生产者生产了一些 100 的倍数供消费者使用,消费者是否继续使用数字的判断条件是产生一个随机数并与 0.88 进行比较,大于 0.88 就不再继续使用数字.
static class Consumer {
BlockingQueue storage;
public Consumer(BlockingQueue storage) {
this.storage = storage;
}
public boolean needMoreNums() {
if (Math.random() > 0.88) {
return false;
}
return true;
}
}
// main 函数,首先创建了生产者/消费者共用的仓库 BlockingQueue storage,仓库容量是 8.
//并且建立生产者并将生产者放入线程后启动线程,启动后进行 500 毫秒的休眠.
//休眠时间保障生产者有足够的时间把仓库塞满,而仓库达到容量后就不会再继续往里塞,这时生产者会阻塞,500 毫秒后消费者也被创建出来,并判断是否需要使用更多的数字,然后每次消费后休眠 100 毫秒,这样的业务逻辑是有可能出现在实际生产中的。
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue storage = new ArrayBlockingQueue(8);
Producer producer = new Producer(storage);
Thread producerThread = new Thread(producer);
producerThread.start();
Thread.sleep(500);
Consumer consumer = new Consumer(storage);
while (consumer.needMoreNums()) {
System.out.println(consumer.storage.take() + "被消费了");
Thread.sleep(100);
}
System.out.println("消费者不需要更多数据了。");
//一旦消费不需要更多数据了,我们应该让生产者也停下来,但是实际情况却停不下来
producer.canceled = true;
System.out.println("producer.canceled :"+ producer.canceled);
}
}
运行结果如下:
可以发现已经打印出“ 消费者不需要更多数据了”,并且“producer.canceled :true” ,但是生产者却没有停止!!!
原因: 因为生产者生生产速度过快,所以大多数情况下,生产者是属于阻塞的状态,也就是会停留在 storage.put(num)这个地方。此时就需要等待消费者 storage.take(num)才会向下执行。因此中断线程在把producer.canceled 设置为true时,此时我们的目的是吧线程中断,但是最终并不是像我们所期望的那样,生产者阻塞在storage.put(num)这步 ,消费者也不能继续消费,导致程序无法执行下去,而我们要中断线程是要进入到下一次的while 循环判断,producer.canceled =true 才能跳出循环,最终执行finall中的 “生产者结束运行”,才能结束。所以长期阻塞的情况下volatile设置标记位的方法,不能中断线程的运行。
六、线程停止的几种方式
6.1 shutdown:
调用 shutdown() 方法之后线程池并不是立刻就被关闭,因为这时线程池中可能还有很多任务正在被执行,或是任务队列中有大量正在等待被执行的任务,调用 shutdown() 方法后线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。但这并不代表 shutdown() 操作是没有任何效果的,调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。
6.2 isShutdown:
它可以返回 true 或者 false 来判断线程池是否已经开始了关闭工作,也就是是否执行了 shutdown 或者 shutdownNow 方法。这里需要注意,如果调用 isShutdown() 方法的返回的结果为 true 并不代表线程池此时已经彻底关闭了,这仅仅代表线程池开始了关闭的流程,也就是说,此时可能线程池中依然有线程在执行任务,队列里也可能有等待被执行的任务。
6.3 isTerminated:
这个方法可以检测线程池是否真正“终结”了,这不仅代表线程池已关闭,同时代表线程池中的所有任务都已经都执行完毕了,因为我们刚才说过,调用 shutdown 方法之后,线程池会继续执行里面未完成的任务,不仅包括线程正在执行的任务,还包括正在任务队列中等待的任务。比如此时已经调用了 shutdown 方法,但是有一个线程依然在执行任务,那么此时调用 isShutdown 方法返回的是 true ,而调用 isTerminated 方法返回的便是 false ,因为线程池中还有任务正在在被执行,线程池并没有真正“终结”。直到所有任务都执行完毕了,调用 isTerminated() 方法才会返回 true,这表示线程池已关闭并且线程池内部是空的,所有剩余的任务都执行完毕了。
6.4 awaitTermination
第四个方法叫作 awaitTermination(),它本身并不是用来关闭线程池的,而是主要用来判断线程池状态的。比如我们给 awaitTermination 方法传入的参数是 10 秒,那么它就会陷入 10 秒钟的等待,直到发生以下三种情况之一:
等待期间(包括进入等待状态之前)线程池已关闭并目所有已提交的任务(包括正在执行的和队列中等待的都执行完毕,相当于线程池已经“终结”了,方法便会返回true
等待超时时间到后,第一种线程池“终结”的情况始终未发生,方法返回 false
等待期间线程被中断,方法会抛出 Interruptedexception异常
等待期间(包括进入等待状态之前)线程池已关闭并且所有已提交的任务(包括正在执行的和队列中等待的)都执行完毕,相当于线程池已经“终结”了,方法便会返回 true;
等待超时时间到后,第一种线程池“终结”的情况始终未发生,方法返回 false; 等待期间线程被中断,方法会抛出 InterruptedException 异常。
6.5 shutdownNow:
shutdownNow则是将线程池的状态设置为STOP,正在执行的任务则被停止,没被执行任务的则返回
七、ThreadLocal相关
7.1 作用:
ThreadLocal是解决线程安全问题一个很好的思路,它通过为每个线程提供一个独立的变量副本解决了变量并发访问的冲突问题。在很多情况下,ThreadLocal比直接使用synchronized同步机制解决线程安全问题更简单,更方便,且结果程序拥有更高的并发性。
7.2 应用场景
在Java的多线程编程中,为保证多个线程对共享变量的安全访问,通常会使用synchronized来保证同一时刻只有一个线程对共享变量进行操作。这种情况下可以将类变量放到ThreadLocal类型的对象中,使变量在每个线程中都有独立拷贝,不会出现一个线程读取变量时而被另一个线程修改的现象。最常见的ThreadLocal使用场景为用来解决数据库连接、Session管理等。
7.3 内存泄漏问题:
最主要的原因在于它的内部类ThreadLocalMap中的Entry的设计。Entry继承了WeakReference<ThreadLocal<>>,即Entry的key是弱引用,所以key’会在垃圾回收的时候被回收掉, 而key对应的value则不会被回收, 这样会导致一种现象:key为null,value有值。
key为空的话value是无效数据,久而久之,value累加就会导致内存泄漏。
Entry的key被设计为弱引用就是为了让程序自动的对访问不到的数据进行回收提醒,所以,在访问不到的数据被回收之前,内存泄漏确实是存在的,但是我们不用担心,就算我们不调用remove,ThreadLocalMap在内部的set,get和扩容时都会清理掉泄漏的Entry,内存泄漏完全没必要过于担心。
7.4 弱引用导致内存泄漏,那为什么key不设置为强引用?
如果key设置为强引用, 当threadLocal实例释放后, threadLocal=null, 但是threadLocal会有强引用指向threadLocalMap,threadLocalMap.Entry又强引用threadLocal, 这样会导致threadLocal不能正常被GC回收。
弱引用虽然会引起内存泄漏, 但是也有set、get、remove方法操作对null key进行擦除的补救措施, 方案上略胜一筹。
上图中,实线代表强引用,虚线代表的是弱引用,如果threadLocal外部强引用被置为null(threadLocalInstance=null)的话,threadLocal实例就没有一条引用链路可达,很显然在gc(垃圾回收)的时候势必会被回收,因此entry就存在key为null的情况,无法通过一个Key为null去访问到该entry的value。同时,就存在了这样一条引用链:threadRef->currentThread->threadLocalMap->entry->valueRef->valueMemory,导致在垃圾回收的时候进行可达性分析的时候,value可达从而不会被回收掉,但是该value永远不能被访问到,这样就存在了内存泄漏。当然,如果线程执行结束后,threadLocal,threadRef会断掉,因此threadLocal,threadLocalMap,entry都会被回收掉。可是,在实际使用中我们都是会用线程池去维护我们的线程,比如在Executors.newFixedThreadPool()时创建线程的时候,为了复用线程是不会结束的,所以threadLocal内存泄漏就值得我们关注。
7.5 InheritableThreadLocal
父子线程之间值传递:在子线程中,可以获取到父线程的InheritableThreadLocal类型变量的值,而不能获取到ThreadLocal类型变量的值
八、线程池
8.1 为什么要用线程池?
管理一组工作线程。通过线程池复用线程有以下几点优点:
- 减少资源创建 => 减少内存开销,创建线程占用内存
- 降低系统开销 => 创建线程需要时间,会延迟处理的请求
- 提高稳定稳定性 => 避免无限创建线程引起的OutOfMemoryError
8.2 创建方式:
看阿里巴巴开发手册并发编程这块有一条:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式。
Q:为什么禁止使用Executors去创建线程池,而是推荐自己去创建ThreadPoolExecutor的原因?
A:FixedThreadPool和SingleThreadExecutor => 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而引起OOM异常
CachedThreadPool => 允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而引起OOM异常。
8.3 执行逻辑
判断核心线程数是否已满,核心线程数大小和corePoolSize参数有关,未满则创建线程执行任务
若核心线程池已满,判断队列是否满,队列是否满和workQueue参数有关,若未满则加入队列中
若队列已满,判断线程池是否已满,线程池是否已满和maximumPoolSize参数有关,若未满创建线程执行任务若线程池已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和handler参数有关。
8.4 如何定义线程池参数?
CPU密集型 => 线程池的大小推荐为CPU数量 + 1,CPU数量可以根据Runtime.availableProcessors方法获取
IO密集型 => CPU数量 * CPU利用率 * (1 + 线程等待时间/线程CPU时间)
混合型 => 将任务分为CPU密集型和IO密集型,然后分别使用不同的线程池去处理,从而使每个线程池可以根据各自的工作负载来调整
阻塞队列 => 推荐使用有界队列,有界队列有助于避免资源耗尽的情况发生
拒绝策略 => 默认采用的是AbortPolicy拒绝策略,直接在程序中抛出RejectedExecutionException异常【因为是运行时异常,不强制catch】,这种处理方式不够优雅。处理拒绝策略有以下几种比较推荐:
在程序中捕获RejectedExecutionException异常,在捕获异常中对任务进行处理。针对默认拒绝策略
使用CallerRunsPolicy拒绝策略,该策略会将任务交给调用execute的线程执行【一般为主线程】,此时主线程将在一段时间内不能提交任何任务,从而使工作线程处理正在执行的任务。此时提交的线程将被保存在TCP队列中,TCP队列满将会影响客户端,这是一种平缓的性能降低
自定义拒绝策略,只需要实现RejectedExecutionHandler接口即可
如果任务不是特别重要,使用DiscardPolicy和DiscardOldestPolicy拒绝策略将任务丢弃也是可以的。
8.5 拒绝策略
线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
ThreadPoolExecutor.AbortPolicy
抛出java.util.concurrent.RejectedExecutionException异常。
ThreadPoolExecutor.CallerRunsPolicy
用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。
ThreadPoolExecutor.DiscardOldestPolicy
丢弃任务队列中最旧任务。
ThreadPoolExecutor.DiscardPolicy
丢弃当前将要加入队列的任务。
8.6 线程池队列
- ArrayBlockingQueue:基于数组的阻塞队列实现。生产者放入数据和消费者获取数据,共用同一个锁对象。默认采用非公平锁。
- LinkedBlockingQueue:基于链表的阻塞队列。生产者端和消费者端分别采用了独立的锁来控制数据同步,并发性能较好。需要注意的是,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE)。
- PriorityBlockingQueue: 基于优先级的阻塞无界队列(优先级的判断通过构造函数传入的Compator对象来决定,但不保证同优先级元素顺序)。不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。内部控制线程同步的锁采用的是公平锁。
- SynchronousQueue:无缓冲的等待队列。每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都要等待另一个线程的插入操作。由于队列没有容量,所以不能调用peek操作(返回队列头元素)。
- DelayQueue:支持延时获取元素的无界阻塞队列。队列中每个元素必须实现Delayed接口。插入数据的操作(生产者)永远不会被阻塞,只有获取数据的操作(消费者)才会被阻塞。
8.7 线程池怎么做到线程复用?
take & poll
我们说take()方法会将核心线程阻塞挂起,这样一来它就不会占用太多的cpu资源,直到拿到Runnable 然后返回。
如果allowCoreThreadTimeOut设置为true,那么核心线程就会去调用poll方法,因为poll可能会返回null,所以这时候核心线程满足超时条件也会被销毁
非核心线程会workQueue.
8.8 提供一个多线程处理任务方法,并且可以控制并发量:
public class TaskUtils {
private static final int corePoolSize = Runtime.getRuntime().availableProcessors();//CPU核心数
private static final int maximumPoolSize = corePoolSize * 2;
private static long keepAliveTime = 1;
private static TimeUnit keepAliveTimeUnit = TimeUnit.MINUTES;
private static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(1024);
private static RejectedExecutionHandler rejectedHandler = new ThreadPoolExecutor.CallerRunsPolicy();
/**
* corePoolSize : 线程池核心线程数,最好默认CPU核心数
* maximumPoolSize : 线程池最大线程数,最好是核心线程数的两倍,太多会引起线程切换
* keepAliveTime : 大于核心线程数的空闲线程存活时间
* keepAliveTimeUnit : 空闲线程存活时间的单位(秒、分钟、小时等等)
* workQueue : 线程池有界队列,新任务没有可用线程处理时会把任务放到该队列中,等待被处理
* rejectedHandler : 拒绝处理策略,默认直接丢弃并抛出异常-AbortPolicy,调用者线程直接处理-CallerRunsPolicy
*/
private static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
keepAliveTimeUnit,
workQueue,
rejectedHandler
);
public static <T> void startWithMultiThread(List<T> list, int nThread, Consumer<T> func, String methodName) {
if (CollectionUtils.isEmpty(list)) {
return;
}
if (nThread <= 0) {
return;
}
if (func == null) {
return;
}
if (CollectionUtils.isEmpty(list)) {
return;
}
HashMap<String, String> map = new HashMap();
map.put("jobName", methodName);
Semaphore semaphore = new Semaphore(nThread);
// ExecutorService executorService = Executors.newFixedThreadPool(nThread);
for (T obj : list) {
try {
semaphore.acquire();
poolExecutor.execute(() -> {
try {
func.accept(obj);
} catch (Exception ex) {
// logger.Error("执行出错", ex);
} finally {
semaphore.release();
}
});
} catch (InterruptedException e) {
// logger.Error("startWithMultiThread_调度出错", e);
}
}
try {
poolExecutor.shutdown();
poolExecutor.awaitTermination(1, TimeUnit.SECONDS);
} catch (Exception ex) {
// logger.Error("线程池关闭出错", ex);
}
}
}
九、总结
使用多线程要看我们的实际情况,怎么在机器状态稳定的情况下使用多线程这是一个推敲实践的过程,有时候机器配置一样,但是有的业务能在保证机器各项指标正常的情况是开的线程多一点,但是有的就不行,这个还是和业务复杂度相关的。同事博主还在工作中遇到过使用多线程池的场景,但是这个想法也是在一个开源组件源码中得到的启发【rocketmq】,主要场景就是公共资源怎么不通渠道公平处理,所以每个渠道单独开一个线程池使用公共资源。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/132153.html