整体介绍
/* 手写线程池以及拒绝策略,使用到的设计模式: 享元模式(池化技术) 策略模式(拒绝策略 提高程序扩展性) 线程池中 有一些线程去执行任务 worker 继承 thread实现 起初没有线程 如果任务来了直接 创建线程并且执行 worker.start() 任务数量大于一定的阈值后 就放入任务队列中 放入队列时采用lock 线程池就像调度者 不断的检查队列中是否有任务需要执行 for循环检查 这个程序一直运行 每执行一个任务就告诉 队列还可以添加一个新任务 使用队列平衡生产者和消费者 , 使用await 和 signal进行线程通信 如果任务数超过 coreSize 时,加入任务队列暂存 如果队列满了,此时又有新任务到来,那么就按照拒绝策略处理任务。 当一个worker 把一个task执行完毕后,它仍然会继续询问 taskqueue中是否还有其他任务 如果有就继续执行 如果没有 那么就把当前的这个task = null 同时移除这个worker, 不过在代码中目前不会走到这一步。 因为这个worker会在taskqueue进入condition的等待队列 await等待。 */
代码实现
package ThreadPoolExecutorDemo;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyPool {
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(2,
TimeUnit.MILLISECONDS, 4, new MyRejectPolicy<Runnable>() {
@Override
public void reject(MyBlockingQueue<Runnable> queue, Runnable task) {
// 1. 死等
// queue.put(task);// 该方法会调用 await 进入到 lock 的一个waitset中
// 2) 让调用者放弃任务执行 啥都不做
// 3) 让调用者(主线程)抛出异常 后面的几个任务都没有被执行 甚至没有被创建 因为主线程down了
// throw new RuntimeException("任务执行失败 " + task);
// 4) 让调用者(主线程)自己执行任务
task.run();
}
});
for (int i = 0; i < 7; i++) {
int j = i;
myThreadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
@FunctionalInterface
interface MyRejectPolicy<T>{
// 定义 队列如何处理task 的策略。
void reject(MyBlockingQueue<T> taskQueue, T task);
}
class MyThreadPool{
private int coreSize;
private TimeUnit timeUnit;
private HashSet<Worker> workers = new HashSet<>();
MyBlockingQueue<Runnable> taskQueue;
private MyRejectPolicy<Runnable> rejectPolicy;
public MyThreadPool(int coreSize, TimeUnit timeUnit, int queueCapacity, MyRejectPolicy rejectPolicy) {
this.coreSize = coreSize;
this.timeUnit = timeUnit;
this.taskQueue = new MyBlockingQueue<Runnable>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
public void execute(Runnable task){
synchronized (workers){
// 如果一个线程提交执行 然后进入了 if 但是阻塞了
// 期间 有很多线程都在执行 那么可能 workers 就占满了
if(workers.size() < coreSize){
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
}else{
// taskQueue.put(task); // 不再是一直等待 队列中有位置再放入 否则一直都等待
// 传递 策略和任务 到队列中 由阻塞队列去执行
taskQueue.doTheRejectPolicy(rejectPolicy,task);
}
}
}
class Worker extends Thread{
private Runnable task;
Worker(Runnable task){
this.task = task;
}
@Override
public void run() {
while(task != null || (task = taskQueue.take()) != null){
try {
System.out.println();
task.run();
}catch(Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
}
}
}
// 注意 变量 lock的设置 不能是 static
// 一队列 二条件变量 三个函数
class MyBlockingQueue<T> {
private Deque<T> dq = new ArrayDeque<>();
private ReentrantLock lock = new ReentrantLock();
private Condition putCondition = lock.newCondition();
private Condition takeCondition = lock.newCondition();
private int capcity;
public MyBlockingQueue(int capcity) {
this.capcity = capcity;
}
public void put(T element){
// 当队列满了后 put要不然 进入等待节点链表 要不然是在await
lock.lock();
try{
while(dq.size() == capcity){
try {
putCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
dq.addLast(element);
takeCondition.signal();
}finally {
lock.unlock();
}
}
public T take(){
lock.lock();
try{
while(dq.isEmpty()){
try {
takeCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = dq.pollFirst();
putCondition.signal();
return task;
}finally {
lock.unlock();
}
}
public int getSize(){
lock.lock();
try{
return dq.size();
}finally {
lock.unlock();
}
}
public void doTheRejectPolicy(MyRejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 再次判断队列是否满
if(dq.size() == capcity) {
rejectPolicy.reject(this, task);
} else { // 有空闲 直接加入即可
dq.addLast(task);
takeCondition.signal();
}
} finally {
lock.unlock();
}
}
}
参考:
黑马 JUC
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/92803.html