一篇文章搞定Seata源码

一、环境准备

1.源码下载

官方地址:https://seata.io/zh-cn/blog/download.html

一篇文章搞定Seata源码
image20220217165653477.png

通过idea打开seata-1.4.2版本的源码

一篇文章搞定Seata源码
image20220217165735950.png

2.回顾AT模式

  其实在之前的应用课程中,我们已经用过AT模式,同时也写过一个小的Demo,那么这里其实我们主要要分析的是AT模式官方文档中的一些内容

  官方文档:https://seata.io/zh-cn/docs/dev/mode/at-mode.html

2.1写隔离

  • 一阶段本地事务提交前,需要确保先拿到「全局锁」
  • 拿不到「全局锁」 ,不能提交本地事务。
  • 「全局锁」 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。

图解:

一篇文章搞定Seata源码
image20220221173230146.png

  如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

  此时,如果 tx2 仍在等待该数据的 「全局锁」,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 「全局锁」 等锁超时,放弃 「全局锁」 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

  因为整个过程 「全局锁」 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 「脏写」 的问题。

2.2 读隔离

  在数据库本地事务隔离级别 「读已提交(Read Committed)」 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 「读未提交(Read Uncommitted)」

  如果应用在特定场景下,必需要求全局的 「读已提交」 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。

图解:

一篇文章搞定Seata源码
image20220221175333911.png

  SELECT FOR UPDATE 语句的执行会申请 「全局锁」 ,如果 「全局锁」 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 「全局锁」 拿到,即读取的相关数据是 「已提交」 的,才返回。

  出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

2.3 AT二阶段

一阶段:

  1. 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = ‘TXC’)等相关的信息。
  2. 查询前镜像(改变之前的数据):根据解析得到的条件信息,生成查询语句,定位数据。
  3. 执行业务 SQL:更新这条数据。
  4. 查询后镜像(改变后的数据):根据前镜像的结果,通过「主键」 定位数据。
  5. 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到UNDO_LOG 表中。
  6. 提交前,向 TC 注册分支:申请「全局锁」
  7. 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
  8. 将本地事务提交的结果上报给 TC。

二阶段-回滚:

  1. 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
  2. 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
  3. 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
  4. 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。

二阶段-提交:

  1. 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
  2. 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。

二、Seata源码剖析

1. Seata客户端启动

首先一个Seata的客户端启动一般分为几个流程:

  1. 自动加载各种Bean及配置信息
  2. 初始化TM
  3. 初始化RM(具体服务)
  4. 初始化分布式事务客户端完成,代理数据源
  5. 连接TC(Seata服务端),注册RM,注册TM
  6. 开启全局事务

  在这个其中,就会涉及到几个核心的类型,首先我们需要来看配置类型GlobalTransactionAutoConfiguration

  所以我们直接通过官方案例引入的Seata包,找到SpringBoot项目在启动的时候自动扫描加载类型的spring.factories,然后找到GlobalTransactionAutoConfiguration(Seata自动配置类)

2.全局事务扫描类源码

  这个类型的核心点,就是加载配置,注入相关的Bean

/**
 * seata自动配置类
 */
@Configuration
@EnableConfigurationProperties(SeataProperties.class)
public class GlobalTransactionAutoConfiguration {

 private final ApplicationContext applicationContext;

 private final SeataProperties seataProperties;

 public GlobalTransactionAutoConfiguration(ApplicationContext applicationContext,
   SeataProperties seataProperties) {
  this.applicationContext = applicationContext;
  this.seataProperties = seataProperties;
 }
 // 注入全局事务扫描器
 @Bean
 public GlobalTransactionScanner globalTransactionScanner() {

  String applicationName = applicationContext.getEnvironment()
    .getProperty("spring.application.name");

  String txServiceGroup = seataProperties.getTxServiceGroup();

  if (StringUtils.isEmpty(txServiceGroup)) {
   txServiceGroup = applicationName + "-fescar-service-group";
   seataProperties.setTxServiceGroup(txServiceGroup);
  }
  // 构建全局扫描器,传入参数:应用名、事务分组名,失败处理器
  return new GlobalTransactionScanner(applicationName, txServiceGroup);
 }
}

