DDD 领域事件设计(基于Guava EventBus二次封装)

实现目的

解决 DDD中的领域事件

设计初衷

基于Guava 的EventBus二次封装,添加消息消费失败重试机制

设计流程

说明:DomainEventBus是自己对Guava EventBus的上层封装,publishDomainEvent则是对EventBus的 post的上层封装,后续会有详细说明DDD 领域事件设计(基于Guava EventBus二次封装)

核心实现

下面我会先从EventBus的源码分析开始,然后一步一步改进封装

Guava EventBus 源码总览

DDD 领域事件设计(基于Guava EventBus二次封装)这是整个Guava EventBus的核心类,代码量非常少

SubscriberRegistry 注册器

  • 核心方法: register 注册订阅方法:将订阅者注册到一个  CopyOnWriteArraySet 中
    //订阅者
    private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
        Maps.newConcurrentMap();
    
    void register(Object listener) {
       
       // key class value Subscriber(方法)
        Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
    
        for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
          Class<?> eventType = entry.getKey();
          //获取这个事件类型的订阅者集合
          Collection<Subscriber> eventMethodsInListener = entry.getValue();
       // 从缓存中按事件类型查找订阅者集合
          CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
    
          if (eventSubscribers == null) {
            // 如果没有则初始化
            CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
            eventSubscribers =
                MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
          }
    
          eventSubscribers.addAll(eventMethodsInListener);
        }
      }
    private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
       // HashMultimap 不熟悉可以看我之前的博客介绍
       // https://weihubeats.blog.csdn.net/article/details/117339250
        Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
        Class<?> clazz = listener.getClass();
        for (Method method : getAnnotatedMethods(clazz)) {
          Class<?>[] parameterTypes = method.getParameterTypes();
          Class<?> eventType = parameterTypes[0];
          methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
        }
        return methodsInListener;
      }
    
    // 利用反射获得某个类所有的方法,包括父类、接口等.
    // 由于反射性能问题,这里加了缓存
    private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
        try {
          return subscriberMethodsCache.getUnchecked(clazz);
        } catch (UncheckedExecutionException e) {
          throwIfUnchecked(e.getCause());
          throw e;
        }
      }
    
    // subscriberMethodsCache 在启动就已经初始化了
    private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
          CacheBuilder.newBuilder()
              .weakKeys()
              .build(
                  new CacheLoader<Class<?>, ImmutableList<Method>>() {
                    @Override
                    public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
                      return getAnnotatedMethodsNotCached(concreteClass);
                    }
                  });
    
    // 核心实现
    private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
       // 获得这个类的所有父类及接口
        Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
        Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
        for (Class<?> supertype : supertypes) {
          for (Method method : supertype.getDeclaredMethods()) {
            // 判断方法 是否有 Subscribe注解并且 方法不为静态方法
            if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
              // TODO(cgdecker): Should check for a generic parameter type and error out
              Class<?>[] parameterTypes = method.getParameterTypes();
              checkArgument(
                  parameterTypes.length == 1,
                  "Method %s has @Subscribe annotation but has %s parameters. "
                      + "Subscriber methods must have exactly 1 parameter.",
                  method,
                  parameterTypes.length);
    
              checkArgument(
                  !parameterTypes[0].isPrimitive(),
                  "@Subscribe method %s's parameter is %s. "
                      + "Subscriber methods cannot accept primitives. "
                      + "Consider changing the parameter to %s.",
                  method,
                  parameterTypes[0].getName(),
                  Primitives.wrap(parameterTypes[0]).getSimpleName());
    
              MethodIdentifier ident = new MethodIdentifier(method);
              if (!identifiers.containsKey(ident)) {
                identifiers.put(ident, method);
              }
            }
          }
        }
        return ImmutableList.copyOf(identifiers.values());
      }

这里的改动不需要很大,自定义我们自己的订阅注解即可

自定义消息订阅注解

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface CiderSubscribe {
    
        /**
         * 方法最长执行时间,单位 秒
         */

        int maxExecuteTime() default 10;
    }

然后将 getAnnotatedMethodsNotCached 换成我们自定义的注解

if (method.isAnnotationPresent(CiderSubscribe.class) && !method.isSynthetic())这样我们自定义的CiderSubscribe就生效了,这里只是替换了注解

  • EventBus发送事件
    // 发送事件
    public void post(Object event) {
       // 获取所有订阅者
        Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
        if (eventSubscribers.hasNext()) {
          //事件分发
          dispatcher.dispatch(event, eventSubscribers);
          //如果事件没有订阅者,当做死亡事件处理
        } else if (!(event instanceof DeadEvent)) {
          // the event had no subscribers and was not itself a DeadEvent
          post(new DeadEvent(this, event));
        }
      }

