Spring @Async 注解多线程导致Spring的注解@Transactional失效问题

导读:本篇文章讲解 Spring @Async 注解多线程导致Spring的注解@Transactional失效问题,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

本文主要解决:Spring @Async 注解多线程导致Spring的注解@Transactional失效问题!

问题:多线程为什么会导致事务注解@Transactional失效

实现AOP的方法有动态代理、编译期,类加载期织入等等,Spring实现AOP的方法则就是利用了动态代理机制,正因如此,才会导致某些情况下@Async和@Transactional不生效。

spring多线程的使用:

@Async注解使用如下

@EnableAsync //添加此注解开启异步调用(可用在配置类上,也可在启动类上标注)
public class ProviderApplication{
    public static void main(String[] args) {
        SpringApplication.run(ProviderApplication.class, args);
    }
}

线程池配置采用自定义线程池配置类即可例如:

@Configuration
@EnableAsync
public class TaskPoolConfig {
    @Autowired
    private ThreadPoolProperties threadPoolProperties;
    
    public final static String TASK_EXECUTOR="taskExecutor";

    @Bean(TASK_EXECUTOR)
    public Executor taskExecutor(){
        //使用VisiableThreadPoolTaskExecutor 监控线程池清空
        ThreadPoolTaskExecutor taskExecutor=new VisiableThreadPoolTaskExecutor();
        //配置核心线程数
        taskExecutor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
        //配置最大线程数
        taskExecutor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
        //配置队列大小
        taskExecutor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
        //配置线程池中的线程的名称前缀
        taskExecutor.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix());
        //配置非核心线程超时时间
        taskExecutor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 执行初始化
        taskExecutor.initialize();
        return taskExecutor;
    }
}

yml配置如下:

executor:
  corePoolSize: 20
  maxPoolSize: 100
  queueCapacity: 20
  keepAliveSeconds: 60
  threadNamePrefix: XCExecutor-

配置类中采用的 ThreadPoolProperties 类,是读取spring的yml配置文件获取:

@ConfigurationProperties(value = "executor")
@Component
public class ThreadPoolProperties {

    /**
     * 核心线程数量
     */
    private Integer corePoolSize;
    /**
     * 当核心线程都在跑任务,还有多余的任务会存到此处
     */
    private Integer maxPoolSize;
    /**
     * 如果queueCapacity存满了,还有任务就会启动更多的线程,直到线程数达到maxPoolSize。如果还有任务,则根据拒绝策略进行处理
     */
    private Integer queueCapacity;
    /**
     * 非核心线程的超时时长,超长后会被回收
     */
    private Integer keepAliveSeconds;
    /**
     * 线程名称前缀
     */
    private String threadNamePrefix;

    public Integer getCorePoolSize() {
        return corePoolSize;
    }

    public void setCorePoolSize(Integer corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    public Integer getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(Integer maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

    public Integer getQueueCapacity() {
        return queueCapacity;
    }

    public void setQueueCapacity(Integer queueCapacity) {
        this.queueCapacity = queueCapacity;
    }

    public Integer getKeepAliveSeconds() {
        return keepAliveSeconds;
    }

    public void setKeepAliveSeconds(Integer keepAliveSeconds) {
        this.keepAliveSeconds = keepAliveSeconds;
    }

    public String getThreadNamePrefix() {
        return threadNamePrefix;
    }

    public void setThreadNamePrefix(String threadNamePrefix) {
        this.threadNamePrefix = threadNamePrefix;
    }
}

配置类中的VisiableThreadPoolTaskExecutor类是负责打印线程的线程池运行状况打印,代码如下:

public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private final static Logger logger= LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);

    private void showThreadPoolInfo(String prefix){
        ThreadPoolExecutor executor=getThreadPoolExecutor();

        if(null==executor){
            return;
        }
        // @TODO taskCount 任务总数 completedTaskCount 已完成数 activeCount 活跃线程数 queueSize 队列大小
        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                executor.getTaskCount(),
                executor.getCompletedTaskCount(),
                executor.getActiveCount(),
                executor.getQueue().size());

    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

当任务达到量多,单线程情况处理较慢可采用多线程的方式提高效率,且任务不需要即时获取结果(调用第三方接口,等api),使用只需要将@Async标注在需要多线程执行的方法上,例如:

@Component
public class ThreadTask{
	//注解标注中指向使用哪个线程池
	@Async(TaskPoolConfig.TASK_EXECUTOR)
	public Future<String> startTask(String id) {
		//Future类是异步线程返回的执行结果,本文暂不做过多介绍
        try {
            //执行业务代码
        } catch (Exception e) {
            e.printStackTrace();
            return new AsyncResult<>("错误:"+e.getMessage());
        }
        return null;
    }
}

多线程的异步调用未成功大致分为以下三种问题:

1.没有在@SpringBootApplication启动类当中添加注解@EnableAsync注解。
2.异步方法使用注解@Async的返回值只能为void或者Future。
3.没有走Spring的代理类。因为@Transactional和@Async注解的实现都是基于Spring的AOP,而AOP的实现是基于动态代理模式实现的。那么注解失效的原因就很明显了,有可能因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器。

接下来查看详细测试过程:

直接添加@Transactional注解

在这里插入图片描述

在这里插入图片描述

可以很明显的看见,我的代码虽然报错了,但是事务依旧未生效,接下来尝试手动提交事务

在这里插入图片描述
在这里插入图片描述

可以看见,手动提交事务就可以使spring的事务管理器生效。这是为什么呢,抱着知其然知其所以然的心态我们再往源码层面探究:
当Spring发现@Transactional或者@Async时,会自动生成一个ProxyObject,如:

在这里插入图片描述

此时调用Class.transactionTask会调用ProxyClass.产生事务操作。
然而当Class里的一个非事务方法调用了事务方法,ProxyClass是这样的:

在这里插入图片描述

到这里应该可以看明白了,如果调用了noTransactionTask方法,最终会调用到Class.transactionTask,而这个方法是不带有任何Transactional的信息的,也就是@Transactional根本没有生效哦。
简单来说就是: 同一个类内这样调用的话,只有第一次调用了动态代理生成的ProxyClass,之后一直用的是不带任何切面信息的方法本身。

TransactionDefintion类常量解析:
事务的传播级别:

  1. //事务传播级别1:当前如果有事务,Spring就会使用该事务;否则会开始一个新事务;(这也是默认设置和定义)
    int PROPAGATION_REQUIRED = 0;
    
  2. //事务传播级别2:如果有事务,Spring就会使用该事务;否则不会开始一个新事务
    int PROPAGATION_SUPPORTS = 1;
    
  3. //事务传播级别3:当前如果有事务,Spring就会使用该事务;否则会因为没有事务而抛出异常
    int PROPAGATION_MANDATORY = 2;    
    
  4. //事务传播级别4:总是要开启一个新事务。如果当前已经有事务,则将已有事务挂起
    int PROPAGATION_REQUIRES_NEW = 3;
    
  5. //事务传播级别5:代码总是在非事务环境下执行,如果当前有事务,则将已有事务挂起,再执行代码,之后恢复事务
    int PROPAGATION_NOT_SUPPORTED = 4;
    
  6. //事务传播级别6:绝对不允许代码在事务中执行。如果当前运行环境有事务存在,则直接抛出异常,结束运行
    int PROPAGATION_NEVER = 5;
    
  7. //事务传播级别7:该级别支持嵌套事务执行。如果没有父事务存在,那么执行情况与PROPAGATION_REQUIRED一样;典型的应用是批量数据入库,开启父事务对一批数据入库,而对于每条入库的数据都有一个子事务对应,那么当所有的子事务成功,父事务提交,才算成功,否则,就都失败
    int PROPAGATION_NESTED = 6;
    

事务的隔离级别:

  1. //事务隔离级别1:默认的隔离级别,同数据库一样的,如果不做特别设置,mysql默认的是可重复读,而oracle默认的是读提交
    int ISOLATION_DEFAULT = -1;
    int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;   
    
  2. //事务隔离级别2:读未提交,即一个事务可以读取另外一个事务中未提交的数据,即脏读数据存在,性能最好,但是没啥用。
    int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;
    
  3. //事务隔离级别3:读提交,即一个事务只能读取到另一个事务提交后的数据,oracle数据库默认隔离级别;存在不可重复读问题,即交叉事务出现,A事务两次读取数据可能会读到B事务提交的修改后的数据,即在同一个事务中读到了不同的数据,也叫不可重复读
    int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;   
    
  4. //事务隔离级别4:可重复读,即一个事务只能读取到在次事务之前提交的数据,而之后提交不能读取到,不管对方的事务是否提交都读取不到,mysql默认的隔离级别。此隔离级别有可能会遇到幻读现象,但是mysql
    基于innodb引擎实现的数据库已经通过多版本控制解决了此问题,所以可以不考虑了。
    int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE; 
    
  5. //事务隔离级别5:序列化读,每次都是全表锁,性能最差,安全性最高,一般场景不适用,也没有这个必要。
    在开发的过程中,用事务最多的方式是通过注解@Transaction来完成的,虽然大多数的业务场景都可以在这一个注解下完成即可。
    但是为了适应某些特别的场景比方说只读方法的优化等,通过对@Transaction来添加参数来完成我们想要的事务传播特性和隔离级别,以及是否只对某些异常类做回滚,是否只读方法等。

TransactionStatus接口详解:

是否是一个新的事物 boolean isNewTransaction();
判断是否有回滚点 boolean hasSavepoint();
将一个事务标识为不可提交的。在调用完setRollbackOnly()后只能被回滚
在大多数情况下,事务管理器会检测到这一点,在它发现事务要提交时会立刻结束事务。
调用完setRollbackOnly()后,数数据库可以继续执行select,但不允许执行update语句,因为事务只可以进行读取操作,任何在这里插入代码片修改都不会被提交。
void setRollbackOnly(); boolean isRollbackOnly(); @Override void
flush(); 判断事物是否已经完成 boolean isCompleted();
1.创建回滚点 Object createSavepoint() throws TransactionException;
2.回滚到回滚点 void rollbackToSavepoint(Object savepoint) throws TransactionException;
3.释放回滚点 void releaseSavepoint(Object savepoint) throws TransactionException;

总结:

在多线程中spring的事务管理器注解@Transactional会失效,因此@Async与@Transactional不可使用在同一个方法上;
如在多线程并且多数据源的情况下使用事务,采用注入指定数据源的方式和手动提交事务及回滚事务;
多数据源情况下,线程使用的数据源来自主线程采用的数据;

如有不足或错误,请批评指正;

本文参考:https://www.jianshu.com/p/9a0de6577ed7

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/5803.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!