3.GlobalTransactionScanner全局事务扫描器

  在这其中我们要关心的是GlobalTransactionScanner这个类型,这个类型扫描@GlobalTransactional注解,并对代理方法进行拦截增强事务的功能。

一篇文章搞定Seata源码
image20220221231318290.png

  这里给大家展示了当前GlobalTransactionScanner的类关系图,其中我们现在继承了Aop的AbstractAutoProxyCreator类型,在这其中有一个重点方法,其实这个方法就是判断Bean对象是否需要代理,是否需要增强

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    if (beanName != null && this.targetSourcedBeans.contains(beanName)) {
        return bean;
    }
    if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
        return bean;
    }
    if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
        this.advisedBeans.put(cacheKey, Boolean.FALSE);
        return bean;
    }

    // Create proxy if we have advice.
    Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
    if (specificInterceptors != DO_NOT_PROXY) {
        this.advisedBeans.put(cacheKey, Boolean.TRUE);
        Object proxy = createProxy(
            bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
        this.proxyTypes.put(cacheKey, proxy.getClass());
        return proxy;
    }

    this.advisedBeans.put(cacheKey, Boolean.FALSE);
    return bean;
}

  当然这是父类提供的方法,那子类继承之后重写此方法,完成了定制化的效果

@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    try {
        // 加锁防止并发
        synchronized (PROXYED_SET) {
            if (PROXYED_SET.contains(beanName)) {
                return bean;
            }
            interceptor = null;
            //check TCC proxy
            // 检查是否是TCC模式
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                // 如果是:添加TCC拦截器
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                                     (ConfigurationChangeListener)interceptor);
            } else {
                // 不是TCC模式
                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                // 判断是否有相关事务注解,如果没有就不代理
                if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                    return bean;
                }

                // 当发现存在全局事务注解标注的Bean,添加拦截器
                if (globalTransactionalInterceptor == null) {
                    // 添加拦截器
                    globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    ConfigurationCache.addConfigListener(
                        ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)globalTransactionalInterceptor);
                }
                interceptor = globalTransactionalInterceptor;
            }

            LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
            // 检查是否是代理对象
            if (!AopUtils.isAopProxy(bean)) {
                // 不是调用Spring代理(父级)
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
                // 已经是代理对象,反射获取代理类中的已经存在的拦截器组合,然后添加到该集合当中
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                for (Advisor avr : advisor) {
                    advised.addAdvisor(0, avr);
                }
            }
            // 将Bean添加到Set中
            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}

  图解地址:https://www.processon.com/view/link/6213d58f1e0853078013c58f

三、Seata源码分析-2PC核心源码解读

