RabbitMQ常用消息模型

什么是RabbitMQ?

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

RabbitMQ的特点

1、可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

2、灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

3、消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

4、高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

5、多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

6、多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

7、管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

8、跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

9、插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

RabbitMQ中的概念模型

消息模型

最普通的消息模型就是有订阅发布者、消息队列以及消费者,订阅发布者通过发送消息到消息队列当中,消费者从消息队列中去消费消息

RabbitMQ是异步通信的,原来的同步消息处理的话,耦合性太高,每次添加新功能的话就要改变原来的代码,而异步通信将业务代码订阅到对应的消息管道,然后订阅发布者发布了消息,这个业务代码就能立刻去处理请求

1、Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2、Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。

3、Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

4、Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

5、Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

6、Connection 网络连接,比如一个TCP连接。

7、Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

8、Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

9、Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

10、Broker 表示消息队列服务器实体。

RabbitMQ安装

我们可以通过在服务器上用docker进行安装,也可以下载安装包到windows上启动运行

RabbitMQ官网:https://www.rabbitmq.com/

这里我们用docker进行安装

# --hostname 指的是主机的名字,主要是在集群部署的时候用于区分
# -e 设置默认的登录用户名以及密码
# --name 设置容器的名字
# -p 15672 指的是RabbitMQ的管理面板
# -p 5672  指的是RabbitMQ的消息传送端口
[root@xiaozhu /]# docker run -d --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=zhu -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 --name rabbit01 rabbitmq:3-management

安装完成之后,我们就可以用服务器ip:15672查看到对应的效果了

RabbitMQ常用消息模型
image-20220313131853374

RabbitMQ创建Hello World!

我们需要创建一个订阅发布者以及消费者,我们订阅发布者将消息发送到我们指定端口的消息队列中,然后消费者则通过消息队列来获取到我们发送的消息

RabbitMQ常用消息模型
image-20220314221503533

首先我们需要引入RabbitMQ客户端的依赖

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
</dependency>

创建订阅发布者

public class Producer {
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        //设置连接参数,分别是主机、端口号、VHost虚拟主机、用户名、密码
        factory.setHost("xxx.xxx.xxx.xxx");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xxxxx");
        factory.setPassword("xxxxx");
        //建立连接
        Connection connection = factory.newConnection();
        //创建通道Channel,有了通道就可以基于通道向队列发送消息
        Channel channel = connection.createChannel();
        //创建队列,并且指定一个队列的名称
        //参数1:队列名称,如果队列不存在自动创建
        //参数2:用来定义队列特性是否要持久化 true 持久化队列 false 不持久化
        //参数3:exclusive 是否独占队列 true 独占队列 false 不独占
        //参数4:autoDelete 是否在消费完成后自动删除队列 true 自动删除 false 不删除
        //参数5:额外附加参数
        String queueName = "simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);
        //发送消息,并且添加队列的名称,将消息以字节的形式传输,底层是字节传输的
        //参数1:交换机名称
        //参数2:队列名称
        //参数3:传递消息额外设置,MessageProperties.PERSISTENT_TEXT_PLAIN表示消息持久化
        //参数4:消息的具体内容
        String message = "Hello world!";
        channel.basicPublish("",queueName,null,message.getBytes());
        System.out.println("消息发送成功:"+message);
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

运行成功后,我们就可以在控制台看到我们消息发送成功的代码了,接着我们到RabbitMQ的后台管理面板中查看queue队列,发现刚刚创建的队列被添加进来了,并且这个队列中准备着一条消息,当有消费者时,消费者就会主动的去消费这一条消息,我们可能点进去查看里面对应的消息

RabbitMQ常用消息模型
image-20220313135404608

创建消费者

public class Customer {
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        //设置连接的参数,跟订阅发布者一样
        factory.setHost("xxx.xxx.xxx.xxx");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xxxxxx");
        factory.setPassword("xxxxxx");
        //建立连接
        Connection connection = factory.newConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //创建队列,并且设置队列的名称
        String queueName = "simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);
        //通过管道来消费消息
        //参数1:队列名称
        //参数2:消息自动确认,默认是true,消费者向rabbitmq确认消息消费
        //方式一:
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //处理消息
                String msg = new String(body);
                System.out.println("接收到消息了:"+msg);
            }
        });
        //方式二:
        //DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        //    String message = new String(delivery.getBody(), "UTF-8");
        //    System.out.println(" [x] Received '" + message + "'");
        //};
        //channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
        System.out.println("等待消息接收中...");
    }
}

