SpringCloud-Alibaba之分布式事务Seata

导读:本篇文章讲解 SpringCloud-Alibaba之分布式事务Seata,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

SpringCloud-Alibaba之分布式事务Seata

目录

一、Seata介绍

二、使用Seata实现分布式事务控制

2.1 订单微服务实现下单接口

2.2 商品微服务实现扣减库存接口

2.3 暴露出分布式事务问题

2.3.1 正常测试

2.3.2 异常测试

2.4 Seata解决分布式事务问题

2.4.1 下载Seata

2.4.2 修改配置文件

2.4.3 初始化seata在nacos的配置

2.4.4 启动seata服务

2.4.5 添加undo_log表

2.4.6 微服务集成seata事务控制

2.4.7 总结


一、Seata介绍

Seata的设计目标是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进。它把一个分布式事务理解成一个包含了若干分支事务的全局事务。如下所示:

img

全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务。

Seata主要由三个重要组件组成:

  • TC:Transaction Coordinator 事务协调器,管理全局的分支事务的状态,用于全局性事务的提交和回滚。也就是Seata独立运行的服务端。
  • TM:Transaction Manager 事务管理器,用于开启、提交或者回滚全局事务。
  • RM:Resource Manager 资源管理器,用于分支事务上的资源管理,向TC注册分支事务,上报分支事务的状态,接受TC的命令来提交或者回滚分支事务。

img

Seata的执行流程如下:

  1. A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID
  2. A服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖
  3. A服务执行分支事务,向数据库做操作
  4. A服务开始远程调用B服务,此时XID会在微服务的调用链上传播
  5. B服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖
  6. B服务执行分支事务,向数据库做操作
  7. 全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚
  8. TC协调其管辖之下的所有分支事务,决定提交还是回滚

Seata实现2PC与传统2PC的差别:

  1. 架构层次不同,传统2PC方案的RM实际上是在数据库层,即RM本质上就是数据库本身。而Seata的RM是以jar包的形式作为中间件层部署在应用程序这一侧的。
  2. 保持资源锁时间不同,传统的2PC需要将资源锁保持到第二阶段完成后才释放。而Seata的做法是第一阶段就将本地事务提交了,缩短了保持资源锁的时间。

二、使用Seata实现分布式事务控制

下面示例通过Seata中间件实现分布式事务,模拟电商中的下单和减库存的过程。

我们通过订单微服务执行下单操作,然后由订单微服务调用商品微服务扣减库存。如下:

img

2.1 订单微服务实现下单接口

controller层:

package cn.jack.controller;
 
import cn.jack.domain.Order;
import cn.jack.service.impl.OrderServiceImpl5;
 import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
@Slf4j
public class OrderController5 {
 
    @Autowired
    private OrderServiceImpl5 orderServiceImpl5;
 
    /**
     * 下单
     * @param pid   商品id
     * @return
     */
    @RequestMapping("/order/prod/{pid}")
    public Order order(@PathVariable("pid") Long pid) {
        return this.orderServiceImpl5.createOrder(pid);
    }
}

service层:

package cn.jack.service.impl;
import cn.jack.dao.OrderDao;
 import cn.jack.domain.Order;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class OrderServiceImpl5 {

    @Autowired
    private OrderDao orderDao;
    @Autowired
    private ProductService productService;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public Order createOrder(Long pid) {
        log.info("收到下单请求,准备查询商品信息。pid={}", pid);

        // TODO 通过Feign调用商品微服务,查询商品信息
        Product product = this.productService.findByPid(pid);
        log.info("商品信息查询成功。内容为:{}", JSON.toJSONString(product));

        // 进行容错判断
        if (product.getPid() == -100) {
            Order order = new Order();
            order.setOid(-100L);
            order.setPname("下单失败");
            return order;
        }

        // TODO 生成订单信息保存
        Order order = new Order();
        order.setNumber(1);
        order.setPid(pid);
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setUid(1);
        order.setUsername("陈家宝");

        this.orderDao.save(order);
        log.info("订单信息保存成功。内容为:{}", JSON.toJSONString(order));

        // TODO 调用商品微服务,扣减库存
        this.productService.reduceInventory(pid, order.getNumber());

        // TODO 下单成功之后,mq发送订单内容消息
        rocketMQTemplate.convertAndSend("jack-topic", order);

        return order;
    }
}


