一、导入springboot-amqp的起步依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、修改配置文件
spring:
rabbitmq:
host: 192.168.4.150
port: 5672
username: test
password: test
publisher-confirms: true
virtual-host: /
三、绑定交换机和队列
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMqConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 限流
* @return
*/
@Bean(name = "mqlistenerContainer")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(75);
return factory;
}
/**
* 动态生成队列
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
/**
* 声明队列
* @return
*/
@Bean
public Queue test1Queue() {
//durable:队列是否持久化,默认是false。为true时队列开启持久化功能
//exclusive:队列是否设置为排他队列,默认是false。为true时设置为排他队列,只对首次声明它的连接可见,
// 其他连接无法声明相同名称的其他队列,并且在连接断开时自动删除,即使持久化也会被删除
//autoDelete:队列是否自动删除,默认false。为true时,当没有消费者使用此队列,该队列会自动删除
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("test1_Queue",true,false,false);
}
/**
* 声明过期队列
* @return
*/
@Bean
public Queue test2Queue() {
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 5000); // 队列中的每一个消息未被消费则5秒后过期,被自动删除并移到死信队列
return new Queue("test2_Queue",true,false,false,map);
}
/**
* 声明发布订阅模式交换机
* @return
*/
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout_Exchange", true, false);
}
/**
* 声明路由模式交换机
* @return
*/
@Bean
DirectExchange directExchange(){
return new DirectExchange("direct_Exchange", true, false);
}
/**
* 声明主题模式交换机
*/
@Bean
TopicExchange topicExchange(){
return new TopicExchange("topic_Exchange", true, false);
}
/**
* 交换机与队列绑定
* @return
*/
@Bean
Binding truckHistoryBinding(){
return BindingBuilder.bind(test1Queue()).to(directExchange()).with(test1Queue().getName());
}
}
或者通过如下方式动态生成队列和交换机
@Autowired
private RabbitAdmin rabbitAdmin;
/**
* 动态生成
*/
@GetMapping("/test3Queue")
public void test3Queue() {
//生成卡车mq队列
Queue test3Queue = new Queue("test3_Queue", true, false, false);
rabbitAdmin.declareQueue(test3Queue);
//定义卡车交换机
DirectExchange direct1Exchange = new DirectExchange("direct1_Exchange", true, false);
rabbitAdmin.declareExchange(direct1Exchange);
//绑定交换机和队列,并设置Routing key
Binding binding = BindingBuilder.bind(test3Queue).to(direct1Exchange).with(test3Queue.getName());
rabbitAdmin.declareBinding(binding);
}
/**
* 删除mq队列
*
* @return
*/
@PostMapping("/deleteMq")
public String deleteMq(String mq) {
rabbitAdmin.deleteQueue(mq);
return "ok";
}
注意:完成上述操作后在控制台是看不到队列和交换机的,只要把消费者代码写出来便可看到了
四、生产者代码
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage")
public void sendMessage() {
rabbitTemplate.convertAndSend("direct_Exchange","test1_Queue","haha");
}
五、消费者代码
package com.example.demo.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
@RabbitListener(queues = "test1_Queue")
public class Test1QueueReceiver {
@RabbitHandler
public void process(String meassage){
System.err.println(meassage);
}
}
注意:如果消费者接受的消息是实体类对象,需要将类序列化
package com.example.demo.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class Message implements Serializable {
private static final long serialVersionUID = 1L;
private String message;
}
@GetMapping("/sendMessage")
public void sendMessage() {
Message message = new Message();
message.setMessage("lala");
rabbitTemplate.convertAndSend("direct_Exchange","test1_Queue",message);
}
package com.example.demo.mq;
import com.example.demo.entity.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "test1_Queue")
public class Test1QueueReceiver {
@RabbitHandler
public void process(Message meassage){
System.err.println(meassage.getMessage());
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/153428.html