1.什么是MQ
MQ(Message Queue):消息队列,是一种”先进先出“的数据结构。典型的模型就是我们所说的生产者、消费者模型。生产者不断地向消息队列中生产消息,消费者不断地从消息队列中获取消息,同时消息的生产和消费都是异步的,可以实现系统间的解耦
2.什么是RabbitMQ
RabbitMQ是使用Erlang语言开发的基于高级消息队列协议(Advanced Message Queuing Protocol,AMQP)的开源消息队列。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、数据可靠性、数据安全性
3.安装RabbitMQ
使用cat /etc/os-release
查看系统版本号,我这里使用的是Ubuntu 20.04
,对应的分支是focal
cat /etc/os-release
NAME="Ubuntu"
VERSION="20.04.2 LTS (Focal Fossa)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 20.04.2 LTS"
VERSION_ID="20.04"
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
VERSION_CODENAME=focal
UBUNTU_CODENAME=focal
3.1 安装erlang和RabbitMQ
使用以下脚本快速安装:
#!/usr/bin/sh
sudo apt-get install curl gnupg apt-transport-https -y
## Team RabbitMQ's main signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Cloudsmith: modern Erlang repository
curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
## Cloudsmith: RabbitMQ repository
curl -1sLf https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/gpg.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg > /dev/null
## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu focal main
deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.E495BB49CC4BBE5B.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu focal main
## Provides RabbitMQ
##
deb [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu focal main
deb-src [signed-by=/usr/share/keyrings/io.cloudsmith.rabbitmq.9F4587F226208342.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu focal main
EOF
## Update package indices
sudo apt-get update -y
## Install Erlang packages
sudo apt-get install -y erlang-base
erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key
erlang-runtime-tools erlang-snmp erlang-ssl
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
## Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing
3.2 启动RabbitMQ
# 启动rabbitmq
systemctl start rabbitmq-server
# 查看rabbitmq运行状态
systemctl status rabbitmq-server
3.3 加载web管理界面插件
# 加载RabbitMQ的插件,这样我们可以使用web界面来管理RabbitMQ,默认使用guest用户登录,且必须使用localhost:15672来访问管理界面
sudo rabbitmq-plugins enable rabbitmq_management
# username:guest
# password:guest
3.4 RabbitMQ配置文件
RabbitMQ给我们提供了一个配置文件模版,我们可以参照这个来配置。
模版文件地址:https://github.com/rabbitmq/rabbitmq-server/blob/v3.8.x/deps/rabbit/docs/rabbitmq.conf.example
在/etc/rabbitmq目录下创建rabbitmq.conf
# 文件名rabbitmq.conf
# 当该值为true时,我们只能通过localhost:15672来访问管理界面
# 当该值为false时,我们可以通过ip:15672来访问管理界面
loopback_users.guest = false
3.5 相关命令
# 查看相关命令的使用
sudo rabbitmqctl help
4. Java整合RabbitMQ
4.1 引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
4.2 第一种模型(直连)

直连模式下,只有一个生产者和消费者,如果消费者处理消息的速度慢,但是生产者在源源不断的生产消息,就会导致消息的挤压
4.2.1 创建生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 如果不设置,默认使用guest用户
factory.setUsername("stone");
factory.setPassword("123456");
factory.setHost("192.168.0.19");
factory.setVirtualHost("test");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
// 声明与队列相关的参数,boolean durable 如果设置为true的话就是将队列持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World!";
// String exchange, String routingKey, BasicProperties props, byte[] body
// 发布消息
// MessageProperties.PERSISTENT_TEXT_PLAIN 将消息持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
4.2.2 创建消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class Recv {
// 这里我们并没有使用try-with-resource语句自动关闭channel和connection,这样可以使程序一直保持运行接收消息
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 如果不设置,默认使用guest用户
factory.setUsername("stone");
factory.setPassword("123456");
factory.setHost("192.168.0.19");
factory.setVirtualHost("test");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
4.3第二种模型(Work Queue)

在工作队列模式中,默认使用的是轮询调度
(Round-robin dispatching),RabbitMQ将会依次将消息发送给每个消费者
,每个消费者将获得相同数量的消息
也可以手动设置为公平调度(Fair dispatch),即处理消息快的消费者会获得更多数量的消息来处理,处理消息慢的消费者获得的消息数量相对较少。
// 告诉RabbitMQ一次只给消费者一条消息,在该消费者处理完上一条消息之前,不再给该消费者发送消息
channel.basicQos(1);
4.3.1 生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 如果不设置,默认使用guest用户
factory.setUsername("stone");
factory.setPassword("123456");
factory.setHost("192.168.0.104");
factory.setVirtualHost("test");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 队列持久化
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
for (int i = 0; i < 100; i++) {
String message = "task_message_.";
message = message + i;
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
4.3.2 消费者1
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class Worker1 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 如果不设置,默认使用guest用户
factory.setUsername("stone");
factory.setPassword("123456");
factory.setHost("192.168.0.104");
factory.setVirtualHost("test");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 指示一次性只接收一条消息
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 将消息确认改为收到确认,当消费者处理消息宕机时,可以保证消息不丢失
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(3);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
4.3.3 消费者2
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class Worker2 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 如果不设置,默认使用guest用户
factory.setUsername("stone");
factory.setPassword("123456");
factory.setHost("192.168.0.104");
factory.setVirtualHost("test");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 指示一次性只接收一条消息
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 将消息确认改为收到确认,当消费者处理消息宕机时,可以保证消息不丢失
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
4.4 第三种模型(Publish/Subscribe)

在Publish/Subscribe模型中,生产者将消息发布到Exchange中,Exchange将消息推送到队列中,消费者再去队列中获得消息进行消费,这种模式是将一条消息发送给多个消费者。
RabbitMQ中消息传递模型的核心思想是,生产者从不直接向队列发送任何消息。生产者将消息发送到Exchange,Exchange将消息推送到队列中。
Exchange的类型:direct,topic,headers 和fanout。
**在这种模式下,我们主要介绍fanout
类型的Exchange。**使用fanout
类型,不需要设置routingKey
,Exchange会将消息广播到与之绑定的所有的队列中。
-
direct -
direct类型的Exchange -
queue和Exchange绑定,并设置一个routingKey -
和routingKey完全匹配的消息将被路由到queue
-
topic -
* 指代一个字符 -
# 指代一个或多个字符 -
topic类型的交换机 -
queue和Exchange绑定,并设置一个规则的routingKey -
匹配routingKey规则的消息,将路由到指定的queue
-
路由规则 -
headers -
fanout -
fanout类型的Exchange -
queue和Exchange绑定,不设置routingKey -
将收到的消息广播到与之绑定的所有队列
4.4.1 生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Provider {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.106");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (int i = 0; i < 10; i++) {
String message = "测试fanout模型_" + i;
// 因为是广播模型,所以不需要指定routingKey,消息将会推送至所有的queue
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
4.4.2 消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class Consumer {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.106");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 获得临时队列
String queueName = channel.queueDeclare().getQueue();
// 将交换机和queue绑定起来,无需routingKey
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
4.5 第四种模型(Routing)

与Subscribe/Publish模式不同之处在于,Routing模式的Exchange类型是direct
,并且queue和Exchange绑定的时候,设置了routingKey,只有routingKey
完全匹配的消息才会路由到queue中。
当消息的routingKey
为error
时,消息将被路由到q1
,当消息的routingKey
为warning/info
时,消息将被路由到q2
。
4.5.1 生产者
public class Provider {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.106");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String routingKey = "info";
for (int i = 0; i < 10; i++) {
String message = "direct_routing_message_" + i;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
}
}
4.5.2 消费者
public class Consumer {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.106");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 获得临时队列
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "info");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
该模式下需要routingKey完全匹配的消息才能路由到queue中,在消息类型多的情况下不利于扩展,于是Topics模式诞生了
4.6 第五种模型(Topics)

该模式与Routing模式的区别在于,Routing模式下,routingKey是指定的,Topics模式下,routingKey必须是一个单词列表
,用.
分割,最多255个字节,例如:"my.routingkey"
,我们可以使用通配符来指定路由规则。
当然如果我们在topics模式中,不使用特殊字符*和#
,其效果和Routing模式是一样的。
*:指代一个字符
#:指代0个或多个字符
4.6.1 生产者
public class Provider {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "message.a";
String message ="topic_message" ;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
}
4.6.2 消费者
public class Consumer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
// 绑定队列,并设置路由规则,消费者能消费到routingKey例如:message.a,message.b的消息
channel.queueBind(queueName, EXCHANGE_NAME, "message.*");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
最后,欢迎关注微信公众号一起交流
原文始发于微信公众号(阿喵程序园):RabbitMQ入门之常见模式
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/42866.html