2.2 商品微服务实现扣减库存接口
controller层:

package cn.jack.controller;

import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class ProductController {

    @Autowired
    private ProductService productService;
 
    @RequestMapping("/product/{pid}")
    public Product product(@PathVariable("pid") Integer pid) {
        log.info("开始查询商品信息。pid={}", pid);
 
           Product product = productService.findByPid(pid);
           log.info("商品信息查询成功:{}", JSON.toJSONString(product));
 
           return product;
    }

    /**
     * 扣减商品库存
     * @param pid   商品ID
     * @param number    扣除数量
     */
    @RequestMapping("/product/reduceInventory")
    public void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("number") Integer number) {
         this.productService.reduceInventory(pid, number);
    }
}


 Service层:

package cn.jack.service.impl;

import cn.jack.dao.ProductDao;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

 @Service
public class ProductServiceImpl implements ProductService {

    @Autowired
     private ProductDao productDao;

    @Override
    public Product findByPid(Integer pid) {
        return this.productDao.findById(pid).get();
    }

     @Override
    public void reduceInventory(Integer pid, Integer number) {
        // TODO 根据商品id,查询商品信息
         Product product = this.productDao.findById(pid).get();
        // TODO 省略校验,直接扣减库存
        product.setStock(product.getStock() - number);
        this.productDao.save(product);
     }
}


2.3 暴露出分布式事务问题
2.3.1 正常测试
访问商品下单接口,生成订单记录,商品库存扣除正常。

 

2.3.2 异常测试
模拟商品库存扣除出现异常,此时订单信息保存成功,而商品库存没有扣除。出现分布式事务问题。

商品微服务service层修改如下:

package cn.jack.service.impl;

 import cn.jack.dao.ProductDao;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
 import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

 @Service
public class ProductServiceImpl implements ProductService {

     @Autowired
    private ProductDao productDao;

    @Override
     public Product findByPid(Integer pid) {
        return this.productDao.findById(pid).get();
    }

    @Override
    public void reduceInventory(Integer pid, Integer number) {
         // TODO 根据商品id,查询商品信息
        Product product = this.productDao.findById(pid).get();
        // TODO 模拟异常
        int i = 1 / 0;
        // TODO 省略校验,直接扣减库存
        product.setStock(product.getStock() - number);
        this.productDao.save(product);
    }
}
访问下单链接,进行测试:
 


2.4 Seata解决分布式事务问题
 seata官方demo地址:http://seata.io/zh-cn/docs/user/quickstart.html

客户端集成:https://github.com/seata/seata-samples/blob/master/doc/quick-integration-with-spring-cloud.md
 
2.4.1 下载Seata
下载链接:https://github.com/seata/seata/releases/tag/v1.3.0,根据需要进行下载,这里我下载zip版本。

 

2.4.2 修改配置文件
 registry.conf该配置用于指定 TC 的注册中心和配置文件,默认都是 file; 如果使用其他的注册中心,要求 Seata-Server 也注册到该配置中心上。将下载到的压缩包进行解压,进入到conf目录,修改配置文件:

registry.conf
 
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    application = "serverAddr"    # seata-server
    serverAddr = "127.0.0.1:8848"
    group = "DEFAULT_GROUP"    # SEATA_GROUP
    namespace = ""
    cluster = "default"
     username = ""
    password = ""
  }
}

 config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "127.0.0.1:8848"
     namespace = ""
    group = "SEATA_GROUP"
    username = ""
     password = ""
  }
}
 
注意:我微服务中引入的seata依赖spring-cloud-starter-alibaba-seata版本是2.1.1.RELEASE,这里配置文件中的registry.nacos.application必须是serverAddr,group必须是DEFAULT_GROUP,因为该版本的seata客户端获取注册在nacos的seata服务时,写死的参数。所以我们需要把seata注册在nacos的默认分组下,服务名称为serverAddr。

