今天在写项目的时候遇到新需求,一个mq的功能要使用我们公司的服务器的mq,一个mq的功能要使用部署的那边的服务器的mq,话不多说直接上代码。
配置文件application.yml:
spring:
rabbitmq:
yjdpeservice:
host: xxx.xxx.xxx.xxx
port: 5672
username: admin
password: admin
yjservice:
host: xxx.xxx.xxx.xxx
port: 5672
username: admin
password: admin
配置类:
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class RabbitPlusConfig {
@Bean(name="mergeConnectionFactory")
@Primary
public ConnectionFactory MergeConnectionFactory(
@Value("${spring.rabbitmq.yjdpeservice.host}") String host,
@Value("${spring.rabbitmq.yjdpeservice.port}") int port,
@Value("${spring.rabbitmq.yjdpeservice.username}") String username,
@Value("${spring.rabbitmq.yjdpeservice.password}") String password
){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean(name="LocalConnectionFactory")
public ConnectionFactory LocalConnectionFactory(
@Value("${spring.rabbitmq.yjservice.host}") String host,
@Value("${spring.rabbitmq.yjservice.port}") int port,
@Value("${spring.rabbitmq.yjservice.username}") String username,
@Value("${spring.rabbitmq.yjservice.password}") String password
){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean(name="mergeRabbitTemplate")
@Primary
public RabbitTemplate mergeRabbitTemplate(
@Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory
){
RabbitTemplate yjdpRabbitTemplate = new RabbitTemplate(connectionFactory);
return yjdpRabbitTemplate;
}
@Bean(name="LocalRabbitTemplate")
public RabbitTemplate LocalRabbitTemplate(
@Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory
){
RabbitTemplate yjRabbitTemplate = new RabbitTemplate(connectionFactory);
return yjRabbitTemplate;
}
@Bean(name="mergeFactory")
public SimpleRabbitListenerContainerFactory mergeFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean(name="LocalFactory")
public SimpleRabbitListenerContainerFactory LocalFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}
发送消息类:
import io.renren.common.utils.R;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;
@Api(tags = "测试双mq发送")
@RestController
@RequestMapping("rbt/mq")
public class RbtMqController {
@Autowired
@Qualifier(value = "mergeRabbitTemplate")
private RabbitTemplate mergerabbitTemplate;
@Autowired
@Qualifier(value = "LocalRabbitTemplate")
private RabbitTemplate LocalrabbitTemplate;
@ApiOperation("测试发送mq")
@PostMapping("/PostMq/{mqone}/{mqtwo}")
public Object PostMq(@RequestParam("token") String token, @PathVariable String mqone, @PathVariable String mqtwo){
mergerabbitTemplate.convertAndSend("CeshiQueue", (Object) mqone, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
long l = 40000;
//设置定时发布的时间发送到延时队列 到时间后转交给死信队列
message.getMessageProperties().setExpiration(String.valueOf(l));
return message;
}
});
String msgTwo = "success";
LocalrabbitTemplate.convertAndSend("CeshiQueue", (Object) mqtwo, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
long l = 40000;
//设置定时发布的时间发送到延时队列 到时间后转交给死信队列
message.getMessageProperties().setExpiration(String.valueOf(l));
return message;
}
});
return R.ok();
}
}
消费者端:
消费LocalFactory对应的mq中的my-dlx-queue-Ceshi
@Component
@RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "LocalFactory")
@Log4j2
public class locallistener {
@RabbitHandler
public void RegularlyAddAsCheckIn(String msg) throws Exception {
log.info(new Date() + "::LocalFactory收到信息::" + msg);
}
}
消费mergeFactoryFactory对应的mq中的my-dlx-queue-Ceshi
@Component
@RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "mergeFactory")
@Log4j2
public class mergerlistener {
@RabbitHandler
public void RegularlyAddAsCheckIn(String msg) throws Exception {
log.info(new Date() + "::mergeFactory收到信息::" + msg);
}
}
完事,实测有效。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/106022.html