前阵子, 分析了Java Collection框架(framework)
为什么叫框架,而不是叫Java Collection Lib
.结尾时候, 提到了Java1.5以后框架里才添加的新接口Queue
.
添加
Queue
接口到集合框架里的原因. 提供一个无须参数即可获取元素的能力.J K L,公众号:K字的研究为什么Java集合框架(Java Collection Framework) 叫框架?
今天我们聊聊Queue
里比较重要的一个ArrayBlockingQueue
.
ArrayBlockingQueue
Java里的命名还是比较有规律的, 一般都是:实现方式
+接口
.看这个名字就能猜出来, 这是一个用CircularArray(循环数组)
实现的,有阻塞特性的BockingQueue
.
循环数组
内部使用的, 是一个固定尺寸
的循环数组
.也就是一个数组, 加上2个维护当前头
,尾
在什么位置的index
.如果尾已经要越界了, 就折返到开头来继续.
//实现上其实就是取余
if (++i >= modulus) i = 0;
对应到ArrayBlockingQueue
里, 这个实现是这样的:
-
Object[] items
数组 -
int takeIndex
开头的index, 因为队列是从队头取的 -
int putIndex
结尾的index, 因为是从队尾添加的.
随着一边添加一边删除的过程, 可能结构会变成这样.数组数据分散开,不连续. 这点会影响后面很多内容的写法.
BlockingQueue的功能
Queue的能力, 是添加了:
-
一套不会异常的 offer
,pool
,peek
方法 -
一套会抛异常的 add
,remove
,element
方法
在Queue
的基础上,BlockingQueue
添加另外两套内容:
-
offer
&poll
的超时版 -
put
&take
看起来很复杂, 不过说白了, 就是上图这张表. 行为x表现
的一个小矩阵.记得:
-
三种行为: 添加,删除,查看 -
4种特性: 异常, 特殊值,阻塞,超时
这就行了, 剩下都是组合出来的.
ArrayBlockingQueue 4种特性的实现方式
以添加
为例, 来看下这4种特性都是怎么实现的.
不成功抛Exception
会抛异常这组其实是最简单的, 我们可以直接调用offer
接口,然后根据返回值, 判断要不要手动抛异常就行
(也可以反过来实现, 写一个会抛异常的add
,然后用add
去实现offer
.估计会被骂死).
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
不成功返回特殊值
这个就是惯常用的,成功true
,不成功false
,或者找不到返回-1
之类的. 用特殊值表示执行结果.代码倒是很简单, 用到了一个ReentrantLock
锁.剩下就是满了就失败,不满拉倒.
public boolean offer(E e) {
//require 系列, 契约式编程....failfast
Objects.requireNonNull(e);
// 锁🔐属性转本地变量
final ReentrantLock lock = this.lock;
// 加锁, 成功就成功, 不成功则组撒下去
lock.lock();
try {
//检查满了没, 满了就失败
if (count == items.length)
return false;
else {
//入队,成功
enqueue(e);
return true;
}
} finally {
//扫尾开锁
lock.unlock();
}
}
有趣的地方在 final ReentrantLock lock = this.lock;
这个声明临时变量的玩法很有趣, 有why技术
曾经讲解过,可以去看看.
这代码是 Doug Lea 写的,小 Lea 子这人吧,经常搞一些出其不意的代码和优化。他也因为这些“莫名其妙”的代码闻名,习惯就好了。 歪歪,公众号:why技术从源码里的一个注释,我追溯到了12年前,有点意思。
enqueue
暂不着急看, 咱先广度优先(bfs)
.
带超时的版本
前面的offer
,会在满的时候会直接失败. 这个带超时的版本, 满的话会尝试阻塞一定时间,超时了再失败.
想要实现超时, 那肯定还是要借助ReentrantLock
的newCondition
来实现.一个ArrayBlockingQueue
里除了有一个lock
,还有2个Condition
:
-
notFull 不满 塞东西时候, 要不满才行 -
notEmpty 不空 取东西, 要不空才行
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
//跟前面的 lock.lock()比多了一条, 响应`interrupt`.
lock.lockInterruptibly();
try {
// 满则用条件等待.
while (count == items.length) {
if (nanos <= 0L)//超时就返回
return false;
//静待不满
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
纯阻塞版本
满的时候就阻塞, 不带什么超时功能了.
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
小结
这块的代码还没到正地方, 实现难度也不高, 非常适合拿来学习ReentrantLock
和Condition.await
的使用.
后面的删除代码, remove
,take
,poll
, 也是类似, 我们就不提了.只是把Condition
由等待notFull
,改成了等待notEmpty
.
查看部分的函数, 因为没有阻塞功能, 只有2个element
和peek
, 很简单,也不提了.
核心函数 入队和出队(enqueue & dequeue)
按说, 前面的几个函数都是在搞各种特性, 核心功能,应该是很复杂吧?
还真不是.
入队
private void enqueue(E e) {
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;//折返
count++;
notEmpty.signal();
}
出队
private E dequeue() {
final Object[] items = this.items;
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0; //折返
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return e;
}
小结
这里就是个常规的循环数组操作,外加往外放两个信号(signal)
.
-
入队时候, 更新 putIndex
, 放出个不空信号
. -
出队时候, 更新 takeIndex
, 放出个不满信号
.
两边代码完美对称, 几乎没有冗余代码.因为锁和各自的独立逻辑都在外围已经做掉了,维持了核心代码的整洁.
其他方法
ArrayBlockingQueue里的方法有好多个,除了刚刚提到3x4-2=10
个方法.剩下还有几个有趣的方法. 一个一个提一下.
toString
所有Object都是有toString的.不过这种带阻塞,带锁的数据结构, 有点特殊. 引入如果内容如果会变, 这个toString就不太准.要完美处理, 只能加锁才行.
Java8 version
public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k == 0)
return "[]";
final Object[] items = this.items;
StringBuilder sb = new StringBuilder();
sb.append('[');
for (int i = takeIndex; ; ) {
Object e = items[i];
sb.append(e == this ? "(this Collection)" : e);
if (--k == 0)
return sb.append(']').toString();
sb.append(',').append(' ');
if (++i == items.length)
i = 0;
}
} finally {
lock.unlock();
}
}
这个是Java8的版本, 加锁
->转string
->放锁
,到了java17里再看,有一点小改动.流程被改为了加锁
->toArray
->放锁
->转string
.
锁的范围被缩小了, 据说可以增加性能.后面都是以Java17代码为准了,8里面的代码这段真的不如17的.
不过真正有意思的点在这行e == this ? "(this Collection)" : e
. 这个是用来对付这种神奇代码的.
ArrayBlockingQueue<ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(10);
queue.add(queue);
如果有人手贱把ArrayBlockingQueue
加到自己里面的话, 不会被递归拖死…Java 里几乎所有的集合类都有这个代码.
toArray() & toArray(T[] a)
这两个代码都是toArray
, 核心逻辑都是加锁
->转数组
->放锁
. 不过两者略有差异.
无参数的普通toArray
这个就是把所有元素, 复制到一个新的Array
里.
因为循环数组
里面, 数据可能是散开不连续的.所以有可能需要复制两次, 在代码上表现出来是这样的.
final int end = takeIndex + count;
final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
//如果`end != putIndex`,说明`putIndex`已经跑到`takeIndex`前面去了. 这里的复制就要进行两次.
if (end != putIndex)
System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
-
System.arraycopy
,Arrays.copyOfRange
是这里用的复制工具. -
Arrays.copyOfRange
比较有趣, 他其实是System.arraycopy
的一个封装, 允许end
比数组的长度长.
这里是17的写法, 8的写法是用了3次System.arraycopy
.
toArray(T[] a)版本
这个实现就不解释了, 他的功能可以说下,比较复杂,或者说所有的Collection
好像都是符合这个规律的.
-
a.length, 比 Queue
内元素少, 会返回一个新数组. -
a.length >= Queue
内元素数量, 会复制到参数内, 余下部分第一个值设为null
示例代码如下:
ArrayBlockingQueue queue = new ArrayBlockingQueue<Integer>(10,true, Arrays.asList(1, 2, 3, 4));
Integer[] arr = new Integer[10];
Arrays.fill(arr,-1);
queue.toArray(arr);
System.out.println(Arrays.deepToString(arr));
这个代码的输出结果是[1, 2, 3, 4, null, -1, -1, -1, -1, -1]
. 结果有点诡异, 记得住就用这个方法, 记不住就坚持传new Integer[0]
这样的东西好了.
removeAll & retainAll & removeIf
这三个姐妹花, 在分析ArrayList
时候, 曾经介绍过. 这本质上是一个bulkRemove
方法, 只是过滤条件不太一样. removeAll & retainAll
的条件刚好相反.
锁🔐什么的就不放了, 里面这个删除,非常有意思.
for (int i = takeIndex, end = putIndex,
to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++)
if (filter.test(itemAt(items, i)))
return bulkRemoveModified(filter, i);
if (to == end) break;
}
能看出来这段代码的复杂度
吗? 两层for循环,是不是
不考虑最内层bulkRemoveModified
的话, 外面的代码其实只跑了queue
的元素数量那么多次.
对着这两张张图看,比较简单:
-
如果是图一的情况, to = (i < end) ? end : items.length
这里,to==end
. 这时候内层循环跑完, 最后一句break
掉了 -
如果是图二的情况,内层循环第一次跑清理掉 后半段
.最后end==items.length
, 外层循环会第二次进入. 这时候i=0,to=end
, 会开始清理前半段…
是的,这代码两次加一起,只跑了次的.
forEach & contains &circularClear&remove
前面的双层循环
,其实是对循环数组
做循环的非常精妙的方法,整体代码,有多处用了这个手法.
//forEach
for (int i = takeIndex, end = putIndex,
to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++)
action.accept(itemAt(items, i));
if (to == end) break;
}
//contains
for (int i = takeIndex, end = putIndex,
to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++)
if (o.equals(items[i]))
return true;
if (to == end) break;
}
//circularClear
for (int to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++) items[i] = null;
if (to == end) break;
}
//remove
for (int i = takeIndex, end = putIndex,
to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++)
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (to == end) break;
}
removeAt 删除某个指定位置的元素
毕竟,Queue
实现了Collection
接口, remove(Object o)
的方法是不能少的.虽然大多数时候, 都是从头take
元素, 删除某一个位置也是可以的. 这里用到了removeAt
方法.
这个分两种情况, 如果刚好, 要删除的的removeIndex
,就是takeIndex
, 那就是简单置空,移动下takeIndex
就行了. 对应着方法的前半段.
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued()
后半段, 是处理从半截拦腰的地方删一个元素的.很明显, 这里要删除元素, 然后把他后面的元素前移一下.
for (int i = removeIndex, putIndex = this.putIndex;;) {
int pred = i;
if (++i == items.length) i = 0; //折返到数组开头
if (i == putIndex) {
items[pred] = null;
this.putIndex = pred;
break;
}
//前移后面的元素.. 这里的i自增过了,所以是后面的元素.
items[pred] = items[i];
}
clear
这个方法的主体, 会用到circularClear
, 还有一件额外的事,是如果有被阻塞在等待不满条件
的,会被唤醒.
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
drainTo
这个方法是把Queue
里的元素, 全都倒到另一个Collection
里去.
try {
while (i < n) {
@SuppressWarnings("unchecked")
E e = (E) items[take];
c.add(e);
items[take] = null;
if (++take == items.length) take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
这个方法看着贼长,也贼丑.
-
前半部分的 try
里,展示的是另一种对循环数组
进行循环的方法. -
后半部分是复位 takeIndex
并发不满信号
. 取走i
个,就发i
次, count也减少i个.
其实这两块能写在一起的, 每次循环都发信号
,处理count
和takeIndex
.不过这样有点慢, 批量化,到最后一起搞了是为了性能. 这个try
和finally
不加catch
的写法不错.
遗留问题
今天就先看到这, 剩下的迭代器部分和bulkRemoveModified
我觉得再水一篇都水不完,那玩意有点扎嘴,将来再说吧.
-
如果只是想复习下 ReentrantLock
和Condition
的用法, 10个基础方法的写法应该就够用了. -
想看如何操作 循环数组
, 重点是看明白那几个奇怪的双层循环.
对了,还有一个有趣的东西,作者在里面留了个调试信息.
invariantsSatisfied 循环不变量
在计算机科学中,循环不变式,是一组在循环体内、每次迭代均保持为真的性质(表达式),通常被用来证明程式或伪码的正确性 wiki
很少有哪个类会在里面带上循环不变量的, 但是这里带了. 可见这玩意多么难写正确,不知道到底改起来有多崩溃.
capacity > 0
&& items.getClass() == Object[].class
&& (takeIndex | putIndex | count) >= 0
&& takeIndex < capacity
&& putIndex < capacity
&& count <= capacity
&& (putIndex - takeIndex - count) % capacity == 0
&& (count == 0 || items[takeIndex] != null)
&& (count == capacity || items[putIndex] == null)
&& (count == 0 || items[dec(putIndex, capacity)] != null)
5.1了,你们放假了吗?
原文始发于微信公众号(K字的研究):ArrayBlockQueue浅析
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/24768.html