Spring Boot之集成RabbitMQ、ActiveMQ
RabbitMQ
1.添加依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
2.添加application.yml配置
# 配置RabbitMQ的基本信息
spring:
rabbitmq:
host: ip
port: 5672
username: test
password: test
virtual-host: /test
3.定义交换机、队列及绑定关系
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "spring_boot_topic_exchange";
public static final String QUEUE_NAME = "spring_boot_queue";
/**
* 定义交换机
* ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
* @return
*/
@Bean("exchange1")
public Exchange createExchange(){
//durable(true)持久化,消息队列重启后交换机仍然存在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 定义队列
* @return
*/
@Bean("queue1")
public Queue createQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
* 定义队列与交换机的绑定关系
* @param queue 队列名称
* @param exchange 交换机名称
* @return
*/
@Bean
public Binding bindQueueExchange(@Qualifier("queue1") Queue queue, @Qualifier("exchange1") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("old.#").noargs();
}
}
4.消息监听处理类
@Component
public class RabbimtMQListener {
/**
* 监听某个队列消息
* @param message
*/
@RabbitListener(queues = "spring_boot_queue")
public void ListenerQueue(Message message){
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("spring_boot_customer:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.执行测试
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
//注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"old.orange.rabbit","spring_boot_rabbit...");
}
}
先执行发送消息test,换机和队列才能先被声明和绑定,然后启动消费者监听,再次执行发送消息test。
登录管理控制台,查看交换机、队列、绑定关系。
ActiveMQ
1.引入ActiveMQ依赖
<!--ActiveMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2.生产者
队列配置
@Configuration
public class QueueConfig {
@Value("${queue}")
private String queueName;
@Bean("myQueue")
public Queue queue() {
return new ActiveMQQueue(queueName);
}
}
生产者使用队列
@Component
@EnableScheduling
public class Producer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue myQueue;
@Scheduled(fixedDelay = 3000)
public void send() {
jmsMessagingTemplate.convertAndSend(myQueue, "Hello World");
}
}
3.消费者
@Component
public class Consumer {
@JmsListener(destination = "${queue}")
public void receive(String msg) {
System.out.println(msg);
}
}
4.管理控制台参数说明
Number Of Pending Messages 等待消费的消息:是当前未出队列的数量。可以理解为总接收数-总出队列数
Number Of Consumers 消费者:是消费者端的消费者数量
Messages Enqueued 进入队列的消息:进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 :可以理解为是消费这消费掉的数量
5.消息类型
发送简单文本消息
创建消息生产者
@RestController
public class QueueController {
//JmsMessagingTemplate:用来发送消息
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("/send")
public void send(String msg){
jmsMessagingTemplate.convertAndSend("queue", msg);
}
}
创建消息消费者
@Component
public class Consumer {
@JmsListener(destination="queue")
public void readMessage(String msg){
System.out.println("读到消息:"+msg);
}
}
执行测试
http://localhost:8081/send?msg=hello activemq
Spring Boot内置了ActiveMQ的服务,所以不用单独启动也可以执行应用程序。
使用外部服务
在src/main/resources下的application.properties增加配置, 指定ActiveMQ的地址
spring.activemq.broker-url=tcp://ip:61616
spring.activemq.user=admin
spring.activemq.password=admin
运行后,会在activeMQ管理界面看到发送的queue
发送Map信息
创建消息生产者
@RestController
public class QueueController {
//JmsMessagingTemplate:用来发送消息
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("/sendmap")
public void sendMapMessage() {
Map map = new HashMap<>();
map.put("name", "大白");
map.put("age", "11");
map.put("address", "中国上海");
jmsMessagingTemplate.convertAndSend("queue_map", map);
}
}
创建消息消费者
@Component
public class Consumer {
@JmsListener(destination = "queue_map")
public void readMapMessage(Map<String, String> dataMap) {
System.out.println("读取Map内容:" + dataMap);
}
}
执行测试
http://localhost:8081/sendmap
发送Object信息
创建消息生产者
@RestController
public class MessageSender {
@Autowired
/**
* JmsMessagingTemplate:用来发送消息
*/
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("/sendobject")
public void sendObjectMessage() {
User user = new User("大白", 22, "中国上海");
jmsMessagingTemplate.convertAndSend("queue_object", user);
}
}
创建消息消费者
@Component
public class MessageListener {
@JmsListener(destination = "queue_object")
public void readObjectMessage(Map<String, String> dataObject) {
System.out.println("读取Object内容:" + dataObject);
}
}
配置可信任的解析包
在src/main/resources下的application.properties 添加配置
#设置受信任的包,允许所有包下的类能被传输解析
spring.activemq.packages.trust-all=true
执行测试
http://localhost:8081/sendobject
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137005.html