什么是生产者消费者问题 ?
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。
注:该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。
同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。
通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。
问题分析
需要注意以下几点:
- 在缓冲区为空时,消费者不能再进行消费;
- 在缓冲区为满时,生产者不能再进行生产;
- 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即保持线程间的同步;
- 注意生产和消费的互斥条件。
方式一:synchronized、wait和notify
定义 Data 资源类,类中定义资源仓库的大小,当前资源个数。资源类的incrment()和decrement()方法是synchronized 的。生产者/消费者线程共享一个资源Data。
public class NotifyAndWaitTest1 {
// 模拟 一个资源的获取和释放
// 步骤
// 生产者:
// 1 判断资源是否充裕;
// 1 如果资源充裕,就没必要再生产了,等待消费者消费完资源为止
// 2 如果资源不足,就必须立即生产资源
// 1 资源生产完之后,必须通知消费者
// 消费者:
// 1 判断资源是否充裕;
// 1 如果资源不足,就不能再消费了,等待生产者生产出资源为止
// 2 如果资源充足
// 1 直接消费,之后,再通知生产者
//
// 注意:
// Object#notifyAll 方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。
// 此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。
// 即,最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。
//
// 问题:
// 1 唤醒线程的问题:
// 1 有可能会出现极端情况,
// 1 每次唤醒的都是生产者线程,消费者线程一直处于就绪状态,如果生产者不判断生产的必要性,那么,资源就会越积越多,超过仓库的容量。
// 2 也可能,每次唤醒的都是消费者线程,生产者生产完第一个资源,就一直处于就绪状态,如果消费者不判断是否可以消费,那么,就会出现 资源负数
//
// 解决:
// 每次被唤醒后,都判断是否应该被唤醒,否则,就再次进入阻塞状态
// 1 方案一:(情况发生后,补救)
// 1 生产者:每次被唤醒后,都判断(生产的必要性) 再生产资源是否会超过仓库的容量
// 2 消费者:每次被唤醒后,都判断(是否可以消费)消费之后是否会出现 资源负数,即,我要的资源,是否都充足
//
// 2 方案二:(既然是Object#notifyAll 引起的问题,就不让这种情况发生)
// 1 设置两把锁,消费者锁和生产者锁
// 2 所有生产者共享生产者锁,所有消费者共享消费者锁
public static void main(String[] args) throws Exception {
Data data = new Data();
// A 线程,生产资源 10 个
new Thread(()->{
for (int i = 0; i < 555; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
// B 线程,消费资源 10 个
new Thread(()->{
for (int i = 0; i < 555; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
// C 线程,生产资源 10 个
new Thread(()->{
for (int i = 0; i < 666; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
// D 线程,消费资源 10 个
new Thread(()->{
for (int i = 0; i < 666; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
static class Data{
// 当前资源个数
private int data = 0;
// 资源仓库的最大容量为 3
private final int MAX_SIZE = 3;
// +1
public synchronized void increment() throws InterruptedException {
// 资源充裕,等待消费者消费
// if (data >= 1){
// // 无限阻塞,直到被唤醒
// this.wait();
// }
while (data + 1 > MAX_SIZE){
// 没必要生产,无限阻塞,直到被唤醒
this.wait();
}
// 生产
data++;
System.out.println("[" + Thread.currentThread().getName() + "], data=" + data);
// 随机唤醒一个线程
// 这里其实应该唤醒一个消费者
// 但是,由于唤醒是随机的,所以,可能唤醒生产者
// 所以,在唤醒之后,生产者要判断是否有必要生产
// if 应该换成 while
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
// 资源不足,等待生产者生产
// if (data <= 0){
// this.wait();
// }
while (data - 1 < 0){
// 资源不足,无限等待,直到被唤醒
this.wait();
}
// 消费
data--;
System.out.println("[" + Thread.currentThread().getName() + "], data=" + data);
// 随机唤醒一个线程
// 这里其实应该唤醒一个生产者
// 但是,由于唤醒是随机的,所以,可能唤醒消费者
// 所以,在唤醒之后,消费者要判断是否可以消费
// if 应该换成 while
this.notifyAll();
}
}
}
方式二:lock和condition的await、signalAll
public class NotifyAndWaitTest2 {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "D").start();
}
static class Data{
// 公平锁,类似于队列,默认 =false 是非公平锁
private final ReentrantLock lock = new ReentrantLock(true);
// 资源监视器
private Condition condition = lock.newCondition();
// 当前资源数
private int count = 0;
// 仓库资源的最大数量
private final int MAX_COUNT = 3;
// ++; 生产资源
public void increment(){
lock.lock();
try {
while (count + 1 > MAX_COUNT){
// 无限等待,直到 被唤醒
condition.await();
}
count++;
System.out.println("[" + Thread.currentThread().getName() +"] +1 " + count);
// 唤醒所有
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// --;消费资源
public void decrement(){
lock.lock();
try {
while (count - 1 < 0){
// 无限等待,直到 被唤醒
condition.await();
}
count--;
System.out.println("[" + Thread.currentThread().getName() +"] -1 " + count);
// 唤醒所有
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
方式三:BlockingQueue
定义Data资源类,资源类持有一个BlockingQueue。生产者/消费者线程共享一个资源类Data的成员变量,调用Queue的put()和take() 实现生产和消费。
BlockingQueue#put
: 添加一个元素,如果队列满了,则阻塞BlockingQueue#take
: 移除并返回队列头部的元素,如果队列为空则阻塞
public class BlockingQueueConsumerProducer {
public static void main(String[] args) {
// 资源
Data data = new Data();
// 生产者
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
},"A").start();
// 消费者
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
},"B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
},"C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
},"D").start();
}
static class Data{
// 阻塞队列
private final BlockingQueue<Integer> resourceQueue = new LinkedBlockingQueue<>(3);
public void increment(){
try {
// 1 代表资源
// 添加一个元素,如果队列满了,则阻塞
resourceQueue.put(1);
System.out.println(Thread.currentThread().getName() + " "
+ resourceQueue.size() +
" +");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void decrement(){
try {
// 移除并返回队列头部的元素,如果队列为空则阻塞
resourceQueue.take();
System.out.println(Thread.currentThread().getName() + " " +
+ resourceQueue.size() + " -");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/69723.html