手写线程池实践 自定义BlockingQueue和拒绝策略

导读:本篇文章讲解 手写线程池实践 自定义BlockingQueue和拒绝策略,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

整体介绍

/*
手写线程池以及拒绝策略,使用到的设计模式:
享元模式(池化技术) 策略模式(拒绝策略 提高程序扩展性)

线程池中 有一些线程去执行任务                                 worker 继承 thread实现
起初没有线程 如果任务来了直接 创建线程并且执行                  worker.start()
任务数量大于一定的阈值后 就放入任务队列中                       放入队列时采用lock
线程池就像调度者  不断的检查队列中是否有任务需要执行            for循环检查  这个程序一直运行
每执行一个任务就告诉 队列还可以添加一个新任务                    使用队列平衡生产者和消费者 ,            使用await 和 signal进行线程通信

如果任务数超过 coreSize 时,加入任务队列暂存
如果队列满了,此时又有新任务到来,那么就按照拒绝策略处理任务。

当一个worker 把一个task执行完毕后,它仍然会继续询问 taskqueue中是否还有其他任务 如果有就继续执行
如果没有  那么就把当前的这个task = null 同时移除这个worker, 不过在代码中目前不会走到这一步。
因为这个worker会在taskqueue进入condition的等待队列 await等待。
 */

手写线程池实践 自定义BlockingQueue和拒绝策略

代码实现

package ThreadPoolExecutorDemo;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyPool {
    public static void main(String[] args) {
        MyThreadPool myThreadPool = new MyThreadPool(2,
                TimeUnit.MILLISECONDS, 4, new MyRejectPolicy<Runnable>() {
            @Override
            public void reject(MyBlockingQueue<Runnable> queue, Runnable task) {
                // 1. 死等
//            queue.put(task);// 该方法会调用 await 进入到 lock 的一个waitset中
                // 2) 让调用者放弃任务执行  啥都不做
                // 3) 让调用者(主线程)抛出异常 后面的几个任务都没有被执行 甚至没有被创建  因为主线程down了
//            throw new RuntimeException("任务执行失败 " + task);
                // 4) 让调用者(主线程)自己执行任务
                task.run();
            }
        });
        for (int i = 0; i < 7; i++) {
            int j = i;
            myThreadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j);
            });
        }
    }
}

@FunctionalInterface
interface MyRejectPolicy<T>{

    // 定义 队列如何处理task 的策略。
    void reject(MyBlockingQueue<T> taskQueue, T task);
}


class MyThreadPool{
    private int coreSize;
    private TimeUnit timeUnit;
    private HashSet<Worker> workers = new HashSet<>();
    MyBlockingQueue<Runnable> taskQueue;
    private MyRejectPolicy<Runnable> rejectPolicy;


    public MyThreadPool(int coreSize, TimeUnit timeUnit, int queueCapacity, MyRejectPolicy rejectPolicy) {
        this.coreSize = coreSize;
        this.timeUnit = timeUnit;
        this.taskQueue = new MyBlockingQueue<Runnable>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    public void execute(Runnable task){
        synchronized (workers){
            // 如果一个线程提交执行 然后进入了 if  但是阻塞了
            // 期间 有很多线程都在执行 那么可能 workers 就占满了
            if(workers.size() < coreSize){
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            }else{
//                taskQueue.put(task); // 不再是一直等待 队列中有位置再放入 否则一直都等待
                // 传递 策略和任务 到队列中  由阻塞队列去执行
                taskQueue.doTheRejectPolicy(rejectPolicy,task);
            }
        }
    }

    class Worker extends Thread{
        private Runnable task;
        Worker(Runnable task){
            this.task = task;
        }
        @Override
        public void run() {
            while(task != null || (task = taskQueue.take()) != null){
                try {
                    System.out.println();
                    task.run();
                }catch(Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
            }
        }
    }


}


// 注意 变量 lock的设置 不能是 static
// 一队列 二条件变量  三个函数
class MyBlockingQueue<T> {

    private Deque<T> dq = new ArrayDeque<>();
    private ReentrantLock lock = new ReentrantLock();
    private Condition putCondition = lock.newCondition();
    private Condition takeCondition = lock.newCondition();
    private int capcity;
    public MyBlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    public void put(T element){
        // 当队列满了后 put要不然 进入等待节点链表 要不然是在await
        lock.lock();
        try{
            while(dq.size() == capcity){
                try {
                    putCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            dq.addLast(element);
            takeCondition.signal();

        }finally {
            lock.unlock();
        }
    }

    public T take(){
        lock.lock();
        try{
            while(dq.isEmpty()){
                try {
                    takeCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = dq.pollFirst();
            putCondition.signal();
            return task;
        }finally {
            lock.unlock();
        }
    }

    public int getSize(){
        lock.lock();
        try{
            return dq.size();
        }finally {
            lock.unlock();
        }
    }

    public void doTheRejectPolicy(MyRejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 再次判断队列是否满
            if(dq.size() == capcity) {
                rejectPolicy.reject(this, task);
            } else {  // 有空闲 直接加入即可
                dq.addLast(task);
                takeCondition.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}

参考:

黑马 JUC

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/92803.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!