-
背景
-
设计思路
-
具体实现
-
项目代码结构
-
模块说明
-
定义一个幂等注解
-
定义一个AOP切面
-
定义抽象唯一id接口
-
定义一个自动配置类
-
使用
-
项目源码地址
-
不足
背景
目前项目中RocketMQ应用场景还是比较多的,但是线上经常因为一些异常原因导致生产者重新投递消息导致消费者消费重新消费,为此每次需要手动处理消费幂等,觉得比较繁琐,所以就想了写一个通用的自动幂等框架
设计思路
开始是想着借用Redis或Mysql作去重幂等校验,大致思路如下
大致流程如下,其中加锁部分在redis就直接使用redis的分布式锁即可,在Mysql中加锁可以基于for update来
实现
最开始的实现是想基于RocketMQ的MessageListenerConcurrently
接口去重写消费的实现,大致代码如下
public abstract class RocketMQListener implements MessageListenerConcurrently {
private final IdempotentConfig config;
private final Idempotent idempotent;
public RocketMQListener(IdempotentConfig config, Idempotent idempotent) {
// this.strategy = strategy;
this.config = config;
this.idempotent = idempotent;
log.info("Redis 幂等组件加载成功");
}
public RocketMQListener(Idempotent idempotent) {
this.config = new IdempotentConfig();
this.idempotent = idempotent;
log.info("Redis 幂等组件加载成功");
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
boolean hasConsumeFail = false;
int ackIndexIfFail = -1;
for (int i = 0; i < msgs.size(); i++) {
MessageExt msg = msgs.get(i);
try {
hasConsumeFail = !consumeByIdempotent(msg);
} catch (Exception e) {
log.error("消费异常", e);
hasConsumeFail = true;
}
//如果前面出现消费失败的话,后面也不用消费了,因为都会重发
if (hasConsumeFail) {
break;
} else { //到现在都消费成功
ackIndexIfFail = i;
}
}
//全都消费成功
if (!hasConsumeFail) {
log.info("consume [{}] msg(s) all successfully", msgs.size());
} else {//存在失败的
//标记成功位,后面的会重发以重新消费,在这个位置之前的不会重发。 详情见源码:ConsumeMessageConcurrentlyService#processConsumeResult
context.setAckIndex(ackIndexIfFail);
log.warn("consume [{}] msg(s) fails, ackIndex = [{}] ", msgs.size(), context.getAckIndex());
}
//无论如何最后都返回成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 业务处理
*
* @param messageExt
* @return
*/
protected abstract boolean consume(final MessageExt messageExt);
/**
* 默认使用 uniqKey作去重标识
*
* @param messageExt
* @return
*/
protected String uniqID(final MessageExt messageExt) {
String uniqID = MessageClientIDSetter.getUniqID(messageExt);
return uniqID == null ? messageExt.getMsgId() : uniqID;
}
/**
* 幂等消费
*
* @param messageExt
* @return
*/
private boolean consumeByIdempotent(final MessageExt messageExt) {
UnMessage unMessage = new UnMessage();
if (Objects.equals(config.getStrategyEnum(), StrategyEnum.UnIQ_ID)) {
unMessage.setMsgUniqKey(MessageClientIDSetter.getUniqID(messageExt));
} else {
unMessage.setMsgUniqKey(messageExt.getKeys());
}
unMessage.setMsgUniqKey(messageExt.getKeys());
log.info("uniqID {}", messageExt.getKeys());
boolean exitKey = idempotent.exitKey(unMessage);
// 重复消费
if (exitKey) {
log.info("重复消费 exitKey {}", exitKey);
return true;
}
boolean lock = idempotent.lock(unMessage, config);
// 正在消费 重新投递
if (!lock) {
log.info("正在消费 key {}", exitKey);
return false;
}
// 其他情况消费完成
boolean consume;
try {
consume = consume(messageExt);
if (consume) {
try {
idempotent.saveKey(unMessage);
} catch (Exception e) {
config.getMonitor().monitor(unMessage);
throw e;
}
}
return consume;
} finally {
try {
idempotent.unLock(unMessage, config);
} catch (Exception ex) {
log.error("释放锁异常 {}", unMessage);
}
}
}
}
然后业务使用只需要继承RocketMQListener
重写抽象方法consume()
即可
但是在实现完后面临一个问题就是我们项目使用的aliyun的RocketMQ,阿里云的ROcketMQ client 对MessageListenerConcurrently
有了自己的封装,我这里不好扩展,也不好兼容其他MQ,这种实现方式太麻烦太复杂,想着怎么简单怎么实用怎来,于是就打算基于AOP切面来实现。会更为通用
具体实现
项目代码结构
模块说明
-
wh-core 核心实现 -
wh-mq-rocketmq rocketmq幂等核心实现 -
wh-mq-aliyun-rocketmq 阿里云client幂等核心实现 -
wh-mq-Idempotent-samples 使用例子
定义一个幂等注解
-
Idempotent.java
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
}
定义一个AOP切面
@Aspect
@Slf4j
@Component
public class MqIdempotentAop {
private final RedissonClient redissonClient;
private final IdempotentConfig idempotentConfig;
private final MessageConverter messageConverter;
public MqIdempotentAop(RedissonClient redissonClient, IdempotentConfig idempotentConfig, MessageConverter messageConverter) {
if (Objects.isNull(redissonClient)) {
throw new NullPointerException("redissonClient template is null");
}
this.redissonClient = redissonClient;
this.idempotentConfig = idempotentConfig;
this.messageConverter = messageConverter;
}
@Pointcut("@annotation(com.mq.idempotent.core.annotation.Idempotent)")
public void ciderDistributedLockAspect() {
}
@Around(value = "ciderDistributedLockAspect()")
public Object doAround(ProceedingJoinPoint pjp) throws Throwable {
//切点所在的类
MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
Method method = methodSignature.getMethod();
//方法参数
Object[] args = pjp.getArgs();
Idempotent annotation = method.getAnnotation(Idempotent.class);
String msgID = messageConverter.getUniqueKey(Arrays.stream(args).findFirst().orElseThrow(() -> new Exception("参数异常")));
String key = idempotentConfig.getRedisKey() + msgID;
log.info("唯一key {}", key);
if (exitKey(key)) {
log.info("重复消费");
throw new Exception("重复消费");
}
if (!lock(key)) {
log.info("有消息正在消费");
throw new Exception("有消息正在消费");
}
try {
Object proceed = pjp.proceed();
RBucket<String> bucket = redissonClient.getBucket(key);
bucket.set(idempotentConfig.getRedisValue());
return proceed;
} finally {
RLock stockLock = redissonClient.getLock(key);
stockLock.unlock();
}
}
public boolean lock(String lockName) {
RLock stockLock = redissonClient.getLock(lockName);
try {
return stockLock.tryLock(idempotentConfig.getTryLockTime(), TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public boolean exitKey(String key) {
RBucket<String> stockLock = redissonClient.getBucket(key);
return stockLock.isExists();
}
}
定义抽象唯一id接口
这里抽象了一个获取唯一id的接口,用于适配各种mq
/**
* @author : wh
* @date : 2021/11/15 13:51
* @description: 消息转换器
*/
public interface MessageConverter<T> {
String getUniqueKey(T t);
}
由具体的MQ 实现,比如RocketMQ Client
@Component
public class RocketMQMessageConverter implements MessageConverter<MessageExt> {
@Override
public String getUniqueKey(MessageExt messageExt) {
return Objects.nonNull(messageExt.getKeys()) ? messageExt.getKeys() :messageExt.getMsgId();
}
}
定义一个自动配置类
定义一个自动配置类,对sdk的一些核心配置
@Configuration
public class IdempotentAutoConfiguration {
@Value("${idempotent.redis.key:mq::unique::}")
private String redisKey;
@Value("${idempotent.redis.value:s}")
private String redisValue;
@Value("${idempotent.redis.value:1}")
private Long tryLockTime;
@ConditionalOnMissingBean(IdempotentConfig.class)
@Bean
public IdempotentConfig idempotentConfig() {
IdempotentConfig idempotentConfig = new IdempotentConfig();
idempotentConfig.setRedisKey(redisKey);
idempotentConfig.setRedisValue(redisValue);
idempotentConfig.setTryLockTime(tryLockTime);
return idempotentConfig;
}
}
可以看到整体核心逻辑就写完了。由于我们需要给三方项目作为sdk引入,所以我们需要在META-INF
添加 spring.factories
文件,内容如下
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.mq.idempotent.core.aop.MqIdempotentAop,
com.mq.idempotent.core.config.IdempotentAutoConfiguration
核心代码写完我们就可以发布到中央仓库了
使用
使用起来就非常简单了
-
引入依赖
<dependency>
<groupId>io.github.weihubeats</groupId>
<artifactId>wh-rocketmq</artifactId>
<version>1.0.4</version>
</dependency>
-
例子
原先的配置方式不变,无侵入,仅需在需要幂等处理的方法上面加上注解@Idempotent
即可,默认使用业务key去重,如果没有设置则使用MsgID
@Bean(initMethod = "start", destroyMethod = "shutdown")
public Consumer consumer() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.AccessKey, "aliMQAccessKey");
properties.put(PropertyKeyConst.SecretKey, "aliMQSecretKey");
properties.put(PropertyKeyConst.NAMESRV_ADDR, orderNameSerAddr);
properties.put(PropertyKeyConst.MaxReconsumeTimes, 20);
properties.put(PropertyKeyConst.GROUP_ID, "orderServiceGid");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("orderTopic", "test || test_event", (message, context) ->
handleClient(message)
? Action.CommitMessage
: Action.ReconsumeLater);
return consumer;
}
@Idempotent
public boolean handleClient(Message message) {
String tag = message.getTag();
switch (tag) {
case "test" :
return ((AliyunMQConfig) AopContext.currentProxy()).test(message);
}
return true;
}
private boolean test(Message message) {
return false;
}
项目源码地址
https://github.com/weihubeats/wh-mq-Idempotent
觉得不错可以帮忙点个star,后续继续优化。
不足
-
. 没有面向接口编程,扩展性较差,因为为了简单,就直接写死了加锁解锁的一些逻辑,实际应该顶一个接口供自己和外部扩展,来实现自动切换是基于Redis或Mysql去重保证幂等,或由用户自己实现 -
目前支持的MQ Client有限,仅支持 Aliyun oss Client
和RocketMQ Client
。后续需要定义各个Message的一个转换器去转换一个自己对所有MQ统一抽象的Message
,目前的转换器实现还是比较简单的,支持所有的主流MQ,这部分抽象可以参考spring message 模块对所有消息的一个统一封装
原文始发于微信公众号(小奏技术):基于Spring Boot我开源了一个简单的MQ幂等框架(支持主流的MQ客户端)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/30207.html