本文主要解决:Spring @Async 注解多线程导致Spring的注解@Transactional失效问题!
问题:多线程为什么会导致事务注解@Transactional失效
实现AOP的方法有动态代理、编译期,类加载期织入等等,Spring实现AOP的方法则就是利用了动态代理机制,正因如此,才会导致某些情况下@Async和@Transactional不生效。
spring多线程的使用:
@Async注解使用如下
@EnableAsync //添加此注解开启异步调用(可用在配置类上,也可在启动类上标注)
public class ProviderApplication{
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}
线程池配置采用自定义线程池配置类即可例如:
@Configuration
@EnableAsync
public class TaskPoolConfig {
@Autowired
private ThreadPoolProperties threadPoolProperties;
public final static String TASK_EXECUTOR="taskExecutor";
@Bean(TASK_EXECUTOR)
public Executor taskExecutor(){
//使用VisiableThreadPoolTaskExecutor 监控线程池清空
ThreadPoolTaskExecutor taskExecutor=new VisiableThreadPoolTaskExecutor();
//配置核心线程数
taskExecutor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
//配置最大线程数
taskExecutor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
//配置队列大小
taskExecutor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
//配置线程池中的线程的名称前缀
taskExecutor.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix());
//配置非核心线程超时时间
taskExecutor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
taskExecutor.initialize();
return taskExecutor;
}
}
yml配置如下:
executor:
corePoolSize: 20
maxPoolSize: 100
queueCapacity: 20
keepAliveSeconds: 60
threadNamePrefix: XCExecutor-
配置类中采用的 ThreadPoolProperties 类,是读取spring的yml配置文件获取:
@ConfigurationProperties(value = "executor")
@Component
public class ThreadPoolProperties {
/**
* 核心线程数量
*/
private Integer corePoolSize;
/**
* 当核心线程都在跑任务,还有多余的任务会存到此处
*/
private Integer maxPoolSize;
/**
* 如果queueCapacity存满了,还有任务就会启动更多的线程,直到线程数达到maxPoolSize。如果还有任务,则根据拒绝策略进行处理
*/
private Integer queueCapacity;
/**
* 非核心线程的超时时长,超长后会被回收
*/
private Integer keepAliveSeconds;
/**
* 线程名称前缀
*/
private String threadNamePrefix;
public Integer getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(Integer corePoolSize) {
this.corePoolSize = corePoolSize;
}
public Integer getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(Integer maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public Integer getQueueCapacity() {
return queueCapacity;
}
public void setQueueCapacity(Integer queueCapacity) {
this.queueCapacity = queueCapacity;
}
public Integer getKeepAliveSeconds() {
return keepAliveSeconds;
}
public void setKeepAliveSeconds(Integer keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}
public String getThreadNamePrefix() {
return threadNamePrefix;
}
public void setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}
}
配置类中的VisiableThreadPoolTaskExecutor类是负责打印线程的线程池运行状况打印,代码如下:
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private final static Logger logger= LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
private void showThreadPoolInfo(String prefix){
ThreadPoolExecutor executor=getThreadPoolExecutor();
if(null==executor){
return;
}
// @TODO taskCount 任务总数 completedTaskCount 已完成数 activeCount 活跃线程数 queueSize 队列大小
logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
executor.getTaskCount(),
executor.getCompletedTaskCount(),
executor.getActiveCount(),
executor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
当任务达到量多,单线程情况处理较慢可采用多线程的方式提高效率,且任务不需要即时获取结果(调用第三方接口,等api),使用只需要将@Async标注在需要多线程执行的方法上,例如:
@Component
public class ThreadTask{
//注解标注中指向使用哪个线程池
@Async(TaskPoolConfig.TASK_EXECUTOR)
public Future<String> startTask(String id) {
//Future类是异步线程返回的执行结果,本文暂不做过多介绍
try {
//执行业务代码
} catch (Exception e) {
e.printStackTrace();
return new AsyncResult<>("错误:"+e.getMessage());
}
return null;
}
}
多线程的异步调用未成功大致分为以下三种问题:
1.没有在@SpringBootApplication启动类当中添加注解@EnableAsync注解。
2.异步方法使用注解@Async的返回值只能为void或者Future。
3.没有走Spring的代理类。因为@Transactional和@Async注解的实现都是基于Spring的AOP,而AOP的实现是基于动态代理模式实现的。那么注解失效的原因就很明显了,有可能因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器。
接下来查看详细测试过程:
直接添加@Transactional注解
可以很明显的看见,我的代码虽然报错了,但是事务依旧未生效,接下来尝试手动提交事务
可以看见,手动提交事务就可以使spring的事务管理器生效。这是为什么呢,抱着知其然知其所以然的心态我们再往源码层面探究:
当Spring发现@Transactional或者@Async时,会自动生成一个ProxyObject,如:
此时调用Class.transactionTask会调用ProxyClass.产生事务操作。
然而当Class里的一个非事务方法调用了事务方法,ProxyClass是这样的:
到这里应该可以看明白了,如果调用了noTransactionTask方法,最终会调用到Class.transactionTask,而这个方法是不带有任何Transactional的信息的,也就是@Transactional根本没有生效哦。
简单来说就是: 同一个类内这样调用的话,只有第一次调用了动态代理生成的ProxyClass,之后一直用的是不带任何切面信息的方法本身。
TransactionDefintion类常量解析:
事务的传播级别:
- //事务传播级别1:当前如果有事务,Spring就会使用该事务;否则会开始一个新事务;(这也是默认设置和定义)
int PROPAGATION_REQUIRED = 0;
- //事务传播级别2:如果有事务,Spring就会使用该事务;否则不会开始一个新事务
int PROPAGATION_SUPPORTS = 1;
- //事务传播级别3:当前如果有事务,Spring就会使用该事务;否则会因为没有事务而抛出异常
int PROPAGATION_MANDATORY = 2;
- //事务传播级别4:总是要开启一个新事务。如果当前已经有事务,则将已有事务挂起
int PROPAGATION_REQUIRES_NEW = 3;
- //事务传播级别5:代码总是在非事务环境下执行,如果当前有事务,则将已有事务挂起,再执行代码,之后恢复事务
int PROPAGATION_NOT_SUPPORTED = 4;
- //事务传播级别6:绝对不允许代码在事务中执行。如果当前运行环境有事务存在,则直接抛出异常,结束运行
int PROPAGATION_NEVER = 5;
- //事务传播级别7:该级别支持嵌套事务执行。如果没有父事务存在,那么执行情况与PROPAGATION_REQUIRED一样;典型的应用是批量数据入库,开启父事务对一批数据入库,而对于每条入库的数据都有一个子事务对应,那么当所有的子事务成功,父事务提交,才算成功,否则,就都失败
int PROPAGATION_NESTED = 6;
事务的隔离级别:
- //事务隔离级别1:默认的隔离级别,同数据库一样的,如果不做特别设置,mysql默认的是可重复读,而oracle默认的是读提交
int ISOLATION_DEFAULT = -1; int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;
- //事务隔离级别2:读未提交,即一个事务可以读取另外一个事务中未提交的数据,即脏读数据存在,性能最好,但是没啥用。
int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;
- //事务隔离级别3:读提交,即一个事务只能读取到另一个事务提交后的数据,oracle数据库默认隔离级别;存在不可重复读问题,即交叉事务出现,A事务两次读取数据可能会读到B事务提交的修改后的数据,即在同一个事务中读到了不同的数据,也叫不可重复读
int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;
- //事务隔离级别4:可重复读,即一个事务只能读取到在次事务之前提交的数据,而之后提交不能读取到,不管对方的事务是否提交都读取不到,mysql默认的隔离级别。此隔离级别有可能会遇到幻读现象,但是mysql
基于innodb引擎实现的数据库已经通过多版本控制解决了此问题,所以可以不考虑了。int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;
- //事务隔离级别5:序列化读,每次都是全表锁,性能最差,安全性最高,一般场景不适用,也没有这个必要。
在开发的过程中,用事务最多的方式是通过注解@Transaction来完成的,虽然大多数的业务场景都可以在这一个注解下完成即可。
但是为了适应某些特别的场景比方说只读方法的优化等,通过对@Transaction来添加参数来完成我们想要的事务传播特性和隔离级别,以及是否只对某些异常类做回滚,是否只读方法等。
TransactionStatus接口详解:
是否是一个新的事物 boolean isNewTransaction();
判断是否有回滚点 boolean hasSavepoint();
将一个事务标识为不可提交的。在调用完setRollbackOnly()后只能被回滚
在大多数情况下,事务管理器会检测到这一点,在它发现事务要提交时会立刻结束事务。
调用完setRollbackOnly()后,数数据库可以继续执行select,但不允许执行update语句,因为事务只可以进行读取操作,任何在这里插入代码片修改都不会被提交。
void setRollbackOnly(); boolean isRollbackOnly(); @Override void
flush(); 判断事物是否已经完成 boolean isCompleted();
1.创建回滚点 Object createSavepoint() throws TransactionException;
2.回滚到回滚点 void rollbackToSavepoint(Object savepoint) throws TransactionException;
3.释放回滚点 void releaseSavepoint(Object savepoint) throws TransactionException;
总结:
在多线程中spring的事务管理器注解@Transactional会失效,因此@Async与@Transactional不可使用在同一个方法上;
如在多线程并且多数据源的情况下使用事务,采用注入指定数据源的方式和手动提交事务及回滚事务;
多数据源情况下,线程使用的数据源来自主线程采用的数据;
如有不足或错误,请批评指正;
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/5803.html