什么是RabbitMQ?
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
RabbitMQ的特点
1、可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
2、灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
3、消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
4、高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
5、多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
6、多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
7、管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
8、跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
9、插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
RabbitMQ中的概念模型
消息模型
最普通的消息模型就是有订阅发布者、消息队列以及消费者,订阅发布者通过发送消息到消息队列当中,消费者从消息队列中去消费消息
RabbitMQ是异步通信的,原来的同步消息处理的话,耦合性太高,每次添加新功能的话就要改变原来的代码,而异步通信将业务代码订阅到对应的消息管道,然后订阅发布者发布了消息,这个业务代码就能立刻去处理请求
1、Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
2、Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。
3、Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
4、Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
5、Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
6、Connection 网络连接,比如一个TCP连接。
7、Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
8、Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
9、Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
10、Broker 表示消息队列服务器实体。
RabbitMQ安装
我们可以通过在服务器上用docker进行安装,也可以下载安装包到windows上启动运行
RabbitMQ官网:https://www.rabbitmq.com/
这里我们用docker进行安装
# --hostname 指的是主机的名字,主要是在集群部署的时候用于区分
# -e 设置默认的登录用户名以及密码
# --name 设置容器的名字
# -p 15672 指的是RabbitMQ的管理面板
# -p 5672 指的是RabbitMQ的消息传送端口
[root@xiaozhu /]# docker run -d --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=zhu -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 --name rabbit01 rabbitmq:3-management
安装完成之后,我们就可以用服务器ip:15672
查看到对应的效果了

RabbitMQ创建Hello World!
我们需要创建一个订阅发布者以及消费者,我们订阅发布者将消息发送到我们指定端口的消息队列中,然后消费者则通过消息队列来获取到我们发送的消息

首先我们需要引入RabbitMQ客户端的依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.0</version>
</dependency>
创建订阅发布者
public class Producer {
public static void main(String[] args) throws Exception {
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
//设置连接参数,分别是主机、端口号、VHost虚拟主机、用户名、密码
factory.setHost("xxx.xxx.xxx.xxx");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xxxxx");
factory.setPassword("xxxxx");
//建立连接
Connection connection = factory.newConnection();
//创建通道Channel,有了通道就可以基于通道向队列发送消息
Channel channel = connection.createChannel();
//创建队列,并且指定一个队列的名称
//参数1:队列名称,如果队列不存在自动创建
//参数2:用来定义队列特性是否要持久化 true 持久化队列 false 不持久化
//参数3:exclusive 是否独占队列 true 独占队列 false 不独占
//参数4:autoDelete 是否在消费完成后自动删除队列 true 自动删除 false 不删除
//参数5:额外附加参数
String queueName = "simple.queue";
channel.queueDeclare(queueName,false,false,false,null);
//发送消息,并且添加队列的名称,将消息以字节的形式传输,底层是字节传输的
//参数1:交换机名称
//参数2:队列名称
//参数3:传递消息额外设置,MessageProperties.PERSISTENT_TEXT_PLAIN表示消息持久化
//参数4:消息的具体内容
String message = "Hello world!";
channel.basicPublish("",queueName,null,message.getBytes());
System.out.println("消息发送成功:"+message);
//关闭通道和连接
channel.close();
connection.close();
}
}
运行成功后,我们就可以在控制台看到我们消息发送成功的代码了,接着我们到RabbitMQ的后台管理面板中查看queue队列,发现刚刚创建的队列被添加进来了,并且这个队列中准备着一条消息,当有消费者时,消费者就会主动的去消费这一条消息,我们可能点进去查看里面对应的消息