运行成功后我们就能看到控制台输出的对应的消息,并且我们在RabbitMQ的后台管理面板中能查看到对应的连接以及管道,以及队列中的ready准备消息被消费了

  • 控制台输出
RabbitMQ常用消息模型
image-20220313141110897
  • 消费者建立的连接
RabbitMQ常用消息模型
image-20220313141146047
  • 队列消息被消费
RabbitMQ常用消息模型
image-20220313141226174

思考问题:

1、为什么控制台先输出的时等待消息接收中,而不是接收到消息了

这个就是RabbitMQ的回调机制,因为先执行了消费消息,但是需要一点事件通过管道去获取消息,因为后面的代码不会等待前面的代码执行完再执行,所以等待消息接收中就先输出了

2、明明订阅发布者中已经创建了消息队列,为什么还要在消费者中再创建一遍

因为我们的订阅发布者和消费者的执行顺序是不确定的,所以如果我们消费者先执行的话,他获取不到管道并且获取不到队列中的消息,所以为了避免这种情况的发生,所以在消费者中就又创建了一遍,并且队列不会因为订阅发布者和消费者都创建了就会产生两个队列,只会产生一个队列

注:我们每次都要给连接工程设置参数十分的麻烦,并且每次设置的参数也是固定值,所以我们可以封装成一个工具类,然后通过调用工具类中的方法返回一个连接,然后我们用那个连接去创建一个管道即可,这样就不用每次都去设置参数了。还可以添加一个关闭管道和连接的方法,减少了很多的重复代码

工具类

public class RabbitUtils {

    private static ConnectionFactory factory;

    static {
        //单例模式的懒汉式,在引用对象的时候就创建连接工程
        factory = new ConnectionFactory();
        factory.setHost("xxx.xxx.xxx.xxx");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xxxxxx");
        factory.setPassword("xxxxxx");
    }

    public Connection getConnection(){
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return connection;
    }

