阻塞队列
阻塞队列存在的意义:
当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?使用阻塞队列。很显然,使用阻塞队列既避免了线程池内部自己实现阻塞—唤醒机制的麻烦,也避免了睡眠—轮询带来的资源消耗和延迟。阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。
阻塞队列原理图:当队列是空的,从队列中获取元素的操作将会被阻塞
当队列是满的,从队列中添加元素的操作将会被阻塞
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
BlockingQueue的核心方法:
- 抛出异常: 当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementEx·ception异常 。
- 返回特殊值: 插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null,即无法向一个 BlockingQueue 中插入 null。
- 一直阻塞: 当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
- 超时退出: 当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
抛出异常 与 返回特殊值 方法的实现是一样的,只不过对失败的操作的处理不一样!通过 AbstractQueue 的源码可以发现,add(e),remove(),element() 都是分别基于 offer(),poll(),peek() 实现的
BlockingQueue的实现类
1.ArrayBlockingQueue(常用)
2.LinkedBlockingQueue(常用)
3.DelayQueue
4.PriorityBlockingQueue
5.SynchronousQueue
6.LinkedTransferQueue
7.LinkedBlockingQueue
小结
1.前两个常用:数组阻塞队列和链表阻塞队列。
2.在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
阻塞队列的最常使用的例子就是生产者消费者模式,也是各种实现生产者消费者模式方式中首选的方式。使用者不用关心什么阻塞生产,什么时候阻塞消费,使用非常方便;
好的博客:
阻塞队列(BlockingQueue)的实现原理_CSDN博客_阻塞队列的实现原理
双端阻塞队列(BlockingDeque)
concurrent包下还提供双端阻塞队列(BlockingDeque),和BlockingQueue是类似的,只不过BlockingDeque提供从任意一端插入或者抽取元素的队列。
Condition类解析
阻塞队列的实现原理
通过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现
private final Condition notEmpty; private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
Condition
Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒
Condition是在java1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。
Condition是个接口,两个最重要的方法,一个是 await,一个是 signal 方法
await:把当前线程阻塞挂起
signal:唤醒阻塞的线程
Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用。
Conditon中的await()对应Object的wait();Condition中的signal()对应Object的notify();
Condition中的signalAll()对应Object的notifyAll()。
Object对象的wait和notify
在进行线程间的通信时,可以用基于Object对象的wait和notify方法实现等待/通知机制
注意点:
1.wait()和notify()方法是属于Object类的,java中的对象都带有这两个方法
2.要使用wait和notify,必须首先对”对象”进行synchronized加锁
基于Object实现等待/通知机制的相关方法
wait的终止条件:
a .被通知唤醒了
b .线程被中止了(异常)
c .假唤醒
d .超时时间到达
notify唤醒的规则:
1.随机唤醒:
T1:o.wait(); T2:o.wait(); T3:o.wait(); o.notify();//唤醒了:T1 or T2 or T3
2.wait – notify(等待 – 通知)是没有状态保存的;换言之,先notify后wait,wait无法感知到之前曾经有过notify,所以会永远等待下去
wait和notify总结:
1. wait的时候会释放锁(只会释放wait这个锁)
2. notify唤醒是随机的 notifyAll全唤醒
3. 先notify后wait没有用;wait无法感知到之前曾经有过notify,所以会永远等待下去
wait()方法被执行后,锁被自动释放,但notify()方法被执行后,锁却不自动释放,必须执行完notify()方法所在的同步synchronized代码块后才释放锁
在AQS相关类中实现这种等待/通知机制:Condition;一个Condition就代表一个条件队列
Condition源码分析
条件变量Condition,功能和wait-notify可以说”一模一样”
1. Condition的执行方式,是当在线程Consumer中调用await方法后,线程Consumer将释放锁,并且将自己沉睡,等待唤醒,线程Producer获取到锁后,开始做事,完毕后,调用Condition的signalall方法,唤醒线程Consumer,线程Consumer恢复执行。
以上说明Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被带调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。
2. 调用 Condition,需要获得 Lock 锁,所以意味着会存在一个 AQS 同步队列,假如两个线程同时运行的话,那么 AQS 的队列可能是下面这种情况,一个线程获得锁,另一个线程进入到同步队列
阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁
释放:signal()后,节点会从 condition 队列移动到 AQS等待队列,则进入正常锁获取流程
好的博客:
线程池(ThreadPool)
线程池简介
线程池原理(线程池有哪些参数,面试常问这个问题):
1. 固定线程数newFixedThreadPool(5)原理:public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
最终返回一个ThreadPoolExecutor对象,是不是很眼熟??没错就是上面架构图中线程池的核心实现类!
2. 可扩容线程数newCachedThreadPool()原理:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
同样是返回一个ThreadPoolExecutor对象,只是创建对象所传的构造方法参数不同。 而参数的数量都是一样的 5个!
线程池的七个参数:
public ThreadPoolExecutor(int corePoolSize, 1 int maximumPoolSize, 2 long keepAliveTime, 3 TimeUnit unit, 4 BlockingQueue<Runnable> workQueue, 5 ThreadFactory threadFactory, 6 RejectedExecutionHandler handler 7 ) { }
1. corePoolSize:线程池中的常驻线程数(核心)
2. maximumPoolSize:线程池中能够容纳同时执行的最大线程数,必须>= 1
3. keepAliveTime:空闲线程,销毁所需要的时间,当前池中线程数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余线程会被销毁知道只剩下corePoolSize个线程位置。
4. unit:(keepAliveTime)存活的单位。
5. workQueue:任务队列,被提交但尚未执行的任务。
6. threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认即可。
7. handler:拒绝策略(等待队列满时),表示当队列满了,并且工作线程>=线程池的最大线程数(maximumPoolSize),这是如何来拒绝请求执行的runnabel的策略四种拒绝策略:
当线程池的线程数达到最大线程数时,需要执行拒绝策略。拒绝策略需要实现 RejectedExecutionHandler 接口,并实现 rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法。不过 Executors 框架已经为我们实现了 4 种拒绝策略:
1. AbortPolicy:默认策略,将抛出异常
2. CallerRunsPolicy:不抛弃任务,也不抛出异常,而是将任务退回给线程调用处,由调用处来处理任务!
3. DiscardPolicy:抛弃当前任务;该策略丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略
4. DiscardOldestPolicy:抛弃队列中等待 时间最久的任务,然后把当前任务加入到队列线程工厂:
线程工厂指定创建线程的方式,需要实现 ThreadFactory 接口,并实现现 newThread(Runnable r) 方法。该参数可以不用指定,Executors 框架已经为我们实现了一个默认的线程工厂:
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } }
常用功能线程池:
其实Executors已经为我们封装好了 4 种常见的功能线程池,如下:
- 定长线程池(FixedThreadPool)
- 定时线程池(ScheduledThreadPool )
- 可缓存线程池(CachedThreadPool)
- 单线程化线程池(SingleThreadExecutor)
定长线程池(FixedThreadPool)
创建方法的源码:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
- 特点:只有核心线程,线程数量固定,执行完立即回收,任务队列为链表结构的有界队列。
- 应用场景:控制线程最大并发数。
定时线程池(ScheduledThreadPool )
创建方法的源码:
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L; public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); }
- 特点:核心线程数量固定,非核心线程数量无限,执行完闲置 10ms 后回收,任务队列为延时阻塞队列。
- 应用场景:执行定时或周期性的任务。
使用示例:
// 1. 创建 定时线程池对象 & 设置线程池线程数量固定为5 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); // 2. 创建好Runnable类线程对象 & 需执行的任务 Runnable task =new Runnable(){ public void run() { System.out.println("执行任务啦"); } }; // 3. 向线程池提交任务 scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 延迟1s后执行任务 scheduledThreadPool.scheduleAtFixedRate(task,10,1000,TimeUnit.MILLISECONDS);// 延迟10ms后、每隔1000ms执行任务
可缓存线程池(CachedThreadPool)
创建方法的源码:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
- 特点:无核心线程,非核心线程数量无限,执行完闲置 60s 后回收,任务队列为不存储元素的阻塞队列。
- 应用场景:执行大量、耗时少的任务。
使用示例:
// 1. 创建可缓存线程池对象 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // 2. 创建好Runnable类线程对象 & 需执行的任务 Runnable task =new Runnable(){ public void run() { System.out.println("执行任务啦"); } }; // 3. 向线程池提交任务 cachedThreadPool.execute(task);
单线程化线程池(SingleThreadExecutor)
创建方法的源码:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
- 特点:只有 1 个核心线程,无非核心线程,执行完立即回收,任务队列为链表结构的有界队列。
- 应用场景:不适合并发但可能引起 IO 阻塞性及影响 UI 线程响应的操作,如数据库操作、文件操作等。
使用示例:
// 1. 创建单线程化线程池 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); // 2. 创建好Runnable类线程对象 & 需执行的任务 Runnable task =new Runnable(){ public void run() { System.out.println("执行任务啦"); } }; // 3. 向线程池提交任务 singleThreadExecutor.execute(task);
对比
在《阿里巴巴Java开发手册》中,明确禁止使用Executors创建线程池,并要求开发者直接使用ThreadPoolExector或ScheduledThreadPoolExecutor进行创建。这样做是为了强制开发者明确线程池的运行策略,使其对线程池的每个配置参数皆做到心中有数,以规避因使用不
当而造成资源耗尽的风险。
项目中创建多线程时,使用常见的四种线程池创建方式,单一、可变、定长都有一定问题 ,原因是FixedThreadPool和SingleThreadExecutor底层都是用LinkedBlockingQueue实现的,这个队列最大长度为Integer.MAX_VALUE,容易导致OOM,所以实际生产一般自己通过ThreadPoolExecutor的7个参数自定义线程池。
线程池底层工作流程:
注意:当调用execute()方法后,线程池才会创建线程
总结一下:首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize,最后使用拒绝策略。 很显然,基于这种流程,如果队列是无界的,将永远没有机会走到2. 3,也即maxPoolSize没有使用,也一定不会走到2. 4。线程池的运作流程:
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor实现了按时间调度来执行任务,具体有两个方面:(1)延迟执行任务(2)周期执行任务这两个函数的区别如下:AtFixedRate:按固定频率执行,与任务本身执行时间无关。但有个前提条件,任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s。WithFixedDelay:按固定间隔执行,与任务本身执行时间有关。 例如,任务本身执行时间是10s,间隔2s,则下一次开始执行的时间就是12s。延迟执行和周期性执行的原理
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,这意味着其内部的数据结构和ThreadPoolExecutor是基本一样的,那它是如何实现延迟执行任务和周期性执行任务的呢?延迟执行任务依靠的是DelayQueue。在5.1 节中已经提到,DelayQueue是 BlockingQueue的一种,其实现原理是二叉堆。而周期性执行任务是执行完一个任务之后,再把该任务扔回到任务队列中,如此就可以对一个任务反复执行。不过这里并没有使用5.1节中的DelayQueue,而是在ScheduledThreadPoolExecutor内部又实现了一个特定的DelayQueue,如下所示。其原理和DelayQueue一样,但针对任务的取消进行了优化,此处不再进一步展开。下面主要看一下延迟执行和周期性执行的实现过程延迟执行
传进去的是一个Runnable,外加延迟时间delay。在内部通过decorateTask(..)函数把Runnable包装成一个ScheduleFutureTask对象,而DelayedWorkerQueue中存放的正是这种类型的对象,这种类型的对象一定实现了Delayed接口从上面的代码中可以看出,schedule()函数本身很简单,就是把提交的 Runnable 任务加上delay时间,转换成ScheduledFutureTask对象,放入DelayedWorkerQueue中。任务的执行过程还是复用的ThreadPoolExecutor,延迟的控制是在DelayedWorkerQueue内部完成的。周期性执行
和schedule(..)函数的框架基本一样,也是包装一个ScheduledFutureTask对象,只是在延迟时间参数之外多了一个周期参数,然后放入DelayedWorkerQueue就结束了。 两个函数的区别在于一个传入的周期是一个负数,另一个传入的周期是一个正数,为什么要这样做呢?下面进入ScheduledFutureTask的内部一探究竟。
withFixedDelay和atFixedRate的区别就体现在setNextRunTime里面。如果是atFixedRate,period>0,下一次开始执行时间等于上一次开始执行时间+period;如果是withFixedDelay,period < 0,下一次开始执行时间等于triggerTime(-p),为now+(-period),now即上一次执行的结束时间。线程池的其余要点
线程池中的各个状态分别代表什么含义?
线程池目前有5个状态:
RUNNING:接受新任务并处理排队的任务。
SHUTDOWN:不接受新任务,但处理排队的任务。
STOP:不接受新任务,不处理排队的任务,并中断正在进行的任务。
TIDYING:所有任务都已终止,workerCount 为零,线程转换到 TIDYING 状态将运行 terminated() 钩子方法。
T ERMINATED:terminated() 已完成。
在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在一个字段里面,即ctl变量。如图6-3所示,最高的3位存储线程池状态,其余29位存储线程个数。而在JDK 6中,这两个变量是分开存储的。
线程只能在任务到达时才启动吗?
默认情况下,即使是核心线程也只能在新任务到达时才创建和启动。但是我们可以使用 prestartCoreThread(启动一个核心线程)或 prestartAllCoreThreads(启动全部核心线程)方法来提前启动核心线程。
核心线程怎么实现一直存活?
阻塞队列方法有四种形式,它们以不同的方式处理操作,如下表。
核心线程在获取任务时,通过阻塞队列的 take() 方法实现的一直阻塞(存活)。
非核心线程如何实现在 keepAliveTime 后死亡?
原理同上,也是利用阻塞队列的方法,在获取任务时通过阻塞队列的 poll(time,unit) 方法实现的在延迟死亡。
非核心线程能成为核心线程吗?
虽然一直讲着核心线程和非核心线程,但是其实线程池内部是不区分核心线程和非核心线程的。只是根据当前线程池的工作线程数来进行调整,因此看起来像是有核心线程于非核心线程。
如何终止线程池?
线程池的关闭,较之线程的关闭更加复杂。当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。终止线程池主要有两种方式:
shutdown:“温柔”的关闭线程池。不接受新任务,但是在关闭前会将之前提交的任务处理完毕。
shutdownNow:“粗暴”的关闭线程池,也就是直接关闭线程池,通过 Thread#interrupt() 方法终止所有线程,不会等待之前提交的任务执行完毕。但是会返回队列中未处理的任务。
shutdown()与shutdownNow()的区别:
(1)前者不会清空任务队列,会等所有任务执行完成,后者再清空任务队列。(2)前者只会中断空闲的线程,后者会中断所有线程。正确关闭线程池的步骤
线程池的关闭需要一个过程,在调用 shutDown()或者shutdownNow()之后,线程池并不会立即关闭,接下来需要调用 awaitTermination 来等待线程池关闭。关闭线程池的正确步骤如下:
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/110906.html