RabbitMQ之通过消息日志表、ACK机制、死信队列、定时任务实现消息的可靠投递完整使用示例

生活中,最使人疲惫的往往不是道路的遥远,而是心中的郁闷;最使人痛苦的往往不是生活的不幸,而是希望的破灭;最使人颓废的往往不是前途的坎坷,而是自信的丧失;最使人绝望的往往不是挫折的打击,而是心灵的死亡。所以我们要有自己的梦想,让梦想的星光指引着我们走出落漠,走出惆怅,带着我们走进自己的理想。

导读:本篇文章讲解 RabbitMQ之通过消息日志表、ACK机制、死信队列、定时任务实现消息的可靠投递完整使用示例,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

添加依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>

    </dependencies>

定义基础代码

定义消息枚举

public enum MessageEnum {
    /**
     * 发送
     */
    SEND,
    /**
     * 接收
     */
    RECEIVE,
    /**
     * 死亡
     */
    DEAD;
}

定义消息对象

@Getter
@Setter
@ToString
public class MessagePo {
    /**
     * 消息id
     */
    private String id;
    /**
     * 消息服务名
     */
    private String service;
    /**
     * 消息状态
     */
    private MessageEnum type;
    /**
     * 交换机
     */
    private String exchange;
    /**
     * 路由key
     */
    private String routingKey;
    /**
     * 队列
     */
    private String queue;
    /**
     * 重试次数
     */
    private Integer sequence;
    /**
     * 消息体
     */
    private String payload;
    /**
     * 操作时间
     */
    private Date date;
}

定义持久层

@Mapper
@Repository
public interface MessageDao {

    /**
     * 保存消息对象
     *
     * @param messagePo
     */
    @Insert("replace INTO message (id ,type, service, exchange, routing_key, queue, sequence, payload, date) " +
            "VALUES(#{id}, #{type}, #{service},#{exchange},#{routingKey},#{queue},#{sequence}, #{payload},#{date})")
    void insert(MessagePo messagePo);

    /**
     * 修改消息对象
     *
     * @param messagePo
     */
    @Update("update message set type =#{type}, service =#{service}, exchange =#{exchange}, " +
            "routing_key =#{routingKey}, queue =#{queue}, sequence =#{sequence}, payload =#{payload}, date =#{date} " +
            "where id=#{id} and service = #{service}")
    void update(MessagePo messagePo);

    /**
     * 通过消息id与服务名查询消息对象
     *
     * @param id
     * @param service
     * @return
     */
    @Select("SELECT id, type, service, exchange, routing_key routingKey, queue, sequence, payload, date FROM " +
            "message " +
            "WHERE id = #{id} and service = #{service}")
    MessagePo selectByIdAndService(@Param("id") String id, @Param("service") String service);

    /**
     * 通过消息状态与服务名查询消息对象
     *
     * @param type
     * @param service
     * @return
     */
    @Select("SELECT id, type, service, exchange, routing_key routingKey, queue, sequence, payload, date FROM " +
            "message WHERE type = #{type} and service = #{service}")
    List<MessagePo> selectByTypeAndService(@Param("type") String type, @Param("service") String service);

    /**
     * 删除消息对象
     *
     * @param id
     * @param service
     */
    @Delete("DELETE FROM message WHERE id = #{id} and service = #{service}")
    void delete(@Param("id") String id, @Param("service") String service);

}

服务接口

public interface MessageService {

    /**
     * 发送消息前,消息持久化
     */
    MessagePo messageSendReady(String exchange, String routingKey, String body);

    /**
     * 消息发送成功,删除消息
     */
    void messageSendSuccess(String id);

    /**
     * 发送消息返回
     */
    MessagePo messageSendReturn(String id, String exchange, String routingKey, String body);

    /**
     * 记录消息发送次数
     */
    void messageResend(String id);

    /**
     * 保存监听到的死信消息
     */
    void messageDead(String id, String exchange, String routingKey, String queue, String body);

    /**
     * 消息重发多次后,放弃发送
     */
    void messageDead(String id);

    /**
     * 消息消费前保存
     */
    MessagePo messageReceiveReady(String id, String exchange, String routingKey, String queue, String body);

    /**
     * 发送消息被消费成功
     */
    void messageReceiveSuccess(String id);

    /**
     * 查询应发未发消息
     */
    List<MessagePo> listReadyMessages();
}

服务接口实现

@Service
@Slf4j
public class MessageServiceImpl implements MessageService {

    @Autowired
    MessageDao messageDao;

    @Value("${mq.service}")
    String serviceName;

    @Override
    public MessagePo messageSendReady(String exchange, String routingKey, String body) {
        final String messageId = UUID.randomUUID().toString();
        MessagePo messagePo = new MessagePo();
        messagePo.setId(messageId);
        messagePo.setService(serviceName);
        messagePo.setExchange(exchange);
        messagePo.setRoutingKey(routingKey);
        messagePo.setPayload(body);
        messagePo.setDate(new Date());
        messagePo.setSequence(0);
        messagePo.setType(MessageEnum.SEND);
        messageDao.insert(messagePo);
        return messagePo;
    }