    public void close(Connection connection, Channel channel){
        try {
            if(connection!=null)connection.close();
            if(channel!=null)channel.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

注:在前两种模型当中,存在着默认的路由,所以订阅发布者发送消息的方法中的routingKey就是队列名称,会找到这个队列名称去发送相应的消息,然后由消费者消费

Work Queue

多工作管道,我们在实践的过程中,通常会有一个能者多劳的思想,就比如说,一个订阅发布者每次发布50条消息,然后有两个消费者去消费他,并且我们想让这两个消费者能够按照自己的能力去消费消息。

但是在RabbitMQ中,消费者获取消息是轮询的,就是会按照顺序给消费者分发消息,假设现在有50条消息,需要五个消费者去消费掉,每个消费者就是各自消费十条消息,并且是按顺序消费的,如果设置了睡眠时间,消费的条数依旧是固定不变,就是消费的时间可能会长一点,所以我们可以设置他的prefetchCount的大小

RabbitMQ常用消息模型
image-20220314221531002

生产者

public class Producer {
    public static void main(String[] args) throws IOException {
        //通过工具类来获取连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //通过管道声明队列
        channel.queueDeclare("simple.queue",false,false,false,null);
        //给管道中发送消息
        for (int i=1;i<=10;i++){
            channel.basicPublish("","simple.queue",null,("这是一条消息"+i).getBytes());
        }
        //通过工具类关闭连接
        RabbitUtils.close(connection,channel);
    }
}

消费者1(我们让消费者一延迟两秒中输出消息)

public class Customer {
    public static void main(String[] args) throws IOException {
        //通过工具类获取连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //通过管道声明消息队列
        channel.queueDeclare("simple.queue",false,false,false,null);
        //通过消息队列来获取消息
        channel.basicConsume("simple.queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

消费者2

public class Customer2 {
    public static void main(String[] args) throws IOException {
        //通过工具类获取连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //通过管道声明消息队列
        channel.queueDeclare("simple.queue",false,false,false,null);
        //通过消息队列来获取消息
        channel.basicConsume("simple.queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
            }
        });
    }
}

结果:

消费者一虽然说每隔两秒中输出对应获得的消息,但也不影响他的总消息量,总消息量跟消费者二一样,而消费者二在1秒钟就输出完了所有的消息,我们并且能看到后台管理面板中消息是一瞬间被拿完的,说明消费者一和消费者二同时拿到了十条消息,由于消费者一延迟了两秒输出,所以这是消费延迟,这个就是RabbitMQ的默认机制,循环的获取消息,按照顺序的给消费者分发消息,这样的坏处就是不能能者多劳,如果有一个消费者消费的很慢,并且消息队列中的消息在不断的增加,那很有可能会导致消息阻塞,所以我们可以设置每次只能消费一个消息,这样子在消息队列中的剩余的消息就会被处理快的消费者给消费掉

消息自动确认机制

我们消费者在拿到消息的时候,将没有完全处理完的消息也同时确认了,他不关心你下面的业务是否已经处理完毕,他只负责拿到对应的消息就告诉队列确认完毕了,所以这是十分的不安全的,我们可以设置为手动确认,这样子在业务处理完成之后,会告诉消息已经完成,并且继续消费下一条消息

消费者一

public class Customer {
    public static void main(String[] args) throws IOException {
        //通过工具类获取连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //通过管道声明消息队列
        channel.queueDeclare("simple.queue",false,false,false,null);
        //通过消息队列来获取消息
        channel.basicConsume("simple.queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

消费者二

public class Customer2 {
    public static void main(String[] args) throws IOException {
        //通过工具类获取连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //设置每次只能接收一条消息
        channel.basicQos(1);
        //通过管道声明消息队列
        channel.queueDeclare("simple.queue",false,false,false,null);
        //通过消息队列来获取消息
        channel.basicConsume("simple.queue",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

如果不设置手动确认消息的话,后台管理页面中的队列使用会显示未完成消息的条数

Fanout

Fanout就是所说的广播,它只是将收到的所有消息广播到它知道的所有队列,就像微信的公众号一样,你订阅了那一个公众号,然后当那个公众号发了消息之后,所有订阅了他的用户都能够收到消息了

RabbitMQ常用消息模型
image-20220314221552497

订阅发布者

public class Producer {
    public static void main(String[] args) throws IOException {
        //在发布订阅者中添加一台交换机,让所有绑定了这台交换机的所有消息队列都能够收到消息
        //建立连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //创建交换机,后面的类型名称不能随意取
        channel.exchangeDeclare("logs","fanout");
        //发送消息,现在不需要管消息队列了,只需要往交换机中发送消息,由交换机给消息队列分发消息
        channel.basicPublish("logs","",null,"this is exchange".getBytes());
        //关闭连接和通道
        RabbitUtils.close(connection,channel);
    }
}

消费者一

public class Customer1 {
    public static void main(String[] args) throws IOException {
        //消费者需要生成一个临时的队列,用完即删,因为如果每次都创建一个持久的消息队列十分的消耗内存,因为可能队列的数据需要持久化
        //绑定对应的交换机就可以接收到对应的消息
        //建立连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //创建一个临时队列用于存储交换机发送的消息
        String queue = channel.queueDeclare().getQueue();
        //绑定对应的交换机和队列,最后一个参数是routingKey(路由key)
        channel.queueBind(queue,"logs","");
        //接收消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息1:"+new String(body));
            }
        });
    }
}

消费者二

public class Customer2 {
    public static void main(String[] args) throws IOException {
        //消费者需要生成一个临时的队列,用完即删,因为如果每次都创建一个持久的消息队列十分的消耗内存,因为可能队列的数据需要持久化
        //绑定对应的交换机就可以接收到对应的消息
        //建立连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //创建一个临时队列用于存储交换机发送的消息
        String queue = channel.queueDeclare().getQueue();
        //绑定对应的交换机和队列,最后一个参数是routingKey(路由key)
        channel.queueBind(queue,"logs","");
        //接收消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息2:"+new String(body));
            }
        });
    }
}

消费者三

public class Customer3 {
    public static void main(String[] args) throws IOException {
        //消费者需要生成一个临时的队列,用完即删,因为如果每次都创建一个持久的消息队列十分的消耗内存,因为可能队列的数据需要持久化
        //绑定对应的交换机就可以接收到对应的消息
        //建立连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //创建一个临时队列用于存储交换机发送的消息
        String queue = channel.queueDeclare().getQueue();
        //绑定对应的交换机和队列,最后一个参数是routingKey(路由key)
        channel.queueBind(queue,"logs","");
        //接收消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消息3:"+new String(body));
            }
        });
    }
}

Direct

Direct就是所说的路由模式,原本的广播模式只要我们给交换机发送了消息,消息就会发送给所有的消息队列,然后消费者就从消息队列中去消费消息

但是在路由模式之后,我们可以进行选择性的发送消息到消息队列,我们可以给消息队列执行对应的路由,然后我们可以通过路由来将不同的消息发送给不同的消息队列,最后由不同的消费者去消费它

RabbitMQ常用消息模型
image-20220314221618255

订阅发布者

public class Producer {
    public static void main(String[] args) throws IOException {
        //创建连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //创建交换机
        channel.exchangeDeclare("myDirect","direct");
        //发送消息
        channel.basicPublish("myDirect","second",null,"this is direct".getBytes());
        //关闭连接
        RabbitUtils.close(connection,channel);
    }
}

消费者一,消费者一绑定了两个路由,分别是first和second,所以如果交换机发送的路由地址为first或者second其中一个都能够接收到对应的消息

public class Customer1 {
    public static void main(String[] args) throws IOException {
        //建立连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("myDirect","direct");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //将交换机和队列进行绑定
        channel.queueBind(queue,"myDirect","first");
        channel.queueBind(queue,"myDirect","second");
        //获得消息并输出
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到的消息:"+new String(body));
            }
        });
    }
}

