RabbitMQ之通过消息日志表、ACK机制、死信队列、定时任务实现消息的可靠投递完整使用示例
添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
定义基础代码
定义消息枚举
public enum MessageEnum {
/**
* 发送
*/
SEND,
/**
* 接收
*/
RECEIVE,
/**
* 死亡
*/
DEAD;
}
定义消息对象
@Getter
@Setter
@ToString
public class MessagePo {
/**
* 消息id
*/
private String id;
/**
* 消息服务名
*/
private String service;
/**
* 消息状态
*/
private MessageEnum type;
/**
* 交换机
*/
private String exchange;
/**
* 路由key
*/
private String routingKey;
/**
* 队列
*/
private String queue;
/**
* 重试次数
*/
private Integer sequence;
/**
* 消息体
*/
private String payload;
/**
* 操作时间
*/
private Date date;
}
定义持久层
@Mapper
@Repository
public interface MessageDao {
/**
* 保存消息对象
*
* @param messagePo
*/
@Insert("replace INTO message (id ,type, service, exchange, routing_key, queue, sequence, payload, date) " +
"VALUES(#{id}, #{type}, #{service},#{exchange},#{routingKey},#{queue},#{sequence}, #{payload},#{date})")
void insert(MessagePo messagePo);
/**
* 修改消息对象
*
* @param messagePo
*/
@Update("update message set type =#{type}, service =#{service}, exchange =#{exchange}, " +
"routing_key =#{routingKey}, queue =#{queue}, sequence =#{sequence}, payload =#{payload}, date =#{date} " +
"where id=#{id} and service = #{service}")
void update(MessagePo messagePo);
/**
* 通过消息id与服务名查询消息对象
*
* @param id
* @param service
* @return
*/
@Select("SELECT id, type, service, exchange, routing_key routingKey, queue, sequence, payload, date FROM " +
"message " +
"WHERE id = #{id} and service = #{service}")
MessagePo selectByIdAndService(@Param("id") String id, @Param("service") String service);
/**
* 通过消息状态与服务名查询消息对象
*
* @param type
* @param service
* @return
*/
@Select("SELECT id, type, service, exchange, routing_key routingKey, queue, sequence, payload, date FROM " +
"message WHERE type = #{type} and service = #{service}")
List<MessagePo> selectByTypeAndService(@Param("type") String type, @Param("service") String service);
/**
* 删除消息对象
*
* @param id
* @param service
*/
@Delete("DELETE FROM message WHERE id = #{id} and service = #{service}")
void delete(@Param("id") String id, @Param("service") String service);
}
服务接口
public interface MessageService {
/**
* 发送消息前,消息持久化
*/
MessagePo messageSendReady(String exchange, String routingKey, String body);
/**
* 消息发送成功,删除消息
*/
void messageSendSuccess(String id);
/**
* 发送消息返回
*/
MessagePo messageSendReturn(String id, String exchange, String routingKey, String body);
/**
* 记录消息发送次数
*/
void messageResend(String id);
/**
* 保存监听到的死信消息
*/
void messageDead(String id, String exchange, String routingKey, String queue, String body);
/**
* 消息重发多次后,放弃发送
*/
void messageDead(String id);
/**
* 消息消费前保存
*/
MessagePo messageReceiveReady(String id, String exchange, String routingKey, String queue, String body);
/**
* 发送消息被消费成功
*/
void messageReceiveSuccess(String id);
/**
* 查询应发未发消息
*/
List<MessagePo> listReadyMessages();
}
服务接口实现
@Service
@Slf4j
public class MessageServiceImpl implements MessageService {
@Autowired
MessageDao messageDao;
@Value("${mq.service}")
String serviceName;
@Override
public MessagePo messageSendReady(String exchange, String routingKey, String body) {
final String messageId = UUID.randomUUID().toString();
MessagePo messagePo = new MessagePo();
messagePo.setId(messageId);
messagePo.setService(serviceName);
messagePo.setExchange(exchange);
messagePo.setRoutingKey(routingKey);
messagePo.setPayload(body);
messagePo.setDate(new Date());
messagePo.setSequence(0);
messagePo.setType(MessageEnum.SEND);
messageDao.insert(messagePo);
return messagePo;
}
@Override
public void messageSendSuccess(String id) {
messageDao.delete(id, serviceName);
}
@Override
public MessagePo messageSendReturn(String id, String exchange, String routingKey, String body) {
MessagePo messagePo = new MessagePo();
messagePo.setId(UUID.randomUUID().toString());
messagePo.setService(serviceName);
messagePo.setExchange(exchange);
messagePo.setRoutingKey(routingKey);
messagePo.setPayload(body);
messagePo.setDate(new Date());
messagePo.setSequence(0);
messagePo.setType(MessageEnum.SEND);
messageDao.insert(messagePo);
return messagePo;
}
@Override
public void messageResend(String id) {
MessagePo messagePo = messageDao.selectByIdAndService(id, serviceName);
messagePo.setSequence(messagePo.getSequence() + 1);
messageDao.update(messagePo);
}
@Override
public List<MessagePo> listReadyMessages() {
return messageDao.selectByTypeAndService(MessageEnum.SEND.name(), serviceName);
}
@Override
public void messageDead(String id, String exchange, String routingKey, String queue, String body) {
MessagePo messagePo = new MessagePo();
messagePo.setId(id);
messagePo.setService(serviceName);
messagePo.setExchange(exchange);
messagePo.setRoutingKey(routingKey);
messagePo.setPayload(body);
messagePo.setDate(new Date());
messagePo.setSequence(0);
messagePo.setType(MessageEnum.DEAD);
messageDao.insert(messagePo);
}
@Override
public void messageDead(String id) {
MessagePo messagePo = messageDao.selectByIdAndService(id, serviceName);
messagePo.setType(MessageEnum.DEAD);
messageDao.update(messagePo);
}
@Override
public MessagePo messageReceiveReady(String id, String exchange, String routingKey, String queue, String body) {
MessagePo messagePo = messageDao.selectByIdAndService(id, serviceName);
if (null == messagePo) {
messagePo = new MessagePo();
messagePo.setId(id);
messagePo.setService(serviceName);
messagePo.setExchange(exchange);
messagePo.setRoutingKey(routingKey);
messagePo.setQueue(queue);
messagePo.setPayload(body);
messagePo.setDate(new Date());
messagePo.setSequence(0);
messagePo.setType(MessageEnum.RECEIVE);
messageDao.insert(messagePo);
} else {
messagePo.setSequence(messagePo.getSequence() + 1);
messageDao.update(messagePo);
}
return messagePo;
}
@Override
public void messageReceiveSuccess(String id) {
messageDao.delete(id, serviceName);
}
}
消息发送
@Component
@Slf4j
public class MessageSender {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
MessageService messageService;
@Transactional(rollbackFor = Exception.class)
public void send(String exchange, String routingKey, Object payload) {
log.info("send():exchange:{}, routingKey:{}, payload{}", exchange, routingKey, payload);
try {
ObjectMapper mapper = new ObjectMapper();
String payloadStr = mapper.writeValueAsString(payload);
// 发送消息前,消息持久化
MessagePo messagePO = messageService.messageSendReady(exchange, routingKey, payloadStr);
//设置消息属性
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(payloadStr.getBytes(), messageProperties);
message.getMessageProperties().setMessageId(messagePO.getId());
//发送
rabbitTemplate.convertAndSend(exchange, routingKey, message, new CorrelationData(messagePO.getId()));
log.info("发送消息,消息ID:{}", messagePO.getId());
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException("发送RabbitMQ消息失败!", e);
}
}
}
消息监听
@Slf4j
public abstract class AbstractMessageListener implements ChannelAwareMessageListener {
@Autowired
MessageService messageService;
@Value("#{new Integer('${mq.reconsumeTimes}')}")
int reconsumeTimes;
public abstract void receiveMessage(Message message);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
MessageProperties messageProperties = message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
MessagePo messagePO =messageService.messageReceiveReady(
message.getMessageProperties().getMessageId(),
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
new String(message.getBody())
);
log.info("收到消息,消息ID:{} 消费次数:{}", messageProperties.getMessageId(), messagePO.getSequence());
try {
receiveMessage(message);
// 确认签收消息
channel.basicAck(deliveryTag, false);
// 删除消息
messageService.messageReceiveSuccess(messagePO.getId());
} catch (Exception e) {
log.error("RabbitMQ 消息消费失败," + e.getMessage(), e);
if (messagePO.getSequence() >= reconsumeTimes) {
// 入死信队列
channel.basicReject(deliveryTag, false);
} else {
// 重回到队列,重新消费, 按照2的指数级递增
Thread.sleep((long) (Math.pow(2, messagePO.getSequence()) * 1000));
channel.basicNack(deliveryTag, false, true);
}
}
}
}
死信队列监听
@Component
@ConditionalOnProperty("mq.dlxEnabled")
@Slf4j
public class DlxListener implements ChannelAwareMessageListener {
@Autowired
MessageService messageService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String messageBody = new String(message.getBody());
log.error("message:{} | tag:{}", messageBody, message.getMessageProperties().getDeliveryTag());
//保存监听到的死信消息
messageService.messageDead(
message.getMessageProperties().getMessageId(),
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
messageBody);
// 确认签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
配置RabbitMQ
@Configuration
@Slf4j
public class RabbitMqConfig {
@Autowired
MessageService messageService;
@Bean
public Exchange exchange1() {
return new DirectExchange("exchange.test");
}
@Bean
public Queue queue1() {
return new Queue("queue.test");
}
@Bean
public Binding binding1() {
return new Binding(
"queue.test",
Binding.DestinationType.QUEUE,
"exchange.test",
"key.test",
null);
}
@Bean
public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause);
if (ack && null != correlationData) {
String messageId = correlationData.getId();
log.info("消息已正确投递到交换机, messageId:{}", messageId);
messageService.messageSendSuccess(messageId);
} else {
log.error("消息投递至交换机失败,correlationData:{},cause:{}", correlationData, cause);
}
}
);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.error("消息无法路由,message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",
message, replyCode, replyText, exchange, routingKey);
messageService.messageSendReturn(
message.getMessageProperties().getMessageId(),
exchange,
routingKey,
new String(message.getBody())
);
});
return rabbitTemplate;
}
}
配置死信交换机及队列
@Configuration
@ConditionalOnProperty("mq.dlxEnabled")
public class DlxConfig {
@Bean
public TopicExchange dlxExchange() {
return new TopicExchange("exchange.dlx");
}
@Bean
public Queue dlxQueue() {
return new Queue("queue.dlx",true,false,false);
}
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("#");
}
@Bean
public SimpleMessageListenerContainer deadLetterListenerContainer(ConnectionFactory connectionFactory, DlxListener dlxListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(dlxQueue());
container.setExposeListenerChannel(true);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(dlxListener);
/** 设置消费者能处理消息的最大个数 */
container.setPrefetchCount(100);
return container;
}
}
定时任务
@EnableScheduling
@Configuration
@Component
@Slf4j
public class ResendTask {
@Autowired
MessageService messageService;
@Autowired
RabbitTemplate rabbitTemplate;
@Value("#{new Integer('${mq.resendTimes}')}")
int resendTimes;
@Scheduled(fixedDelayString = "${mq.resendFreq}")
public void resendMessage() {
// 重发消息
List<MessagePo> messagePo = messageService.listReadyMessages();
for (MessagePo messagePO : messagePo) {
log.info("重新发送消息,messagePO:{}", messagePO);
if (messagePO.getSequence() > resendTimes) {
log.error("messagePO:{},已达到重试发送次数,放弃发送", messagePO);
messageService.messageDead(messagePO.getId());
continue;
}
//设置消息属性
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(messagePO.getPayload().getBytes(), messageProperties);
message.getMessageProperties().setMessageId(messagePO.getId());
// 发送消息
rabbitTemplate.convertAndSend(messagePO.getExchange(), messagePO.getRoutingKey(),
message, new CorrelationData(messagePO.getId()));
// 更新发送次数
messageService.messageResend(messagePO.getId());
}
}
}
配置application.yml
server.port=8888
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&useSSL=false\
&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
mq.service=testService
mq.host=IP
mq.port=5672
mq.username=guest
mq.password=guest
mq.vhost=/
mq.dlxEnabled=true
mq.resendTimes=5
mq.reconsumeTimes=5
mq.resendFreq=5000
spring.rabbitmq.addresses=IP
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.port=5672
# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 要创建的最少消费者数量
spring.rabbitmq.listener.simple.concurrency=3
# 最大消费者数
spring.rabbitmq.listener.simple.max-concurrency=6
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137002.html