实现目的
解决 DDD中的领域事件
设计初衷
基于Guava 的EventBus二次封装,添加消息消费失败重试机制
设计流程
说明:DomainEventBus是自己对Guava EventBus的上层封装,publishDomainEvent则是对EventBus的 post的上层封装,后续会有详细说明
核心实现
下面我会先从EventBus的源码分析开始,然后一步一步改进封装
Guava EventBus 源码总览
SubscriberRegistry 注册器
-
核心方法: register 注册订阅方法:将订阅者注册到一个 CopyOnWriteArraySet 中
//订阅者
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
void register(Object listener) {
// key class value Subscriber(方法)
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
//获取这个事件类型的订阅者集合
Collection<Subscriber> eventMethodsInListener = entry.getValue();
// 从缓存中按事件类型查找订阅者集合
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
// 如果没有则初始化
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
// HashMultimap 不熟悉可以看我之前的博客介绍
// https://weihubeats.blog.csdn.net/article/details/117339250
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
// 利用反射获得某个类所有的方法,包括父类、接口等.
// 由于反射性能问题,这里加了缓存
private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
try {
return subscriberMethodsCache.getUnchecked(clazz);
} catch (UncheckedExecutionException e) {
throwIfUnchecked(e.getCause());
throw e;
}
}
// subscriberMethodsCache 在启动就已经初始化了
private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
CacheBuilder.newBuilder()
.weakKeys()
.build(
new CacheLoader<Class<?>, ImmutableList<Method>>() {
@Override
public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
return getAnnotatedMethodsNotCached(concreteClass);
}
});
// 核心实现
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
// 获得这个类的所有父类及接口
Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
for (Class<?> supertype : supertypes) {
for (Method method : supertype.getDeclaredMethods()) {
// 判断方法 是否有 Subscribe注解并且 方法不为静态方法
if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
// TODO(cgdecker): Should check for a generic parameter type and error out
Class<?>[] parameterTypes = method.getParameterTypes();
checkArgument(
parameterTypes.length == 1,
"Method %s has @Subscribe annotation but has %s parameters. "
+ "Subscriber methods must have exactly 1 parameter.",
method,
parameterTypes.length);
checkArgument(
!parameterTypes[0].isPrimitive(),
"@Subscribe method %s's parameter is %s. "
+ "Subscriber methods cannot accept primitives. "
+ "Consider changing the parameter to %s.",
method,
parameterTypes[0].getName(),
Primitives.wrap(parameterTypes[0]).getSimpleName());
MethodIdentifier ident = new MethodIdentifier(method);
if (!identifiers.containsKey(ident)) {
identifiers.put(ident, method);
}
}
}
}
return ImmutableList.copyOf(identifiers.values());
}
这里的改动不需要很大,自定义我们自己的订阅注解即可
自定义消息订阅注解
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CiderSubscribe {
/**
* 方法最长执行时间,单位 秒
*/
int maxExecuteTime() default 10;
}
然后将 getAnnotatedMethodsNotCached 换成我们自定义的注解
if (method.isAnnotationPresent(CiderSubscribe.class) && !method.isSynthetic())
这样我们自定义的CiderSubscribe就生效了,这里只是替换了注解
-
EventBus发送事件
// 发送事件
public void post(Object event) {
// 获取所有订阅者
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
//事件分发
dispatcher.dispatch(event, eventSubscribers);
//如果事件没有订阅者,当做死亡事件处理
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
guava中的异常处理非常简单,try catch住,不影响后面订阅者的执行,然后打印log
public void handleException(Throwable exception, SubscriberExceptionContext context) {
Logger logger = logger(context);
if (logger.isLoggable(Level.SEVERE)) {
logger.log(Level.SEVERE, message(context), exception);
}
}
为了后续的重试机制这里我们需要修改异常处理,将执行失败的方法放入redis供后续重试
SubscriberExceptionHandler改动
public void handleException(Throwable exception, SubscriberExceptionContext context) {
if (DataUtils.isNotEmpty(exception)) {
log.error("消息消费异常 ", exception);
}
RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
String eventId = context.getEventId();
RMap<String, String> stringStringRMap = redissonClient.getMap(redisEventId(eventId));
/**
* 执行失败后放入redis 等待 重试
*/
stringStringRMap.put(md5Method(context.getSubscriberMethod()), "1");
stringStringRMap.expireAsync(7, TimeUnit.DAYS);
log.info("放入redis 等待重试 {} msgId {}", context.getSubscriberMethod().getName(), eventId);
}
同时添加一个过滤执行成功的方法,避免重复消费
public Iterator<Subscriber> filterSuccessMethod(String eventId, Iterator<Subscriber> subscriberIterator) {
RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
RMap<String, String> stringStringRMap = redissonClient.getMap(redisEventId(eventId));
// 失败的方法集合
Set<String> failMethodSet = stringStringRMap.keySet();
List<Subscriber> emptyList = Lists.newArrayList();
while (subscriberIterator.hasNext()) {
Subscriber subscriber = subscriberIterator.next();
String methodMd5 = md5Method(subscriber.getMethod());
if (failMethodSet.contains(methodMd5)) {
emptyList.add(subscriber);
}
}
log.info("执行重试的列表size {} eventId {}", emptyList.stream().map(Subscriber::getMethod).map(Method::getName).collect(Collectors.joining(",")) , eventId);
return emptyList.iterator();
}
上面我们只是为重试机制做了准备
-
将异常方法添加到redis -
添加了过滤执行成功的Subscriber
接下来的核心是修改 EventBus post方法
post方法添加了返回值,所有方法全部执行成功才返回true,同时只执行失败的Subscriber
/**
* @param event 事件
* @param eventId 事件id
* @param reconsumeTimes 重试次数
* @return boolean
*/
public boolean post(Object event, String eventId, int reconsumeTimes) {
// 获取所有订阅者
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
//需要删除掉已经成功的 方法
if (reconsumeTimes > 0) {
//获取执行失败的方法
eventSubscribers = exceptionHandler.filterSuccessMethod(eventId, eventSubscribers);
}
if (eventSubscribers.hasNext()) {
// 重新分发
return dispatcher.dispatch(event, eventId, eventSubscribers);
}
return true;
}
这里虽然添加了过滤执行成功消息,但是仅靠EventBus的 post 方法是没法重试的,所以这里我们就借用mq的重试机制
封装自己的 DomainEventBus
核心方法
public static void register(Object obj) {
eventBus.register(obj);
}
/**
* startDeliverTime 开始投递的时间
*/
public void publishDomainEvent(AbstractDomainEvent obj, LocalDateTime startDeliverTime) {
if (DataUtils.isNotEmpty(producer)) {
Map<String, Object> eventMap = new HashMap<>();
eventMap.put("className", obj.getClass().getName());
eventMap.put("data", obj);
eventMap.put("version", 1);
String jsonString = toJSONString(eventMap);
Message message = new Message(mqConfig.getDomainEventTopic(), obj.getTag(), jsonString.getBytes(StandardCharsets.UTF_8));
if (DataUtils.isNotEmpty(startDeliverTime)) {
message.setStartDeliverTime(DateUtils.toUnix(startDeliverTime));
}
SendResult sendResult = producer.send(message);
if (log.isInfoEnabled()) {
log.info("发送领域事件 MQ {} {} {}", jsonString, sendResult.getMessageId(), DataUtils.getDefaultValue(DateUtils.toString(startDeliverTime), ""));
}
} else {
if (DataUtils.isNotEmpty(startDeliverTime)) {
log.warn("本地消息不支持延迟投递 {}", startDeliverTime);
}
eventBus.post(obj);
if (log.isInfoEnabled()) {
log.info("发送领域事件 eventbus {} ", toJSONString(obj));
}
}
}
public void publishDomainEvent(AbstractDomainEvent obj) {
publishDomainEvent(obj, null);
}
@Override
public Action consume(Message message, ConsumeContext context) {
String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);
String msgID = message.getMsgID();
log.info("收到消息 id {} body {} tag {}", msgID, msgBody, message.getTag());
JSONObject jsonObject = parseObject(msgBody);
String data = jsonObject.getString("data");
String className = jsonObject.getString("className");
try {
Class clazz = ClassUtils.getClass(className);
Object parseObject = parseObject(data, clazz);
boolean executeResult = eventBus.post(parseObject, msgID, message.getReconsumeTimes());
if (!executeResult) {
log.error("执行不成功,等待重试 {} 重试次数 {} ", msgID, message.getReconsumeTimes());
return Action.ReconsumeLater;
}
} catch (Exception e) {
log.error("e", e);
log.error("执行不成功,等待重试 {}", msgID);
return Action.ReconsumeLater;
}
return Action.CommitMessage;
}
这里的核心逻辑,如果有配置mq product,就使用mq发送消息,没有就用内存eventBus发送消息。然后再基于mq的 consume 重试机制
事件转发 Dispatcher
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
// 获取线程私有的队列
Queue<Event> queueForThread = queue.get();
// 往队列写入需要被转发的 Event(事件本身+监听者们)
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
// 使用迭代器 遍历执行队列中的事件
while (nextEvent.subscribers.hasNext()) {
// 监听者通过反射执行方法: Subscribers.java:line 70
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
PerThreadQueuedDispatcher 转发器,具备以下两个特点:
-
线程安全的。EventBus 是一个总线,意味着它大概率是会被不同的线程投递事件的。PerThreadQueuedDispatcher 通过 ThreadLocal 将不同线程的数据隔离开,保证线程安全。 -
这个转发器是基于广度优先转发的。想象一下,假如监听的事件处理中继续往总线中post事件,那就面对着深度优先和广度优先两种选择,这个实现是广度优先的,另外一个Dispatcher是深度优先的,等会解释
广度优先,意味着在转发过程中,新入的事件会被写到这个队列尾部,而不会立刻执行
这里分发事件我们也自己重写,因为没有重试机制
boolean dispatch(Object event, String eventId, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
// 所有都执行成功,才是成功
boolean executeResult = true;
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
executeResult = nextEvent.subscribers.next().dispatchEvent(nextEvent.event, eventId)
&& executeResult;
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
return executeResult;
}
总结
总体改动
-
修改封装了了自己的订阅注解CiderSubscribe -
修改了默认的 SubscriberExceptionHandler.handleException异常处理,添加了异常消息放入redis中 -
封装了自己的DomainEventBus,利用mq添加事件重试机制
比较SpringCloud Stream
缺点:
-
扩展性没有SpringCloud Stream方便 -
功能没有SpringCloud Stream多,例如SpringCloud Stream支持多mq的发送和接受(比如RocketMq发送消息,Kafka接受消息),支持消息发送前中后的拦截,支持消息消费前中后的拦截,支持消息的合并,拆分。支持多mq的整合 -
如果没有配置mq,没有重试机制
优点:
-
基于Guava二次封装源码更可控 -
源码简单易懂,对内透明度更高 -
功能满足自身义务 -
使用更简单轻量
源码下载
后续有需要会上传源代码
原文始发于微信公众号(小奏技术):DDD 领域事件设计(基于Guava EventBus二次封装)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/30486.html