2.4.3 初始化seata在nacos的配置
通过看seata提供的README.md得知,使用nacos作为注册、配置中心。需要将一些数据初始化到nacos上。
 
config.txt文件地址:https://github.com/seata/seata/blob/develop/script/config-center/config.txt

初始化脚本地址:https://github.com/seata/seata/blob/develop/script/config-center/nacos/nacos-config.sh

为了后续查看方便,贴上两个文件的内容。

config.txt

transport.type=TCP
 transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroup_mapping.service-product=default
 service.vgroup_mapping.service-order=default
service.vgroup_mapping.my_test_tx_group=default
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:9000
service.enableDegrade=false
service.disableGlobalTransaction=false
 client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
 client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
 client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
 client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=username
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.host=127.0.0.1
store.redis.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.database=0
store.redis.password=null
 store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
 server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
 transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
 metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
注意:

我这里让seata监听9000端口,默认的是8091。修改配置service.default.grouplist=127.0.0.1:9000
有多个微服务,就需要添加多少条此配置service.vgroup_mapping.微服务名称=default。例如,我有个微服务名称为service-product,则需要加条配置:service.vgroup_mapping.service-product=default。后续会在微服务的bootstrap.yml中指定事务服务分组名称为微服务名称,tx-service-group: ${spring.application.name}
初始化脚本nacos-config.sh

#!/usr/bin/env bash
 # Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

while getopts ":h:p:g:t:u:w:" opt
do
  case $opt in
  h)
    host=$OPTARG
    ;;
  p)
    port=$OPTARG
    ;;
  g)
    group=$OPTARG
    ;;
  t)
    tenant=$OPTARG
    ;;
  u)
    username=$OPTARG
    ;;
  w)
    password=$OPTARG
    ;;
  ?)
    echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
    exit 1
    ;;
  esac
done

if [[ -z ${host} ]]; then
    host=localhost
fi
if [[ -z ${port} ]]; then
    port=8848
fi
if [[ -z ${group} ]]; then
    group="SEATA_GROUP"
fi
if [[ -z ${tenant} ]]; then
    tenant=""
fi
if [[ -z ${username} ]]; then
    username=""
fi
if [[ -z ${password} ]]; then
    password=""
fi

nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"

echo "set nacosAddr=$nacosAddr"
echo "set group=$group"

failCount=0
tempLog=$(mktemp -u)
function addConfig() {
  curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$1&group=$group&content=$2&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
  if [[ -z $(cat "${tempLog}") ]]; then
    echo " Please check the cluster status. "
    exit 1
  fi
  if [[ $(cat "${tempLog}") =~ "true" ]]; then
    echo "Set $1=$2 successfully "
  else
    echo "Set $1=$2 failure "
    (( failCount++ ))
  fi
}

count=0
#for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
# 在windows环境下读取路径失败,我这里是直接写死了config.txt路径
for line in $(cat C:/Develop/seata-1.3.0/conf/config.txt | sed s/[[:space:]]//g); do
  (( count++ ))
  key=${line%%=*}
    value=${line#*=}
  addConfig "${key}" "${value}"
done

echo "========================================================================="
echo " Complete initialization parameters,  total-count:$count ,  failure-count:$failCount "
echo "========================================================================="

if [[ ${failCount} -eq 0 ]]; then
  echo " Init nacos config finished, please start seata-server. "
else
  echo " init nacos config fail. "
fi

read -n 1
注意:此脚本在windows环境下,读取config.txt文件路径失败,我这里是直接写死路径。

执行以下命令进行初始化:

# 将seata的配置写入nacos中
# 注意:这里要保证nacos已经正常运行
cd conf
nacos-config.sh 127.0.0.1
正常输出结果如图所示:



在nacos中就可以看到初始化过来的配置:
 


2.4.4 启动seata服务
cd bin
seata-server.bat -p 9000
启动完成,在nacos的服务列表可以看到seata-server的服务。



2.4.5 添加undo_log表
在我们的数据库中加入一张undo_log表,这是seata记录事务日志要用到的表。表结构如下:

-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2.4.6 微服务集成seata事务控制
 第一步:添加依赖

        <!--seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>
第二步:添加代理数据源

Seata是通过代理数据源实现事务分支的,所以需要配置io.seata.rm.datasource.DataSourceProxyBean,且是@Primary默认的数据源,否则事务不会回滚,无法实现分布式事务。

package cn.jack.config;

import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
 
@Configuration
public class DataSourceProxyConfig {
     @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
         return new DruidDataSource();
    }

    @Primary
    @Bean
    public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }
}
第三步:在微服务的resources下添加Seata的配置文件registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
     application = "serverAddr"    # 不起作用,源码中写死了:serverAddr
    serverAddr = "127.0.0.1:8848"
    group = "DEFAULT_GROUP"       # 不起作用,源码中写死了:DEFAULT_GROUP
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
}

 config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

   nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
   }
}
第四步:bootstrap.yml中添加seata配置