1.2PC提交源码流程

  上节课我们分析到了GlobalTransactionalInterceptor全局事务拦截器,一旦执行拦截器,我们就会进入到其中的invoke方法,在这其中会做一些@GlobalTransactional注解的判断,如果有注解以后,会执行全局事务和本地事务,那么在执行全局事务的时候会调用handleGlobalTransaction全局事务处理器,这里主要是获取事务信息

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                               final GlobalTransactional globalTrxAnno) throws Throwable {
    boolean succeed = true;
    try {
        return transactionalTemplate.execute(new TransactionalExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }

            // 获取事务名称,默认获取方法名
            public String name() {
                String name = globalTrxAnno.name();
                if (!StringUtils.isNullOrEmpty(name)) {
                    return name;
                }
                return formatMethod(methodInvocation.getMethod());
            }

            /**
                 * 解析GlobalTransactional注解属性,封装为对象
                 * @return
                 */
            @Override
            public TransactionInfo getTransactionInfo() {
                // reset the value of timeout
                // 获取超时时间,默认60秒
                int timeout = globalTrxAnno.timeoutMills();
                if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                    timeout = defaultGlobalTransactionTimeout;
                }

                // 构建事务信息对象
                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(timeout);// 超时时间
                transactionInfo.setName(name()); // 事务名称
                transactionInfo.setPropagation(globalTrxAnno.propagation());// 事务传播
                transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());// 校验或占用全局锁重试间隔
                transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());// 校验或占用全局锁重试次数
                Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                // 其他构建信息
                for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                transactionInfo.setRollbackRules(rollbackRules);
                return transactionInfo;
            }
        });
    } catch (TransactionalExecutor.ExecutionException e) {
        // 执行异常
        TransactionalExecutor.Code code = e.getCode();
        switch (code) {
            case RollbackDone:
                throw e.getOriginalException();
            case BeginFailure:
                succeed = false;
                failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case CommitFailure:
                succeed = false;
                failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case RollbackFailure:
                failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                throw e.getOriginalException();
            case RollbackRetrying:
                failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                throw e.getOriginalException();
            default:
                throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
        }
    } finally {
        if (degradeCheck) {
            EVENT_BUS.post(new DegradeCheckEvent(succeed));
        }
    }
}

  在这其中,我们要关注一个重点方法execute()

其实这个方法主要的作用就是,执行事务的流程,大概一下几点:

  1. 获取事务信息
  2. 开始执行全局事务
  3. 发生异常全局回滚,各个数据通过undo_log表进行事务补偿
  4. 全局事务提交
  5. 清除所有资源

  这个位置是非常核心的一个位置,因为我们所有的业务进来以后都会走这个位置。

  这其中的第三步和第四步就是在想TC(Seata-Server)发起全局事务的提交/回滚

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    // 获取事务信息
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    // 获取当前事务,主要获取Xid
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 1.2 Handle the transaction propagation.
    // 根据配置的不同事务传播行为,执行不同的逻辑
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                    tx = GlobalTransactionContext.createNew();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,
                // else continue and execute with new transaction.
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                        String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                      , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
        // 当前没有事务,则创建一个新的事务
        if (tx == null) {
            tx = GlobalTransactionContext.createNew();
        }

        // set current tx config to holder
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            // 开始执行全局事务
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // Do Your Business
                // 执行当前业务逻辑:
                // 1. 在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据
                // 2. 执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log表中
                // 3. 远程调用其他应用,远程应用接收到xid,也会注册分支事务,写入branch_table及本地undo_log表
                // 4. 会在lock_table表中插入全局锁数据(一个分支一条)
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                // 发生异常全局回滚,各个数据通过undo_log表进行事务补偿
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            // 全局提交事务
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            // 清除所有资源
            resumeGlobalLockConfig(previousConfig);
            triggerAfterCompletion();
            cleanUp();
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            tx.resume(suspendedResourcesHolder);
        }
    }
}

2.如何发起全局事务

  这个位置我们就看当前这个代码中的 beginTransaction(txInfo, tx);方法

// 想TC发起请求,这里采用了模板模式
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        triggerBeforeBegin();
        // 对TC发起请求
        tx.begin(txInfo.getTimeOut(), txInfo.getName());
        triggerAfterBegin();
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
                                                           TransactionalExecutor.Code.BeginFailure);
    }
}   

  那我们向下来看begin方法,那要注意,这里调用begin方法的是DefaultGlobalTransaction