guava中的异常处理非常简单,try catch住,不影响后面订阅者的执行,然后打印log

    public void handleException(Throwable exception, SubscriberExceptionContext context) {
          Logger logger = logger(context);
          if (logger.isLoggable(Level.SEVERE)) {
            logger.log(Level.SEVERE, message(context), exception);
          }
        }

为了后续的重试机制这里我们需要修改异常处理,将执行失败的方法放入redis供后续重试

SubscriberExceptionHandler改动

    public void handleException(Throwable exception, SubscriberExceptionContext context) {
            if (DataUtils.isNotEmpty(exception)) {
                log.error("消息消费异常 ", exception);
            }
            RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
            String eventId = context.getEventId();
            RMap<String, String> stringStringRMap = redissonClient.getMap(redisEventId(eventId));
    
            /**
             * 执行失败后放入redis 等待 重试
             */

            stringStringRMap.put(md5Method(context.getSubscriberMethod()), "1");
    
            stringStringRMap.expireAsync(7, TimeUnit.DAYS);
    
            log.info("放入redis 等待重试 {} msgId {}", context.getSubscriberMethod().getName(), eventId);
        }

同时添加一个过滤执行成功的方法,避免重复消费

    public Iterator<Subscriber> filterSuccessMethod(String eventId, Iterator<Subscriber> subscriberIterator) {
            RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
    
            RMap<String, String> stringStringRMap = redissonClient.getMap(redisEventId(eventId));
            // 失败的方法集合
            Set<String> failMethodSet = stringStringRMap.keySet();
            List<Subscriber> emptyList = Lists.newArrayList();
    
            while (subscriberIterator.hasNext()) {
                Subscriber subscriber = subscriberIterator.next();
                String methodMd5 = md5Method(subscriber.getMethod());
                if (failMethodSet.contains(methodMd5)) {
                    emptyList.add(subscriber);
                }
            }
            log.info("执行重试的列表size {} eventId {}",              emptyList.stream().map(Subscriber::getMethod).map(Method::getName).collect(Collectors.joining(","))  , eventId);
            return emptyList.iterator();
        }

上面我们只是为重试机制做了准备

  1. 将异常方法添加到redis
  2. 添加了过滤执行成功的Subscriber

接下来的核心是修改 EventBus post方法

post方法添加了返回值,所有方法全部执行成功才返回true,同时只执行失败的Subscriber

    /**
         * @param event 事件
         * @param eventId 事件id
         * @param reconsumeTimes 重试次数
         * @return boolean
         */

    public boolean post(Object event, String eventId, int reconsumeTimes) {
        // 获取所有订阅者
            Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
            //需要删除掉已经成功的 方法
            if (reconsumeTimes > 0) {
                //获取执行失败的方法
                eventSubscribers = exceptionHandler.filterSuccessMethod(eventId, eventSubscribers);
    
            }
            if (eventSubscribers.hasNext()) {
               // 重新分发
                return dispatcher.dispatch(event, eventId, eventSubscribers);
            }
    
            return true;
        }

这里虽然添加了过滤执行成功消息,但是仅靠EventBus的 post 方法是没法重试的,所以这里我们就借用mq的重试机制

封装自己的 DomainEventBus

核心方法

     public static void register(Object obj) {
            eventBus.register(obj);
        }
    
        /**
         * startDeliverTime 开始投递的时间
         */

        public void publishDomainEvent(AbstractDomainEvent obj, LocalDateTime startDeliverTime) {
            if (DataUtils.isNotEmpty(producer)) {
                Map<String, Object> eventMap = new HashMap<>();
                eventMap.put("className", obj.getClass().getName());
                eventMap.put("data", obj);
                eventMap.put("version"1);
                String jsonString = toJSONString(eventMap);
                Message message = new Message(mqConfig.getDomainEventTopic(), obj.getTag(), jsonString.getBytes(StandardCharsets.UTF_8));
    
                if (DataUtils.isNotEmpty(startDeliverTime)) {
                    message.setStartDeliverTime(DateUtils.toUnix(startDeliverTime));
                }
                SendResult sendResult = producer.send(message);
                if (log.isInfoEnabled()) {
                    log.info("发送领域事件 MQ {} {} {}", jsonString, sendResult.getMessageId(), DataUtils.getDefaultValue(DateUtils.toString(startDeliverTime), ""));
                }
    
            } else {
                if (DataUtils.isNotEmpty(startDeliverTime)) {
                    log.warn("本地消息不支持延迟投递 {}", startDeliverTime);
                }
                eventBus.post(obj);
    
                if (log.isInfoEnabled()) {
                    log.info("发送领域事件 eventbus {} ", toJSONString(obj));
                }
            }
        }
    
        public void publishDomainEvent(AbstractDomainEvent obj) {
            publishDomainEvent(obj, null);
        }
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);
            String msgID = message.getMsgID();
            log.info("收到消息 id {} body {} tag {}", msgID, msgBody, message.getTag());
    
            JSONObject jsonObject = parseObject(msgBody);
            String data = jsonObject.getString("data");
            String className = jsonObject.getString("className");
            try {
                Class clazz = ClassUtils.getClass(className);
                Object parseObject = parseObject(data, clazz);
    
                boolean executeResult = eventBus.post(parseObject, msgID, message.getReconsumeTimes());
    
                if (!executeResult) {
                    log.error("执行不成功,等待重试 {} 重试次数 {} ", msgID, message.getReconsumeTimes());
                    return Action.ReconsumeLater;
                }
            } catch (Exception e) {
                log.error("e", e);
                log.error("执行不成功,等待重试 {}", msgID);
                return Action.ReconsumeLater;
    
            }
    
            return Action.CommitMessage;
        }