spring:
  application:
    name: service-product
  cloud:
    nacos:
      config:
        server-addr: 127.0.0.1:8848   # Nacos配置中心的地址
        file-extension: yaml  # 配置的格式
        shared-dataids: all-service.yaml  # 要引入的配置
        refreshable-dataids: all-service.yaml # 动态刷新时也刷新引入的配置
    alibaba:
      seata:
        tx-service-group: ${spring.application.name}    # 和服务名称相同
  profiles:
    active: test # 环境标识
第五步:下单接口服务层开启全局事务

package cn.jack.service.impl;

import cn.jack.dao.OrderDao;
import cn.jack.domain.Order;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import com.alibaba.fastjson.JSON;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class OrderServiceImpl5 {

    @Autowired
    private OrderDao orderDao;
    @Autowired
    private ProductService productService;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GlobalTransactional    // 全局事务控制
    public Order createOrder(Long pid) {
        log.info("收到下单请求,准备查询商品信息。pid={}", pid);

        // TODO 通过Feign调用商品微服务,查询商品信息
        Product product = this.productService.findByPid(pid);
        log.info("商品信息查询成功。内容为:{}", JSON.toJSONString(product));

        // 进行容错判断
        if (product.getPid() == -100) {
             Order order = new Order();
            order.setOid(-100L);
            order.setPname("下单失败");
            return order;
        }

        // TODO 生成订单信息保存
         Order order = new Order();
        order.setNumber(1);
        order.setPid(pid);
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setUid(1);
        order.setUsername("陈家宝");

            this.orderDao.save(order);
        log.info("订单信息保存成功。内容为:{}", JSON.toJSONString(order));

        // TODO 调用商品微服务,扣减库存
        this.productService.reduceInventory(pid, order.getNumber());

        // TODO 下单成功之后,mq发送订单内容消息
        rocketMQTemplate.convertAndSend("jack-topic", order);

        return order;
    }
}
 第六步:测试

再次下单测试。扣减库存失败后,商品订单数据回滚,达到了全局事务的一致性。

2.4.7 总结
Seata运行流程分析




服务之间调用的问题
 服务之间的调用使用了Feign,如果配置了容错类(fallback)或者容错工厂(fallbackFactory),则全局事务“不生效”。即商品微服务中扣减库存出现异常后,订单微服务的订单不会回滚。



 可以理解为Seata决定全局事务是否回滚的方式,就是看开启全局事务的方法(加了@GlobalTransactional注解的方法)是否抛出异常,如果该方法抛了异常,则回滚所有分支事务。而如果Feign调用商品微服务(ProductService)配置了容错类的话,虽然调用的商品微服务方法抛出了异常,但是进入了容错的方法,也就是说开启全局事务的方法(此处为下订单的方法createOrder)并没有异常,所以订单不会回滚。

2.2 商品微服务实现扣减库存接口

controller层:

package cn.jack.controller;
 
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
 import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
@Slf4j
public class ProductController {
 
    @Autowired
    private ProductService productService;
 
    @RequestMapping("/product/{pid}")
    public Product product(@PathVariable("pid") Integer pid) {
        log.info("开始查询商品信息。pid={}", pid);
 
        Product product = productService.findByPid(pid);
        log.info("商品信息查询成功:{}", JSON.toJSONString(product));
 
        return product;
    }
 