@Override
public void begin(int timeout, String name) throws TransactionException {
    //判断调用者是否是TM
    if (role != GlobalTransactionRole.Launcher) {
        assertXIDNotNull();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
        }
        return;
    }
    assertXIDNull();
    String currentXid = RootContext.getXID();
    if (currentXid != null) {
        throw new IllegalStateException("Global transaction already exists," +
                                        " can't begin a new global transaction, currentXid = " + currentXid);
    }
    // 获取Xid
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Begin new global transaction [{}]", xid);
    }
}

  在向下来看begin方法,这时候使用的是(默认事务管理者)DefaultTransactionManager.begin,来真正的获取xid,其中就是传入事务的相关信息,最终TC端返回对应的全局事务Xid。

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    // 发送请求得到响应
    GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    //返回Xid
    return response.getXid();
}

  这里采用的是Netty的通讯方式

private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
    try {
        // 通过Netty发送请求
        return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
    } catch (TimeoutException toe) {
        throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
    }
}

四、Seata源码分析-数据源代理

  上节课我们分析了整体的Seata-AT模式的2PC执行流程,那么这节课我们要分析的就是在AT模式中的关键点,数据源代理

1.AT模式的核心点:

  1. 获取全局锁、开启全局事务
  2. 解析SQL并写入undolog

  那么上节课其实我们已经把第一步分析清楚了,那么这节课我们就要分析的是AT模式如何解析SQL并写入undolog,首先我们要先明确实际上Seata其中采用了数据源代理的模式。

  那么这个就需要我们在回顾一下GlobalTransactionScanner这个类型,在这个类型中继承了一些的接口和抽象类,比较关键的几个:

  • AbstractAutoProxyCreator
  • InitializingBean
  • ApplicationContextAware
  • DisposableBean

这里给大家回顾一下:

  1. 继承ApplicationContextAware类型以后,需要实现对应的方法:

    void setApplicationContext(ApplicationContext applicationContext) throws BeansException

    当spring启动完成后,会自动调用这个类型,把ApplicationContext给bean。也就是说,GlobalTransactionScanner天然能拿到Spring的环境。

  2. 继承了InitializingBean接口,需要实现一个方法:

    void afterPropertiesSet() throws Exception;

    凡是继承该接口的类,在初始化bean的时候,当所有properties都设置完成后,会执行该方法。

  3. 继承DisposableBean,需要实现一个方法:

    void destroy() throws Exception;

    和InitializingBean接口相反,这个是在销毁的时候会调用这个方法。

  4. AbstractAutoProxyCreator就比较复杂了,它Spring实现AOP的一种方式。本质上是一个BeanPostProcessor,他在bean初始化之前,调用内部的createProxy方法,创建一个bean的AOP代理bean并返回,对Bean的增强。

总结一下:总体的逻辑就是,GlobalTransactionScanner扫描有注解的bean,做AOP增强。

2.数据源代理

  关于数据源代理这里我们

  全局事务拦截成功后最终还是执行了业务方法的,但是由于Seata对数据源做了代理,所以sql解析与undolog入库操作是在数据源代理中执行的,箭头处的代理就是Seata对DataSource,Connection,Statement做的代理封装类

一篇文章搞定Seata源码
image20220226142501746.png

  数据源代理是非常重要的一个环节。我们知道,在分布式事务运行过程中,undo log等的记录、资源的锁定等,都是用户无感知的,因为这些操作都在数据源的代理中完成了。

3.数据源代理DataSourceProxy

  DataSourceProxy的主要功能为,它在构造方法中调用了一个自定义的init方法,主要做了以下能力的增强:

  1. 为每个数据源标识了资源组ID
  2. 如果配置打开,会有一个定时线程池定时更新表的元数据信息并缓存到本地
  3. 生成代理连接ConnectionProxy

那我们先来看init方法

