线程通信
使用 volatile 关键字
基于 volatile 关键字来实现线程间相互通信是使用共享内存的思想,大致意思就是多个线程同时监听一个变量,当这个变量发生变化的时候 ,线程能够感知并执行相应的业务。这也是最简单的一种实现方式。
public class VolatileShareDemo {
// 定义一个共享变量来实现通信,它需要是volatile修饰,否则线程不能及时感知
static volatile boolean notice = false;
public static void main(String[] args) {
List<String> list = new ArrayList<>();
// 实现线程A
Thread threadA = new Thread(() -> {
for (int i = 1; i <= 10; i++) {
list.add("abc");
System.out.println("线程A向list中添加一个元素,此时list中的元素个数为:" + list.size());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (list.size() == 5)
notice = true;
}
});
// 实现线程B
Thread threadB = new Thread(() -> {
while (true) {
if (notice) {
System.out.println("线程B收到通知,开始执行自己的业务...");
break;
}
}
});
// 需要先启动线程B
threadB.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 再启动线程A
threadA.start();
}
}
wait/notify
众所周知,Object类提供了线程间通信的方法:wait()、notify()、notifyaAl(),它们是多线程通信的基础,而这种实现方式的思想自然是线程间通信。
注意:
wait和 notify必须配合synchronized使用,wait方法释放锁,notify方法不释放锁
wait和notify调用的顺序一定要注意先后,如果先调用了notify,然后才调用wait方法的话,则调用了wait方法被阻塞的线程则不会被唤醒,会一直处于阻塞状态。
我们可以通过生产者/消费者问题来学习如何使用。
public class Producer implements Runnable {
private Queue<String> bags;
private int maxSize;
public Producer(Queue<String> bags, int maxSize) {
this.bags = bags;
this.maxSize = maxSize;
}
@Override
public void run() {
int i = 0;
while (true) {
i++;
synchronized (bags) { //抢占锁
if (bags.size() == maxSize) {
System.out.println("bags 满了");
try {
//park(); ->JVM ->Native
bags.wait(); //满了,阻塞当前线程并且释放Producer抢到的锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产者生产:bag" + i);
bags.add("bag" + i); //生产bag
bags.notify(); //表示当前已经生产了数据,提示消费者可以消费了
} //同步代码快执行结束
}
}
}
public class Consumer implements Runnable {
private Queue<String> bags;
private int maxSize;
public Consumer(Queue<String> bags, int maxSize) {
this.bags = bags;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
synchronized (bags) {
if (bags.isEmpty()) {
System.out.println("bags为空");
try {
bags.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String bag = bags.remove();
System.out.println("消费者消费:" + bag);
bags.notify(); //这里只是唤醒Producer线程,但是Producer线程并不能马上执行。
} //同步代码块执行结束, monitorexit指令执行完成
}
}
}
public class ProducerConsumerExample {
public static void main(String[] args) throws InterruptedException {
Queue<String> strings = new LinkedList<>();
Producer producer = new Producer(strings, 3);
Consumer consumer = new Consumer(strings, 3);
new Thread(producer).start();
//生产者在消费者后
Thread.sleep(100);
new Thread(consumer).start();
}
}
运行效果:
生产者生产:bag1
生产者生产:bag2
生产者生产:bag3
bags 满了
消费者消费:bag1
消费者消费:bag2
消费者消费:bag3
bags为空
生产者生产:bag4
从运行效果我们可以看出,消费者发出notify()唤醒通知之后,依然是走完了自己线程的业务之后,生产者才开始执行,这也正好说明了,notify()方法不释放锁,而wait()方法释放锁。
Condition
Condition接口提供了与Object阻塞(wait()
)与唤醒(notify()
或notifyAll()
)相似的功能,只不过Condition
接口提供了更为丰富的功能,如:限定等待时长等。Condition
需要与Lock
结合使用,需要通过锁对象获取Condition。
基本使用
从以下代码可以清楚了解condition如何使用,以及他的执行顺序。
public class ConditionDemoWait implements Runnable {
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemoWait");
lock.lock();
try {
condition.await(); //让当前线程阻塞,Object.wait();
System.out.println("end - ConditionDemoWait");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ConditionDemoNotify implements Runnable {
private Lock lock;
private Condition condition;
public ConditionDemoNotify(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemoNotify");
lock.lock(); //synchronized(lock)
try {
condition.signal(); //让当前线程唤醒 Object.notify(); //因为任何对象都会有monitor
System.out.println("end - ConditionDemoNotify");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ConditionExample {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ConditionDemoWait cd = new ConditionDemoWait(lock, condition);
ConditionDemoNotify cdn = new ConditionDemoNotify(lock, condition);
new Thread(cd).start();
new Thread(cdn).start();
}
}
运行效果:
begin - ConditionDemoWait
begin - ConditionDemoNotify
end - ConditionDemoNotify
end - ConditionDemoWait
源码分析
上述示例中的Condition
对象是调用了Lock#newCondition()
方法,源码如下:
public class ReentrantLock implements Lock, java.io.Serializable {
...
public Condition newCondition() {
return sync.newCondition();
}
abstract static class Sync extends AbstractQueuedSynchronizer {
...
final ConditionObject newCondition() {
return new ConditionObject();
}
...
}
...
}
上述的ConditionObject
定义在AQS中,如下:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
public class ConditionObject implements Condition, java.io.Serializable {
...
}
...
}
首先来分析下Condition#await()
方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
根据AQS队列的特性,若有多个线程执行lock#lock()
方法,会将处于阻塞状态的线程维护到一个双向链表中,如下:
假设当前是线程A获取到锁,其他线程执行lock#lock()
方法时,将会构建成一个上述链表。
若获取锁的线程(线程A)执行Condition#await()
方法,则会将当前线程添加至Condition
队列中,如下:
然后在调用fullyRelease()
方法时会释放当前线程的锁,然后唤醒处于阻塞队列中的下一个线程:
在调用isOnSyncQueue()
方法时会检查当前节点是否在同步队列中,若不存在,则会调用LockSupport.park()
进行阻塞。
假设当前线程A是生产者线程,调用await()
方法后,会释放锁,并且将当前线程加入到Condition
队列中。此时,消费者能获取到锁资源,然后继续执行。
假设线程B是消费者线程,当添加一个元素后会调用condition#signal()
方法,定义如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
执行signal()
方法,会将Condition
队列中的第一个节点移除,将其变为同步队列中的尾结点,如下:
至此,完成了Condition
队列转换为同步队列的过程。后续流程基本就是重复以上操作。
实际应用
虽然我们在日常开发中可能很少用到Condition,但是他还是具有实际用处的。
- 实现阻塞队列
- 在线程池中会用到阻塞队列
- 生产者消费者
- 流量缓冲等
阻塞队列
队列是一种只允许在一端进行删除操作,在另一端进行插入操作的线性表,允许插入的一端称为队尾、允许删除的一端称为队头。
那么阻塞队列,实际上是在队列的基础上增加了两个操作。
-
支持阻塞插入:队列满了的情况下,会阻塞继续往队列中添加数据的线程,直到队列元素被释放。
-
支持阻塞移除:队列为空的情况下,会阻塞从队列中获取元素的线程,直到队列添加了新的元素。
阻塞队列中的方法
-
添加元素
针对队列满了之后的不同的处理策略
- add -> 如果队列满了,抛出异常
- offer -> true/false , 添加成功返回true,否则返回false
- put -> 如果队列满了,则一直阻塞
- offer(timeout) , 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout时长,超过阻塞时长,返回false。
-
移除元素
-
element-> 队列为空,抛异常
-
peek -> true/false , 移除成功返回true,否则返回false
-
take -> 一直阻塞
-
poll(timeout) -> 如果超时了,还没有元素,则返回null
-
JUC中的阻塞队列
-
ArrayBlockingQueue 基于数组结构
-
LinkedBlockingQueue 基于链表结构
-
PriorityBlcokingQueue 基于优先级队列
-
DelayQueue 允许延时执行的队列
-
SynchronousQueue 没有任何存储结构的的队列
延时队列的实现
//首先需要实现Delayed接口
public class DelayQueueExampleTask implements Delayed {
private String orderId;
private long start = System.currentTimeMillis();
private long time; //
public DelayQueueExampleTask(String orderId, long time) {
this.orderId = orderId;
this.time = time;
}
//计算延迟时间
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((start + time) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
//比较
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayQueueExampleTask{" +
"orderId='" + orderId + '\'' +
", start=" + start +
", time=" + time +
'}';
}
}
public class DelayQueueMain {
private static DelayQueue<DelayQueueExampleTask> delayQueue = new DelayQueue();
public static void main(String[] args) {
delayQueue.offer(new DelayQueueExampleTask("1001", 1000));
delayQueue.offer(new DelayQueueExampleTask("1002", 5000));
delayQueue.offer(new DelayQueueExampleTask("1003", 3000));
delayQueue.offer(new DelayQueueExampleTask("1004", 6000));
delayQueue.offer(new DelayQueueExampleTask("1005", 2000));
delayQueue.offer(new DelayQueueExampleTask("1006", 8000));
delayQueue.offer(new DelayQueueExampleTask("1007", 3000));
while (true) {
try {
DelayQueueExampleTask task = delayQueue.take();
System.out.println(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行效果:
DelayQueueExampleTask{orderId='1001', start=1636615343142, time=1000}
DelayQueueExampleTask{orderId='1005', start=1636615343142, time=2000}
DelayQueueExampleTask{orderId='1007', start=1636615343142, time=3000}
DelayQueueExampleTask{orderId='1003', start=1636615343142, time=3000}
DelayQueueExampleTask{orderId='1002', start=1636615343142, time=5000}
DelayQueueExampleTask{orderId='1004', start=1636615343142, time=6000}
DelayQueueExampleTask{orderId='1006', start=1636615343142, time=8000}
通过condition 实现阻塞队列
我们可以通过以下代码初步实现阻塞队列的效果。
public class ConditionBlockedQueueExample {
//表示阻塞队列中的容器(正常应该是一个queue)
private List<String> items;
//元素个数(表示已经添加的元素个数)
private volatile int size;
//数组的容量
private volatile int count;
private Lock lock = new ReentrantLock();
//让take方法阻塞 ->wait/notify
private final Condition notEmpty = lock.newCondition();
//放add方法阻塞
private final Condition notFull = lock.newCondition();
public ConditionBlockedQueueExample(int count) {
this.count = count;
items = new ArrayList<>(count); //写死了
}
//添加一个元素,并且阻塞添加
public void put(String item) throws InterruptedException {
lock.lock();
try {
if (size >= count) {
System.out.println("队列满了,需要先等一会");
notFull.await();
}
++size; //增加元素个数
items.add(item);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public String take() throws InterruptedException {
lock.lock();
try {
if (size == 0) {
System.out.println("阻塞队列空了,先等一会");
notEmpty.await();
}
--size;
String item = items.remove(0);
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionBlockedQueueExample cbqe = new ConditionBlockedQueueExample(10);
//生产者线程
Thread t1 = new Thread(() -> {
Random random = new Random();
for (int i = 0; i < 1000; i++) {
String item = "item-" + i;
try {
cbqe.put(item); //如果队列满了,put会阻塞
System.out.println("生产一个元素:" + item);
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread.sleep(100);
Thread t2 = new Thread(() -> {
Random random = new Random();
for (; ; ) {
try {
String item = cbqe.take();
System.out.println("消费者线程消费一个元素:" + item);
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t2.start();
}
}
运行效果:
.....
生产一个元素:item-5
消费者线程消费一个元素:item-5
生产一个元素:item-6
消费者线程消费一个元素:item-6
生产一个元素:item-7
消费者线程消费一个元素:item-7
生产一个元素:item-8
消费者线程消费一个元素:item-8
生产一个元素:item-9
消费者线程消费一个元素:item-9
生产一个元素:item-10
消费者线程消费一个元素:item-10
阻塞队列空了,先等一会
生产一个元素:item-11
消费者线程消费一个元素:item-11
阻塞队列空了,先等一会
生产一个元素:item-12
消费者线程消费一个元素:item-12
阻塞队列空了,先等一会
生产一个元素:item-13
消费者线程消费一个元素:item-13
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/16827.html