一、概念
线程池的创建有两种:ThreadPoolExecutor 和 Executors。
线程池不是一个提高系统的并发能力的策略,是一个更好的管理线程的方案。
线程池主要解决两个问题:
一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁是需要开销的。线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。
二是线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。每个ThreadPoolExecutor也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。
另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同情境的需要,可以使用更方便的Executors的工厂方法,
比如newCachedThreadPool (缓冲线程池,线程池线程个数最多可达Integer.MAX_ VALUE,线程自动回收)、newFixedThreadPool (固定大小的线程池)、newSingleThreadExecutor (单线程化的线程池)等来创建线程池,当然还可以自定义。
二、ThreadPoolExecutor类构造器
public static void main(String[] args) throws InterruptedException {
//创建一个线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
30,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(40));
for (int i = 0; i < 1000; i++) {
//调用任务
executor.execute(new Text());
//由于线程进入队列速度太快,暂停10毫秒
Thread.sleep(10);
}
//关闭线程池,但队列中的任务会继续完成
executor.shutdown();
}
//模拟任务
static class Text implements Runnable {
@Override
public void run() {
System.out.println("--------------------" + Thread.currentThread().getName() + ":执行了任务--------------------");
}
}
2.1、线程池参数详情
2.2、线程池状态
//假设Integer类型是32位的二进制表示。
//高3位代表线程池的状态,低29位代表的是线程池的数量
//默认是RUNNING状态,线程池的数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数位数,表示的Integer中除去最高的3位之后剩下的位数表示
线程池的个数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池的线程的最大数量
//这里举例是32为机器,表示为00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程池的状态
// runState is stored in the high-order bits
//11100000000000000000000000000000
//接受新任务并且处理阻塞队列里面任务
private static final int RUNNING = -1 << COUNT_BITS;
//00000000000000000000000000000000
//拒绝新任务但是处理阻塞队列的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//00100000000000000000000000000000
//拒接新任务并且抛弃阻塞队列里面的任务,同时会中断正在处理的任务
private static final int STOP = 1 << COUNT_BITS;
//01000000000000000000000000000000
//所有任务都执行完(包括阻塞队列中的任务)后当线程池活动线程数为0,
将要调用terminated方法。
private static final int TIDYING = 2 << COUNT_BITS;
//01100000000000000000000000000000
//终止状态,terminated方法调用完成以后的状态
private static final int TERMINATED = 3 << COUNT_BITS;
通过上面内容可以看到ctl其实存放的是线程池的状态和线程数量的变量,默认是RUNNING,也就是11100000000000000000000000000000,然后32位-3位=29位,也就是低29位代表的是线程的数量,高3位代表线程的状态,可以清晰看到下面的线程池的状态都是通过低位来进行向左位移的操作的,除了上面的变量,还提供了操作线程池状态的方法:
// 操作ctl变量,主要是进行分解或组合线程数量和线程池状态。
// 获取高3位,获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取低29位,获取线程池中线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 组合ctl变量,rs=runStatue代表的是线程池的状态,wc=workCount代表的是线程池线程的数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
//指定的线程池状态c小于状态s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//指定的线程池状态c至少是状态s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判断线程池是否运行状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* CAS增加线程池线程数量.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* CAS减少线程池线程数量
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 将线程池的线程数量进行较少操作,如果竞争失败直到竞争成功为止。
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
2.3、线程池的执行流程
- 主线程进行线程池的调用,线程池执行execute方法。
- 线程池通过addWorker进行创建线程,并将线程放入到线程池中,这里我们看到第二步是将线程添加到核心线程中,其实线程池内部不分核心线程和非核心线程,只是根据corePoolSize和maximumPoolSize设置的大小来进行区分,因为超过corePoolSize并且超时的线程会被回收,至于回收那些线程,是根据线程获取任务的时候进行判断。
- 当线程池的数量达到corePoolSize时,线程池首先会将任务添加到队列中。
- 当队列中任务也达到了队列设置的最大值时,它会创建新的线程,注意的是此时的线程数量已经超过了corePoolSize,但是没有达到maximumPoolSize最大值。
- 当线程池的线程数量达到了maximumPoolSize,则会执行相应拒绝策略。
2.4、队列介绍
2.5、案例分析
代码和DefaultThreadFactory一样,只是在new Thread新建线程的动作的时候输出了线程池的名称,方便查看线程创建的时机。
示例:
public class CustomThreadFactory implements ThreadFactory{
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public CustomThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
String name = namePrefix + threadNumber.getAndIncrement();
Thread t = new Thread(group, r,
name,
0);
System.out.println("线程池创建,线程名称为:" + name);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
定义了一个线程池,线程池初始化的corePoolSize为5,也就是线程池中线程的数量为5,最大线程maximumThreadPoolSize为10,空余的线程存活的时间是60s,使用ArrayBlockingQueue来作为阻塞队列,自定义了ThreadFactory线程池工厂,这里只是针对线程创建的时候输出线程池的名称。
public class ThreadPoolTest {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5,
10,
60L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(5), new CustomThreadFactory());
for (int i = 0; i < 15; i++) {
int index = i;
executorService.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("由线程:" + Thread.currentThread().getName() + "执行任务完成"+index);
});
}
}
}
2.6、源码分析 execute方法
public void execute(Runnable command) {
// 判断提交的任务是不是为空,如果为空则抛出NullPointException异常
if (command == null)
throw new NullPointerException();
// 获取线程池的状态和线程池的数量
int c = ctl.get();
// 如果线程池的数量小于corePoolSize,则进行添加线程执行任务
if (workerCountOf(c) < corePoolSize) {
//添加线程修改线程数量并且将command作为第一个任务进行处理
if (addWorker(command, true))
return;
// 获取最新的状态
c = ctl.get();
}
// 如果线程池的状态是RUNNING,将命令添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
//二次检查线程池状态和线程数量
int recheck = ctl.get();
//线程不是RUNNING状态,从队列中移除当前任务,并且执行拒绝策略。
//这里说明一点,只有RUNNING状态的线程池才会接受新的任务,其余状态全部拒绝。
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程池的线程数量为空时,代表线程池是空的,添加一个新的线程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列是满的,或者是SynchronousQueue队列时,则直接添加新的线程执行任务,如果添加失败则进行拒绝
//可能线程池的线程数量大于maximumPoolSize则采取拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
通过分析execute方法总结以下几点:
- 当线程池中线程的数量小于corePoolSize时,直接添加线程到线程池并且将当前任务做为第一个任务执行。
- 如果线程池的状态的是RUNNING,则可以接受任务,将任务放入到阻塞队列中,内部进行二次检查,有可能在运行下面内容时线程池状态已经发生了变化,在这个时候如果线程池状态变成不是RUNNING,则将当前任务从队列中移除,并且进行拒绝策略。
- 如果阻塞队列已经满了或者SynchronousQueue这种特殊队列无空间的时候,直接添加新的线程执行任务,当线程池的线程数量大于maximumPoolSize时相应拒绝策略。
- 入队操作用的是offer方法,该方法不会阻塞队列,如果队列已经满时或超时导致入队失败,返回false,如果入队成功返回true。
2.7、拒绝策略
JDK内置的拒绝策略如下:
- AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
- CallerRunsPolicy策略:只要线程池没有关闭线程池状态是RUNNING状态,该策略直接调用线程中运行当前被丢弃的任务。
- DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的第一个任务,并尝试再次提交任务。
- DiscardPolicy策略:该策略默默丢弃无法处理的任务,不予任何处理。
三、Executors框架类
我们说常用的三种Executors创建线程的方式。
3.1 newCachedThreadPool
newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过需要的线程数量,可灵活回收空闲线程,若无可回收,则新建线程。
示例:
public class ThreadPoolTest2 {
/**
* newCachedThreadPool创建一个可缓存线程池,
* 如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
*/
public static void test_2() {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 1; i++) {
final int index = i;
/*try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + new Date().getSeconds());
}
});
}
}
public static void main(String[] args) {
test_2();
}
}
3.2 newSingleThreadExecutor
newSingleThreadExecutor 创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,可以控制线程的执行顺序。
示例:
public class ThreadPoolTest3 {
/**
* newSingleThreadExecutor 创建一个单线程化的线程池,
* 它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
*/
public static void test_4() {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 1; i < 11; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
//会按顺序打印
System.out.println(Thread.currentThread().getName()+":"+index);
}
});
}
}
public static void main(String[] args) {
test_4();
}
}
3.3 newFixedThreadPool
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待,当创建的线程池数量为1的时候。也类似于单线程化的线程池,当为1的时候,也可控制线程的执行顺序.。
示例:
public class ThreadPoolTest4 {
/**
* newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
*/
public static void test_3() {
//当参数为1的时候,可以控制线程的执行顺序,类似join的作用
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
public static void main(String[] args) {
test_3();
}
}
2.8、线程池参数设置原则
1、如何来设置
* 需要根据几个值来决定
– tasks :每秒的任务数,假设为500~1000
– taskcost:每个任务花费时间,假设为0.1s
– responsetime:系统允许容忍的最大响应时间,假设为1s
* 做几个计算
- corePoolSize = 每秒需要多少个线程处理?
- threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
- 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
- queueCapacity = (coreSizePool/taskcost)*responsetime
- 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
- 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
- maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
- 计算可得 maxPoolSize = (1000-80)/10 = 92
- (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
-
rejectedExecutionHandler(策略):根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
-
keepAliveTime和allowCoreThreadTimeout采用默认通常能满足,一般默认是60秒
2、 以上都是理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件和优化代码,降低taskcost来处理。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/77201.html