SpringBoot项目中连接两个RabbitMq

导读:本篇文章讲解 SpringBoot项目中连接两个RabbitMq,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

今天在写项目的时候遇到新需求,一个mq的功能要使用我们公司的服务器的mq,一个mq的功能要使用部署的那边的服务器的mq,话不多说直接上代码。

配置文件application.yml:

  spring: 
      rabbitmq:
        yjdpeservice:
          host: xxx.xxx.xxx.xxx
          port: 5672
          username: admin
          password: admin
        yjservice:
          host: xxx.xxx.xxx.xxx
          port: 5672
          username: admin
          password: admin

配置类:

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class RabbitPlusConfig {

    @Bean(name="mergeConnectionFactory")
    @Primary
    public ConnectionFactory MergeConnectionFactory(
            @Value("${spring.rabbitmq.yjdpeservice.host}") String host,
            @Value("${spring.rabbitmq.yjdpeservice.port}") int port,
            @Value("${spring.rabbitmq.yjdpeservice.username}") String username,
            @Value("${spring.rabbitmq.yjdpeservice.password}") String password
    ){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean(name="LocalConnectionFactory")
    public ConnectionFactory LocalConnectionFactory(
            @Value("${spring.rabbitmq.yjservice.host}") String host,
            @Value("${spring.rabbitmq.yjservice.port}") int port,
            @Value("${spring.rabbitmq.yjservice.username}") String username,
            @Value("${spring.rabbitmq.yjservice.password}") String password
    ){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean(name="mergeRabbitTemplate")
    @Primary
    public RabbitTemplate mergeRabbitTemplate(
            @Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory
    ){
        RabbitTemplate yjdpRabbitTemplate = new RabbitTemplate(connectionFactory);
        return yjdpRabbitTemplate;
    }

    @Bean(name="LocalRabbitTemplate")
    public RabbitTemplate LocalRabbitTemplate(
            @Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory
    ){
        RabbitTemplate yjRabbitTemplate = new RabbitTemplate(connectionFactory);
        return yjRabbitTemplate;
    }

    @Bean(name="mergeFactory")
    public SimpleRabbitListenerContainerFactory mergeFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name="LocalFactory")
    public SimpleRabbitListenerContainerFactory LocalFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }


}

发送消息类:

import io.renren.common.utils.R;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;

@Api(tags = "测试双mq发送")
@RestController
@RequestMapping("rbt/mq")
public class RbtMqController {


    @Autowired
    @Qualifier(value = "mergeRabbitTemplate")
    private RabbitTemplate mergerabbitTemplate;

    @Autowired
    @Qualifier(value = "LocalRabbitTemplate")
    private RabbitTemplate LocalrabbitTemplate;


    @ApiOperation("测试发送mq")
    @PostMapping("/PostMq/{mqone}/{mqtwo}")
    public Object PostMq(@RequestParam("token") String token, @PathVariable String mqone, @PathVariable String mqtwo){

        mergerabbitTemplate.convertAndSend("CeshiQueue", (Object) mqone, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                long l = 40000;
                //设置定时发布的时间发送到延时队列 到时间后转交给死信队列
                message.getMessageProperties().setExpiration(String.valueOf(l));
                return message;

            }
        });

        String msgTwo = "success";
        LocalrabbitTemplate.convertAndSend("CeshiQueue", (Object) mqtwo, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                long l = 40000;
                //设置定时发布的时间发送到延时队列 到时间后转交给死信队列
                message.getMessageProperties().setExpiration(String.valueOf(l));
                return message;

            }
        });
        return R.ok();
    }

}

消费者端:

消费LocalFactory对应的mq中的my-dlx-queue-Ceshi

@Component
@RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "LocalFactory")
@Log4j2
public class locallistener {

    @RabbitHandler
    public void RegularlyAddAsCheckIn(String msg) throws Exception {
        log.info(new Date() + "::LocalFactory收到信息::" + msg);
    }
}

消费mergeFactoryFactory对应的mq中的my-dlx-queue-Ceshi

@Component
@RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "mergeFactory")
@Log4j2
public class mergerlistener {

    @RabbitHandler
    public void RegularlyAddAsCheckIn(String msg) throws Exception {
        log.info(new Date() + "::mergeFactory收到信息::" + msg);
    }

}

完事,实测有效。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/106022.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!