【RabbitMQ笔记04】消息队列RabbitMQ七种模式之发布订阅模式(Publish/Subscribe)

有目标就不怕路远。年轻人.无论你现在身在何方.重要的是你将要向何处去。只有明确的目标才能助你成功。没有目标的航船.任何方向的风对他来说都是逆风。因此,再遥远的旅程,只要有目标.就不怕路远。没有目标,哪来的劲头?一车尔尼雷夫斯基

导读:本篇文章讲解 【RabbitMQ笔记04】消息队列RabbitMQ七种模式之发布订阅模式(Publish/Subscribe),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

这篇文章,主要介绍消息队列RabbitMQ七种模式之发布订阅模式(Publish/Subscribe)。

目录

一、发布订阅模式

1.1、Exchange交换机

(1)什么是Exchange交换机呢???

(2)Exchange如何知道消息应该分发到哪个Queue里面?

(3)Exchange分类

1.2、发布订阅模式介绍

1.3、案例代码

(1)引入依赖

(2)编写生产者

(3)编写消费者


一、发布订阅模式

1.1、Exchange交换机

前面介绍的两个模式下,生产者都是直接和Queue消息队列交互的,这种方式存在一个问题,那就是如果要向不同的Queue消息队列里面发送数据,还得重新写一个生产者出来,为了解决这个问题,RabbitMQ引入了Exchange交换机。

(1)什么是Exchange交换机呢???

Exchange交换机,它主要作用就是接收生产者发送的消息,并且根据消息的RouteKey将消息映射到不同的Queue消息队列里面,通过这种方式,就可以实现不同的消息分发到不同的Queue消息队列里面,并且这种模式下,生产者是不会直接和Queue消息队列交互的,生产者也不需要知道消息应该放入哪个队列,它只要告诉Exchange即可,剩下的事情都是Exchange来处理。

【RabbitMQ笔记04】消息队列RabbitMQ七种模式之发布订阅模式(Publish/Subscribe)

(2)Exchange如何知道消息应该分发到哪个Queue里面?

为了能够让Exchange知道某一条消息应该分发到哪个消息队列里面,RabbitMQ需要给Exchange和Queue之间添加映射关系,这个关系叫做:Binding(绑定)。虽然绑定在一起了,但是还是区分不了哪个消息应该分发到哪个Queue队列里面,所以还需要一个唯一标识,这个标识就是RouteKey路由键,也可以认为是Queue的唯一标识。采用上面这种模式,大致的逻辑图就如下所示啦:

【RabbitMQ笔记04】消息队列RabbitMQ七种模式之发布订阅模式(Publish/Subscribe)

(3)Exchange分类

RabbitMQ中有四种常见的Exchange交换机类型,分别是:

  • direct:直接模式。
  • topic:主题模式,后面介绍的Topic模式就是利用这个Exchange类型。
  • fanout:广播模式,这是发布订阅模式要使用的Exchange类型。
  • headers:不常用。

1.2、发布订阅模式介绍

RabbitMQ中,发布订阅模式就是指定Exchange交换机的类型是【fanout】,然后RabbitMQ就会将消息分发到所有和这个Exchange交换机绑定的Queue消息队列里面,此时所有的消费者都可以接收到这一条消息。

注意:对于发布订阅模式来说,消息的路由键RouteKey是没有作用的,可以不写。

1.3、案例代码

(1)引入依赖

<!-- 引入 RabbitMQ 依赖 -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>

(2)编写生产者

  • 因为这里采用的Exchange交换机来分发消息,所以生产者中只需要将消息发送到Exchange交换机里面即可,而不需要指定具体的Queue队列。
package com.rabbitmq.demo.pub;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:23
 * @Copyright (C) ZhuYouBin
 * @Description: 消息生产者
 */
public class Producer {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接的 RabbitMQ 服务地址
        factory.setHost("127.0.0.1"); // 默认就是本机
        factory.setPort(5672); // 默认就是 5672 端口
        // 3、获取连接
        Connection connection = null; // 连接
        Channel channel = null; // 通道
        try {
            connection = factory.newConnection();
            // 4、获取通道
            channel = connection.createChannel();
            // 5、声明 Exchange,如果不存在,则会创建
            String exchangeName = "exchange_demo_2023";
            channel.exchangeDeclare(exchangeName, "fanout");
            // 6、发送消息
            String message = "这是发布订阅模式,发送的消息数据";
            channel.basicPublish(exchangeName, "", null, message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != channel) {
                try {
                    channel.close();
                } catch (Exception e) {}
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (Exception e) {}
            }
        }
    }
}

(3)编写消费者

  • 消费者中,需要创建Queue队列,并且将这个Queue绑定到对应的Exchange上面(不同的消费者可以创建不同的队列,队列不同照样可以接收到订阅的消息)。
package com.rabbitmq.demo.pub;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:30
 * @Copyright (C) ZhuYouBin
 * @Description: 消息消费者
 */
public class Consumer {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接的 RabbitMQ 服务地址
        factory.setHost("127.0.0.1"); // 默认就是本机
        factory.setPort(5672); // 默认就是 5672 端口
        // 3、获取连接
        Connection connection = null; // 连接
        Channel channel = null; // 通道
        try {
            connection = factory.newConnection();
            // 4、获取通道
            channel = connection.createChannel();
            // 5、声明 Exchange,如果不存在,则会创建
            String exchangeName = "exchange_demo_2023";
            channel.exchangeDeclare(exchangeName, "fanout");
            // 6、指定需要操作的消息队列,如果队列不存在,则会创建
            String queueName = "queue_demo_2023";
            channel.queueDeclare(queueName, false, false, false, null);
            // 7、绑定 Exchange 和 Queue
            channel.queueBind(queueName, exchangeName, "");
            // 8、消费消息
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    // 接收消息
                    System.out.println("这是接收的消息:" + new String(delivery.getBody()));
                }
            };
            channel.basicConsume(queueName, true, callback, i->{});
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行消费者、生产者,查看控制台,消费者日志输出内容,可以发现两个绑定不同Queue队列的消费者,接收到了相同的消息,这就是发布订阅模式。

到此,RabbitMQ消息队列中的发布订阅模式就介绍完啦。

综上,这篇文章结束了,主要介绍消息队列RabbitMQ七种模式之发布订阅模式(Publish/Subscribe)。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/134511.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!