一、Spring 事务执行流程
我们在使用Spring事务机制时,需要在配置类创建一个事务管理器的实例,用于处理事务提交、回滚、关闭等操作
定义一个DataSourceTransactionManager实例的事务管理器
@Bean
public PlatformTransactionManager transactionManager() {
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
transactionManager.setDataSource(dataSource());
return transactionManager;
}
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setUrl("jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8&useSSL=false");
dataSource.setUsername("root");
dataSource.setPassword("ROOT");
return dataSource;
}
在详细介绍Spring事务执行的源码之前,先大概介绍一个事务执行的大概流程,其中重点介绍一下Spring事务的传播机制
1.1 基本流程
在上面生成Advise实例中,生成的TransactionInterceptor对象其实就是一个methodInterceptor对象,在执行事务方法时,会调用TransactionInterceptor对象的invoke()方法,基本执行流程为:
- 利用所配置的PlatformTransactionManager事务管理器新建一个数据库连接
- 修改数据库连接的autocommit为false,这样才能由Spring来控制提交或回滚逻辑
- 执行MethodInvocation.proceed()方法,简单理解就是去执行事务方法,其中就会执行SQL
- 如果没有异常,则提交
- 如果抛出异常,就回滚
1.2 Spring事务传播机制
然而在真正的开发工作中,肯定还会出现方法之间的调用,以a()方法和b()方法为例,a()方法中调用b()方法,那么a()方法中的事务是否也可以用在b()方法中呢?
针对这个问题,Spring提供了事务传播机制,用来解决方法调用时,事务的作用域问题
下面以一个简单的例子介绍一个Spring事务传播机制的实现过程,假设a()方法和b()方法都使用了@Transactional注解,a()方法在一个事务中执行,调用b()方法时,需要重新开一个事务执行:
- 代理对象执行a()方法之前,需要先利用事务管理器新建一个数据库连接conn
- 将数据库连接conn的autocommit改为false
- 把数据库连接conn设置到ThreadLocal中,(这样可以保证同一个线程中,用的都是用一个数据库连接)
- 执行a()方法中的SQL
- 执行a()方法过程中,调用b()方法(注意用代理对象调用,否则事务不生效)
- 代理对象执行b()方法前,首先判断当前线程已经存在一个数据库连接conn,表示当前线程已经拥有了一个Spring事务,则将该事务挂起
- 挂起就是把ThreadLoacl中的数据库连接conn从ThreadLocal中移除,并放入到一个挂起资源对象
- 挂起完成后,再利用事务管理器新建一个数据库连接reconn
- 将数据库连接reconn的autocommit设置为false
- 把数据库连接reconn设置到ThreadLocal中
- 执行b()方法中的SQL
- b()方法执行完成之后,则从ThreadLocal中拿到数据库连接reconn进行提交
- 提交完成之后恢复所挂起的数据库连接conn,这里的恢复就是把在挂起资源对象中所保存的数据库连接conn再次设置到ThreadLocal中
- a()方法正常执行完,则从ThreadLocal中拿到数据库连接conn进行提交
注:在执行某方方法时,判断当前是否存在一个事务,就是判断当前线程的ThreadLocal中是否存在一个数据库连接对象,如果存在则表示以及存在一个事务了
如果以非事务运行时,表示在执行方法时,Spring事务管理器不会去建立数据库连接,执行SQL时,由Mybatis或JdbcTemplate建立数据库连接来执行SQL
Spring提供了7种传播机制:
- REQUIRED:没有事务则新建,有则直接在该事务上执行
- SUPPORTS:如果有事务,直接在该事务上运行,如果没有事务,就以非事务方式运行
- MANDATORY:如果没有事务,直接抛异常;如果有事务,直接在当前事务运行
- REQUIRES_NEW:创建一个新的事务,并挂起当前的事务
- NOT_SUPPORTED:以非事务方式运行,如果存在事务,则把当前事务挂起
- NEVER:以非事务方式运行,如果存在事务,则抛出异常
- NESTED:同MySQL的savepoint,如果存在事务,则在事务内部运行(可以单独回滚),如果没有事务则新建事务
二、开启Spring事务
在前面已经说过,在执行事务方法时,会调用TransactionInterceptor的invoke()方法,这里使用匿名内部类创建了一个CoroutinesInvocationCallback对象,而该对象是在开始事务之后,才会去调用里面的proceedWithInvocation()方法,而在invokeWithinTransaction()方法里面,会去创建事务等
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}
2.1 获取事务管理器
在《Spring 事务源码分析(上)》的生成Advise实例时,设置了TransactionAttributeSource实例,这个实例更像是一个工具类,在各种解析的时候都需要用到它
在invokeWithinTransaction()方法内部,首先就是获取TransactionAttributeSource实例,然后调用它的getTransactionAttribute()方法,将method方法上@Transactional注解的配置生成一个TransactionAttribute对象,这个对象包含了事务的所有配置
接着就是调用determineTransactionManager()去获取一个事务管理器实例
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
// TransactionAttribute就是@Transactional中的配置
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
……
}
在文章的开始位置,定义了一个DataSourceTransactionManager事务管理器对象,在@Transactional注解的value()和transactionManager()配置可以指定事务管理器的名称,在获取事务管理器时,首先根据配置来获取;如果没有配置,则直接调用getBean()方法,通过指定TransactionManager类型,获取事务管理器的实例,最后进行缓存
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
……
// @Transactional注解指定的事务管理器名称
String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier)) {
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
}
else if (StringUtils.hasText(this.transactionManagerBeanName)) {
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
}
else {
TransactionManager defaultTransactionManager = getTransactionManager();
if (defaultTransactionManager == null) {
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
if (defaultTransactionManager == null) {
// 直接从容器获取TransactionManager 事务管理器实例
defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
this.transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
}
return defaultTransactionManager;
}
}
2.2 创建事务
获取事务管理器之后,就可以创建事务了,但在创建事务之前,需要去生成一个JoinPoint,通常把方法名作为连接点
在这里创建事务时,返回可能是现有的事务,也可能是一个新的事务,这与方法上定义的事务传播机制有关
所以这里把TransactionInfo理解成是一个逻辑事务,事务创建完成之后,就会去回调proceedWithInvocation()方法,这个方法可能是去执行下一个MethodInterceptor,也可能就是直接调用事务方法
执行过程中过程中出现异常,会调用completeTransactionAfterThrowing()方法回滚,最后从ThreadLocal中清除当前事务,最后就是调用commitTransactionAfterReturning()方法去提交事务
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// joinpoint的唯一标识
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 如果有必要就创建事务,这里就涉及到事务传播机制的实现了
// TransactionInfo表示一个逻辑事务,比如两个逻辑事务属于同一个物理事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 执行下一个Interceptor或被代理对象中的方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
……
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
下面重点介绍createTransactionIfNecessary()方法是如何创建事务的
2.2.1 创建TransactionStatus
如果有@Transactional注解,并且也有事务管理器,就为当前事务方法创建一个TransactionStatus对象,TransactionStatus中的属性记录了物理事务是不是最新的,如果不是最新的,说明当前方法使用的是同一个事务
然后调用prepareTransactionInfo生成也给TransactionInfo对象,并把TransactionStatus对象设置进去,最后把TransactionInfo对象放入到当前线程ThreadLocal对象transactionInfoHolder中
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 每个逻辑事务都会创建一个TransactionStatus,但是TransactionStatus中有一个属性代表当前逻辑事务底层的物理事务是不是新的
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
2.2.2 先尝试获取事务(可能为空)
通过getTransaction()方法来获取事务,首先会尝试去当前线程的ThreadLocal中去拿,前面的事务管理器配置的是DataSourceTransactionManager,所以下面看DataSourceTransactionManager中doGetTransaction()方法的实现
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
……
}
获取把事务管理器设置的dataSource对象作为Key,调用getResource()来获取数据库连接,然后将得到的数据库连接设置到新生成的DataSourceTransactionObject对象中,通过设置这个连接的状态为false,表示不是最新的物理连接
这时获取的数据库连接可能为空
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
getResource()方法会去调用doGetResource()方法,在该方法中,resources对象就是一个ThreadLocal,存放的是当前线程的数据库连接,所以在doGetResource()方法中,直接根据key就可以获取数据库连接
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
private static Object doGetResource(Object actualKey) {
// resources是一个ThreadLocal包装的Map,用来缓存资源的,比如缓存当前线程中由某个DataSource所创建的数据库连接
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
// 获取DataSource对象所对应的数据库连接对象
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
2.2.3 当前没有事务,则新建事务
上面通过doGetTransaction()方法去获取当前线程的事务,返回的一个DataSourceTransactionObject对象,这个对象里面记录数据库连接
接下来,就是要判断是否有数据库连接
Object transaction = doGetTransaction();
// transaction.getConnectionHolder().isTransactionActive()
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
isExistingTransaction()方法判断是否有数据库连接也很简单,就是判断前面返回的DataSourceTransactionObject中,它的connectionHolder属性是否为null即可
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
如果当前线程并没有数据库连接,则去创建一个数据库连接。但在开启一个事务之前,要检查一些事务的配置,比如超时时间不能小于-1,如果当前方法定义的事务传播机制为MANDATORY,直接抛异常
在没有事务时,只有REQUIRED、REQUIRES_NEW和NESTED这三种事务传播机制才会去创建新的事务
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 没有事务需要挂起,不过TransactionSynchronization有可能需要挂起
// suspendedResources表示当前线程被挂起的资源持有对象(数据库连接、TransactionSynchronization)
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 开启事务后,transaction中就会有数据库连接了,并且isTransactionActive为true
// 并返回TransactionStatus对象,该对象保存了很多信息,包括被挂起的资源
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
但在创建事务之前,还有一个重要的动作,就是挂起当前事务,即把当前事务信息封装成一个SuspendedResourcesHolder
下面看下suspend()方法在做什么,首先会去判断当前线程是否有TransactionSynchronization,如果有,则需要把TransactionSynchronization相关的配置全部拿出来,然后将事务的活动状态设置为false,表示当前事务不可用,然后生成一个SuspendedResourcesHolder对象暂存当前事务的信息
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
// synchronizations是一个ThreadLocal<Set<TransactionSynchronization>>
// 我们可以在任何地方通过TransactionSynchronizationManager给当前线程添加TransactionSynchronization,
// 如果当前线程存在TransactionSynchronization
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 调用TransactionSynchronization的suspend方法,并清空和返回当前线程中所有的TransactionSynchronization对象
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 挂起事务,把transaction中的Connection清空,并把resources中的key-value进行移除,并返回数据库连接Connection对象
suspendedResources = doSuspend(transaction);
}
// 获取并清空当前线程中关于TransactionSynchronizationManager的设置
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
// 将当前线程中的数据库连接对象、TransactionSynchronization对象、TransactionSynchronizationManager中的设置构造成一个对象
// 表示被挂起的资源持有对象,持有了当前线程中的事务对象、TransactionSynchronization对象
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
如果当前事务不为空,会调用doSuspend()方法把当前事务挂起,也就是从ThreadLocal中把数据库连接移除
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
// resources是存放数据库连接的ThreadLocal
private static Object doUnbindResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
// Transparently suppress a ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
value = null;
}
return value;
}
TransactionSynchronization是一个什么东西,我们可以通过TransactionSynchronizationManager在任何事务方法中来添加一些TransactionSynchronization,TransactionSynchronization中的这些方法,会在事务的不同阶段执行
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public int getOrder() {
return TransactionSynchronization.super.getOrder();
}
// 事务挂起时执行
@Override
public void suspend() {
TransactionSynchronization.super.suspend();
}
@Override //事务恢复时执行
public void resume() {
TransactionSynchronization.super.resume();
}
@Override
public void flush() {
TransactionSynchronization.super.flush();
}
@Override // 提交前执行
public void beforeCommit(boolean readOnly) {
TransactionSynchronization.super.beforeCommit(readOnly);
}
@Override // 回滚前执行
public void beforeCompletion() {
TransactionSynchronization.super.beforeCompletion();
}
@Override
public void afterCommit() {
TransactionSynchronization.super.afterCommit();
}
@Override
public void afterCompletion(int status) {
TransactionSynchronization.super.afterCompletion(status);
}
});
接下来,才是真正地去开启一个事务,调用doBegin()获得一个新的连接,该方法的第一个参数是DataSourceTransactionObject对象,就是前面保存数据库连接的对象
开启一个事务之后,通过prepareSynchronization()方法来初始化TransactionSynchronization
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 开启的这个事务的状态信息:
// 事务的定义、用来保存数据库连接的对象、是否是新事务,是否是新的TransactionSynchronization
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开始事务
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
在doBegin()方法中,会通过DataSource获取一个数据库连接,然后设置DataSourceTransactionObject对象的一些参数,包括数据库连接,是否是新的连接,隔离等级,只读等设置
同时要把数据库连接的AutoCommit设置为false,由Spring来控制事务的提交于回滚
通过prepareTransactionalConnection()方法来设置当前事务为只读事务
最后调用bindResource()方法,将数据库连接添加到当前线程ThreadLoacl的resource中,至此,便开始了一个新的数据库事务
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
// 如果当前线程中所使用的DataSource还没有创建过数据库连接,就获取一个新的数据库连接
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
// 设置autocommit为false
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
如果当前事务配置为只读事务,则执行SQL方法,将事务设置为只读
protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)
throws SQLException {
if (isEnforceReadOnly() && definition.isReadOnly()) {
try (Statement stmt = con.createStatement()) {
stmt.executeUpdate("SET TRANSACTION READ ONLY");
}
}
}
2.2.4 当前有事务,根据事务传播机制判断是否需要新建事务
上面介绍了在没有事务时的情况,如果当前线程已经存在事务了,又会怎么处理呢
如果当前ThreadLocal中存在数据库连接,则调用handleExistingTransaction()方法来处理已存在的事务
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// transaction.getConnectionHolder().isTransactionActive()
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
……
}
在handleExistingTransaction()方法中,就按照事务传播机制来决定是否新建事务或无事务执行或抛异常
PROPAGATION_NEVER:已经有事务存在,直接抛异常
PROPAGATION_NOT_SUPPORTED:挂起当前事务,以无事务方式运行
PROPAGATION_REQUIRES_NEW:挂起当前事务,创建新的事务
PROPAGATION_NESTED:如果以savePoint来解决这种事务,就在当前事务创建SavePoint,回滚时可以局部回滚;如果不使用SavePoint,就新建一个事务。但PlatformTransactionManager类型的事务管理理器的useSavepointForNestedTransaction()方法固定返回true
如果没有创建新的事务,最后调用prepareTransactionStatus()方法,为当前事务构建一个新的TransactionStatus对象
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
return startTransaction(definition, transaction, debugEnabled, null);
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
protected boolean useSavepointForNestedTransaction() {
return true;
}
三、提交/回滚事务
3.1 恢复事务
在创建新的事务之前,都会将现在的事务挂起,如果在创建新事务的过程中出现了异常,还需要把这些挂起的事务恢复
或者当前事务已经提交或回滚了,也需要去恢复已经挂起的事务
所有恢复事务的方法都会去调用resume()方法,在该方法中,会去调用doResume()方法,而doResume()方法中,就是直接把之前已经挂起的事务,重新放入到当前线程ThreadLocal的resource中
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
3.2 回滚事务
现在我们回到最开始的invokeWithinTransaction()方法,在执行proceedWithInvocation()方法时,如果出现异常,会调用completeTransactionAfterThrowing()方法进行回滚
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
// 提交事务
commitTransactionAfterReturning(txInfo);
}
在completeTransactionAfterThrowing()方法中,首先调用rollback()方法判断抛出的异常与@Transactional中回滚规则中定义的异常是否匹配,如果匹配则直接回滚
如果不匹配,说明手动强制回滚,调用下面的commit()方法
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
3.2.1 强制回滚
在commit()方法中,首先会判断当前事务的rollbackOnly属性是否为true
public final void commit(TransactionStatus status) throws TransactionException {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
processRollback(defStatus, false);
return;
}
……
}
rollbackOnly的用途用下:
test()方法和testB()方法中的SQL可以正常执行,但调用完testB()方法后,抛出了异常,同时为了能让返回的异常清晰明细,进行捕获,然后构建一个简单的异常信息,因为异常被捕获了,导致事务就可以正常提交。为了能让前面的SQL回滚,我们可以通过setRollbackOnly()方法设置当前事务的rollbackOnly为true,这样在提交时,会强制回滚
@Transactional(rollbackFor = Exception.class)
public Map<String,Object> test(){
jdbcTemplate.execute("insert into user values (1,'lizhi',24)");
userService.testB();
try {
throw new NullPointerException();
} catch (Exception e) {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return new HashMap<String,Object>();
}
}
@Transactional(rollbackFor = NullPointerException.class)
public void testB(){
jdbcTemplate.execute("insert into user values (2,'lizhi',24)");
}
在processRollback()方法中,调用triggerBeforeCompletion()方法,去执行所有TransactionSynchronization的beforeCompletion()方法
如果当前事务有安全点,则直接回滚到安全点
如果当前事务是一个新的事务,则直接回滚
如果当前事务设置了rollbackOnly属性,或者globalRollbackOnParticipationFailure属性为true,就把当前事务的rollbackOnly属性设置为true,当事务提交的时候,会判断,然后进行回滚
globalRollbackOnParticipationFailure属性表示部分异常,全局回滚,默认为true,比如a()方法调用b()方法,如果b()方法出现了异常,a()方法中的SQL也会被回滚
然后调用triggerAfterCompletion()执行所有TransactionSynchronization的afterCompletion()方法
最后调用cleanupAfterCompletion()方法去清除当前事务的信息,以及恢复挂起的事务
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
doRollback(status);
}
else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
doSetRollbackOnly(status);
}
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
finally {
cleanupAfterCompletion(status);
}
}
3.3 提交事务
事务提交的核心方法位于processCommit()方法中,都是都是去调用TransactionSynchronization的一些BeforeCommit()和BeforeCompletion()方法
如果当前事务有安全点,提交时,只是释放安全点,只提交安全点之后的SQL
如果当前事务是一个新的事务,直接调用doCommit()方法,调用数据库连接的commit()方法进行提交
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
}
……
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
// 恢复被挂起的资源到当前线程中
cleanupAfterCompletion(status);
}
}
注:Spring事务提交和回滚这一块,涉及到事务的传播关系,会出现多层级执行的问题,代码稍微显得有些复杂
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/153649.html