什么是MQ消息中间件及使用场景
全称MessageQueue,主要是⽤于程序和程序直接通信,异步+解耦+削峰
异步:用户注册-》发送邮件(注册活动)
解耦:订单系统-》物流系统
削峰:秒杀,日志处理
JMS与AMQP
什么是JMS: Java消息服务(Java Message Service),Java平 台中关于⾯向消息中间件的接⼝
JMS是⼀种与⼚商⽆关的 API,⽤来访问消息收发系统 消息,它类似于JDBC(Java
Database Connectivity)。 这⾥,JDBC 是可以⽤来访问许多不同关系数据库的 API
代表:kafka rocketMQ
消息类型:TextMessage/ObjectMessage/StreamMessage等
什么是AMQP: AMQP不从API层进⾏限定,直接定义⽹络交换的数据 格式,这使得实现了AMQP的
provider天然性就是跨平台 ⽐如Java语⾔产⽣的消息,可以⽤其他语⾔ ⽐如python 的进⾏消费
AQMP可以⽤http来进⾏类⽐,不关⼼实现接⼝的语⾔,只要都按照相应的数据格式去发送报⽂请
求,不同语⾔的client可以和不同语⾔的server进⾏通讯
代表:rabbitMQ
消息类型:Byte[]
RabbitMQ核心概念
Broker: RabbitMQ的服务端程序,可以认为一个mq节点就是一个broker
Producer: 创建消息,然后发布到rabbitMQ
Consumer: 消费队列里的消息
Queue:是rabbitMQ的内部对象,用于存储消息,消息只能存储在队列里面
Channel: ⼀条⽀持多路复⽤的通道,独⽴的双向数据流通 道,可以发布、订阅、接收消息。
信道是建⽴在真实的TCP连接内的虚拟连接,复⽤TCP连接的通道
Connection连接 : 是RabbitMQ的socket链接,它封装了socket协议相 关部分逻辑,⼀个连接上可
以有多个channel进⾏通信
Exchange: 生产者将消息发送到exchange,exchange将消息路由到一个或多个队列中,队列和交
换机可以是多对多的关系
Routingkey:producer将消息发送到exchange时,一般会指定一个routingkey,用来指定这个息
的路由规则
Binding:通过绑定将exchange和queue关联起来,在绑定的时候一般会指定一个绑定key
(bindingkey),这样rabbitMQ就知道如果将消息路由到对应的队列中了
Virtual host 虚拟主机:⽤于不同业务模块的逻辑隔离,⼀个Virtual Host⾥
⾯可以有若⼲个Exchange和Queue,同⼀个VirtualHost ⾥⾯不能有相同名称的Exchange或
Queue
默认是 /,可以自己创建
rabbitMQ消息模式及4种交换机说明
rabbitMQ的消息传递模式,也是AMQP类型的消息传递模式
四种交换机
direct exchange: 定向(直连)
将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
例如:一个队列绑定到交换机上要求路由键为”aabb”,则只有被标记为”aabb”的消息才被转发
到队列上。
fanout exchange: 广播,扇形
只需要简单的将队列绑定到交换机上,所有发送到交换机上的消息都会被转发到与该交
换机绑定的所有队列上,用于发布订阅,广播形式。不需处理路由键
topic exchange: 通配符,主题(大多数都使用此交换机)
主题交换机是一种发布、订阅的模式,结合了直连交换机和扇形交换机的特点,将路由键和
模式进行匹配。符号”#”代表一个或多个词,符号”*”代表一个词。例如:”abc.#”能
匹配”abc,ff,gi”,”abc.*”只能匹配”abc.xx”。
headers exchange:(基本不用,后面不做介绍)
根据发送的消息内容中的headers属性进⾏匹配, 在绑定Queue与Exchange时指定⼀组键值
对,当消息发送到RabbitMQ时会取到该消息的headers 与Exchange绑定时指定的键值对进⾏
匹配。
三种交换机的实战代码演示
springboot依赖版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.3</version>
</dependency>
direct exchange 直连交换机
发送消息代码
public class Send {
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//ip地址
factory.setHost("ip");
//端口号 一般都是5672
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
//虚拟主机
factory.setVirtualHost("dev");
//jdk7语法 创建链接 自动关闭
try(Connection connection = factory.newConnection()){
//创建信道
Channel channel = connection.createChannel();
//绑定交换机 直连交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String error = "订单服务的error信息";
String info = "订单服务的info信息";
String debug = "订单服务的debug信息";
//direct 3条消息分别绑定3个路由键
channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("direct交换机发送消息成功");
}
}
}
接受消息1
public class Recv1 {
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
//虚拟主机
factory.setVirtualHost("dev");
//创建链接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//根据路由键绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
//手工确认 不是多条确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//关闭消息自动确认
channel.basicConsume(queueName, false, consumer);
}
}
接收消息2
public class Recv2 {
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
//虚拟主机
factory.setVirtualHost("dev");
//创建链接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//根据路由键绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
//手工确认 不是多条确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//关闭消息自动确认
channel.basicConsume(queueName, false, consumer);
}
}
1比2多接收两条消息
fanout exchange 扇形交换机
发送消息
public class Send {
private static final String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("dev");
//jdk7语法 创建链接 自动关闭
try(Connection connection = factory.newConnection()){
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String msg = "rabbitmq 扇形交换机发送消息";
//fanout无需处理routingkey
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("fanout交换机发送消息成功");
}
}
}
接收消息1
public class Recv1 {
private static final String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("dev");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列 fanout交换机不用处理routingkey
channel.queueBind(queueName,EXCHANGE_NAME,"");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
//手工确认 不是多条确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//关闭消息自动确认
channel.basicConsume(queueName, false, consumer);
}
}
接收消息2
public class Recv2 {
private static final String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("dev");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列 fanout交换机不用处理routingkey
channel.queueBind(queueName,EXCHANGE_NAME,"");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
//手工确认 不是多条确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//关闭消息自动确认
channel.basicConsume(queueName, false, consumer);
}
}
1和2代码相同,接收的消息也都一致
topic exchange 主题交换机
public class Send {
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("dev");
//jdk7语法 创建链接 自动关闭
try(Connection connection = factory.newConnection()){
Channel channel = connection.createChannel();
//绑定交换机 topic交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String error = "订单服务的error信息";
String info = "订单服务的info信息";
String debug = "商品服务的debug信息";
//topic交换机 3条消息绑定3个路由键
channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("topic交换机发送消息成功");
}
}
}
接收消息1
public class Recv1 {
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("dev");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列 TOPIC交换机 #代表一个或多个单词
channel.queueBind(queueName,EXCHANGE_NAME,"#.error");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
//手工确认 不是多条确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//关闭消息自动确认
channel.basicConsume(queueName, false, consumer);
}
}
接收消息2
public class Recv2 {
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("dev");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列 TOPIC交换机 *代表一个单词
channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
//手工确认 不是多条确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//关闭消息自动确认
channel.basicConsume(queueName, false, consumer);
}
}
1只能接收error日志,2能接收所有日志
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/1275.html