这里的核心逻辑,如果有配置mq product,就使用mq发送消息,没有就用内存eventBus发送消息。然后再基于mq的 consume 重试机制

事件转发 Dispatcher

    private final ThreadLocal<Queue<Event>> queue =
            new ThreadLocal<Queue<Event>>() {
              @Override
              protected Queue<Event> initialValue() {
                return Queues.newArrayDeque();
              }
            };
    
    
    private final ThreadLocal<Boolean> dispatching =
            new ThreadLocal<Boolean>() {
              @Override
              protected Boolean initialValue() {
                return false;
              }
            };
    
    
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
          checkNotNull(event);
          checkNotNull(subscribers);
        // 获取线程私有的队列
          Queue<Event> queueForThread = queue.get();
         // 往队列写入需要被转发的 Event(事件本身+监听者们)
          queueForThread.offer(new Event(event, subscribers));
    
          if (!dispatching.get()) {
            dispatching.set(true);
            try {
              Event nextEvent;
              while ((nextEvent = queueForThread.poll()) != null) {
                // 使用迭代器 遍历执行队列中的事件
                while (nextEvent.subscribers.hasNext()) {
                  // 监听者通过反射执行方法: Subscribers.java:line 70
                  nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
                }
              }
            } finally {
              dispatching.remove();
              queue.remove();
            }
          }
        }

PerThreadQueuedDispatcher 转发器,具备以下两个特点:

  • 线程安全的。EventBus 是一个总线,意味着它大概率是会被不同的线程投递事件的。PerThreadQueuedDispatcher 通过 ThreadLocal 将不同线程的数据隔离开,保证线程安全。
  • 这个转发器是基于广度优先转发的。想象一下,假如监听的事件处理中继续往总线中post事件,那就面对着深度优先和广度优先两种选择,这个实现是广度优先的,另外一个Dispatcher是深度优先的,等会解释

广度优先,意味着在转发过程中,新入的事件会被写到这个队列尾部,而不会立刻执行

这里分发事件我们也自己重写,因为没有重试机制

    boolean dispatch(Object event, String eventId, Iterator<Subscriber> subscribers) {
                checkNotNull(event);
                checkNotNull(subscribers);
                Queue<Event> queueForThread = queue.get();
                queueForThread.offer(new Event(event, subscribers));
                // 所有都执行成功,才是成功
                boolean executeResult = true;
                if (!dispatching.get()) {
                    dispatching.set(true);
                    try {
                        Event nextEvent;
                        while ((nextEvent = queueForThread.poll()) != null) {
                            while (nextEvent.subscribers.hasNext()) {
    
                                executeResult = nextEvent.subscribers.next().dispatchEvent(nextEvent.event, eventId)
                                        && executeResult;
                            }
                        }
                    } finally {
                        dispatching.remove();
                        queue.remove();
                    }
                }
                return executeResult;
            }

总结

总体改动

  1. 修改封装了了自己的订阅注解CiderSubscribe
  2. 修改了默认的 SubscriberExceptionHandler.handleException异常处理,添加了异常消息放入redis中
  3. 封装了自己的DomainEventBus,利用mq添加事件重试机制

比较SpringCloud Stream

缺点:

  1. 扩展性没有SpringCloud Stream方便
  2. 功能没有SpringCloud Stream多,例如SpringCloud Stream支持多mq的发送和接受(比如RocketMq发送消息,Kafka接受消息),支持消息发送前中后的拦截,支持消息消费前中后的拦截,支持消息的合并,拆分。支持多mq的整合
  3. 如果没有配置mq,没有重试机制

优点:

  1. 基于Guava二次封装源码更可控
  2. 源码简单易懂,对内透明度更高
  3. 功能满足自身义务
  4. 使用更简单轻量

源码下载

后续有需要会上传源代码


原文始发于微信公众号(小奏技术):DDD 领域事件设计(基于Guava EventBus二次封装)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/30486.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!