【并发专题】手写LinkedBlockingQueue

不管现实多么惨不忍睹,都要持之以恒地相信,这只是黎明前短暂的黑暗而已。不要惶恐眼前的难关迈不过去,不要担心此刻的付出没有回报,别再花时间等待天降好运。真诚做人,努力做事!你想要的,岁月都会给你。【并发专题】手写LinkedBlockingQueue,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

分析

LinkedBlockingQueue有如下特点:

  1. 近乎无界队列,但可以是有界队列
  2. 实现了BlockingQueue接口
  3. 需要实现take方法和put方法,实现阻塞效果
  4. 数据结构是单链表,有head跟last指针来进行入队出队操作
  5. 有两把锁,读写分离
  6. 所以也有两个条件队列,分别用来提醒消费者继续消费,或者生产者继续生产
    在这里插入图片描述
    手写源码过程中,还是如往常一样,需要注意的是,按照Api要求来实现自己的代码。

源码

下面只是简单的实现了take()put()方法,其他就略过了

package org.tuling.juc.queue;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 手写LinkedBlockingQueue
 *
 * @tag 【LinkedBlockingQueue】 【手写】
 * @Date 2023/8/1 19:13
 * @slogan 编码即学习
 **/
public class MyLinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingDeque<E> {
    /**
     * 容量
     */
    private final int capacity;

    /**
     * 当前队列任务数量
     */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 链表头节点
     */
    transient Node<E> head;

    /**
     * 链表尾节点
     */
    transient Node<E> last;

    /**
     * 拿锁(读锁)
     */
    private final ReentrantLock takeLock = new ReentrantLock();

    /**
     * 队列不空,条件
     * 通知消费者可以去消费任务了。当队列为空时,消费者会阻塞在这个条件上,等待唤醒
     */
    private Condition notEmpty = takeLock.newCondition();

    /**
     * 添加锁(写锁)
     */
    private final ReentrantLock putLock = new ReentrantLock();

    /**
     * 队列不满,条件
     * 通知生产者可以去生产任务了。当队列满时,生产者会阻塞在这个条件上,等待唤醒
     */
    private Condition notFull = putLock.newCondition();

    /**
     * 默认的构造函数
     * 会创建一个capacity为Integer.MAX_VALUE的阻塞队列
     */
    public MyLinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * 创建一个指定容量capacity的的阻塞队列
     *
     * @param capacity 容量大小
     */
    public MyLinkedBlockingQueue(int capacity) {
        this.capacity = capacity;
        head = new Node(null);
        last = head;
    }

    /**
     * 添加元素方法(可阻塞等待)
     *
     * <p>
     * 以下是BlockingQueue接口要求:
     * 将指定的元素插入此队列,并在必要时等待可用空间。
     * 参数: E -要添加的元素
     * 抛出:
     * InterruptedException -如果在等待时中断
     * ClassCastException——如果指定元素的类阻止它被添加到这个队列
     * NullPointerException -如果指定的元素为空
     * IllegalArgumentException -如果指定元素的某些属性阻止它被添加到这个队列
     * </p>
     */
    @Override
        public void put(E e) throws InterruptedException {
        if (e == null) {
            throw new NullPointerException();
        }

        // 添加元素
        Node<E> node = new Node<>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        int oldCount = - 1;
        putLock.lockInterruptibly();
        try {
            while (count.get() == this.capacity) {

                // 队列满了
                notFull.await();
            }

            // 入队
            enqueue(node);
            oldCount = count.getAndIncrement();

            // 判断是否需要唤醒生产者(可能这边生产,另一头在消费了)
            if (oldCount + 1 < capacity) {
                notFull.signal();
            }
        } finally {
            putLock.unlock();
        }


        // 通知消费者消费
        // 为什么判断之前是空的就通知NotEmpty?
        // 很简单啊,因为只有空的时候才可能发生消费者阻塞在NotEmpty条件上
        if (oldCount == 0) {

            // 通知消费者
            this.signalNotEmpty();
        }
    }

    /**
     * 通知消费者,队列不为空了
     * 这里需要上锁,为什么?因为如果不上锁,可能会有其他消费线程并发进来,发现队列是空的,然后进入了沉睡(与其沉睡上下文切换,还不如让它竞争锁)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * 入队(尾插法)
     *
     * @param node 需要入队的节点
     */
    private void enqueue(Node<E> node) {
        last.next = node;
        last = last.next;
    }