private void init(DataSource dataSource, String resourceGroupId) {
    //资源组ID,默认是“default”这个默认值
    this.resourceGroupId = resourceGroupId;
    try (Connection connection = dataSource.getConnection()) {
        //根据原始数据源得到JDBC连接和数据库类型
        jdbcUrl = connection.getMetaData().getURL();
        dbType = JdbcUtils.getDbType(jdbcUrl);
        if (JdbcConstants.ORACLE.equals(dbType)) {
            userName = connection.getMetaData().getUserName();
        }
    } catch (SQLException e) {
        throw new IllegalStateException("can not init dataSource", e);
    }
    DefaultResourceManager.get().registerResource(this);
    if (ENABLE_TABLE_META_CHECKER_ENABLE) {
        //如果配置开关打开,会定时线程池不断更新表的元数据信息
        /**
        *每分钟查询一次数据源的表结构信息并缓存,在需要查询数据库结构时会用到,不然每次去数据库查询结构效率会很低。
        */

        tableMetaExcutor.scheduleAtFixedRate(() -> {
            try (Connection connection = dataSource.getConnection()) {
                TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                    .refresh(connection, DataSourceProxy.this.getResourceId());
            } catch (Exception ignore) {
            }
        }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
    }

    //Set the default branch type to 'AT' in the RootContext.
    RootContext.setDefaultBranchType(this.getBranchType());
}

  这3个增强里面,前两个都比较容易理解,第三是最重要的。我们知道在AT模式里面,会自动记录undo log、资源锁定等等,都是通过ConnectionProxy完成的。

  另外,DataSourceProxy重写了几个方法。

  重点是getConnection,此时会返回一个ConnectionProxy,而不是原生的Connection

@Override
public ConnectionProxy getConnection() throws SQLException {
    Connection targetConnection = targetDataSource.getConnection();
    return new ConnectionProxy(this, targetConnection);
}

@Override
public ConnectionProxy getConnection(String username, String password) throws SQLException {
    Connection targetConnection = targetDataSource.getConnection(username, password);
    return new ConnectionProxy(this, targetConnection);
}

4.ConnectionProxy分析

  ConnectionProxy继承了AbstractConnectionProxy。一看到Abstract,就知道它的父类封装了很多通用工作。它的父类里面还使用了PreparedStatementProxy、StatementProxy、DataSourceProxy。

一篇文章搞定Seata源码
image20220226172114629.png

  我们先来分析AbstractConnectionProxy

5.AbstractConnectionProxy

  在这个抽象连接对象中,定义了很多通用的逻辑,所以在这其中我们要关注的主要在于PreparedStatementProxy和StatementProxy,其实这里的通用逻辑就是数据源连接的步骤,获取连接,创建执行对象等等这些

@Override
public Statement createStatement() throws SQLException {
    //调用真实连接对象获得Statement对象
    Statement targetStatement = getTargetConnection().createStatement();
    //创建Statement的代理
    return new StatementProxy(this, targetStatement);
}

@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
    //数据库类型,比如mysql、oracle等
    String dbType = getDbType();
    // support oracle 10.2+
    PreparedStatement targetPreparedStatement = null;
    //如果是AT模式且开启全局事务,那么就会进入if分支
    if (BranchType.AT == RootContext.getBranchType()) {
        List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
        if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
            SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
            if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                //得到表的元数据
                TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
                                                                                                   sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
                //得到表的主键列名
                String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
            }
        }
    }
    if (targetPreparedStatement == null) {
        targetPreparedStatement = getTargetConnection().prepareStatement(sql);
    }
    // 创建PreparedStatementProxy代理
    return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}

6.分布式事务SQL执行

  在这两个代理对象中,执行SQL语句的关键方法如下:

@Override
public ResultSet executeQuery(String sql) throws SQLException {
    this.targetSQL = sql;
    return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
}

@Override
public int executeUpdate(String sql) throws SQLException {
    this.targetSQL = sql;
    return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);
}

@Override
public boolean execute(String sql) throws SQLException {
    this.targetSQL = sql;
    return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
}

  其他执行SQL语句的方法与上面三个方法都是类似的,都是调用ExecuteTemplate.execute方法,未完待续….


原文始发于微信公众号(波哥带你学编程):一篇文章搞定Seata源码

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/58803.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!