生产者消费者问题
注意点
多线程操作资源类,牢记三步走
:
- 判断
- 干活
- 唤醒
synchronized版
//A : num + 1
//B : num - 1
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {
for (int i = 0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(() -> {
for (int i = 0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(() -> {
for (int i = 0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
//等待,业务,通知
class Data{
private int number = 0;
//+1
public synchronized void increment() throws InterruptedException {
//while的作用就是防止虚假唤醒
while(number != 0){
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"< "+number);
//通知其他线程,我+1完毕了
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
while(number == 0){
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"< "+number);
//通知其他线程,我-1完毕了
this.notifyAll();
}
}
阻塞队列版
/使用阻塞队列实现生产者,消费者模型
//volatile/CAS/AutomicReference/BlockingQueue/线程交互
public class BlockingQueueCustomerProductor {
public static void main(String[] args) throws InterruptedException {
MySource mySource = new MySource(new ArrayBlockingQueue(10));
new Thread(() -> {
try {
mySource.myProd();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"AAA").start();
new Thread(() -> {
try {
mySource.myConsumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"BBB").start();
TimeUnit.SECONDS.sleep(5);
System.out.println("5秒已经过了,开始结束吧");
mySource.stop();
}
}
//判断,干活,通知
class MySource{
private volatile boolean flag = true; //默认开启,进行生产
private volatile boolean flag2 = true; //默认开启,进行消费
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MySource(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws InterruptedException {
String data = null;
boolean retValue;
while (flag){
data = atomicInteger.incrementAndGet()+"";
retValue = blockingQueue.offer(data,2, TimeUnit.SECONDS);
if (retValue){
System.out.println(Thread.currentThread().getName()+"插入成功: "+data);
}else {
System.out.println(Thread.currentThread().getName()+"插入失败: "+data);
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+ "\t大老板叫停生产: flag=false");
}
public void myConsumer() throws InterruptedException {
String result = null;
while (flag2){
result = blockingQueue.poll(2,TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase(""))
{
flag2 = false;
System.out.println(Thread.currentThread().getName()+"\t 超过两秒钟没有渠道,消费退出");
}else {
System.out.println(Thread.currentThread().getName() + "消费队列: " + result + " 成功");
}
}
}
public void stop(){
this.flag = false;
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/202548.html