-
背景
-
任务异常log的获取
-
线程池线程数不知道如何设置
-
需要解决线程池的线程数设置
-
线程池名字
-
编码实现
-
ThreadFactoryImpl
-
ThreadPoolUtil
-
ThreadPoolBuilder
-
使用
-
总结
背景
相信不少工作中的小伙伴都会使用线程池,但是线程池的定义使用是个一个比较复杂的过程,因为单是线程池构造函数就有7个之多,并且每个参数的定义也不是简单的基本数据结构直接传入就好了

所以一些没有经验的小伙伴就使用jdk提供的默认的线程池建造工具类Executors

Executors
也确实提供了一些线程池的构造,方便使用,但是都是无界队列,任务过多容易出现OOM,阿里巴巴规范也是禁止使用Executors
去构造线程池
虽然有些阿里自己开源的中间件也没有去遵守这个规范,但这并不妨碍这是个好规范
任务异常log的获取
而且需要注意的是线程池执行任务任务过程出现的错误默认的UncaughtExceptionHandler
策略是打印到控制台

这只是个小问题,问题比较麻烦的就是使用线程池的submit
方法提交任务在发生异常是不会打印异常信息的,只有在获取任务结果的时候才会打印错误,有的人在不需要获取任务结果的时候也会使用submit
这种方式去提交任务,就导致异常了也不知道,这是很坑的,比如这样
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
int i = 1 / 0;
});
发生了异常我们也不知道,看看源码我们就知道了

执行任务的线程被封装为RunnableFuture
对象,我们看看RunnableFuture
对象的run方法


抛出的异常会被放到 outcome 对象中,这个对象就是 submit() 方法会返回的 FutureTask 对象执行 get() 方法得到的结果
所以在线程使用如果不需要获取任务结果就不要使用
submit
方法
线程池线程数不知道如何设置
业界现在都流行动态线程池的一个设计,我之前也写过如何实现一个动态线程池
https://weihubeats.blog.csdn.net/article/details/113751243
但是对于线程池的一个线程数设置大致都有一个计算公式
-
I/O 密集型: 最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)] -
CPU密集: 最佳线程数CPU+1
需要解决线程池的线程数设置
线程池名字
默认的线程池名字都是 pool-x-thread-x这种,在我们线上对线程池的问题定位也是非常不方便,我们需要为每个线程池使用指定合适的前缀
编码实现
基于上面的种种问题,我们来优化一个我们自己的线程池建造工具类
首先解决线程池不打印错误和线程名前缀的定义问题,我们定义一个自定义线程创建工厂
ThreadFactoryImpl
public class ThreadFactoryImpl implements ThreadFactory {
private final AtomicLong threadIndex = new AtomicLong(0);
private final String threadNamePrefix;
private final boolean daemon;
public ThreadFactoryImpl(final String threadNamePrefix) {
this(threadNamePrefix, false);
}
public ThreadFactoryImpl(final String threadNamePrefix, boolean daemon) {
this.threadNamePrefix = threadNamePrefix;
this.daemon = daemon;
}
@Override
public Thread newThread(@NotNull Runnable r) {
Thread thread = new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
thread.setDaemon(daemon);
return thread;
}
}
然后定义一个线程池工具类,相关的工具方法
ThreadPoolUtil
public static ThreadFactory createThreadFactory(String threadNamePrefix, boolean daemon) {
if (threadNamePrefix != null) {
return new ThreadFactoryImpl(threadNamePrefix, daemon);
}
return Executors.defaultThreadFactory();
}
接下来就是自定义线程池建造工具类的核心实现了
ThreadPoolBuilder
public class ThreadPoolBuilder {
private static final RejectedExecutionHandler defaultRejectHandler = new ThreadPoolExecutor.AbortPolicy();
/**
* cpu核数
*/
private static final int CPU = SystemUtil.getCPU();
/**
* create io ThreadPoolExecutor
*
* @return ThreadPoolExecutor
*/
public static IOThreadPoolBuilder ioThreadPoolBuilder() {
return new IOThreadPoolBuilder();
}
/**
* create cpu ThreadPoolExecutor
*
* @return
*/
public IOThreadPoolBuilder CPUPool() {
return new IOThreadPoolBuilder();
}
public static class IOThreadPoolBuilder {
private ThreadFactory threadFactory;
private RejectedExecutionHandler rejectHandler;
private int queueSize = -1;
private int maximumPoolSize = CPU;
private int keepAliveTime = 120;
private boolean daemon = false;
private String threadNamePrefix;
public int getCorePooSize(int ioTime, int cpuTime) {
return CPU + (1 + (ioTime / cpuTime));
}
public IOThreadPoolBuilder setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
public IOThreadPoolBuilder setDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}
public IOThreadPoolBuilder setRejectHandler(RejectedExecutionHandler rejectHandler) {
this.rejectHandler = rejectHandler;
return this;
}
public IOThreadPoolBuilder setQueueSize(int queueSize) {
this.queueSize = queueSize;
return this;
}
public IOThreadPoolBuilder setMaximumPoolSize(int maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
return this;
}
public IOThreadPoolBuilder setKeepAliveTime(int keepAliveTime) {
this.keepAliveTime = keepAliveTime;
return this;
}
public ThreadPoolExecutor builder(int ioTime, int cpuTime) {
BlockingQueue<Runnable> queue;
if (rejectHandler == null) {
rejectHandler = defaultRejectHandler;
}
threadFactory = ThreadPoolUtil.createThreadFactory(this.threadNamePrefix, this.daemon);
queue = queueSize < 1 ? new LinkedBlockingQueue<>() : new ArrayBlockingQueue<>(queueSize);
return new ThreadPoolExecutor(getCorePooSize(ioTime, cpuTime), maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, queue, threadFactory, rejectHandler);
}
}
}
这里可以对队列数量作强制限制,如果不指定则报错也行
使用
ThreadPoolExecutor executor = ThreadPoolBuilder
.ioThreadPoolBuilder()
.setThreadNamePrefix("io-test")
.setMaximumPoolSize(20)
.builder(10, 20);
executor.execute(() -> {
System.out.println(Thread.currentThread().getName());
int i = 1 / 0;
});
总结
可以看到线程池的创建还是相对复杂一些,但是我们可以封装一些适合自己开发的线程池。比如IO类型的线程池,CPU类型的线程池,创CachedThreadPool等
原文始发于微信公众号(小奏技术):看到同事还在乱定义线程池参数,我直接封装一个线程池建造工具类
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/30124.html