1 背景
当时有个业务关系到多表操作,并且发送多条MQ。发现事务回滚,消息却发出去了。
2 TransactionalEventListener
可以监听事务的状态来执行某些操作。
3 实战
3.1 环境
1 mysql
2 rabbitmq
3 spring boot
3.2 数据库
CREATE TABLE `t_department` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`department_name` varchar(50) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
3.3 pom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.71</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.0.RELEASE</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
3.4 application.yml
server:
port: 9999
spring:
datasource:
url: jdbc:mysql://192.168.110.35:3306/test_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
rabbitmq:
host: rabbit.stream.greycdn.com
port: 5672
username: admin
password: Nosmoking3311
listener:
simple:
acknowledge-mode: manual
mybatis-plus:
#默认值classpath*:/mapper/**/*.xml
mapper-locations: classpath:/mapper/*.xml
#配置全局自增ID
global-config:
db-config:
id-type: auto
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
3.5 department 业务代码
@Data
@TableName("t_department")
public class DepartmentEntity {
/**
*
*/
@TableId
private Integer id;
/**
*
*/
private String departmentName;
}
@Mapper
public interface DepartmentMapper extends BaseMapper<DepartmentEntity> {
}
3.6 集成MQ
public final class RabbitMqConstant {
private RabbitMqConstant() {
}
public static final String TRANSACTION_EXCHANGE = "transaction_exchange";
public static final String TRANSACTION_QUEUE = "transaction_queue";
public static final String TRANSACTION_KEY = "transaction_key";
}
@Configuration
public class RabbitMqConfig {
@Bean
public DirectExchange transactionExchange() {
return new DirectExchange(RabbitMqConstant.TRANSACTION_EXCHANGE);
}
@Bean
public Queue transactionQueue() {
return new Queue(RabbitMqConstant.TRANSACTION_QUEUE);
}
@Bean
public Binding bindingLiveLog(@Qualifier("transactionQueue") Queue transactionQueue,
@Qualifier("transactionExchange") DirectExchange transactionExchange) {
return BindingBuilder.bind(transactionQueue).to(transactionExchange).with(RabbitMqConstant.TRANSACTION_KEY);
}
}
3.7 RabbitMqTransactionEvent【 事务事件】
@AllArgsConstructor
@NoArgsConstructor
@Data
public class RabbitmqMessageDto {
private String exchange;
private String routeKey;
private Object data;
}
public class RabbitMqTransactionEvent extends ApplicationEvent {
private final RabbitmqMessageDto rabbitmqMessageDto;
/**
* Create a new {@code ApplicationEvent}.
* @param source the object on which the event initially occurred or with
* which the event is associated (never {@code null})
* @param rabbitmqMessageDto 消息体
*/
public RabbitMqTransactionEvent(Object source, RabbitmqMessageDto rabbitmqMessageDto) {
super(source);
this.rabbitmqMessageDto = rabbitmqMessageDto;
}
public RabbitmqMessageDto getRabbitmqMessageDto() {
return rabbitmqMessageDto;
}
}
3.8 RabbitMqTransactionListener 【事务监听器】
@Configuration
@Slf4j
public class RabbitMqTransactionListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void sendMqEvent(RabbitMqTransactionEvent rabbitMqTransactionEvent) {
RabbitmqMessageDto rabbitmqMessageDto = rabbitMqTransactionEvent.getRabbitmqMessageDto();
log.info("RabbitMqTransactionListener 发送消息:{}", JSON.toJSON(rabbitmqMessageDto.getData()));
rabbitTemplate.convertAndSend(rabbitmqMessageDto.getExchange(), rabbitmqMessageDto.getRouteKey(), JSON.toJSON(rabbitmqMessageDto.getData()));
}
}
3.9 DepartmentService
@Service("departmentService")
@Slf4j
public class DepartmentService extends ServiceImpl<DepartmentMapper, DepartmentEntity> {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
/**
* @Description: 创建部门发送消息,创建员工发送消息
* @Author:rosh
* @Date: 2021-12-03
* @param departmentName 部门名称
*/
@Transactional
public void createDepartment(String departmentName) {
//创建部门,校验略过
DepartmentEntity entity = new DepartmentEntity();
entity.setDepartmentName(departmentName);
baseMapper.insert(entity);
//发送消息
applicationEventPublisher.publishEvent(new RabbitMqTransactionEvent("source",
new RabbitmqMessageDto(RabbitMqConstant.TRANSACTION_EXCHANGE, RabbitMqConstant.TRANSACTION_KEY, entity)));
log.info("创建部门成功");
}
}
3.10 测试
测试正常:
测试回滚:
发现事务回滚后,没有打印发送消息日志,查看数据库也没有数据。
3.11 码云地址
https://gitee.com/zhurongsheng/transaction-message
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15040.html