线程池的介绍
1.线程池的重要性
2.什么是”池”,软件中的”池”,可以理解为计划经济
3.如果不使用线程池,每个任务都开一个线程处理,
① 一个线程
② for循环创建线程
③ 当任务数量上升到1000
这样开销太大,我们希望有固定数量的线程,来执行这1000个线程,这样就避免了反复创建并且销毁线程所带来的开销问题。
为什么要使用线程
问题一:反复创建线程开销大
问题二:过多的线程会占用太多的内存
解决以上两个问题的思路
① 用少量的线程——避免内存占用过多
② 让这部分线程都保持工作,且可以反复执行任务——避免生命周期的损耗
线程池的好处
① 加快响应速度
② 合理利用CPU和内存
③ 统一管理
线程池适合应用的场合
服务器接受到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率
实际上,在开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理
创建和停止线程池
线程池构造函数的参数

● corePoolSize:核心线程数,线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务
● maxPoolSize:最大线程数,线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程有一个上限,这就是最大量maximumPoolSize
添加线程规则
1.如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新的任务。
2.如果线程数等于(大于)corePoolSize但少于maximumPoolSize,则将任务放入队列。
3.如果队列已满,并且线程数小于maximumPoolSize,则创建一个新线程来运行任务。
4.如果队列已满,并且线程数大于或等于maxPoolSize,则拒绝该任务
线程池执行流程:

是否需要增加线程的判断顺序是:
① corePoolSize
② workQueue
③ maxPoolSize
例子:
线程池:核心池大小5,最大池大小为10,队列为100。
因为线程中的请求最多会创建5个,然后任务将被添加到队列中,直到到达100。当队列已满时,将创建最新的线程maxPoolSize,最多到十个线程,如果再来任务,就拒绝。
增减线程的特点
1.通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池
2.线程池希望保持较少的线程数,并且只有在负载变得很大的时才增加它。
3.通过设置maximmumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
4.是只有在队列满时才创建多于corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。
线程池应该手动创建还是自动创建
手动创建更好,因为这样可以让我们更加明确线程池的运行规则,避免资源耗尽的风险。
自动创建线程池(也就是直接调用JDK封装好的构造函数)可能带来那些问题
1.newFixedThreadPool
由于传出去的LinkedBlockingQueue是没有容量上限的,所以当请求越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易占用大量的内存,可能会导致OOM
源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
正常实例:
/**
* 描述: 演示newFixedThreadPool
*/
public class FixedThreadPoolTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}
class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
出现OOM实例:
① 内存设置小一点

②代码:
public class FixedThreadPoolOOM {
private static ExecutorService executorService = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE ; i++) {
executorService.execute(new SubThread());
}
}
}
class SubThread implements Runnable{
@Override
public void run() {
try {
Thread.sleep(1000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.newSingleThreadExecutor
可以看出,这里和上面的newFixedThreadPool的原理基本一样,只不过把线程数直接设置成了1,所以这也会导致一样的问题,也就是当请求堆积的时候,可能会占用大量的内存。
① 单线程的线程池:它只会用工作的唯一线程来执行任务
② 它的原理和FixedThreadPool是一样的,但是此时的线程线程数量被设置为了1
源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
实例:
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 1000 ; i++) {
executorService.execute(new Task());
}
}
}
3.newCachedThreadPool
① 可缓存线程池
② 特点:无界线程池,具有自动回收多余线程的功能。
这里的弊端在于第二个参数maximumPoolSize被设置为了Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致OOM。
源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
实例:
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}
4.newScheduledThreadPool
支持定时及周期性任务执行的线程池
源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
实例:
public class ScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
// threadPool.schedule(new Task(),5, TimeUnit.SECONDS);
threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
}
}
5.以上4种线程池的构造函数的参数

