《分布式事务系列教程-第四章-XA分布式事务解决方案》

追求适度,才能走向成功;人在顶峰,迈步就是下坡;身在低谷,抬足既是登高;弦,绷得太紧会断;人,思虑过度会疯;水至清无鱼,人至真无友,山至高无树;适度,不是中庸,而是一种明智的生活态度。

导读:本篇文章讲解 《分布式事务系列教程-第四章-XA分布式事务解决方案》,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

《分布式事务系列教程-第四章-XA分布式事务解决方案》

一、 XA解决方案

XA是一个分布式事务协议,由Tuxedo提出。XA中大致分为两部分:事务管理器(TM)和资源管理器(RM)。其中资源管理器(RM)由数据库实现,比如Oracle、DB2这些数据库都实现了XA规范的接口,而TM作为全局的调度者,负责各个RM的提交和回滚。XA协议也分为2PC和3PC,本章讨论的是XA协议的2PC;

1.1 MySQL的XA实现

MySQL XA是基于DTP分布式事务处理模型标准实现的,支持多数据源的分布式事务。

命令如下:

  • 开启一个全局事务:
xa start xid
xa start "001";
  • 将全局事务置于空闲状态(此状态时所有的RM资源已经锁定):
xa end xid
xa start "001";
  • 准备阶段:
xa prepare xid
xa prepare "001"
  • 提交阶段:
xa commit xid
xa commit "001";
  • 不进入两段式直接提交:
xa commit xid one phase;
xa commit "001" one phase;

XA执行流程:

在这里插入图片描述

创建一张测试表:

create table user(
	id int,
    username varchar(30)
);

insert into user values(1,'zs');

案例:

mysql> xa start '001';
Query OK, 0 rows affected (0.00 sec)

mysql> update user set username='ls';
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> xa end '001';			
Query OK, 0 rows affected (0.00 sec)

mysql> xa prepare '001';
Query OK, 0 rows affected (0.00 sec)

mysql> xa commit '001';			# 释放锁资源
Query OK, 0 rows affected (0.00 sec)

mysql>

Tips:在MySQL控制台中演示不出一个全局事务中执行多个分支事务,我们可以在下一小结使用Java客户端在一个全局事务操作多个分支事务。

1.2 Atomikos实现分布式事务

JTA(Java Transaction API): Java事务API(编程接口),是XA在Java上的一种实现。JTA底层采用的是2PC(两阶段提交协议)

在JTA中存在以下三种角色:

  • TransactionManager: 事务管理器

  • XAResource: 资源管理器。代表每一个数据源(RM)

  • XID: 事务ID,每一个独立的数据源都会分配一个事务ID(前面的xa start xid)

JTA只是Java提供的一个分布式多数据源的一套编程接口,来帮助我们解决分布式事务,我们具体的实现采用Atomikos,Atomikos是一个为Java平台提供增值服务的并且开源类事务管理器

搭建工程

创建订单数据库、库存数据库:

create database orders;
use orders;
CREATE TABLE `t_orders` (
  `id` varchar(30) NOT NULL,
  `count` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert  into `t_orders`(`id`,`count`) values ('1',101);

create database store;
use store;
CREATE TABLE `t_store` (
  `id` varchar(30) NOT NULL,
  `count` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert  into `t_store`(`id`,`count`) values ('1',100);

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.nc</groupId>
    <artifactId>xa_transaction</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>xa_transaction</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.17</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

DataSourceConfig:

package com.lscl.config;

import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.jta.JtaTransactionManager;

import java.util.Properties;

@Configuration
public class DataSourceConfig {


    /**
     * t_orders数据源
     *
     * @return
     */
    @Bean(name = "ordersDS")
    @Qualifier("ordersDS")
    public AtomikosDataSourceBean ordersDS() {
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setUniqueResourceName("ordersDS");
        atomikosDataSourceBean.setXaDataSourceClassName(
                "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
        Properties properties = new Properties();
        properties.put("URL","jdbc:mysql://localhost:3306/orders");
        properties.put("user", "root");
        properties.put("password", "admin");
        atomikosDataSourceBean.setXaProperties(properties);
        return atomikosDataSourceBean;
    }


    /**
     * t_store数据源
     *
     * @return
     */
    @Bean(name = "storeDS")
    @Qualifier("storeDS")
    public AtomikosDataSourceBean storeDS() {
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setUniqueResourceName("storeDS");
        atomikosDataSourceBean.setXaDataSourceClassName(
                "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
        Properties properties = new Properties();
        properties.put("URL", "jdbc:mysql://localhost:3306/store");
        properties.put("user", "root");
        properties.put("password", "admin");
        atomikosDataSourceBean.setXaProperties(properties);
        return atomikosDataSourceBean;
    }

    /**
     * transaction manager
     *
     * @return
     */
    @Bean
    public UserTransactionManager userTransactionManager() {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(true);
        return userTransactionManager;
    }

    /**
     * jta transactionManager
     *
     * @return
     */
    @Bean
    public JtaTransactionManager transactionManager() {
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
        jtaTransactionManager.setTransactionManager(userTransactionManager());
        return jtaTransactionManager;
    }

}

Controller:

package com.lscl.controller;

import com.lscl.service.OrdersService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class OrdersController {

    @Autowired
    private OrdersService ordersService;

    @RequestMapping("/addOrders/{flag}")
    public String add(@PathVariable Integer flag) throws Exception{
        ordersService.add(flag);;
        return "ok";
    }
}

Service:

package com.lscl.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.stereotype.Service;

import javax.transaction.Transactional;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

@Service
public class OrdersService {

    @Autowired
    @Qualifier("ordersDS")
    private AtomikosDataSourceBean ordersDS;            //RM1

    @Autowired
    @Qualifier("storeDS")
    private AtomikosDataSourceBean storeDS;             //RM2

    @Transactional
    public void add(Integer flag) throws Exception {
        Connection ordersConn = null;
        Connection storeConn = null;

        try {
            ordersConn = ordersDS.getConnection();
            storeConn = storeDS.getConnection();

            // 订单+1
            String ordersSQL = "update t_orders set count=count+1";

            // 库存-1
            String storeSQL = "update t_store set count=count-1";

            // 执行订单+1
            Statement ordersState = ordersConn.createStatement();		// prepare 阶段
            ordersState.execute(ordersSQL);

            if (flag == 500) {
                int i = 1 / 0;          // 模拟异常
            }

            // 执行库存-1
            Statement storeState = storeConn.createStatement();			// prepare 阶段
            storeState.execute(storeSQL);
            
            // 代码没有问题准备发起全局提交  commit 阶段
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {

            if (ordersConn != null) {
                ordersConn.close();
            }

            if (storeConn != null) {
                storeConn.close();
            }
        }
    }
}

调用图解:

在这里插入图片描述

1.3 总结

1)在执行分支事务时,会将RM资源锁住,需要等到所有的RM响应,等到第二阶段执行完毕时(提交/回滚),RM的锁才会释放,在高并发场所不适用

2)XA方案依赖于本地数据库对XA协议的支持,如果本地数据库不支持XA协议那么第三方程序(Java)将操作不了。例如许多非关系型数据库并没有支持XA。

3)MySQL对XA方案支持的不太友好,MySQL的XA实现,没有记录prepare阶段日志。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/131763.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!