    /**
     * 扣减商品库存
     * @param pid   商品ID
     * @param number    扣除数量
     */
    @RequestMapping("/product/reduceInventory")
    public void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("number") Integer number) {
        this.productService.reduceInventory(pid, number);
    }
}

Service层:

package cn.jack.service.impl;
 
import cn.jack.dao.ProductDao;
import cn.jack.domain.Product;
 import cn.jack.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
@Service
public class ProductServiceImpl implements ProductService {
 
    @Autowired
    private ProductDao productDao;
 
    @Override
    public Product findByPid(Integer pid) {
        return this.productDao.findById(pid).get();
    }
 
    @Override
    public void reduceInventory(Integer pid, Integer number) {
        // TODO 根据商品id,查询商品信息
        Product product = this.productDao.findById(pid).get();
        // TODO 省略校验,直接扣减库存
        product.setStock(product.getStock() - number);
        this.productDao.save(product);
    }
}

2.3 暴露出分布式事务问题

2.3.1 正常测试

访问商品下单接口,生成订单记录,商品库存扣除正常。

img

2.3.2 异常测试

模拟商品库存扣除出现异常,此时订单信息保存成功,而商品库存没有扣除。出现分布式事务问题。

商品微服务service层修改如下:

package cn.jack.service.impl;
 
import cn.jack.dao.ProductDao;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
@Service
public class ProductServiceImpl implements ProductService {
 
    @Autowired
    private ProductDao productDao;
 
    @Override
    public Product findByPid(Integer pid) {
        return this.productDao.findById(pid).get();
    }
 
    @Override
    public void reduceInventory(Integer pid, Integer number) {
        // TODO 根据商品id,查询商品信息
        Product product = this.productDao.findById(pid).get();
        // TODO 模拟异常
        int i = 1 / 0;
        // TODO 省略校验,直接扣减库存
        product.setStock(product.getStock() - number);
        this.productDao.save(product);
    }
}

访问下单链接,进行测试:

img

2.4 Seata解决分布式事务问题

seata官方demo地址:http://seata.io/zh-cn/docs/user/quickstart.html

客户端集成:https://github.com/seata/seata-samples/blob/master/doc/quick-integration-with-spring-cloud.md

2.4.1 下载Seata

下载链接:https://github.com/seata/seata/releases/tag/v1.3.0,根据需要进行下载,这里我下载zip版本。

img

2.4.2 修改配置文件

registry.conf该配置用于指定 TC 的注册中心和配置文件,默认都是 file; 如果使用其他的注册中心,要求 Seata-Server 也注册到该配置中心上。将下载到的压缩包进行解压,进入到conf目录,修改配置文件:

  • registry.conf
 
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
 
  nacos {
    application = "serverAddr"    # seata-server
    serverAddr = "127.0.0.1:8848"
    group = "DEFAULT_GROUP"    # SEATA_GROUP
    namespace = ""
    cluster = "default"
    username = ""
     password = ""
  }
}
 
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"
 
  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
}
 

注意:我微服务中引入的seata依赖spring-cloud-starter-alibaba-seata版本是2.1.1.RELEASE,这里配置文件中的registry.nacos.application必须是serverAddr,group必须是DEFAULT_GROUP,因为该版本的seata客户端获取注册在nacos的seata服务时,写死的参数。所以我们需要把seata注册在nacos的默认分组下,服务名称为serverAddr。

2.4.3 初始化seata在nacos的配置

通过看seata提供的README.md得知,使用nacos作为注册、配置中心。需要将一些数据初始化到nacos上。

config.txt文件地址:https://github.com/seata/seata/blob/develop/script/config-center/config.txt

初始化脚本地址:https://github.com/seata/seata/blob/develop/script/config-center/nacos/nacos-config.sh

为了后续查看方便,贴上两个文件的内容。

config.txt

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroup_mapping.service-product=default
service.vgroup_mapping.service-order=default
service.vgroup_mapping.my_test_tx_group=default
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:9000
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=username
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.host=127.0.0.1
store.redis.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.database=0
store.redis.password=null
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