阻塞队列分析
① FixedThreadPool和SingleThreadExecutor的Queue是LinkedBlockingQueue
假设FixedThreadPool固定有10个线程,由于线程数量不能往上膨胀,不得不用一个能够存储很多的或无限多的队列帮助我们去存储任务,新来的任务数量没办法估计,只能在自身上把阻塞队列的容量设置为无限,这就是选择LinedBlockingQueue的原因。
② CachedThreadPool使用的是Queue是SynchronousQueue
SynchronousQueue內部不存储任务,在CachedThreadPool这种线程池情况下不需要队列存储任务,当有任务过来直接就交给新的线程去执行,新的线程数量是不受限制的,效率比较高,不需要在队列中进行中转。
③ ScheduledThreadPool来说,它使用的是延迟队列DelayedWorkQueue
延迟队列的能力就是把里面的任务根据时间先后做延迟,符合它的使用场景。
WorkStealingPool线程池
WorkStealingPool是JDK1.8加入的
① 这个线程池和之前的的都有很大不同
② 子任务
③ 窃取
正确创建线程池的方法
根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,我们想给线程取什么名字等等
线程池里的数量设定为多少较合适
① CPU密集型(加密、计算hash等):最佳核心线程数为CPU核心数的1-2倍。
② 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于核心线程数很多倍,保证线程空闲可以衔接上。
③ 参考Brain Goetz推荐的计算方法:
线程数=CPU核心数*(1+平均等待时间/平均工作时间)
停止线程池的正确方法
1.shutdown
初始化整个关闭过程,线程,池在执行过程中有线程中有正在执行的任务,还包括队列中有等待被执行的任务,运行该方法后,线程池知道我们想让它停止,优雅起见,会把正在执行的任务以及队列中等待执行的任务都执行完毕后,它再关闭,当执行该方法后再有新的任务提交,会抛出拒绝的异常
2.isShutdown
可以返回一个true或false,告诉我们这个线程池是不是已经停止了
3.isTerminated
线程池是不是完全停止,包括正在执行的线程以及队列里面任务都清空了
4.awaitTermination
检测等待一段时间内线程是不是完全停止的方法
5.shutdownNow
尝试停止所有主动执行的任务,停止等待任务的处理,并返回正在等待执行的任务列表。

实例:
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
List<Runnable> runnableList = executorService.shutdownNow();
// boolean b = executorService.awaitTermination(3L, TimeUnit.SECONDS);
// System.out.println(b);
// System.out.println(executorService.isShutdown());
// executorService.shutdown();
// System.out.println(executorService.isShutdown());
// System.out.println(executorService.isTerminated());
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+"被中断了");
}
}
}
任务太多,怎么拒绝
1.拒绝时机
① 当Excutor关闭时,提交新任务会被拒绝。
② 当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时
2.四种拒绝策略
① AbortPolicy:直接抛出RejectedExecutionException异常信息,线程池默认的拒绝策略
② DiscardPolicy:默默的丢弃任务,不会得到通知,不知道这个任务,不会得到处理
③ DiscardOldestPolicy:丢弃最老的存储时间最久的任务丢掉,以便腾出空间存放你刚刚提交最新的任务
④ CallerRunsPolicy:谁提交的任务谁去执行,即体现调用线程处理(优点一:让主线程去运行避免业务损失;优点二:让提交的速度降低,主线程提交任务满了后发现是这种拒绝策略,本身作为提交者运行,运行该任务花一定的时间,只有执行该任务完以后才能提交下一个任务,这段时间线程池也会执行完毕一些任务,为后面的任务腾出一定的空闲,给线程池一个缓冲的时间)
可以根据业务需求选择拒绝策略
钩子方法,给线程池加料
1.每个任务执行前后
2.日志、统计
3.代码演示:
/**
* 演示每个任务执行前后放钩子方法
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private static final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
private boolean isPaused;
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
public void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(
10, 20, 10l, TimeUnit.SECONDS,
new LinkedBlockingDeque<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池暂定了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程被恢复了");
}
}
实现原理、源码分析
线程池组成部分
① 线程池管理器
② 工作线程
③ 任务队列
④ 任务接口(Task)
Executor家族
线程池、ThreadPoolExecutor、ExecutorService、Executor、Executors等这么多和线程池相关的类,大家都是什么关系

工具类Executors

线程池实现任务复用的原理(相同的线程执行不同的任务)
源码:
execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
如果工作线程小于核心线程进入addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
线程复用主要在Worker类中RunWorker的方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//直接拿到任务
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();//执行完一个任务再去阻塞队列取一个新的任务
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//执行Runnable的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
线程池的状态
1.RUNNING:接受新任务并处理排队任务
2.SHUTDOWN:不接受新任务,但处理排队任务
3.STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务
4.TIDYING:中文整洁的意思,理解中文容易理解这个状态了:所有任务都已经终止,workerCount为零时,线程会转换到TIDYING状态,并将运行terminate()钩子方法
5.TERMINATED:terminate()运行完成

使用线程池的注意点
1.避免任务的堆积(导致内存不够)
2.避免线程数过度增加
3.排查线程泄漏(排查线程数量是否超过正常数量,线程已经执行完毕,却不能被回收,可能逻辑有问题导致任务结束不了)
原文始发于微信公众号(itmkyuan):关于线程池构造函数参数、阻塞队列、拒绝策略、线程设置的入门学习
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/39217.html