文章目录
1、概念
队列
队列就可以想成是一个数组,从一头进入,一头出去,排队买饭
阻塞队列
BlockingQueue 阻塞队列,排队拥堵,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:
- 线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
- 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
- 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞
为什么需要BlockingQueue
- 在多线程领域:所谓的阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又需要被唤醒
- 使用BlockingQueue好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮你一手包办了
2、BlockingQueue接口实现
BlockingQueue阻塞队列是属于一个接口,底下有七个实现类
- ArrayBlockQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列有界,但是界限非常大,相当于无界,可以当成无界
- PriorityBlockQueue:支持优先级排序的无界阻塞队列
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列生产一个,消费一个,不存储元素,不消费不生产
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
3、BlockingQueue核心方法
3.1、阻塞队列api之抛异常
public class BlockingQueueExceptionDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//往队列中塞元素
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
try {
//抛出 java.lang.IllegalStateException: Queue full
System.out.println(blockingQueue.add("XXX"));
} catch (Exception e) {
System.err.println(e);
}
//检查队列是否有值,并返回队列第一个元素
System.out.println(blockingQueue.element());
//从队列中取元素
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
try {
//抛出 java.util.NoSuchElementException
System.out.println(blockingQueue.remove());
} catch (Exception e) {
System.err.println(e);
}
}
}
运行结果:
Connected to the target VM, address: '127.0.0.1:59946', transport: 'socket'
true
true
true
java.lang.IllegalStateException: Queue full
a
a
b
c
java.util.NoSuchElementException
Disconnected from the target VM, address: '127.0.0.1:59946', transport: 'socket'
Process finished with exit code 0
3.2、阻塞队列api之返回布尔值
public class BlockingQueueBooleanDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//往队列中塞元素
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("x"));
//检查是否有值,并返回队列第一个元素
System.out.println(blockingQueue.peek());
//从队列中取元素
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
}
运行结果:
true
true
true
false
a
a
b
c
null
Process finished with exit code 0
3.3、阻塞队列api之阻塞
public class BlockingQueueBlockedDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
new Thread(()->{
try {
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
blockingQueue.put("x");//将会阻塞,直到主线程take()
System.out.println("it was blocked.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
TimeUnit.SECONDS.sleep(2);
try {
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
System.out.println("Blocking...");
blockingQueue.take();//最终将会阻塞在这里
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
it was blocked.
Blocking...
Process finished with exit code 130
3.4、阻塞队列api之超时控制
public class BlockingQueueTimeoutDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println("Offer.");
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS));
System.out.println("Poll.");
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
}
}
运行结果:
Offer.
true
true
true
false
Poll.
a
b
c
null
Process finished with exit code 0
4、SynchronousQueue阻塞队列
- SynchronousQueue没有容量
- 与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue
- 每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put A ");
blockingQueue.put("A");
System.out.println(Thread.currentThread().getName() + "\t put B ");
blockingQueue.put("B");
System.out.println(Thread.currentThread().getName() + "\t put C ");
blockingQueue.put("C");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take A ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take B ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take C ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
}
运行结果:
t1 put A
t2 take A
t1 put B
t2 take B
t1 put C
t2 take C
Process finished with exit code 0
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/148630.html