消费者二,消费者二只绑定了second的路由,所以只能接收到发送路由为second的交换机的信息

public class Customer2 {
    public static void main(String[] args) throws IOException {
        //建立连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("myDirect","direct");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //将交换机和队列进行绑定
        channel.queueBind(queue,"myDirect","second");
        //获得消息并输出
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到的消息:"+new String(body));
            }
        });
    }
}

Topic

前面的Direct相当于是路由交换机,根据对应的路由地址来向对应的消息队列发送消息,然后消息队列下的消费者进行消费。而Topic就相当于是动态路由,原本我们给消息队列和交换机绑定的时候,如果需要绑定多个路由路径的话,需要不断的复制原来的代码,现在我们可以在路由之间加上*和#来表示,星号表示单个字符串,井号表示可以没有或者由多个字符串

RabbitMQ常用消息模型
image-20220314221635266

订阅发布者

public class Producer {
    public static void main(String[] args) throws IOException {
        //建立连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //创建交换机,交换机的模式为topic
        channel.exchangeDeclare("topics","topic");
        //发送消息
        String routerKey = "user.save.go";
        channel.basicPublish("topics",routerKey,null,("this is topic "+routerKey).getBytes());
        //关闭连接
        RabbitUtils.close(connection,channel);
    }
}

消费者一,消费者一用user.*表示,表示由两个字符串组成,第一个字符串必须是user,第二个字符串不固定

public class Customer {
    public static void main(String[] args) throws IOException {
        //建立连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //创建交换机
        channel.exchangeDeclare("topics","topic");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //将消息队列和交换机进行绑定,并用通配符配置路由
        channel.queueBind(queue,"topics","user.*");
        //获取消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
    }
}

消费者二,消费者二用user.#表示,表示由1到n个字符串组成,后面可以跟字符串也可能不跟字符串,可以跟很多个字符串

public class Customer2 {
    public static void main(String[] args) throws IOException {
        //建立连接
        Connection connection = RabbitUtils.getConnection();
        //创建管道Channel
        Channel channel = connection.createChannel();
        //创建交换机
        channel.exchangeDeclare("topics","topic");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //将消息队列和交换机进行绑定,并用通配符配置路由
        channel.queueBind(queue,"topics","user.#");
        //获取消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
    }
}

本片文章就是介绍了一下RabbitMQ以及特点,快速使用几种常用的消息模型,来感受一个消息队列的用法,之后也会继续分享RabbitMQ相关的知识!


原文始发于微信公众号(爱吃薯片):RabbitMQ常用消息模型

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/26020.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!