1. 问题&分析
小艾被拉去复盘了,这次又是宝宝心里苦!
1.1. 案例
刚刚躺下,一阵急促的报警电话袭来,将小艾从睡梦中吵醒,她立即打开监控查看详细的错误日志,看到日志后她悬着的心终于落下,感到非常庆幸。
事件是这样的,前两天接到产品的一个需求,在用户支付成功后,向用户发送短信通知,通知内容为:“亲爱的xxx,你已成功购买 yyyy,我们会尽快为您安排发货”。聪明的小艾想到哪怕是短信通知发送失败也不应该影响正常的业务逻辑,所以,她特意使用 try-catch 来避免异常扩大,具体代码如下:
@PostMapping("paySuccess")
@Transactional
public RestResult<Boolean> paySuccess(@RequestParam Long orderId, @RequestParam String token){
// 验证 token,保障有效性
checkToke(token);
// 加载订单信息
Order order = findById(orderId);
if (order == null){
return RestResult.success(false);
}
// 支付成功,更新订单状态
order.paySuccess();
// 将变更更新到数据库
updateOrder(order);
// 由于对这块逻辑不太放心,所以增加了 try-catch 逻辑
// 避免对主流程的影响
try {
// 发送短信通知
sendSMSNotice(order);
}catch (Exception e){
log.error("Failed to Send SMS Notice for Order {}", order.getId(), e);
}
return RestResult.success(true);
}
真是有惊无险,小艾长叹一口气,正准备放松的时候,又收到一批报警:
-
原本很快的正常请求也出现大规模超时
-
系统 CPU 过高,已经超过了警戒线
-
数据库的链接出现暴增,MySQL CPU 报警
顿时乱成一团,看来事情比想象中要严重太多。
1.2. 问题分析
从整个事上看,小艾已经做了部分防御工作,通过 try-catch 对高风险代码进行特殊处理,哪怕出问题也不会影响主流程。但,这次出现的是网络问题,在读取信息的时候出现超时,工作线程只有等待网络超时后才能继续执行,导致越来越多的线程发生阻塞,最终将系统全部资源耗光,从而影响业务。
具体原因如下图所示:
当系统线程逐步被耗光时,会触发:
-
正常逻辑受到影响:没有足够的线程资源可用,正常流程只能在工作队列中进行等待或直接被抛弃,导致处理时间过长,接口出现大量的超时
-
创建大量线程导致系统CPU过高:当核心线程不够时,请求被放入工作队列中,当工作队列满时,系统会创建更多线程,这些新增线程大大增加 CPU 负担,导致使用率飙升
-
出现大量长事务占用宝贵的数据库链接资源:paySuccess 方法上使用 @Transactional 进行标注,意味着整个方法都受到数据库事务保护,也就是说在进入方法时申请数据库链接,在方法退出时释放数据库链接。方法无法快速返回,导致数据库链接无法被有效复用,系统只能申请更多链接以应对更多的请求;
2. 解决方案
究其原因,这个事件的本质就是:系统间的耦合太重:
-
资源耦合太重。这是事故的根因,触达通知 和 业务逻辑 共用一组线程资源,当 触达通知 出现 IO 超时,导致大量线程处于阻塞状态,无法处理正确的请求,最终影响业务逻辑。
-
逻辑耦合太重。发送触达通知 在支付成功的主流程中,出问题会直接影响主流程(这次特意加了 try-catch)。如果再有类似的需求,整个 paySuccess 方法将变的非常复杂,不利于系统维护
2.1. 自定义线程池实现资源隔离
既然是资源耦合导致的问题,那最简的方式便是进行资源隔离,可以创建一个独立的线程池用于处理 触达通知。具体如下图所示:
调整非常简单,包括:
-
自定义线程。为了控制资源同时避免造成阻塞,需要对 线程数、队列长度 和 拒绝策略 进行定制
-
将发送任务提交到线程池异步处理。在执行完标准的 paySuccess 后,将 发送逻辑 封装成 Runnable 并提交到线程池
核心代码如下:
// 定义独立的线程
private ExecutorService executorService;
@PostConstruct
public void init() {
// 核心线程数 为 CPU 数量
int coreThreads = Runtime.getRuntime().availableProcessors();
// 【控制资源】最大线程数 为 核心线程数 的 5 倍
int maxThreads = coreThreads * 5;
// 【控制资源】队列长度为 1024
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1024);
// 【避免阻塞】线程 和 队列 资源耗尽后,直接抛弃最老的任务,避免造成阻塞
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
// 自定义线程工厂
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("SMS-Send-Thread-%d")
.daemon(true)
.uncaughtExceptionHandler((t, e) -> log.error("Failed to run task", e))
.build();
// 自定义线程池
this.executorService = new ThreadPoolExecutor(coreThreads, maxThreads,
5, TimeUnit.MINUTES,
queue,
threadFactory,
rejectedExecutionHandler
);
}
@PostMapping("paySuccessUseThreadPool")
@Transactional
public RestResult<Boolean> paySuccess(@RequestParam Long orderId, @RequestParam String token){
// 验证 token,保障有效性
checkToke(token);
// 加载订单信息
Order order = findById(orderId);
if (order == null){
return RestResult.success(false);
}
// 支付成功,更新订单状态
order.paySuccess();
// 将变更更新到数据库
updateOrder(order);
// 【核心改动点】将任务提交到独立的线程池,避免受影响
// 避免对主流程的影响
this.executorService.execute(() -> sendSMSNotice(order));
return RestResult.success(true);
}
这样就实现了百分百的资源隔离,再也不用担心 发送短信通知 影响业务逻辑。
2.2. 使用 Spring Event 实现解耦
自定义线程池能实现非常好的资源隔离,但对于逻辑隔离帮助不大,也就是说每次增加 “支付成功后xxx” 这样的需求都需要在 paySuccess 方法中增加代码,非常不便于维护,万一新增代码中有bug,主流程仍旧会受到影响。此时,我们可以引入 Spring Event 实现两者的彻底解耦。
如下图所示:
在引入 Spring Event 后,paySuccess 只需对外发布领域事件,而下游只需通过 @EventListener 注解便能完成对事件的监听。这样,再有类似的需求,便不用在 paySuccess 方法添加代码,只需增加一个新类即可。
核心代码如下:
@Autowired
private ApplicationEventPublisher eventPublisher;
@PostMapping("paySuccessUseEvent")
@Transactional
public RestResult<Boolean> paySuccess(@RequestParam Long orderId, @RequestParam String token){
// 验证 token,保障有效性
checkToke(token);
// 加载订单信息
Order order = findById(orderId);
if (order == null){
return RestResult.success(false);
}
// 支付成功,更新订单状态
order.paySuccess();
// 将变更更新到数据库
updateOrder(order);
// 【核心改动点】
// 发布自己的领域事件,其他下游逻辑直接监听事件即可
// 增加下游处理逻辑时,无需对 此方法 进行修改
eventPublisher.publishEvent(new OrderPaidEvent(order));
return RestResult.success(true);
}
// 发送短信通知任务处理器
@Component
@Slf4j
public class SMSSendNoticeListener {
/**
* 1. 对 OrderPaidEvent 事件进行处理 <br />
* 2. 使用 sendSMSTaskExecutorService 线程池进行异步处理
* @param event
*/
@EventListener
@Async("sendSMSTaskExecutorService")
public void handle(OrderPaidEvent event){
sendSMSNotice(event.getOrder());
}
/**
* 自定义线程池
*/
@Bean
public Executor sendSMSTaskExecutorService(){
// 核心线程数 为 CPU 数量
int coreThreads = Runtime.getRuntime().availableProcessors();
// 【控制资源】最大线程数 为 核心线程数 的 5 倍
int maxThreads = coreThreads * 5;
// 【控制资源】队列长度为 1024
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1024);
// 【避免阻塞】线程 和 队列 资源耗尽后,直接抛弃最老的任务,避免造成阻塞
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
// 自定义线程工厂
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("SMS-Send-Thread-%d")
.daemon(true)
.uncaughtExceptionHandler((t, e) -> log.error("Failed to run task", e))
.build();
// 自定义线程池
return new ThreadPoolExecutor(coreThreads, maxThreads,
5, TimeUnit.MINUTES,
queue,
threadFactory,
rejectedExecutionHandler
);
}
}
该方案具有以下特点:
-
基于 Spring Event 的发布订阅机制实现业务逻辑的解耦。消息发送者(paySuccess 方法) 和 消息处理者( SendSMSNotice 以及其他需求)实现了解耦,两者可以独自发展互不影响;
-
基于 线程池 实现资源隔离。@Async 是 Spring 提供的异步方法调用,当一个方法被@Async注解标记后,每当该方法被调用时,它都会在一个新的线程中运行,不会阻塞主线程
但,仍旧有一个缺陷:这些逻辑仍旧在同一个服务器上运行,没有做到彻底的资源隔离。
如果后台的异步任务耗费太多的资源,与前台任务出现资源争夺时,将严重影响系统的响应时间。所以,在部署时,强烈建议将前台和后台任务部署到不同的集群上,从而从根源上杜绝由于资源争夺而产生的相互影响。
2.3. 使用 MQ 实现解耦
为了彻底的对资源进行隔离,需要引入 MQ 中间件,将发送方、消费方 隔离到不同的集群。
如下图所示:
发送方核心代码如下:
@Autowired
private RocketMQTemplate rocketMQTemplate;
@PostMapping("paySuccessUseMQ")
@Transactional
public RestResult<Boolean> paySuccess(@RequestParam Long orderId, @RequestParam String token){
// 验证 token,保障有效性
checkToke(token);
// 加载订单信息
Order order = findById(orderId);
if (order == null){
return RestResult.success(false);
}
// 支付成功,更新订单状态
order.paySuccess();
// 将变更更新到数据库
updateOrder(order);
// 【核心改动点】
// 向 MQ 中发送领域事件,其他下游逻辑直接消费事件即可
// 增加下游处理逻辑时,无需对 此方法 进行修改
rocketMQTemplate.convertAndSend("order_event", new OrderPaidEvent(order));
return RestResult.success(true);
}
消费方核心代码如下:
@Component
@Slf4j
@RocketMQMessageListener(topic = "order_event", consumerGroup = "send-sms-notice-group")
public class SMSSendNoticeConsumer implements RocketMQListener<OrderPaidEvent> {
@Override
public void onMessage(OrderPaidEvent event) {
sendSMSNotice(event.getOrder());
}
}
至此,终于实现了彻底的资源隔离。
3. 示例&源码
代码仓库:https://gitee.com/litao851025/learnFromBug
代码地址:https://gitee.com/litao851025/learnFromBug/tree/master/src/main/java/com/geekhalo/demo/mq/event
原文始发于微信公众号(geekhalo):【故障现场】下游服务出问题却险些要了我的命
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/235007.html