如果重复的消息同时进来的话,我们就要进行消息幂等性的校验,使其只消费一条消息,使得消息不会被重复消费,并且手动确认消息(ACK)保证队列中的消息被成功消费
解决方案:
对于mysql在高并发情况下存在的性能瓶颈,这里使用redis缓存,把消费过的消息id存入redis中,每次消费消息前,检查redis中是否存在该消息id,如果存在则证明被消费过不再消费,不存在则直接返回。
1.开启手动确认
rabbitmq:
username: guest
password: guest
host: 8.140.157.58
port: 5672
#开启手动确认
listener:
simple:
acknowledge-mode: manual
2.消费者模块引入redis并配置
redis:
host: 8.140.157.58
port: 6379
database: 0
password: yyx10033016
timeout: 10000ms
lettuce:
pool:
min-idle: 5
max-idle: 200
max-active: 1024
max-wait: 10000ms
3.修改消费者模块
@Component
public class MailReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
@Resource
private JavaMailSender javaMailSender;
@Resource
private MailProperties mailProperties;
@Resource
private TemplateEngine templateEngine;
@Autowired
private RedisTemplate redisTemplate;
@RabbitListener(queues = MailConstants.QUEUE)
public void handler(Message message, Channel channel){
Employee employee = (Employee) message.getPayload();
MessageHeaders headers = message.getHeaders();
//消息序号
long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
String msgId = (String) headers.get("spring_returned_message_correlation");
HashOperations hashOperations = redisTemplate.opsForHash();
MimeMessage mimeMessage = javaMailSender.createMimeMessage();
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage);
try {
if (hashOperations.entries("mail_log").containsKey(msgId)){
LOGGER.error("消息已将被消费=============>",msgId);
/*
手动确认消息
tag:消息序号
multiple:false 是否确认多条
*/
channel.basicAck(tag,false);
return;
}
//发件人
mimeMessageHelper.setFrom(mailProperties.getUsername());
//收件人
mimeMessageHelper.setTo(employee.getEmail());
//主题
mimeMessageHelper.setSubject("入职欢迎邮件");
//发送日期
mimeMessageHelper.setSentDate(new Date());
//邮件发送内容
Context context = new Context();
context.setVariable("name",employee.getName());
context.setVariable("posName",employee.getPositionName());
context.setVariable("joblevelName",employee.getJoblevelName());
context.setVariable("departmentName",employee.getDepartmentName());
String mail = templateEngine.process("mail", context);
mimeMessageHelper.setText(mail,true);
//发送邮件
javaMailSender.send(mimeMessage);
LOGGER.info("mail发送成功!");
//将消息id存入redis中
hashOperations.put("mail_log",msgId,"ok");
//手动确认消息
channel.basicAck(tag,false);
} catch (Exception e) {
try {
//true 是否退回队列
channel.basicNack(tag,false,true);
} catch (IOException ioException) {
LOGGER.error("邮件发送失败======>",e.getMessage());
}
LOGGER.error("邮件发送失败======>",e.getMessage());
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/4966.html