RabbitMQ入门学习以及包含源码 Mac
RabbitMQ学习包含源码
这是自己在学习当中写的一篇博客,所有的内容都是自己亲测有效的。
安装RabbitMQ
由于我用的是mac,所以在这里就用mac讲解了
- 首先安装brew brew官网
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
- 使用brew安装rabbitmq
brew update
brew install rabbitmq
- 启动rabbitmq
brew services start rabbitmq
#关闭
brew services stop rabbitmq
- 进入 http://localhost:15672/
- 账号和密码默认都是guest
RabbitMQ入门
搭建工程
使用idea搭建一个maven工程即可
添加依赖在pom.xml文件中
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
编写生产者
package com.duoduo.rabbitmq.simple;
import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ClassName Producer.java
* @Author 拾光
* @Date 2020年07月07日 17:58:00
* @Description mq生产者
*/
public class Producer {
static final String QUEUE_NAME = "simple_que";
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
//主机地址;默认为 localhost
connectionFactory.setHost("localhost");
//虚拟主机名称 默认为 /
connectionFactory.setVirtualHost("/");
//设置端口 默认为 5672
connectionFactory.setPort(5672);
//连接用户名;默认为 guest
connectionFactory.setUsername("guest");
//连接密码;默认为 guest
connectionFactory.setPassword("guest");
//创建新的连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候删除队列
* 参数5:队列其他参数
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//要发送的消息
String msg="你好啊,小宝贝";
/**
* 参数 1:交换机名称,如果没有指定则使用默认 Default Exchage
* 参数 2:路由 key,简单模式可以传递队列名称
* 参数 3:消息其它属性
* 参数 4:消息内容
*/
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("已发送消息:"+msg);
//关闭资源
channel.close();
connection.close();
}
}
抽取获得连接的工具类
package com.duoduo.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ClassName ConnectionUtil.java
* @Author 拾光
* @Date 2020年07月07日 18:43:00
* @Description 连接mq工具类
*/
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
//主机地址;默认为 localhost
connectionFactory.setHost("localhost");
//虚拟主机名称 默认为 /
connectionFactory.setVirtualHost("/");
//设置端口 默认为 5672
connectionFactory.setPort(5672);
//连接用户名;默认为 guest
connectionFactory.setUsername("guest");
//连接密码;默认为 guest
connectionFactory.setPassword("guest");
//创建连接
return connectionFactory.newConnection();
}
}
创建消费者
package com.duoduo.rabbitmq.simple;
import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Consumer.java
* @Author 拾光
* @Date 2020年07月07日 18:47:00
* @Description mq消费者
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候删除队列
* 参数5:队列其他参数
*/
channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
//创建消费者
DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
/**
* @param consumerTag 消息者标签
* @param envelope 消息包的内容,可以从中获取ID,消息交换机,消息路由
* @param properties 属性信息
* @param body 消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:"+envelope.getRoutingKey());
//交换机
System.out.println("交换机为:"+envelope.getExchange());
//消息ID
System.out.println("消息id为:"+envelope.getDeliveryTag());
//收到的消息
System.out.println("收到的消息为:"+new String(body,"utf-8"));
}
};
//监听消息
/**
* 参数一 队列名称
* 参数二 是否自动确认
* 参数三 消息接受到后回调
*/
channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer);
//关闭资源
// channel.close();
// connection.close();
}
}
小结:
上述的案列中用的简单模式,再上图的模型中,有如下概念:
P:生产者,也就是要发送消息的程序
C:消费者,消息的接受者,会一直等着消息的到来
queue: 消息队列,上图中红色的部分,类似一个邮箱,可以缓存消息。
在 rabitMQ 中消费者是一定要到某个消息队列中去获取消息的
AMQP
介绍
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
rabbitmq运转流程
生产者发送消息
生产者连接到Rabbitmq Broker,建立一个连接(Connection),开启一个信道(Channel)
生产者声明一个交换机,并设置相关属性,比如交换机类型、是否持久化
生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
生产者通过路由键将交换机和队列绑定起来
生产者发送消息至Rabbitmq Broker,其中包含路由键、交换器等信息
相应的交换器根据接收到的路由键查找相匹配的队列
如果找到,则将从生产者发送过来的消息存入相应的队列中
如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
关闭信道
关闭连接
消费者接受消息
消费者连接到Rabbitmq Broker,建立一个连接(Connection),开启一个信道(Channel)
消费者向Rabbitmq Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
等待Rabbitmq Broker回应并投递相应队列中的消息,消费者接受消息
消费者确认(ack)接收到的消息
Rabbitmq从队列中删除相应已经被确认的消息
关闭信道
关闭连接
工作模式介绍
官网介绍:RabbitMQ介绍官网
这里简单介绍下五种工作模式的主要特点:
-
简单模式:一个生产者,一个消费者
-
work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。
-
订阅模式:一个生产者发送的消息会被多个消费者获取。
-
路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
-
topic模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。
Work queues 工作队列模式
代码
生产者
package com.duoduo.rabbitmq.work;
import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName WorkProduce.java
* @Author 拾光
* @Date 2020年07月08日 17:03:00
* @Description 工作模式生产者
*/
public class WorkProduce {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//通过使用工具类来创建连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候删除队列
* 参数5:队列其他参数
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//要发送的消息
for (int i = 0; i < 20; i++) {
String msg="你好啊,小宝贝-----工作模式--"+i;
/**
* 参数 1:交换机名称,如果没有指定则使用默认 Default Exchage
* 参数 2:路由 key,简单模式可以传递队列名称
* 参数 3:消息其它属性
* 参数 4:消息内容
*/
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("已发送消息:"+msg);
}
//关闭资源
channel.close();
connection.close();
}
}
消费者1
package com.duoduo.rabbitmq.work;
import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Consumer.java
* @Author 拾光
* @Date 2020年07月07日 18:47:00
* @Description mq消费者
*/
public class WorkConsumer1 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候删除队列
* 参数5:队列其他参数
*/
channel.queueDeclare(WorkConsumer1.QUEUE_NAME,true,false,false,null);
//一次只接受并处理一条消息
channel.basicQos(1);
//创建消费者
DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
/**
* @param consumerTag 消息者标签
* @param envelope 消息包的内容,可以从中获取ID,消息交换机,消息路由
* @param properties 属性信息
* @param body 消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key为:"+envelope.getRoutingKey());
//交换机
System.out.println("交换机为:"+envelope.getExchange());
//消息ID
System.out.println("消息id为:"+envelope.getDeliveryTag());
//收到的消息
System.out.println("收到的消息为:"+new String(body,"utf-8"));
Thread.sleep(10000);
//确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
} catch (Exception e) {
e.printStackTrace();
}
}
};
//监听消息
/**
* 参数一 队列名称
* 参数二 是否自动确认
* 参数三 消息接受到后回调
*/
channel.basicConsume(WorkConsumer1.QUEUE_NAME,false,defaultConsumer);
//关闭资源
// channel.close();
// connection.close();
}
}
消费者2
package com.duoduo.rabbitmq.work;
import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Consumer.java
* @Author 拾光
* @Date 2020年07月07日 18:47:00
* @Description mq消费者
*/
public class WorkConsumer2 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候删除队列
* 参数5:队列其他参数
*/
channel.queueDeclare(WorkConsumer2.QUEUE_NAME,true,false,false,null);
//一次只接受并处理一条消息
channel.basicQos(1);
//创建消费者
DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
/**
* @param consumerTag 消息者标签
* @param envelope 消息包的内容,可以从中获取ID,消息交换机,消息路由
* @param properties 属性信息
* @param body 消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key为:"+envelope.getRoutingKey());
//交换机
System.out.println("交换机为:"+envelope.getExchange());
//消息ID
System.out.println("消息id为:"+envelope.getDeliveryTag());
//收到的消息
System.out.println("收到的消息为:"+new String(body,"utf-8"));
Thread.sleep(10000);
//确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
} catch (Exception e) {
e.printStackTrace();
}
}
};
//监听消息
/**
* 参数一 队列名称
* 参数二 是否自动确认
* 参数三 消息接受到后回调
*/
channel.basicConsume(WorkConsumer2.QUEUE_NAME,false,defaultConsumer);
//关闭资源
// channel.close();
// connection.close();
}
}
测试
小结
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
在这里就不写订阅模式了,因为下面要介绍发布订阅模式
Publish/Subscribe 发布与订阅模式
模式说明
发布订阅模式: 1、每个消费者监听自己的队列。 2、生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的 队列都将接收到消息
生产者
package com.duoduo.rabbitmq.发布订阅模式;
import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName WorkProduce.java
* @Author 拾光
* @Date 2020年07月08日 17:03:00
* @Description 工作模式生产者
*/
public class PublicProduce {
/**
* 交换机名称
*/
static final String FANOUT_EXCHANGE="fanout_exchange";
//队列名称
static final String FANOUT_QUEUE_1="fanout_queue_1";
//队列名称
static final String FANOUT_QUEUE_2="fanout_queue_2";
public static void main(String[] args)throws Exception {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//申明交换机
/**
* 申明交换机
* 参数一: 交换机名称
* 参数二: 交换机类型 fanout ,topic, direct,headers
*/
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候删除队列
* 参数5:队列其他参数
*/
channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
//队列绑定交换机
channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
for (int i = 1; i < 10; i++) {
String message="你好朋友,发布订阅模式"+i;
/**
* 参数 1:交换机名称,如果没有指定则使用默认 Default Exchage
* 参数 2:路由 key,简单模式可以传递队列名称
* 参数 3:消息其它属性
* 参数 4:消息内容
*/
channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
System.out.println("已发送消息:"+message);
}
//关闭资源
channel.close();
connection.close();
}
}
消费者1
package com.duoduo.rabbitmq.发布订阅模式;
import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Consumer.java
* @Author 拾光
* @Date 2020年07月07日 18:47:00
* @Description mq消费者
*/
public class PubliceConsumer1 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
channel.exchangeDeclare(PublicProduce.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候删除队列
* 参数5:队列其他参数
*/
channel.queueDeclare(PublicProduce.FANOUT_QUEUE_1,true,false,false,null);
//队列绑定交换机
channel.queueBind(PublicProduce.FANOUT_QUEUE_1,PublicProduce.FANOUT_EXCHANGE,"");
//一次只接受并处理一条消息
channel.basicQos(1);
//创建消费者
DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
/**
* @param consumerTag 消息者标签
* @param envelope 消息包的内容,可以从中获取ID,消息交换机,消息路由
* @param properties 属性信息
* @param body 消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key为:"+envelope.getRoutingKey());
//交换机
System.out.println("交换机为:"+envelope.getExchange());
//消息ID
System.out.println("消息id为:"+envelope.getDeliveryTag());
//收到的消息
System.out.println("收到的消息为:"+new String(body,"utf-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
};
//监听消息
/**
* 参数一 队列名称
* 参数二 是否自动确认
* 参数三 消息接受到后回调
*/
channel.basicConsume(PublicProduce.FANOUT_QUEUE_1,true,defaultConsumer);
//关闭资源
// channel.close();
// connection.close();
}
}
消费者2
package com.duoduo.rabbitmq.发布订阅模式;
import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Consumer.java
* @Author 拾光
* @Date 2020年07月07日 18:47:00
* @Description mq消费者
*/
public class PubliceConsumer2 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//申明交换机
channel.exchangeDeclare(PublicProduce.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候删除队列
* 参数5:队列其他参数
*/
channel.queueDeclare(PublicProduce.FANOUT_QUEUE_2,true,false,false,null);
//队列绑定交换机
channel.queueBind(PublicProduce.FANOUT_QUEUE_2,PublicProduce.FANOUT_EXCHANGE,"");
//一次只接受并处理一条消息
channel.basicQos(1);
//创建消费者
DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
/**
* @param consumerTag 消息者标签
* @param envelope 消息包的内容,可以从中获取ID,消息交换机,消息路由
* @param properties 属性信息
* @param body 消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key为:"+envelope.getRoutingKey());
//交换机
System.out.println("交换机为:"+envelope.getExchange());
//消息ID
System.out.println("消息id为:"+envelope.getDeliveryTag());
//收到的消息
System.out.println("收到的消息为:"+new String(body,"utf-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
};
//监听消息
/**
* 参数一 队列名称
* 参数二 是否自动确认
* 参数三 消息接受到后回调
*/
channel.basicConsume(PublicProduce.FANOUT_QUEUE_2,true,defaultConsumer);
//关闭资源
// channel.close();
// connection.close();
}
}
测试
小结:
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
Routing路由模式
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey 。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的
Routingkey 与消息的 Routing key 完全一致,才会接收到消息
生产者
package com.duoduo.rabbitmq.route;
import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName WorkProduce.java
* @Author 拾光
* @Date 2020年07月08日 17:03:00
* @Description 路由模式
*/
public class RouteProduce {
/**
* 交换机名称
*/
static final String DIRECT_EXCHANGE="direct_exchange";
//队列名称
static final String DIRECT_QUEUE_INSERT="direct_queue_insert";
//队列名称
static final String DIRECT_QUEUE_UPDATE="direct_queue_update";
public static void main(String[] args)throws Exception {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//申明交换机
/**
* 申明交换机
* 参数一: 交换机名称
* 参数二: 交换机类型 fanout ,topic, direct,headers
*/
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候删除队列
* 参数5:队列其他参数
*/
channel.queueDeclare(DIRECT_QUEUE_INSERT,true,false,false,null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE,true,false,false,null);
//队列绑定交换机
channel.queueBind(DIRECT_QUEUE_INSERT,DIRECT_EXCHANGE,"insert");
channel.queueBind(DIRECT_QUEUE_UPDATE,DIRECT_EXCHANGE,"update");
/**
* 参数 1:交换机名称,如果没有指定则使用默认 Default Exchage
* 参数 2:路由 key,简单模式可以传递队列名称
* 参数 3:消息其它属性
* 参数 4:消息内容
*/
String message="新增了路由模式:route key为insert";
channel.basicPublish(DIRECT_EXCHANGE,"insert",null,message.getBytes());
System.out.println("已发送消息:"+message);
message="新增了路由模式:route key为update";
channel.basicPublish(DIRECT_EXCHANGE,"update",null,message.getBytes());
System.out.println("已发送消息:"+message);
//关闭资源
channel.close();
connection.close();
}
}
消费者1
package com.duoduo.rabbitmq.route;
import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.duoduo.rabbitmq.发布订阅模式.PublicProduce;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Consumer.java
* @Author 拾光
* @Date 2020年07月07日 18:47:00
* @Description mq消费者
*/
public class RouteConsumer1 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
channel.exchangeDeclare(RouteProduce.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候删除队列
* 参数5:队列其他参数
*/
channel.queueDeclare(RouteProduce.DIRECT_QUEUE_INSERT,true,false,false,null);
//队列绑定交换机
channel.queueBind(RouteProduce.DIRECT_QUEUE_INSERT,RouteProduce.DIRECT_EXCHANGE,"insert");
//创建消费者
DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
/**
* @param consumerTag 消息者标签
* @param envelope 消息包的内容,可以从中获取ID,消息交换机,消息路由
* @param properties 属性信息
* @param body 消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key为:"+envelope.getRoutingKey());
//交换机
System.out.println("交换机为:"+envelope.getExchange());
//消息ID
System.out.println("消息id为:"+envelope.getDeliveryTag());
//收到的消息
System.out.println("收到的消息为:"+new String(body,"utf-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
};
//监听消息
/**
* 参数一 队列名称
* 参数二 是否自动确认
* 参数三 消息接受到后回调
*/
channel.basicConsume(RouteProduce.DIRECT_QUEUE_INSERT,true,defaultConsumer);
//关闭资源
// channel.close();
// connection.close();
}
}
消费者2
package com.duoduo.rabbitmq.route;
import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Consumer.java
* @Author 拾光
* @Date 2020年07月07日 18:47:00
* @Description mq消费者
*/
public class RouteConsumer2 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//申明交换机
channel.exchangeDeclare(RouteProduce.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候删除队列
* 参数5:队列其他参数
*/
channel.queueDeclare(RouteProduce.DIRECT_QUEUE_UPDATE,true,false,false,null);
//队列绑定交换机
channel.queueBind(RouteProduce.DIRECT_QUEUE_UPDATE,RouteProduce.DIRECT_EXCHANGE,"update");
//创建消费者
DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
/**
* @param consumerTag 消息者标签
* @param envelope 消息包的内容,可以从中获取ID,消息交换机,消息路由
* @param properties 属性信息
* @param body 消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key为:"+envelope.getRoutingKey());
//交换机
System.out.println("交换机为:"+envelope.getExchange());
//消息ID
System.out.println("消息id为:"+envelope.getDeliveryTag());
//收到的消息
System.out.println("收到的消息为:"+new String(body,"utf-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
};
//监听消息
/**
* 参数一 队列名称
* 参数二 是否自动确认
* 参数三 消息接受到后回调
*/
channel.basicConsume(RouteProduce.DIRECT_QUEUE_UPDATE,true,defaultConsumer);
//关闭资源
// channel.close();
// connection.close();
}
}
测试
小结:
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合routing key的队列。
源码地址:https://github.com/chaiduolai/Learn_RabbitMQ.git
如果有不对的地方欢迎交流!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/159017.html