    @Override
    public void messageSendSuccess(String id) {
        messageDao.delete(id, serviceName);
    }

    @Override
    public MessagePo messageSendReturn(String id, String exchange, String routingKey, String body) {
        MessagePo messagePo = new MessagePo();
        messagePo.setId(UUID.randomUUID().toString());
        messagePo.setService(serviceName);
        messagePo.setExchange(exchange);
        messagePo.setRoutingKey(routingKey);
        messagePo.setPayload(body);
        messagePo.setDate(new Date());
        messagePo.setSequence(0);
        messagePo.setType(MessageEnum.SEND);
        messageDao.insert(messagePo);
        return messagePo;
    }

    @Override
    public void messageResend(String id) {
        MessagePo messagePo = messageDao.selectByIdAndService(id, serviceName);
        messagePo.setSequence(messagePo.getSequence() + 1);
        messageDao.update(messagePo);
    }

    @Override
    public List<MessagePo> listReadyMessages() {
        return messageDao.selectByTypeAndService(MessageEnum.SEND.name(), serviceName);
    }

    @Override
    public void messageDead(String id, String exchange, String routingKey, String queue, String body) {
        MessagePo messagePo = new MessagePo();
        messagePo.setId(id);
        messagePo.setService(serviceName);
        messagePo.setExchange(exchange);
        messagePo.setRoutingKey(routingKey);
        messagePo.setPayload(body);
        messagePo.setDate(new Date());
        messagePo.setSequence(0);
        messagePo.setType(MessageEnum.DEAD);
        messageDao.insert(messagePo);
    }

    @Override
    public void messageDead(String id) {
        MessagePo messagePo = messageDao.selectByIdAndService(id, serviceName);
        messagePo.setType(MessageEnum.DEAD);
        messageDao.update(messagePo);
    }

    @Override
    public MessagePo messageReceiveReady(String id, String exchange, String routingKey, String queue, String body) {
        MessagePo messagePo = messageDao.selectByIdAndService(id, serviceName);
        if (null == messagePo) {
            messagePo = new MessagePo();
            messagePo.setId(id);
            messagePo.setService(serviceName);
            messagePo.setExchange(exchange);
            messagePo.setRoutingKey(routingKey);
            messagePo.setQueue(queue);
            messagePo.setPayload(body);
            messagePo.setDate(new Date());
            messagePo.setSequence(0);
            messagePo.setType(MessageEnum.RECEIVE);
            messageDao.insert(messagePo);
        } else {
            messagePo.setSequence(messagePo.getSequence() + 1);
            messageDao.update(messagePo);
        }
        return messagePo;
    }

    @Override
    public void messageReceiveSuccess(String id) {
        messageDao.delete(id, serviceName);
    }
}

消息发送

@Component
@Slf4j
public class MessageSender {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    MessageService messageService;

    @Transactional(rollbackFor = Exception.class)
    public void send(String exchange, String routingKey, Object payload) {
        log.info("send():exchange:{}, routingKey:{}, payload{}", exchange, routingKey, payload);

        try {
            ObjectMapper mapper = new ObjectMapper();
            String payloadStr = mapper.writeValueAsString(payload);

            // 发送消息前,消息持久化
            MessagePo messagePO = messageService.messageSendReady(exchange, routingKey, payloadStr);

            //设置消息属性
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/json");
            Message message = new Message(payloadStr.getBytes(), messageProperties);
            message.getMessageProperties().setMessageId(messagePO.getId());
            //发送
            rabbitTemplate.convertAndSend(exchange, routingKey, message, new CorrelationData(messagePO.getId()));

            log.info("发送消息,消息ID:{}", messagePO.getId());

        } catch (Exception e) {
            log.error(e.getMessage(), e);
            throw new RuntimeException("发送RabbitMQ消息失败!", e);
        }
    }
}

消息监听

@Slf4j
public abstract class AbstractMessageListener implements ChannelAwareMessageListener {

    @Autowired
    MessageService messageService;

    @Value("#{new Integer('${mq.reconsumeTimes}')}")
    int reconsumeTimes;

    public abstract void receiveMessage(Message message);

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        MessageProperties messageProperties = message.getMessageProperties();
        long deliveryTag = messageProperties.getDeliveryTag();

        MessagePo messagePO =messageService.messageReceiveReady(
                message.getMessageProperties().getMessageId(),
                message.getMessageProperties().getReceivedExchange(),
                message.getMessageProperties().getReceivedRoutingKey(),
                message.getMessageProperties().getConsumerQueue(),
                new String(message.getBody())
        );

        log.info("收到消息,消息ID:{} 消费次数:{}", messageProperties.getMessageId(), messagePO.getSequence());