创建消费者
public class Customer {
public static void main(String[] args) throws Exception {
//创建连接工程
ConnectionFactory factory = new ConnectionFactory();
//设置连接的参数,跟订阅发布者一样
factory.setHost("xxx.xxx.xxx.xxx");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xxxxxx");
factory.setPassword("xxxxxx");
//建立连接
Connection connection = factory.newConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//创建队列,并且设置队列的名称
String queueName = "simple.queue";
channel.queueDeclare(queueName,false,false,false,null);
//通过管道来消费消息
//参数1:队列名称
//参数2:消息自动确认,默认是true,消费者向rabbitmq确认消息消费
//方式一:
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//处理消息
String msg = new String(body);
System.out.println("接收到消息了:"+msg);
}
});
//方式二:
//DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// String message = new String(delivery.getBody(), "UTF-8");
// System.out.println(" [x] Received '" + message + "'");
//};
//channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
System.out.println("等待消息接收中...");
}
}
运行成功后我们就能看到控制台输出的对应的消息,并且我们在RabbitMQ的后台管理面板中能查看到对应的连接以及管道,以及队列中的ready准备消息被消费了
-
控制台输出

-
消费者建立的连接

-
队列消息被消费

思考问题:
1、为什么控制台先输出的时等待消息接收中,而不是接收到消息了
这个就是RabbitMQ的回调机制,因为先执行了消费消息,但是需要一点事件通过管道去获取消息,因为后面的代码不会等待前面的代码执行完再执行,所以等待消息接收中就先输出了
2、明明订阅发布者中已经创建了消息队列,为什么还要在消费者中再创建一遍
因为我们的订阅发布者和消费者的执行顺序是不确定的,所以如果我们消费者先执行的话,他获取不到管道并且获取不到队列中的消息,所以为了避免这种情况的发生,所以在消费者中就又创建了一遍,并且队列不会因为订阅发布者和消费者都创建了就会产生两个队列,只会产生一个队列
注:我们每次都要给连接工程设置参数十分的麻烦,并且每次设置的参数也是固定值,所以我们可以封装成一个工具类,然后通过调用工具类中的方法返回一个连接,然后我们用那个连接去创建一个管道即可,这样就不用每次都去设置参数了。还可以添加一个关闭管道和连接的方法,减少了很多的重复代码
工具类
public class RabbitUtils {
private static ConnectionFactory factory;
static {
//单例模式的懒汉式,在引用对象的时候就创建连接工程
factory = new ConnectionFactory();
factory.setHost("xxx.xxx.xxx.xxx");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xxxxxx");
factory.setPassword("xxxxxx");
}
public Connection getConnection(){
Connection connection = null;
try {
connection = factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
public void close(Connection connection, Channel channel){
try {
if(connection!=null)connection.close();
if(channel!=null)channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
注:在前两种模型当中,存在着默认的路由,所以订阅发布者发送消息的方法中的routingKey就是队列名称,会找到这个队列名称去发送相应的消息,然后由消费者消费
Work Queue
多工作管道,我们在实践的过程中,通常会有一个能者多劳的思想,就比如说,一个订阅发布者每次发布50条消息,然后有两个消费者去消费他,并且我们想让这两个消费者能够按照自己的能力去消费消息。
但是在RabbitMQ中,消费者获取消息是轮询的,就是会按照顺序给消费者分发消息,假设现在有50条消息,需要五个消费者去消费掉,每个消费者就是各自消费十条消息,并且是按顺序消费的,如果设置了睡眠时间,消费的条数依旧是固定不变,就是消费的时间可能会长一点,所以我们可以设置他的prefetchCount的大小

生产者
public class Producer {
public static void main(String[] args) throws IOException {
//通过工具类来获取连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//通过管道声明队列
channel.queueDeclare("simple.queue",false,false,false,null);
//给管道中发送消息
for (int i=1;i<=10;i++){
channel.basicPublish("","simple.queue",null,("这是一条消息"+i).getBytes());
}
//通过工具类关闭连接
RabbitUtils.close(connection,channel);
}
}
消费者1(我们让消费者一延迟两秒中输出消息)
public class Customer {
public static void main(String[] args) throws IOException {
//通过工具类获取连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//通过管道声明消息队列
channel.queueDeclare("simple.queue",false,false,false,null);
//通过消息队列来获取消息
channel.basicConsume("simple.queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println(msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
消费者2
public class Customer2 {
public static void main(String[] args) throws IOException {
//通过工具类获取连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//通过管道声明消息队列
channel.queueDeclare("simple.queue",false,false,false,null);
//通过消息队列来获取消息
channel.basicConsume("simple.queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println(msg);
}
});
}
}
结果:
消费者一虽然说每隔两秒中输出对应获得的消息,但也不影响他的总消息量,总消息量跟消费者二一样,而消费者二在1秒钟就输出完了所有的消息,我们并且能看到后台管理面板中消息是一瞬间被拿完的,说明消费者一和消费者二同时拿到了十条消息,由于消费者一延迟了两秒输出,所以这是消费延迟,这个就是RabbitMQ的默认机制,循环的获取消息,按照顺序的给消费者分发消息,这样的坏处就是不能能者多劳,如果有一个消费者消费的很慢,并且消息队列中的消息在不断的增加,那很有可能会导致消息阻塞,所以我们可以设置每次只能消费一个消息,这样子在消息队列中的剩余的消息就会被处理快的消费者给消费掉
消息自动确认机制
我们消费者在拿到消息的时候,将没有完全处理完的消息也同时确认了,他不关心你下面的业务是否已经处理完毕,他只负责拿到对应的消息就告诉队列确认完毕了,所以这是十分的不安全的,我们可以设置为手动确认,这样子在业务处理完成之后,会告诉消息已经完成,并且继续消费下一条消息
消费者一
public class Customer {
public static void main(String[] args) throws IOException {
//通过工具类获取连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//通过管道声明消息队列
channel.queueDeclare("simple.queue",false,false,false,null);
//通过消息队列来获取消息
channel.basicConsume("simple.queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println(msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
消费者二
public class Customer2 {
public static void main(String[] args) throws IOException {
//通过工具类获取连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//设置每次只能接收一条消息
channel.basicQos(1);
//通过管道声明消息队列
channel.queueDeclare("simple.queue",false,false,false,null);
//通过消息队列来获取消息
channel.basicConsume("simple.queue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println(msg);
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
如果不设置手动确认消息的话,后台管理页面中的队列使用会显示未完成消息的条数
Fanout
Fanout就是所说的广播,它只是将收到的所有消息广播到它知道的所有队列,就像微信的公众号一样,你订阅了那一个公众号,然后当那个公众号发了消息之后,所有订阅了他的用户都能够收到消息了

订阅发布者
public class Producer {
public static void main(String[] args) throws IOException {
//在发布订阅者中添加一台交换机,让所有绑定了这台交换机的所有消息队列都能够收到消息
//建立连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//创建交换机,后面的类型名称不能随意取
channel.exchangeDeclare("logs","fanout");
//发送消息,现在不需要管消息队列了,只需要往交换机中发送消息,由交换机给消息队列分发消息
channel.basicPublish("logs","",null,"this is exchange".getBytes());
//关闭连接和通道
RabbitUtils.close(connection,channel);
}
}
消费者一
public class Customer1 {
public static void main(String[] args) throws IOException {
//消费者需要生成一个临时的队列,用完即删,因为如果每次都创建一个持久的消息队列十分的消耗内存,因为可能队列的数据需要持久化
//绑定对应的交换机就可以接收到对应的消息
//建立连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//创建一个临时队列用于存储交换机发送的消息
String queue = channel.queueDeclare().getQueue();
//绑定对应的交换机和队列,最后一个参数是routingKey(路由key)
channel.queueBind(queue,"logs","");
//接收消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息1:"+new String(body));
}
});
}
}
消费者二
public class Customer2 {
public static void main(String[] args) throws IOException {
//消费者需要生成一个临时的队列,用完即删,因为如果每次都创建一个持久的消息队列十分的消耗内存,因为可能队列的数据需要持久化
//绑定对应的交换机就可以接收到对应的消息
//建立连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//创建一个临时队列用于存储交换机发送的消息
String queue = channel.queueDeclare().getQueue();
//绑定对应的交换机和队列,最后一个参数是routingKey(路由key)
channel.queueBind(queue,"logs","");
//接收消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息2:"+new String(body));
}
});
}
}
消费者三
public class Customer3 {
public static void main(String[] args) throws IOException {
//消费者需要生成一个临时的队列,用完即删,因为如果每次都创建一个持久的消息队列十分的消耗内存,因为可能队列的数据需要持久化
//绑定对应的交换机就可以接收到对应的消息
//建立连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//创建一个临时队列用于存储交换机发送的消息
String queue = channel.queueDeclare().getQueue();
//绑定对应的交换机和队列,最后一个参数是routingKey(路由key)
channel.queueBind(queue,"logs","");
//接收消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息3:"+new String(body));
}
});
}
}
Direct
Direct就是所说的路由模式,原本的广播模式只要我们给交换机发送了消息,消息就会发送给所有的消息队列,然后消费者就从消息队列中去消费消息
但是在路由模式之后,我们可以进行选择性的发送消息到消息队列,我们可以给消息队列执行对应的路由,然后我们可以通过路由来将不同的消息发送给不同的消息队列,最后由不同的消费者去消费它

订阅发布者
public class Producer {
public static void main(String[] args) throws IOException {
//创建连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare("myDirect","direct");
//发送消息
channel.basicPublish("myDirect","second",null,"this is direct".getBytes());
//关闭连接
RabbitUtils.close(connection,channel);
}
}
消费者一,消费者一绑定了两个路由,分别是first和second,所以如果交换机发送的路由地址为first或者second其中一个都能够接收到对应的消息
public class Customer1 {
public static void main(String[] args) throws IOException {
//建立连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("myDirect","direct");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//将交换机和队列进行绑定
channel.queueBind(queue,"myDirect","first");
channel.queueBind(queue,"myDirect","second");
//获得消息并输出
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("收到的消息:"+new String(body));
}
});
}
}
消费者二,消费者二只绑定了second的路由,所以只能接收到发送路由为second的交换机的信息
public class Customer2 {
public static void main(String[] args) throws IOException {
//建立连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("myDirect","direct");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//将交换机和队列进行绑定
channel.queueBind(queue,"myDirect","second");
//获得消息并输出
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("收到的消息:"+new String(body));
}
});
}
}
Topic
前面的Direct相当于是路由交换机,根据对应的路由地址来向对应的消息队列发送消息,然后消息队列下的消费者进行消费。而Topic就相当于是动态路由,原本我们给消息队列和交换机绑定的时候,如果需要绑定多个路由路径的话,需要不断的复制原来的代码,现在我们可以在路由之间加上*和#来表示,星号表示单个字符串,井号表示可以没有或者由多个字符串

