RabbitMQ 详解
介绍
RabbitMQ是一个消息代理:它接受和转发消息。你可以把他想象成一个邮局。当你把你想要投递的邮件放在一个邮箱里时,邮差最终会将邮件投递给你的收件人。在这个类比中,RabbitMQ是邮箱、邮差、邮局。RabbitMQ和邮局的主要区别在于它不处理纸张,相反的它接受、存储和转发数据的二进制blob消息。
RabbitMQ的一下术语:
- 生产者:发送消息的程序是生产者
- 消费者:接收消息的程序
- 队列:上述类比中的邮箱,用于在RabbitMQ中存储消息,生产者将消息放入队列,消费者从队列中获取消息。队列受到主机的内存和磁盘的限制和约束
⚠️生产者、消费者和代理(RabbitMQ)可以不在同一个主机上。反之也可以在同一个主机上。
例如生产者与消费者是同一应用程序,我将其称为“自发自接”程序
hello world
下面将进行简单的收发消息
注:本文将全部基于Spring Boot
yaml配置
spring:
rabbitmq:
# addresses: amqp://admin:secret@localhost
host: localhost
port: 5672
username: admin
password: secret
# virtual-host: /uat
code
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ApplicationRunner runner(AmqpTemplate template) {
// 注入RabbitTemplate template,发消息
return args -> template.convertAndSend("myqueue", "hello world");
}
@Bean
public Queue myQueue() {
// 创建队列
return new Queue("myqueue");
}
// 消息接收
@RabbitListener(queues = "myqueue")
public void listen(String in) {
System.out.println(in);
}
}
结语
如此我们就完成了一次简单的消息收发
下图展示了消息收发
Exchange
RabbitMQ中消息传递模型的核心思想是生产者从不将消息直接发送到队列上。
生产者只能向交换机(exchange)发送消息,交换机只做一件非常简单的事情,它接受来自生产者的消息,并且将消息推入队列中。交换机应该如何处理它收到的消息,是推到特定队列?还是推到多个队列?或者丢弃这个消息?
具体的规则是由交换机的类型定义的。
类型说明
交换机有以下几种类型:
- direct,直连交换机
- 根据路由键(routing key)投递到不同的队列
- topic,主题交换机
- 根据路由键进行模式匹配后投递,#表示0个或多个词,*表示一个词
- 并且必须使用
.
对单词进行隔开 - 路由键看起来像是这样:
xxx.xx.xx
、xxx.xx.*
、xxx.xx.#
、*.xx.#
、#.xx.*
- ⚠️注意:这里的通配符仅限绑定关系上的路由键,在发送消息的时候这样写是无效的
- fanout,扇形交换机
- 它收到的所有消息广播给所有绑定此交换机的队列
- headers,头交换机
- 根据消息中的headers属性进行匹配投递
上面提到了队列与交换机的绑定,那么如何绑定呢
@Bean
public Binding getBinding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("this is routing key");
}
上面的代码片段展示如何将一个队列绑定到一个直连交换机(DirectExchange)
代码演示
直连交换机(DirectExchange)
队列、交换机、绑定关系声明
@Bean
public Queue queue() {
return new Queue("queue name");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct exchange name");
}
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("routing key");
}
发消息
// 正式使用应该注入RabbitTemplate bean,而不是new一个
RabbitTemplate template = new RabbitTemplate();
template.convertAndSend("exchange", "routing key", "message data");
接收消息
@RabbitListener(queues = "queue name")
public void listen(String in){
System.out.println(in);
}
可能有人会问为什么接收的时候为什么不用写路由键。是因为我们写了队列,队列绑定了路由键和交换机。
完整演示代码
@Bean
Queue testQueue1() {
return new Queue("test.queue1");
}
@Bean
DirectExchange testExchange() {
return new DirectExchange("test.direct.exchange");
}
@Bean
Binding testBinding1(@Qualifier("testQueue1") Queue testQueue, @Qualifier("testExchange") TopicExchange testExchange) {
return BindingBuilder.bind(testQueue).to(testExchange).with("routing.key.test");
}
@RabbitListener(queues = "test.queue1")
public void listen1(String in) {
System.out.println("-------------------");
System.out.println(in);
}
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
@Qualifier("testBinding1")
Binding binding;
@GetMapping("testSend")
void testSend() {
rabbitTemplate.convertAndSend(binding.getExchange(), binding.getRoutingKey(), "hello1");
rabbitTemplate.convertAndSend(binding.getExchange(), binding.getRoutingKey(), "hello2");
}
主题交换机(TopicExchange)
队列、交换机、绑定关系声明
@Bean
public TopicExchange exchange() {
return new TopicExchange("topic exchange name");
}
@Bean
public Queue queue1() {
return new Queue("queue name1");
}
@Bean
public Queue queue2() {
return new Queue("queue name2");
}
@Bean
public Binding binding1(Queue queue1, TopicExchange exchange) {
return BindingBuilder.bind(queue1).to(exchange).with("routing key partten");
}
@Bean
public Binding binding2(Queue queue2, TopicExchange exchange) {
return BindingBuilder.bind(queue2).to(exchange).with("routing key partten");
}
我们声明了2个队列以及两个绑定关系,并且路由键都是空的
消息发送
// 正式使用应该注入RabbitTemplate bean,而不是new一个
RabbitTemplate template = new RabbitTemplate();
template.convertAndSend("exchange", "routing key", "message data");
接收消息
@RabbitListener(queues = "queue name1")
public void listen1(String in){
System.out.println(in);
}
@RabbitListener(queues = "queue name2")
public void listen1(String in){
System.out.println(in);
}
同时接收两个队列的消息,以便测试topic类型的交换机
假设我们两个此时的绑定关系绑定的路由键分别是:“routing.key.test.1”、“routing.key.test.2”
那么我们在发送消息的时候填写路由键:“routing.key.test.*”、”routing.key.test#”的时候上面声明的队列都会收到消息
完整演示代码
@Bean
Queue testQueue1() {
return new Queue("test.queue1");
}
@Bean
Queue testQueue2() {
return new Queue("test.queue2");
}
@Bean
TopicExchange testExchange() {
return new TopicExchange("test.topic.exchange");
}
@Bean
Binding testBinding1(@Qualifier("testQueue1") Queue testQueue, @Qualifier("testExchange") TopicExchange testExchange) {
return BindingBuilder.bind(testQueue).to(testExchange).with("routing.key.test.#");
}
@Bean
Binding testBinding2(@Qualifier("testQueue2") Queue testQueue, @Qualifier("testExchange") TopicExchange testExchange) {
return BindingBuilder.bind(testQueue).to(testExchange).with("routing.key.test.*");
}
@RabbitListener(queues = "test.queue1")
public void listen1(String in) {
System.out.println("--------queue1-----------");
System.out.println(in);
}
@RabbitListener(queues = "test.queue2")
public void listen2(String in) {
System.out.println("---------queue2----------");
System.out.println(in);
}
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
@Qualifier("testExchange")
TopicExchange testExchange;
@GetMapping("testSend")
void testSend() {
rabbitTemplate.convertAndSend(testExchange.getName(), "routing.key.test.hh", "hello1");
rabbitTemplate.convertAndSend(testExchange.getName(), "routing.key.test.hhh.hhh", "hello2");
}
消息打印结果
我们看到queue1
收到了两次消息而queue2
只收到一次,这是因为绑定的路由键匹配的结果
queue1
绑定的路由键:routing.key.test.#
,表示接收routing.key.test.
开头的所有路由键消息
queue2
绑定的路由键:routing.key.test.*
,表示接收routing.key.test.xxx
xxx表示任意字符,但后续不能有.
扇形交换机(FanoutExchange)
交换机收到消息将会下发给所有绑定的队列
队列、交换机、绑定关系声明
@Bean
public FanoutExchange exchange() {
return new FanoutExchange("fanout exchange name");
}
@Bean
public Queue queue1() {
return new Queue("queue name1");
}
@Bean
public Queue queue2() {
return new Queue("queue name2");
}
@Bean
public Binding binding1(Queue queue1, FanoutExchange exchange) {
return BindingBuilder.bind(queue1).to(exchange);
}
@Bean
public Binding binding2(Queue queue2, FanoutExchange exchange) {
return BindingBuilder.bind(queue2).to(exchange);
}
消息发送
// 正式使用应该注入RabbitTemplate bean,而不是new一个
RabbitTemplate template = new RabbitTemplate();
template.convertAndSend("exchange", "message data");
接收消息
@RabbitListener(queues = "queue name1")
public void listen1(String in){
System.out.println(in);
}
@RabbitListener(queues = "queue name2")
public void listen1(String in){
System.out.println(in);
}
完整演示代码
@Bean
Queue testQueue1() {
return new Queue("test.queue1");
}
@Bean
Queue testQueue2() {
return new Queue("test.queue2");
}
@Bean
FanoutExchange testExchange() {
return new FanoutExchange("test.fanout.exchange");
}
@Bean
Binding testBinding1(@Qualifier("testQueue1") Queue testQueue, @Qualifier("testExchange") FanoutExchange testExchange) {
return BindingBuilder.bind(testQueue).to(testExchange);
}
@Bean
Binding testBinding2(@Qualifier("testQueue2") Queue testQueue, @Qualifier("testExchange") FanoutExchange testExchange) {
return BindingBuilder.bind(testQueue).to(testExchange);
}
@RabbitListener(queues = "test.queue1")
public void listen1(String in) {
System.out.println("--------queue1-----------");
System.out.println(in);
}
@RabbitListener(queues = "test.queue2")
public void listen2(String in) {
System.out.println("---------queue2----------");
System.out.println(in);
}
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
@Qualifier("testExchange")
FanoutExchange testExchange;
@GetMapping("testSend")
void testSend() {
rabbitTemplate.convertAndSend(testExchange.getName(), null, "hello");
}
fanout类型的交换机没有路由键这一说,我们在发送的时候留空路由键就行
执行结果
头交换机(HeadersExchange)
我们需要注意匹配规则
- all:表示全匹配,headers全部匹配才下发消息
- any:表示部分匹配,headers中有匹配就下发消息
队列、交换机、绑定关系声明
@Bean
public HeadersExchange exchange() {
return new HeadersExchange("headers exchange name");
}
@Bean
public Queue queue1() {
return new Queue("queue name1");
}
@Bean
public Queue queue2() {
return new Queue("queue name2");
}
@Bean
public Binding binding1(Queue queue1, HeadersExchange exchange) {
HashMap<String, Object> headerValues = new HashMap<String, Object>() {{
put("key1", "val1");
put("key2", "val2");
}};
// 全匹配
return BindingBuilder.bind(queue1).to(exchange).whereAll(headerValues).match();
}
@Bean
public Binding binding2(Queue queue2, HeadersExchange exchange) {
HashMap<String, Object> headerValues = new HashMap<String, Object>() {{
put("key1", "val1");
put("key2", "val2");
}};
// 部分匹配
return BindingBuilder.bind(queue2).to(exchange).whereAny(headerValues).match();
}
消息发送
// 正式使用应该注入RabbitTemplate bean,而不是new一个
RabbitTemplate template = new RabbitTemplate();
template.convertAndSend("exchange", "message data");
接收消息
@RabbitListener(queues = "queue name1")
public void listen1(String in){
System.out.println(in);
}
@RabbitListener(queues = "queue name2")
public void listen1(String in){
System.out.println(in);
}
完整演示代码
@Bean
Queue testQueue1() {
return new Queue("test.queue1");
}
@Bean
Queue testQueue2() {
return new Queue("test.queue2");
}
@Bean
HeadersExchange testExchange() {
return new HeadersExchange("test.headers.exchange");
}
@Bean
Binding testBinding1(@Qualifier("testQueue1") Queue testQueue, @Qualifier("testExchange") HeadersExchange testExchange) {
HashMap<String, Object> headerValues = new HashMap<String, Object>() {{
put("key1", "val1");
put("key2", "val2");
}};
// 全匹配
return BindingBuilder.bind(testQueue).to(testExchange).whereAll(headerValues).match();
}
@Bean
Binding testBinding2(@Qualifier("testQueue2") Queue testQueue, @Qualifier("testExchange") HeadersExchange testExchange) {
HashMap<String, Object> headerValues = new HashMap<String, Object>() {{
put("key1", "val1");
put("key2", "val2");
}};
// 部分匹配
return BindingBuilder.bind(testQueue).to(testExchange).whereAny(headerValues).match();
}
@RabbitListener(queues = "test.queue1")
public void listen1(String in) {
System.out.println("--------queue1-----------");
System.out.println(in);
}
@RabbitListener(queues = "test.queue2")
public void listen2(String in) {
System.out.println("---------queue2----------");
System.out.println(in);
}
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
@Qualifier("testExchange")
HeadersExchange testExchange;
@GetMapping("testSend")
void testSend() {
rabbitTemplate.convertAndSend(testExchange.getName(), "", "hello1", message -> {
message.getMessageProperties().setHeader("key1", "val1");
message.getMessageProperties().setHeader("key2", "val2");
return message;
});
rabbitTemplate.convertAndSend(testExchange.getName(), "", "hello2", message -> {
message.getMessageProperties().setHeader("key1", "val1");
return message;
});
}
我们发送了两次消息,分别对应全匹配以及部分匹配的。可以看到全匹配的收到1次消息,部分匹配的收到两次消息
Queue
RabbitMQ中的队列是消息的有序集合。消息遵循FIFO(先进先出)的方式。通俗来讲,队列它是一个有顺序的数据结构,主要有两个操作:在尾部添加消息、从头部取出消息。
本节我们讲解有关RabbitMQ队列的信息。
队列名称
- 名称最长可以是255字节的UTF-8字符
- 创建队列的时候名称是可选的,如果为命名那么代理(RabbitMQ)会生成一个唯一的名称(不建议)
**amq.**
开头的队列名称是保留名称,如果我们尝试添加此格式的名称会返回403错误
队列属性
队列具有定义其行为方式的属性。
- 名称
- 持久(队列将在代理(RabbitMQ)重启后继续存在)
- 独占(仅由一个连接使用,当该连接关闭时,队列将删除)
- 自动删除(当最后一个使用者取消订阅时,将删除)
- 参数(可选的。由插件和代理特定功能(如TTL、队列长度限制等)使用)
可选参数
可选的队列参数,也称为“x-arguments”,因为它们是 AMQP 0-9-1 协议中的字段名称,是任意键/值的映射(字典) 声明队列时客户端可以提供的对。
可选参数由各种功能和插件使用,例如
- 队列类型(例如仲裁或经典队列)
- 消息和队列 TTL
- 队列长度限制
- 最大优先级
- 消费者的优先事项
大部分可选参数可以在队列声明后动态更改,部分参数必须在队列声明的时候设置例如:队列类型(x-queue-type)、队列最大优先级(x-max-priority),并且在之后无法更改。
各项参数的解释:
- Message TTL
x-message-ttl
- 消息存活时间(毫秒)
- Auto expire
x-expires
- 在自动删除队列之前队列可以闲置多长时间(毫秒)
- Max length
x-max-length
- 队列中消息的最大条数
- Max length bytes
x-max-length-bytes
- 队列中消息的总大小
- Overflow behaviour
x-overflow
- 设置队列的溢出行为。即队列达到最大长度时再次发消息会如何处置。
- 需要注意默认为drop-head
drop-head
删除头部消息(即删除队列中最先入队的消息)reject-publish
拒绝接收,收到的消息将被丢弃(如果绑定死信队列则发到死信队列)reject-publish-dlx
拒绝接收,收到的消息将被丢弃并且不尝试转发的死信队列
- Dead letter exchange
x-dead-letter-exchange
- 死信交换机名称,如果消息被丢弃(溢出行为设为
reject-publish
)或过期将转发到该交换机
- Dead letter routing key
x-dead-letter-routing-key
- 死信路由键,若没有设置则在转发到死信交换机的时候使用原始的路由键
- Single active consumer
x-single-active-consumer
- 确保一次只有一个消费者从队列中消费,并在消费者被取消或死亡的情况下转移到下一个消费者
- Maximum priority
x-max-priority
- 队列支持的最大优先级数。如果没有设置队列不支持消息优先级
- Lazy mode
x-queue-mode=layz
- 将队列设为惰性模式,在磁盘上保留尽可能多的消息以减少RAM使用。如果没有设置队列中的消息将保存在内存中以便更快的传递消息
- Master locator
x-queue-master-locator
- 主队列选择策略(集群相关)
min-masters
选择最小绑定主机数量的节点client-local
选择客户段声明队列连接到的节点random
随机选择一个节点
可选参数与策略定义的优先级
当我们提供x-
开头的参数和策略相同的参数时,前者优先也就是x-
开头的参数优先。
如果同时使用了管理员策略那么管理员策略优先于客户端提供的策略。
对于最大队列长度或者TTL等数值,将使用两者中的较小值。
消息排序
RabbitMQ中的队列是消息的有序集合。消息遵循FIFO(先进先出)的规则,同时不保证优先级队列和分片队列的顺序是FIFO
消费者还可能受到竞争(即一个队列多个消费者)、消费者优先级、消息重新传递的影响。
如果是多个消费者并且优先级相同的话,消息将按照FIFO的顺序进行投递消息,对于这多个消费者则是用类似轮询的方式进行投递(即:假设A和B两个消费者,先A后B再A如此循环亦或者先B后A再B的循环)
持久性
队列可以是持久的,也可以是临时的。持久队列的元数据存储在磁盘上,而临时队列的元数据尽可能的存储在内存中。
- 临时队列将在节点启动的时候删除。因此,它们无法在节点重新启动后继续存在,按照这一性质临时队列中的消息也将丢失。
- 持久队列在节点启动的时候回复,包括其中标记了持久性的消息。如果消息未标记持久那么将在恢复过程中被丢弃,即便是它们存储在持久队列中
临时队列
在某些场景下队列应该是临时的。虽然客户端可以删除它们在断开连接之前声明的队列,这并不是很方便。最重要的是客户端可能会失败,而导致某些队列长时间闲置。
有三种方法可以自动删除队列:
- 独占队列(下面展开讲)
- TTL(也在下面讲)
- 自动删除队列
自动删除队列将在最后一个使用者取消订阅时删除(手动断开连接、TCP连接丢失等)。如果从来没有任何使用者,这个时候不会自动删除,对于这种情况建议使用独占队列或者TTL
独占队列
独占队列只能使用(使用、清除、删除等)通过其声明连接。如果使用其他不同的连接会导致通道级别异常RESOURCE_LOCKED
独占队列的声明连接关闭时被删除或消失(例如,由于底层TCP连接丢失)。因此它们仅适用于特定与客户端的临时队列。
通常独占队列的名称为声明连接的服务名称。
生存时间(TTL)和长度限制
队列的长度可以受到限制。队列和消息可以具有TTL
这两个功能可用于数据过期和限制队列最多可以使用多少资源(RAM、磁盘空间)。
内存存储和持久存储
队列将消息保存在RAM和磁盘上。在某些协议中(例如AMQP 0-9-1)一定程度上由客户端控制。例如在spring-amqp
中我们通过deliveryMode
属性(默认就是持久的)来控制
MessageProperties properties = new MessageProperties();
// 对于非持久消息才设置,因为默认是持久消息
properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
优先级
队列可以有0个或多个优先级。仅通过可选参数配置了最大优先级的队列进行优先级排序,生产者使用可选字段priority
来确定优先级。
如果需要使用优先级队列,建议使用1~10个队列。目前使用更多的优先级会消耗更多的字段(Erlang进程)。
常用参数解释
- durable-持久性
- 队列将会持久化到磁盘上,即时代理(RabbitMQ)重启后也不会丢失
- exclusive-独占(排他性)
- 独占队列只有创建的连接可以访问,并且在该连接关闭时删除队列
- auto-delete-自动删除
- 如果设置,则当所有使用者都关闭连接时删除队列。如果从始至终都没有使用者那么不会自动删除
我们可以看到在spring-amqp
中默认的设置。持久、非排他、非自动删除
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/195204.html