        try {
            receiveMessage(message);
            // 确认签收消息
            channel.basicAck(deliveryTag, false);
			// 删除消息
            messageService.messageReceiveSuccess(messagePO.getId());
        } catch (Exception e) {
            log.error("RabbitMQ 消息消费失败," + e.getMessage(), e);
            if (messagePO.getSequence() >= reconsumeTimes) {
                // 入死信队列
                channel.basicReject(deliveryTag, false);
            } else {
                // 重回到队列,重新消费, 按照2的指数级递增
                Thread.sleep((long) (Math.pow(2, messagePO.getSequence()) * 1000));
                channel.basicNack(deliveryTag, false, true);
            }
        }
    }
}

死信队列监听

@Component
@ConditionalOnProperty("mq.dlxEnabled")
@Slf4j
public class DlxListener implements ChannelAwareMessageListener {

    @Autowired
    MessageService messageService;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String messageBody = new String(message.getBody());
        log.error("message:{} | tag:{}", messageBody, message.getMessageProperties().getDeliveryTag());

        //保存监听到的死信消息
        messageService.messageDead(
                message.getMessageProperties().getMessageId(),
                message.getMessageProperties().getReceivedExchange(),
                message.getMessageProperties().getReceivedRoutingKey(),
                message.getMessageProperties().getConsumerQueue(),
                messageBody);
                
		// 确认签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

配置RabbitMQ

@Configuration
@Slf4j
public class RabbitMqConfig {

    @Autowired
    MessageService messageService;

    @Bean
    public Exchange exchange1() {
        return new DirectExchange("exchange.test");
    }

    @Bean
    public Queue queue1() {
        return new Queue("queue.test");
    }

    @Bean
    public Binding binding1() {
        return new Binding(
                "queue.test",
                Binding.DestinationType.QUEUE,
                "exchange.test",
                "key.test",
                null);
    }

    @Bean
    public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                    log.info("correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause);
                    if (ack && null != correlationData) {
                        String messageId = correlationData.getId();
                        log.info("消息已正确投递到交换机, messageId:{}", messageId);
                        messageService.messageSendSuccess(messageId);
                    } else {
                        log.error("消息投递至交换机失败,correlationData:{},cause:{}", correlationData, cause);
                    }
                }
        );

        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.error("消息无法路由,message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",
                    message, replyCode, replyText, exchange, routingKey);
           
            messageService.messageSendReturn(
                    message.getMessageProperties().getMessageId(),
                    exchange,
                    routingKey,
                    new String(message.getBody())
            );
        });
        return rabbitTemplate;
    }
}

配置死信交换机及队列

@Configuration
@ConditionalOnProperty("mq.dlxEnabled")
public class DlxConfig {
    @Bean
    public TopicExchange dlxExchange() {
        return new TopicExchange("exchange.dlx");
    }

    @Bean
    public Queue dlxQueue() {
        return new Queue("queue.dlx",true,false,false);
    }

    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("#");
    }

    @Bean
    public SimpleMessageListenerContainer deadLetterListenerContainer(ConnectionFactory connectionFactory, DlxListener dlxListener) {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(dlxQueue());
        container.setExposeListenerChannel(true);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(dlxListener);
        /** 设置消费者能处理消息的最大个数 */
        container.setPrefetchCount(100);
        return container;
    }

}

定时任务

@EnableScheduling
@Configuration
@Component
@Slf4j
public class ResendTask {

    @Autowired
    MessageService messageService;
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Value("#{new Integer('${mq.resendTimes}')}")
    int resendTimes;

    @Scheduled(fixedDelayString = "${mq.resendFreq}")
    public void resendMessage() {
        // 重发消息
        List<MessagePo> messagePo = messageService.listReadyMessages();
        for (MessagePo messagePO : messagePo) {
            log.info("重新发送消息,messagePO:{}", messagePO);
            if (messagePO.getSequence() > resendTimes) {
                log.error("messagePO:{},已达到重试发送次数,放弃发送", messagePO);
                messageService.messageDead(messagePO.getId());
                continue;
            }
            //设置消息属性
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/json");
            Message message = new Message(messagePO.getPayload().getBytes(), messageProperties);
            message.getMessageProperties().setMessageId(messagePO.getId());
            
            // 发送消息
            rabbitTemplate.convertAndSend(messagePO.getExchange(), messagePO.getRoutingKey(),
                    message, new CorrelationData(messagePO.getId()));
           // 更新发送次数         
            messageService.messageResend(messagePO.getId());
        }
    }
}

配置application.yml

server.port=8888
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&useSSL=false\
  &serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

mq.service=testService
mq.host=IP
mq.port=5672
mq.username=guest
mq.password=guest
mq.vhost=/
mq.dlxEnabled=true
mq.resendTimes=5
mq.reconsumeTimes=5
mq.resendFreq=5000


spring.rabbitmq.addresses=IP
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.port=5672

# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 要创建的最少消费者数量
spring.rabbitmq.listener.simple.concurrency=3
# 最大消费者数
spring.rabbitmq.listener.simple.max-concurrency=6

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137002.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!