订阅发布者
public class Producer {
public static void main(String[] args) throws IOException {
//建立连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//创建交换机,交换机的模式为topic
channel.exchangeDeclare("topics","topic");
//发送消息
String routerKey = "user.save.go";
channel.basicPublish("topics",routerKey,null,("this is topic "+routerKey).getBytes());
//关闭连接
RabbitUtils.close(connection,channel);
}
}
消费者一,消费者一用user.*表示,表示由两个字符串组成,第一个字符串必须是user,第二个字符串不固定
public class Customer {
public static void main(String[] args) throws IOException {
//建立连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare("topics","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//将消息队列和交换机进行绑定,并用通配符配置路由
channel.queueBind(queue,"topics","user.*");
//获取消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
消费者二,消费者二用user.#表示,表示由1到n个字符串组成,后面可以跟字符串也可能不跟字符串,可以跟很多个字符串
public class Customer2 {
public static void main(String[] args) throws IOException {
//建立连接
Connection connection = RabbitUtils.getConnection();
//创建管道Channel
Channel channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare("topics","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//将消息队列和交换机进行绑定,并用通配符配置路由
channel.queueBind(queue,"topics","user.#");
//获取消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
本片文章就是介绍了一下RabbitMQ以及特点,快速使用几种常用的消息模型,来感受一个消息队列的用法,之后也会继续分享RabbitMQ相关的知识!
原文始发于微信公众号(爱吃薯片):RabbitMQ常用消息模型
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/26020.html