注意:

  1. 我这里让seata监听9000端口,默认的是8091。修改配置service.default.grouplist=127.0.0.1:9000
  2. 有多个微服务,就需要添加多少条此配置service.vgroup_mapping.微服务名称=default。例如,我有个微服务名称为service-product,则需要加条配置:service.vgroup_mapping.service-product=default。后续会在微服务的bootstrap.yml中指定事务服务分组名称为微服务名称,tx-service-group: ${spring.application.name}

初始化脚本nacos-config.sh

#!/usr/bin/env bash
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
 
while getopts ":h:p:g:t:u:w:" opt
do
  case $opt in
  h)
    host=$OPTARG
    ;;
  p)
    port=$OPTARG
    ;;
  g)
    group=$OPTARG
    ;;
  t)
    tenant=$OPTARG
    ;;
  u)
    username=$OPTARG
    ;;
  w)
    password=$OPTARG
    ;;
  ?)
    echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
    exit 1
    ;;
  esac
done
 
if [[ -z ${host} ]]; then
    host=localhost
fi
if [[ -z ${port} ]]; then
    port=8848
fi
if [[ -z ${group} ]]; then
    group="SEATA_GROUP"
fi
if [[ -z ${tenant} ]]; then
    tenant=""
fi
if [[ -z ${username} ]]; then
     username=""
fi
if [[ -z ${password} ]]; then
    password=""
fi
 
nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"
 
echo "set nacosAddr=$nacosAddr"
echo "set group=$group"
 
  failCount=0
tempLog=$(mktemp -u)
function addConfig() {
  curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$1&group=$group&content=$2&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
    if [[ -z $(cat "${tempLog}") ]]; then
    echo " Please check the cluster status. "
    exit 1
  fi
     if [[ $(cat "${tempLog}") =~ "true" ]]; then
    echo "Set $1=$2 successfully "
  else
    echo "Set $1=$2 failure "
       (( failCount++ ))
  fi
}
 
  count=0
#for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
# 在windows环境下读取路径失败,我这里是直接写死了config.txt路径
for line in $(cat C:/Develop/seata-1.3.0/conf/config.txt | sed s/[[:space:]]//g); do
     (( count++ ))
  key=${line%%=*}
    value=${line#*=}
  addConfig "${key}" "${value}"
   done
 
echo "========================================================================="
echo " Complete initialization parameters,  total-count:$count ,  failure-count:$failCount "
  echo "========================================================================="
 
if [[ ${failCount} -eq 0 ]]; then
  echo " Init nacos config finished, please start seata-server. "
   else
  echo " init nacos config fail. "
fi
 
   read -n 1

注意:此脚本在windows环境下,读取config.txt文件路径失败,我这里是直接写死路径。

执行以下命令进行初始化:

# 将seata的配置写入nacos中
# 注意:这里要保证nacos已经正常运行
cd conf
nacos-config.sh 127.0.0.1

正常输出结果如图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BOr0QIWK-1654484848475)(https://gitee.com/xiaoxiangyuan/mark_down_pic/raw/master/2021/images/20220516175319.png)]

在nacos中就可以看到初始化过来的配置:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K5Oj1Yg7-1654484848476)(https://gitee.com/xiaoxiangyuan/mark_down_pic/raw/master/2021/images/20220516175319.png)]

2.4.4 启动seata服务

cd bin

seata-server.bat -p 9000

启动完成,在nacos的服务列表可以看到seata-server的服务。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1EejM3iu-1654484848476)(/Users/apple/Downloads/%E6%88%AA%E5%9B%BE/iShot_2022-06-06_10.55.10.png)][外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ayI6F725-1654484848476)(/Users/apple/Downloads/%E6%88%AA%E5%9B%BE/iShot_2022-06-06_10.55.27.png)]

2.4.5 添加undo_log表

在我们的数据库中加入一张undo_log表,这是seata记录事务日志要用到的表。表结构如下:

-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

2.4.6 微服务集成seata事务控制

第一步:添加依赖

        <!--seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>

第二步:添加代理数据源

Seata是通过代理数据源实现事务分支的,所以需要配置io.seata.rm.datasource.DataSourceProxy 的Bean,且是@Primary默认的数据源,否则事务不会回滚,无法实现分布式事务。

