Java并发编程之阻塞队列

有时候,不是因为你没有能力,也不是因为你缺少勇气,只是因为你付出的努力还太少,所以,成功便不会走向你。而你所需要做的,就是坚定你的梦想,你的目标,你的未来,然后以不达目的誓不罢休的那股劲,去付出你的努力,成功就会慢慢向你靠近。

导读:本篇文章讲解 Java并发编程之阻塞队列,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

1、概念

队列

队列就可以想成是一个数组,从一头进入,一头出去,排队买饭

阻塞队列

BlockingQueue 阻塞队列,排队拥堵,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:

在这里插入图片描述

  • 线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
  • 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞

为什么需要BlockingQueue

  • 在多线程领域:所谓的阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又需要被唤醒
  • 使用BlockingQueue好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮你一手包办了

2、BlockingQueue接口实现

BlockingQueue阻塞队列是属于一个接口,底下有七个实现类

  • ArrayBlockQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列有界,但是界限非常大,相当于无界,可以当成无界
  • PriorityBlockQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列生产一个,消费一个,不存储元素,不消费不生产
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

3、BlockingQueue核心方法

在这里插入图片描述

在这里插入图片描述

3.1、阻塞队列api之抛异常

public class BlockingQueueExceptionDemo {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        //往队列中塞元素
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));

        try {
            //抛出 java.lang.IllegalStateException: Queue full
            System.out.println(blockingQueue.add("XXX"));
        } catch (Exception e) {
            System.err.println(e);
        }

        //检查队列是否有值,并返回队列第一个元素
        System.out.println(blockingQueue.element());

        //从队列中取元素
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());

        try {
            //抛出 java.util.NoSuchElementException
            System.out.println(blockingQueue.remove());
        } catch (Exception e) {
            System.err.println(e);
        }
    }
}

运行结果:

Connected to the target VM, address: '127.0.0.1:59946', transport: 'socket'
true
true
true
java.lang.IllegalStateException: Queue full
a
a
b
c
java.util.NoSuchElementException
Disconnected from the target VM, address: '127.0.0.1:59946', transport: 'socket'

Process finished with exit code 0

3.2、阻塞队列api之返回布尔值

public class BlockingQueueBooleanDemo {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
		//往队列中塞元素
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("x"));

        //检查是否有值,并返回队列第一个元素
        System.out.println(blockingQueue.peek());
        
		//从队列中取元素
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
    }
}

运行结果:

true
true
true
false
a
a
b
c
null

Process finished with exit code 0

3.3、阻塞队列api之阻塞

public class BlockingQueueBlockedDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        new Thread(()->{
            try {
                blockingQueue.put("a");
                blockingQueue.put("b");
                blockingQueue.put("c");
                blockingQueue.put("x");//将会阻塞,直到主线程take()
                System.out.println("it was blocked.");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        TimeUnit.SECONDS.sleep(2);

        try {

            blockingQueue.take();
            blockingQueue.take();
            blockingQueue.take();
            blockingQueue.take();

            System.out.println("Blocking...");
            blockingQueue.take();//最终将会阻塞在这里

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

it was blocked.
Blocking...

Process finished with exit code 130

3.4、阻塞队列api之超时控制

public class BlockingQueueTimeoutDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        System.out.println("Offer.");
        System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS));

        System.out.println("Poll.");
        System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
    }
}

运行结果:

Offer.
true
true
true
false
Poll.
a
b
c
null

Process finished with exit code 0

4、SynchronousQueue阻塞队列

  • SynchronousQueue没有容量
  • 与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue
  • 每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然
public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "\t put A ");
                blockingQueue.put("A");

                System.out.println(Thread.currentThread().getName() + "\t put B ");
                blockingQueue.put("B");

                System.out.println(Thread.currentThread().getName() + "\t put C ");
                blockingQueue.put("C");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            try {

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take A ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take B ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take C ");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2").start();
    }
}

运行结果:

t1	 put A 
t2	 take A 
t1	 put B 
t2	 take B 
t1	 put C 
t2	 take C 

Process finished with exit code 0

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/148630.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!