    /**
     * 出队(可阻塞等待)
     *
     * <p>
     * 以下是BlockingQueue接口要求:
     * 检索并删除此队列的头部,必要时等待,直到元素可用为止。
     * 返回:
     * 这个队列的头
     * 抛出:
     * InterruptedException -如果在等待时中断
     * </p>
     */
    @Override
     public E take() throws InterruptedException {
        E result;
        final ReentrantLock takeLock = this.takeLock;
        final AtomicInteger count = this.count;
        int oldCount = -1;
        takeLock.lock();
        try {

            while (count.get() == 0) {

                // 队列为空
                notEmpty.await();
            }

            // 出队
            result = dequeue();
            oldCount = count.getAndDecrement();

            // 判断是否需要唤醒消费者(可能这边消费,另一头在生产了)
            if (oldCount > 1) {
                notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }

        // 通知生产者生产
        // 为什么判断之前是满的就通知NotFull?
        // 很简单啊,因为只有满的时候才可能发生生产者阻塞在NotFull条件上
        if (oldCount == capacity) {
            this.signalNotFull();
        }
        return result;
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            this.notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    /**
     * 出队
     */
    private E dequeue() {

        Node<E> head = this.head;
        Node<E> first = head.next;

        // 这样设置,方便将以前的头节点给GC了
        head.next = null;

        this.head = first;
        E item = first.item;
        first.item = null;
        return item;
    }

    @Override
    public Iterator<E> iterator() {
        return null;
    }

    @Override
    public Iterator<E> descendingIterator() {
        return null;
    }

    @Override
    public void push(E e) {

    }

    @Override
    public E pop() {
        return null;
    }

    @Override
    public void addFirst(E e) {

    }

    @Override
    public void addLast(E e) {

    }

    @Override
    public boolean offerFirst(E e) {
        return false;
    }

    @Override
    public boolean offerLast(E e) {
        return false;
    }

    @Override
    public E removeFirst() {
        return null;
    }

    @Override
    public E removeLast() {
        return null;
    }

    @Override
    public E pollFirst() {
        return null;
    }

    @Override
    public E pollLast() {
        return null;
    }

    @Override
    public E getFirst() {
        return null;
    }

    @Override
    public E getLast() {
        return null;
    }

    @Override
    public E peekFirst() {
        return null;
    }

    @Override
    public E peekLast() {
        return null;
    }

    @Override
    public void putFirst(E e) throws InterruptedException {

    }

    @Override
    public void putLast(E e) throws InterruptedException {
    }

    @Override
    public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public E takeFirst() throws InterruptedException {
        return null;
    }

    @Override
    public E takeLast() throws InterruptedException {
        return null;
    }

    @Override
    public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
        return null;
    }

    @Override
    public E pollLast(long timeout, TimeUnit unit) throws InterruptedException {
        return null;
    }

    @Override
    public boolean removeFirstOccurrence(Object o) {
        return false;
    }

    @Override
    public boolean removeLastOccurrence(Object o) {
        return false;
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return null;
    }

    @Override
    public int remainingCapacity() {
        return 0;
    }

    @Override
    public int drainTo(Collection<? super E> c) {
        return 0;
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        return 0;
    }

    @Override
    public int size() {
        return 0;
    }

    @Override
    public boolean offer(E e) {
        return false;
    }

    @Override
    public E poll() {
        return null;
    }

    @Override
    public E peek() {
        return null;
    }

    /**
     * 内部单链表
     * 维护阻塞队列
     */
    public static final class Node<E> {

        /**
         * 当前节点
         */
        E item;

        /**
         * 下一个节点
         */
        Node<E> next;

        public Node(E item) {
            this.item = item;
        }
    }
}

使用示例

public class MyLinkedBlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {

        // 指定队列的大小创建有界队列
//        BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(10);
        BlockingQueue<Integer> boundedQueue = new MyLinkedBlockingQueue<>(10);

        // 向队列中添加元素
        boundedQueue.put(3);

        // 从队列中取出元素
        Integer take = boundedQueue.take();
        System.out.println(take);
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/180524.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!