package cn.jack.config;
 
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
 
@Configuration
public class DataSourceProxyConfig {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }
    
    @Primary
    @Bean
    public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }
}

第三步:在微服务的resources下添加Seata的配置文件registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
 
  nacos {
    application = "serverAddr"    # 不起作用,源码中写死了:serverAddr
    serverAddr = "127.0.0.1:8848"
    group = "DEFAULT_GROUP"       # 不起作用,源码中写死了:DEFAULT_GROUP
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
}
 
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"
 
  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
}

第四步:bootstrap.yml中添加seata配置

spring:
  application:
    name: service-product
  cloud:
    nacos:
      config:
        server-addr: 127.0.0.1:8848   # Nacos配置中心的地址
        file-extension: yaml  # 配置的格式
        shared-dataids: all-service.yaml  # 要引入的配置
        refreshable-dataids: all-service.yaml # 动态刷新时也刷新引入的配置
    alibaba:
      seata:
        tx-service-group: ${spring.application.name}    # 和服务名称相同
  profiles:
    active: test # 环境标识

第五步:下单接口服务层开启全局事务

package cn.jack.service.impl;
 
import cn.jack.dao.OrderDao;
import cn.jack.domain.Order;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import com.alibaba.fastjson.JSON;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
@Service
@Slf4j
public class OrderServiceImpl5 {
 
    @Autowired
    private OrderDao orderDao;
    @Autowired
    private ProductService productService;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
 
    @GlobalTransactional    // 全局事务控制
    public Order createOrder(Long pid) {
        log.info("收到下单请求,准备查询商品信息。pid={}", pid);
 
        // TODO 通过Feign调用商品微服务,查询商品信息
        Product product = this.productService.findByPid(pid);
        log.info("商品信息查询成功。内容为:{}", JSON.toJSONString(product));
 
        // 进行容错判断
        if (product.getPid() == -100) {
            Order order = new Order();
            order.setOid(-100L);
            order.setPname("下单失败");
            return order;
        }
 
        // TODO 生成订单信息保存
        Order order = new Order();
        order.setNumber(1);
        order.setPid(pid);
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setUid(1);
        order.setUsername("陈家宝");
 
        this.orderDao.save(order);
        log.info("订单信息保存成功。内容为:{}", JSON.toJSONString(order));
 
        // TODO 调用商品微服务,扣减库存
        this.productService.reduceInventory(pid, order.getNumber());
 
        // TODO 下单成功之后,mq发送订单内容消息
        rocketMQTemplate.convertAndSend("jack-topic", order);
 
        return order;
    }
}

第六步:测试

再次下单测试。扣减库存失败后,商品订单数据回滚,达到了全局事务的一致性。

2.4.7 总结

  • Seata运行流程分析
    在这里插入图片描述
要点说明:
1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连接代理的
目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有
undo_log。
2、在第一阶段undo_Jog中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就己经将分
支事务提交,也就释放了锁资源。
3、TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支事务,每个
分支事务将自己的Branch ID分支事务ID与XID关联。
4、第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事务,这里各
各参与者只需要删除undo._log即可,并且可以异步执行,第二阶段很快可以完成。
5、第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过XID 和 Branch ID 找到相应的回滚日
志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失败则会重试回滚操
作。
  • 服务之间调用的问题

服务之间的调用使用了Feign,如果配置了容错类(fallback)或者容错工厂(fallbackFactory),则全局事务“不生效”。即商品微服务中扣减库存出现异常后,订单微服务的订单不会回滚。

可以理解为Seata决定全局事务是否回滚的方式,就是看开启全局事务的方法(加了@GlobalTransactional注解的方法)是否抛出异常,如果该方法抛了异常,则回滚所有分支事务。而如果Feign调用商品微服务(ProductService)配置了容错类的话,虽然调用的商品微服务方法抛出了异常,但是进入了容错的方法,也就是说开启全局事务的方法(此处为下订单的方法createOrder)并没有异常,所以订单不会回滚。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/118695.html

(0)
seven_的头像seven_bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!