概述
由于分布式系统节点众多,排查错误日志要涉及到多个节点,如果在多个节点中没有唯一的请求id来把各个节点的请求日志串联起来,那么查询起来就会耗时耗力,因此Spring Sleuth
出现了(Spring Sleuth基于Google dapper论文实现,详细了解可以查看此论文),Sleuth会在接收请求的入口通过Filter生成唯一的标识TraceId
,这个TraceId会一直跟随请求路径传递到各个节点,只要有日志输出就会把TraceId的值打印出来,如下图(正常还会生成SpanId,为了便于理解没展现)
假如线上发生问题,要排查日志,那么根据这个TraceId
,就能够快速查询到各个节点对应的请求日志,但是唯一的遗憾是异步执行会丢失TraceId,因此这里介绍异步跨线程下如何保证TraceId不丢失的问题
我们在官方文档中找到了异步传递Traceid说明,如下图
大致意思Sleuth默认支持@Async
传递TraceId,并且支持spring.sleuth.async.enabled
进行控制,同时提供了
LazyTraceExecutor
TraceableExecutorService
TraceableScheduledExecutorService
线程包装类,来支持跨线程传递TraceId,其中TraceableScheduledExecutorService
是ScheduledExecutorService类的实现,用于实现定时任务触发,个人觉得这种需求不是特别多,所以只介绍常用的一些配置,比如@Async
配置、线程池配置、EventBus
配置,具体查看后续章节
Asnc配置
默认Sleuth是支持@Async注解
异步传递TraceId的,但是如果自定义线程池,配置不对的情况可能就会导致失效,因为Spring在这快有个bug,详细了解请查看以下链接:
所以正确配置方法有如下3种
配置方法
方式1(推荐)
这里用到了Sleuth的LazyTraceExecutor包装了线程池,这样可以保证trace对象传到下一个线程中
@Configuration
@EnableAsync
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SpringAsyncConfig extends AsyncConfigurerSupport {
@Autowired
private BeanFactory beanFactory;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("AsyncExecutor-");
executor.initialize();
return new LazyTraceExecutor(this.beanFactory, executor);
}
}
复制代码
方式2
Sleuth初始化时会默认查找TaskExecutor作为Async的线程池,如果查找不到会获取默认的线程池
@EnableAsync
@Configuration
public class WebConfig {
@Bean
public TaskExecutor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("AsyncExecutor-");
executor.initialize();
return executor;
}
}
复制代码
方式3
如果默认不配置任何线程池,只在工程中加了@EnableAsync
注解,那么Sleuth会使用自带的线程池SimpleAsyncTaskExecutor
,这个线程池每次调用都会创建新线程,如果调用量比较多,创建的线程也会非常多,我们知道系统资源是有限的,如果线程数过多,会导致程序内存吃紧,从而导致OOM,所以不推荐使用这种方式
测试验证
测试代码
Async配置
@Configuration
@EnableAsync
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SpringAsyncConfig extends AsyncConfigurerSupport {
@Autowired
private BeanFactory beanFactory;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("AsyncExecutor-");
executor.initialize();
return new LazyTraceExecutor(this.beanFactory, executor);
}
}
复制代码
Service
@Service
@Slf4j
public class TestService {
@Async
public void printAsyncLog() {
log.info("async log.....");
}
}
复制代码
Controller
@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {
@Autowired
private TestService testService;
@RequestMapping(value = "/print/log", method = RequestMethod.GET)
public String printLog() {
log.info("sync log.....1.....");
testService.printAsyncLog();
log.info("sync log.....2.....");
return "success";
}
}
复制代码
请求测试
执行请求test/async/print/log,输出以下信息,可以看到TraceId一样,只有Spanid发生了变化,线程名称前缀AsyncExecutor
与设置前缀相同
19:44:54.818, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com.example.elkdemo.web.AsyncTestWeb printLog:30 - sync log.....1.....
19:44:54.819, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com.example.elkdemo.web.AsyncTestWeb printLog:32 - sync log.....2.....
19:44:54.819, [fae1c9449e12695f 2d51edbb45896bd8] [AsyncExecutor-2] INFO [] c.e.elkdemo.service.TestService printAsyncLog:50 - async log.....
复制代码
线程池配置
线程池执行是通过TraceableExecutorService
包装了ExecutorService
,而且在初始化的时候需要注入进去BeanFactory
对象,所以线程池作为全局变量和局部变量配置稍有不同,注意下面线程池设置只是示例代码,实际运用中可以根据需求自行修改
全局变量配置
构造函数初始化(推荐)
@Service
@Slf4j
public class TestService{
final BeanFactory beanFactory;
private TraceableExecutorService traceableExecutorService;
public TestService(BeanFactory beanFactory1) {
this.beanFactory = beanFactory1;
this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
/**
* 异步输出线程池日志
*/
public void printThreadPoolLog() {
traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}
}
复制代码
单例初始化
@Service
@Slf4j
public class TestService {
@Autowired
private BeanFactory beanFactory;
volatile TraceableExecutorService traceableExecutorService;
public TraceableExecutorService getTraceableExecutorService() {
if (traceableExecutorService == null) {
synchronized (TraceableExecutorService.class) {
if (traceableExecutorService == null) {
traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
}
}
return traceableExecutorService;
}
/**
* 异步输出线程池日志
*/
public void printThreadPoolLog() {
TraceableExecutorService executorService = getTraceableExecutorService();
executorService.execute(() -> log.info("async thread pool log....."));
}
}
复制代码
通过InitializingBean的afterPropertiesSet进行初始化
@Service
@Slf4j
public class TestService implements InitializingBean {
@Autowired
private BeanFactory beanFactory;
private TraceableExecutorService traceableExecutorService;
@Override
public void afterPropertiesSet() {
traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
/**
* 异步输出线程池日志
*/
public void printThreadPoolLog() {
traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}
}
复制代码
局部变量配置
/**
* 异步输出线程池日志
*/
public void printThreadPoolLog2() {
TraceableExecutorService executorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
executorService.execute(() -> log.info("async thread pool log....."));
}
复制代码
测试验证
这里采用全局变量配置方式测试
测试代码
Controller
@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {
@Autowired
private TestService testService;
@RequestMapping(value = "/print/threadPool/log", method = RequestMethod.GET)
public String printThreadPoolLog() {
log.info("sync log.....1.....");
testService.printThreadPoolLog();
log.info("sync log.....2.....");
return "success";
}
}
复制代码
Service
service采用构造函数方式进行初始化
@Service
@Slf4j
public class TestService{
final BeanFactory beanFactory;
private TraceableExecutorService traceableExecutorService;
public TestService(BeanFactory beanFactory1) {
this.beanFactory = beanFactory1;
this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
/**
* 异步输出线程池日志
*/
public void printThreadPoolLog() {
traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}
}
复制代码
请求测试
执行请求/test/async/print/threadPool/log,输出以下信息,可以看到Traceid一样,只有Spanid发生了变化
19:35:13.799, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:38 - sync log.....1.....
19:35:13.801, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:40 - sync log.....2.....
19:35:13.801, [884212fb58c658c5 70008b8d3a97602d] [pool-4-thread-2] INFO [] c.e.elkdemo.service.TestService lambda$printThreadPoolLog$0:37 - async thread pool log.....
复制代码
EventBus配置
EventBus
配置与线程池配置类似,把TraceableExecutorService
注入到AsyncEventBus
中即可,因TraceableExecutorService
类引用了BeanFactory
实例,所以比原生方式复杂了一点,以下只介绍构造函数的初始化方式,其他初始化方式与线程池配置类似,所以这里就不再举例说明
构造函数进行初始化
@Component
@Slf4j
public class PushEventBus {
private EventBus eventBus;
public PushEventBus(BeanFactory beanFactory) {
Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
this.eventBus = new AsyncEventBus(traceableExecutorService);
}
public void register(Object obj) {
eventBus.register(obj);
}
public void post(Object obj) {
eventBus.post(obj);
}
}
复制代码
测试验证
测试代码
EventBus
@Component
@Slf4j
public class PushEventBus {
private EventBus eventBus;
public PushEventBus(BeanFactory beanFactory) {
Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
this.eventBus = new AsyncEventBus(traceableExecutorService);
}
public void register(Object obj) {
eventBus.register(obj);
}
public void post(Object obj) {
eventBus.post(obj);
}
}
复制代码
监听类
@Slf4j
public class EventListener {
/**
* 监听 Integer 类型的消息
*/
@Subscribe
public void listenInteger(Integer param) {
log.info("EventListener#listenInteger->{}",param);
}
/**
* 监听 String 类型的消息
*/
@Subscribe
public void listenString(String param) {
log.info("EventListener#listenString->{}",param);
}
}
复制代码
controller
@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {
@Autowired
private PushEventBus pushEventBus;
@RequestMapping(value = "/print/guava/log", method = RequestMethod.GET)
public String printGuavaLog() {
pushEventBus.register(new EventListener());
log.info("sync log.....1.....");
pushEventBus.post("11");
log.info("sync log.....2.....");
return "success";
}
}
复制代码
请求测试
执行请求/test/async/print/guava/log,输出以下信息,可以看到Traceid一样,只有Spanid发生了变化
19:27:44.234, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:48 - sync log.....1.....
19:27:44.236, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:50 - sync log.....2.....
19:27:44.236, [50844e0d3909868c 702bf55c84873f17] [pool-3-thread-1] INFO [] c.e.elkdemo.service.EventListener listenString:21 - EventListener#listenString->11
复制代码
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/94716.html