生产者和消费者的三种实现方式(Java)

导读:本篇文章讲解 生产者和消费者的三种实现方式(Java),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

什么是生产者消费者问题 ?

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。

注:该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。

同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。

通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

问题分析

需要注意以下几点:

  1. 在缓冲区为空时,消费者不能再进行消费;
  2. 在缓冲区为满时,生产者不能再进行生产;
  3. 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即保持线程间的同步;
  4. 注意生产和消费的互斥条件。

方式一:synchronized、wait和notify

定义 Data 资源类,类中定义资源仓库的大小,当前资源个数。资源类的incrment()和decrement()方法是synchronized 的。生产者/消费者线程共享一个资源Data。

public class NotifyAndWaitTest1 {

    // 模拟 一个资源的获取和释放

    // 步骤
    // 生产者:
    //      1 判断资源是否充裕;
    //          1 如果资源充裕,就没必要再生产了,等待消费者消费完资源为止
    //      2 如果资源不足,就必须立即生产资源
    //          1 资源生产完之后,必须通知消费者
    // 消费者:
    //      1 判断资源是否充裕;
    //          1 如果资源不足,就不能再消费了,等待生产者生产出资源为止
    //      2 如果资源充足
    //          1 直接消费,之后,再通知生产者
    //
    // 注意:
    // Object#notifyAll 方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。
    // 此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。
    // 即,最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。
    //
    // 问题:
    // 1 唤醒线程的问题:
    //      1 有可能会出现极端情况,
    //          1 每次唤醒的都是生产者线程,消费者线程一直处于就绪状态,如果生产者不判断生产的必要性,那么,资源就会越积越多,超过仓库的容量。
    //          2 也可能,每次唤醒的都是消费者线程,生产者生产完第一个资源,就一直处于就绪状态,如果消费者不判断是否可以消费,那么,就会出现 资源负数
    //
    // 解决:
    // 每次被唤醒后,都判断是否应该被唤醒,否则,就再次进入阻塞状态
    // 1 方案一:(情况发生后,补救)
    //      1 生产者:每次被唤醒后,都判断(生产的必要性) 再生产资源是否会超过仓库的容量
    //      2 消费者:每次被唤醒后,都判断(是否可以消费)消费之后是否会出现 资源负数,即,我要的资源,是否都充足
    //
    // 2 方案二:(既然是Object#notifyAll 引起的问题,就不让这种情况发生)
    //      1 设置两把锁,消费者锁和生产者锁
    //      2 所有生产者共享生产者锁,所有消费者共享消费者锁

    public static void main(String[] args) throws Exception {

        Data data = new Data();

        // A 线程,生产资源 10 个
        new Thread(()->{
            for (int i = 0; i < 555; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        // B 线程,消费资源 10 个
        new Thread(()->{
            for (int i = 0; i < 555; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();


        // C 线程,生产资源 10 个
        new Thread(()->{
            for (int i = 0; i < 666; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();

        // D 线程,消费资源 10 个
        new Thread(()->{
            for (int i = 0; i < 666; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();

    }

    static class Data{
        // 当前资源个数
        private int data = 0;
        // 资源仓库的最大容量为 3
        private final int MAX_SIZE = 3;

        // +1
        public synchronized void increment() throws InterruptedException {
            // 资源充裕,等待消费者消费
            // if (data >= 1){
            //     // 无限阻塞,直到被唤醒
            //     this.wait();
            // }
            while (data + 1 > MAX_SIZE){
                // 没必要生产,无限阻塞,直到被唤醒
                this.wait();
            }

            // 生产
            data++;
            System.out.println("[" + Thread.currentThread().getName() + "], data=" + data);
            // 随机唤醒一个线程
            // 这里其实应该唤醒一个消费者
            // 但是,由于唤醒是随机的,所以,可能唤醒生产者
            // 所以,在唤醒之后,生产者要判断是否有必要生产
            // if 应该换成 while
            this.notifyAll();
        }

        // -1
        public synchronized void decrement() throws InterruptedException {
            // 资源不足,等待生产者生产
            // if (data <= 0){
            //     this.wait();
            // }
            while (data - 1 < 0){
                // 资源不足,无限等待,直到被唤醒
                this.wait();
            }

            // 消费
            data--;
            System.out.println("[" + Thread.currentThread().getName() + "], data=" + data);
            // 随机唤醒一个线程
            // 这里其实应该唤醒一个生产者
            // 但是,由于唤醒是随机的,所以,可能唤醒消费者
            // 所以,在唤醒之后,消费者要判断是否可以消费
            // if 应该换成 while
            this.notifyAll();
        }
    }
}

方式二:lock和condition的await、signalAll

public class NotifyAndWaitTest2 {

    public static void main(String[] args) {

        Data data = new Data();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        }, "D").start();
    }

    static class Data{
        // 公平锁,类似于队列,默认 =false 是非公平锁
        private final ReentrantLock lock = new ReentrantLock(true);
        // 资源监视器
        private Condition condition = lock.newCondition();
        // 当前资源数
        private int count = 0;
        // 仓库资源的最大数量
        private final int MAX_COUNT = 3;

        // ++; 生产资源
        public void increment(){
            lock.lock();
            try {
                while (count + 1 > MAX_COUNT){
                    // 无限等待,直到 被唤醒
                    condition.await();
                }
                count++;
                System.out.println("[" + Thread.currentThread().getName() +"] +1 " + count);
                // 唤醒所有
                condition.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

        // --;消费资源
        public void decrement(){
            lock.lock();
            try {
                while (count - 1 < 0){
                    // 无限等待,直到 被唤醒
                    condition.await();
                }
                count--;
                System.out.println("[" + Thread.currentThread().getName() +"] -1 " + count);
                // 唤醒所有
                condition.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

    }
}

方式三:BlockingQueue

定义Data资源类,资源类持有一个BlockingQueue。生产者/消费者线程共享一个资源类Data的成员变量,调用Queue的put()和take() 实现生产和消费。

  • BlockingQueue#put: 添加一个元素,如果队列满了,则阻塞
  • BlockingQueue#take: 移除并返回队列头部的元素,如果队列为空则阻塞
public class BlockingQueueConsumerProducer {

    public static void main(String[] args) {
        // 资源
        Data data = new Data();
        // 生产者
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        },"A").start();
        // 消费者
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        },"B").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        },"C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        },"D").start();
    }

    static class Data{

        // 阻塞队列
        private final BlockingQueue<Integer> resourceQueue = new LinkedBlockingQueue<>(3);

        public void increment(){
            try {
                // 1 代表资源
                // 添加一个元素,如果队列满了,则阻塞
                resourceQueue.put(1);
                System.out.println(Thread.currentThread().getName() + " "
                        + resourceQueue.size() +
                        " +");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void decrement(){
            try {
                // 移除并返回队列头部的元素,如果队列为空则阻塞
                resourceQueue.take();
                System.out.println(Thread.currentThread().getName() + " " +
                        + resourceQueue.size() + " -");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/69723.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!