前言
线程池工作流程:当有任务提交到线程池后,会先判断当前是否有空闲线程,如果有则将任务交给空闲线程执行,如果没有,则会进入等待队列等待执行,如果等待队列已满。则会执行拒绝策略拒绝执行任务。
关于线程池
概念
由于线程在操作系统中是一种非常重要的资源,且创建和销毁线程又非常耗时,消耗系统性能。因此提出了线程池的概念。
简单理解,线程池是一种由多个不同线程组成的池化技术。池化技术是一种针对一些比较稀缺的资源,提供一个可复用资源的技术。比如:数据库连接池、线程连接池。
作用
线程池会默认初始化一定数量的线程,需要线程就从线程池里取出来用,用户就放回线程池。它主要作用就是能提高系统性能,避免频繁创建销毁线程。且方便统一管理,统一调度线程。
属性
Java中线程池类为:ThreadPoolExecutor,位于java.util.concurrent包下面,该类的继承关系如下:
ThreadPoolExecutor有几个重要的属性参数,一般创建ThreadPoolExecutor类(也就是创建线程池)时需要指定:
- ThreadPoolExecutor.corePoolSize:核心线程数量,核心线程不会销毁,一直存在
- ThreadPoolExecutor.maximumPoolSize:最大线程数,加核心线程数一起可创建的最大线程数量
- ThreadPoolExecutor.keepAliveTime:核心线程数之外的线程存活时间,单位(TimeUnit)由调用ThreadPoolExecutor类的构造方法时指定。一般是毫秒或秒。
- ThreadPoolExecutor.workQueue:等待队列,当线程池内没有多余线程执行任务时,任务将进入队列等待执行,队列必须是(阻塞队列)BlockingQueue接口的实现类。
- ThreadPoolExecutor.threadFactory:线程创建工厂,主要用来给线程池创建线程,默认工厂类为:DefaultThreadFactory,它是Executors类下的静态类,implements(实现)于ThreadFactory接口
- ThreadPoolExecutor.handler:拒绝策略,当线程池内线程数不足,且workQueue等待队列也饱和时,执行拒绝策略,拒绝任务。
以上是比较常见的创建线程池所需属性,还有很多比较重要的属性,了解它们更方便我们理解线程池。例如:
- ThreadPoolExecutor.workers:
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock
* Worker:线程池中工作的线程,它实现Runnable接口
* 线程池中所有工作线程的集合
*/
private final HashSet workers = new HashSet();
- ThreadPoolExecutor.mainLock:
/**
* 线程池内状态或数据有变更时,需要加锁保证线程安全
*/
private final ReentrantLock mainLock = new ReentrantLock();
更多的可以查看源码,每个属性字段上面都有说明注释。
线程池怎么用?
创建
Java中提供了一个名为:Executors的类,处于java.util.concurren包下面。它内置几种线程池的创建方式:
- Executors.newFixedThreadPool:创建一个固定线程数的线程池,核心线程数和最大线程数不定,但是核心线程数不得大于最大线程数。当线程池内线程不足时,任务会进入等待队列
- Executors.newSingleThreadExecutor:创建一个只有一个核心线程的线程池,所有的任务严格顺序执行
- Executors.newCachedThreadPool:创建一个没有核心线程的线程池,该线程池最大可创建线程数为:Integer.MAX_VALUE(2的31次方)。每当执行一个任务,先找线程池中有无空闲的线程,如果没有,则创建一个线程,执行完任务后回到线程池。默认在线程池中空闲存活60秒。
- Executors.newSingleThreadScheduledExecutor:创建一个只有一个核心线程,最大可创建线程数为:Integer.MAX_VALUE(2的31次方)。可执行定时任务的线程池
- Executors.newWorkStealingPool
其中前四种线程池都是由ThreadPoolExecutor类实现的,而最后一种Executors.newWorkStealingPool则是由jdk1.7新增的ForkJoinPool(分叉连接池)类实现。
使用
以Executors.newFixedThreadPool为例,随便建一个测试类
package com.example.study.executors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class TestExecutors {
//创建一个固定数线程的线程池、核心线程数和最大线程数都为9
private ExecutorService executorService = Executors.newFixedThreadPool(9);
public String invoke(){
System.out.println("我是有返回值的任务,我正在被线程【"+Thread.currentThread().getName()+"】执行中");
return Thread.currentThread().getName();
}
public void invoke2(){
System.out.println("我是无返回值任务,我正在被线程【"+Thread.currentThread().getName()+"】执行中");
}
public List<Future> hasReturn(){
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < 9; i++) {
//提交一个任务到线程池中---返回Future对象(但是入参必须得是Callable。如果入参是Runnable接口则没有返回值)
Future<?> submit = this.executorService.submit((Callable<? extends Object>) this::invoke);
futureList.add(submit);
}
return futureList;
}
public void noReturn(){
for (int i = 0; i < 9; i++) {
//提交一个任务到线程池中---无返回值
this.executorService.execute(() -> {
this.invoke2();
});
}
}
public static void main(String[] args) {
TestExecutors testExecutors = new TestExecutors();
//线程池使用方法一
List<Future> futureList = testExecutors.hasReturn();
//计数器
CountDownLatch countDownLatch = new CountDownLatch(9);
try {
for (int i = 0; i < futureList.size(); i++) {
Future future = futureList.get(i);
Object o = future.get();
System.out.println("Future执行返回:"+o);
}
}catch (Exception e){
}finally {
//计数器减一
countDownLatch.countDown();
}
//线程池使用方法二
testExecutors.noReturn();
}
}
线程池的使用方法有两个,分别是:
- 调用ExecutorService.submit()方法,该方法由AbstractExecutorService抽象类实现,有三个重载方法。
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
以上代码可以看到,submit方法底层实际也是调用了execute()方法,只不过是封装了一个RunnableFuture对象当返回值。
- 调用Executor.execute()方法,该方法由ThreadPoolExecutor实现
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
ThreadPoolExecutor.execute方法的实现逻辑是如文章 前言 说的线程池工作原理一致。
源码解析
下面分析一下ThreadPoolExecutor类的源码,来验证一下上面说的线程池工作原理:
1、从线程池执行入口开启
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//判断当前可用核心线程数是否还有(workerCountOf表示线程池中有多少个工作线程)
if (workerCountOf(c) < corePoolSize) {
//有,则将任务添加到工作线程中(addWorker)。且开始执行。(这里说明一下,线程池在提交任务后,不会以当前线程去执行任务,而是会交给一个工作线程去执行。这里的工作线程就是Worker)
if (addWorker(command, true))
return;
c = ctl.get();
}
//当前核心线程无可用,将任务添加到等待队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
//线程池中无可用线程、且无法加入等待队列、执行拒绝策略
reject(command);
}
看完上面的解析,得知addWorker()方法是将任务提交到工作线程去执行,说明这是一个核心点。继续看addWorker()方法逻辑
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//代码第一部分:主要是做一些状态、数据检验
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//这里采用CAS更改数据
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//代码第二部分:核心、重要
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个工作线程,并将任务提交给工作线程Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将刚刚创建带有任务的工作线程放到线程池集合(其实就是我们说的线程池)中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
if (workerAdded) {
//任务添加成功,启动工作线程(因为Worker是工作线程,它实现了Runnable接口,所以这里调用start()方法,就是启动了线程,会去执行该线程任务的run()方法)
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
上面调用了t.start()执行了Worker工作线程,我们继续跟进去看Worker线程执行了什么:
找到Worker类的run()方法:
找到一个叫runWorker()的方法:最底层的核心逻辑就在这里!
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//从Worker工作线程取出任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果工作线程中取不到任务,那么则从等待队列里取,getTask()方法实现
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//任务执行前逻辑,空实现,给我们继承扩展用的
beforeExecute(wt, task);
Throwable thrown = null;
try {
//调用任务(伪线程,因为任务是实现Runnbale接口的类,但是没有调用start()方法,就不会额外新建线程)的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行后逻辑,空实现,给我们继承扩展用的
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
上面有个调用:task.run(); 该调用是真正执行了任务。以上分析可得知:线程池的工作原理其实是通过Worker工作线程把我们的任务包装起来执行。
下面贴一下getTask()代码,看下线程池是如何从等待队列里取任务的:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//核心点,从队列中poll或者take
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
以上对于线程池源码的分析正好印证了文章开篇说的线程池工作流程和原理
总结
Q:线程池是什么?
A:线程池是一种由多个不同线程组成的池化技术。池化技术是一种针对一些比较稀缺的资源,提供一个可复用资源的技术。
Q:线程池有什么作用?
A:提高系统性能,避免频繁创建销毁线程。且方便统一管理,统一调度线程。
Q:线程池怎么使用?
A:Java有内置4种线程池,但正常情况下,我们使用线程池要根据服务器性能来合理设置参数,自定义业务线程池。因为线程是非常昂贵稀缺的资源。且使用线程池还有个点需要注意:默认的线程池使用的是无界的阻塞队列。这样当并发非常高的情况下,工作队列会无限制的堆积,造成系统OOM内存溢出。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/99027.html