TransactionalEventListener 实现事务消息【SpringBoot + RabbitMQ + MybatisPlus】

导读:本篇文章讲解 TransactionalEventListener 实现事务消息【SpringBoot + RabbitMQ + MybatisPlus】,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1 背景

当时有个业务关系到多表操作,并且发送多条MQ。发现事务回滚,消息却发出去了。

2 TransactionalEventListener

可以监听事务的状态来执行某些操作。

3 实战

在这里插入图片描述

3.1 环境

1 mysql
2 rabbitmq
3 spring boot

3.2 数据库

在这里插入图片描述

CREATE TABLE `t_department` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `department_name` varchar(50) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

3.3 pom

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>


    <properties>
        <java.version>1.8</java.version>
    </properties>




    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.71</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>


        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

    </dependencies>




    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.0.RELEASE</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

3.4 application.yml

server:
  port: 9999


spring:
  datasource:
    url: jdbc:mysql://192.168.110.35:3306/test_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
    username: root
    password: 123456
    driver-class-name: com.mysql.jdbc.Driver
  rabbitmq:
    host: rabbit.stream.greycdn.com
    port: 5672
    username: admin
    password: Nosmoking3311
    listener:
      simple:
        acknowledge-mode: manual




mybatis-plus:
  #默认值classpath*:/mapper/**/*.xml
  mapper-locations: classpath:/mapper/*.xml
  #配置全局自增ID
  global-config:
    db-config:
      id-type: auto
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

3.5 department 业务代码

@Data
@TableName("t_department")
public class DepartmentEntity  {

	/**
	 * 
	 */
	@TableId
	private Integer id;
	/**
	 * 
	 */
	private String departmentName;

}


@Mapper
public interface DepartmentMapper extends BaseMapper<DepartmentEntity> {
	
}


3.6 集成MQ

public final class RabbitMqConstant {

    private RabbitMqConstant() {

    }


    public static final String TRANSACTION_EXCHANGE = "transaction_exchange";

    public static final String TRANSACTION_QUEUE = "transaction_queue";

    public static final String TRANSACTION_KEY = "transaction_key";


}


@Configuration
public class RabbitMqConfig {

    @Bean
    public DirectExchange transactionExchange() {
        return new DirectExchange(RabbitMqConstant.TRANSACTION_EXCHANGE);
    }

    @Bean
    public Queue transactionQueue() {
        return new Queue(RabbitMqConstant.TRANSACTION_QUEUE);
    }

    @Bean
    public Binding bindingLiveLog(@Qualifier("transactionQueue") Queue transactionQueue,
                                  @Qualifier("transactionExchange") DirectExchange transactionExchange) {
        return BindingBuilder.bind(transactionQueue).to(transactionExchange).with(RabbitMqConstant.TRANSACTION_KEY);
    }


}

3.7 RabbitMqTransactionEvent【 事务事件】

@AllArgsConstructor
@NoArgsConstructor
@Data
public class RabbitmqMessageDto {


    private String exchange;

    private String routeKey;

    private Object data;

}




public class RabbitMqTransactionEvent extends ApplicationEvent {


    private final RabbitmqMessageDto rabbitmqMessageDto;


    /**
     * Create a new {@code ApplicationEvent}.
     * @param source the object on which the event initially occurred or with
     * which the event is associated (never {@code null})
     * @param rabbitmqMessageDto 消息体
     */
    public RabbitMqTransactionEvent(Object source, RabbitmqMessageDto rabbitmqMessageDto) {
        super(source);
        this.rabbitmqMessageDto = rabbitmqMessageDto;
    }


    public RabbitmqMessageDto getRabbitmqMessageDto() {
        return rabbitmqMessageDto;
    }
}

3.8 RabbitMqTransactionListener 【事务监听器】

@Configuration
@Slf4j
public class RabbitMqTransactionListener {


    @Autowired
    private RabbitTemplate rabbitTemplate;


    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void sendMqEvent(RabbitMqTransactionEvent rabbitMqTransactionEvent) {
        RabbitmqMessageDto rabbitmqMessageDto = rabbitMqTransactionEvent.getRabbitmqMessageDto();
        log.info("RabbitMqTransactionListener 发送消息:{}", JSON.toJSON(rabbitmqMessageDto.getData()));
        rabbitTemplate.convertAndSend(rabbitmqMessageDto.getExchange(), rabbitmqMessageDto.getRouteKey(), JSON.toJSON(rabbitmqMessageDto.getData()));
    }

}

3.9 DepartmentService

@Service("departmentService")
@Slf4j
public class DepartmentService extends ServiceImpl<DepartmentMapper, DepartmentEntity> {


    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;


    /**
     * @Description: 创建部门发送消息,创建员工发送消息
     * @Author:rosh
     * @Date: 2021-12-03
     * @param departmentName 部门名称
     */
    @Transactional
    public void createDepartment(String departmentName) {

        //创建部门,校验略过
        DepartmentEntity entity = new DepartmentEntity();
        entity.setDepartmentName(departmentName);
        baseMapper.insert(entity);
        //发送消息
        applicationEventPublisher.publishEvent(new RabbitMqTransactionEvent("source",
                new RabbitmqMessageDto(RabbitMqConstant.TRANSACTION_EXCHANGE, RabbitMqConstant.TRANSACTION_KEY, entity)));
        log.info("创建部门成功");

    }


}

3.10 测试

测试正常:

在这里插入图片描述
测试回滚:
在这里插入图片描述

发现事务回滚后,没有打印发送消息日志,查看数据库也没有数据。
在这里插入图片描述

3.11 码云地址

https://gitee.com/zhurongsheng/transaction-message

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15040.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!