简介
为了防止消息重复消费导致业务处理异常,消息队列RocketMQ版的消费者在接收到消息后,有必要根据业务上的唯一Key对消息做幂等处理。本文介绍消息幂等的概念、适用场景以及处理方法。
什么是消息幂等
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费者的处理过程就是幂等的。
例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为100美元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费100美元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。
适用场景
在互联网应用中,尤其在网络不稳定的情况下,消息队列RocketMQ版的消息有可能会出现重复。如果消息重复会影响您的业务处理,请对消息做幂等处理。
消息重复的场景如下
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同但Message ID不同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。
负载均衡时消息重复
包括但不限于网络抖动、Broker重启以及消费者应用重启
当消息队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到少量重复消息。
处理伪方案
因为不同的Message ID对应的消息内容可能相同,有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置。
以支付场景为例,可以将消息的Key设置为订单号,作为幂等处理的依据。具体代码示例如下:
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
消费者收到消息时可以根据消息的Key,即订单号来实现消息幂等:
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根据业务唯一标识的Key做幂等处理。
}
});
实战案例
方案一:使用Redis的set命令
/**
* @desc 第一种处理方式
* @param string $orderSn
* @return bool
* @author Tinywan(ShaoBo Wan)
*/
protected static function saveDeliverSetCommand(string $orderSn): bool
{
return self::_redis()->set($orderSn, 'true', ['nx', 'ex' => 120]);
}
说明:上述代码和前面描述的原理一致,但实际上存在问题,在高并发场景下依然会有幂等性问题,这是因为没有充分利用redis的原子性。
方案二:使用Redis原子性
使用Redis的原子性操作,比如SETNX
和EXPIRE
来实现更可靠的幂等性控制。
/**
* @desc 第二种处理方式
* @param string $orderSn
* @return bool
* @author Tinywan(ShaoBo Wan)
*/
protected static function saveDeliverSetnxCommand(string $orderSn): bool
{
$result = self::_redis()->setnx($orderSn, 'true');
if ($result) {
self::_redis()->expire($orderSn, 120);
}
return $result;
}
使用
SETNX
命令尝试将业务唯一标识保存到Redis中,如果返回1
表示设置成功,说明是第一次提交;否则返回0
,表示重复提交。
方案三:使用Redis + Lua脚本
伪代码
/**
* @desc 第三种处理方式
* @param string $orderSn
* @return bool
* @author Tinywan(ShaoBo Wan)
*/
protected static function saveDeliverLuaScript(string $orderSn): bool
{
// TODO 使用Lua脚本执行原子性操作
$script = <<<tinywan
if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[2])
return true
else
return false
end
tinywan;
// TODO 将业务唯一标识保存到Redis中,并设置过期时间(120秒)
return (bool)self::_redis()->eval($script, [$orderSn, 'true', 120], 1);
}
Redis中使用Lua的好处
-
减少网络开销。可以将多个请求通过脚本的形式一次发送,减少网络时延 -
原子操作。redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。因此在编写脚本的过程中无需担心会出现竞态条件,无需使用事务。 -
复用。客户端发送的脚步会永久存在redis中,这样,其他客户端可以复用这一脚本而不需要使用代码完成相同的逻辑。
Redis Lua脚本与事务
从定义上来说, Redis 中的脚本本身就是一种事务, 所以任何在事务里可以完成的事, 在脚本里面也能完成。 并且一般来说, 使用脚本要来得更简单,并且速度更快。
Lua 脚本命令参数
首先定义了一个字符串变量
$script
,用于存储Lua脚本的内容。
if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then:
使用 Redis 的 SETNX
命令,在键 KEYS[1]
中设置值为 ARGV[1]
(ARGV
是一个参数数组)。如果 SETNX
返回值为 1(表示设置成功)
,则执行以下代码块。
redis.call('EXPIRE', KEYS[1], ARGV[2]):
使用 Redis 的 EXPIRE
命令,在键 KEYS[1]
设置过期时间为 ARGV[2]
秒。
return true
:返回布尔值 true
给调用方,表示设置和过期时间设置都成功。
else
:如果 SETNX
返回值不为 1,则执行以下代码块。
return false
:返回布尔值 false
给调用方,表示设置失败。
使用 evalSha
命令方法替换 eval
命令方法
/**
* @desc 第三种方式 Plus
* @param string $orderSn
* @return bool
* @author Tinywan(ShaoBo Wan)
*/
protected static function saveDeliverLuaScriptPlus(string $orderSn): bool
{
$redis = self::_redis();
$scriptShaKey = 'REDIS:SCRIPT:SHA';
$scriptSha = $redis->get($scriptShaKey);
if (!$scriptSha) {
// TODO 使用Lua脚本执行原子性操作
// TODO 使用SETNX命令尝试将$orderSn保存到Redis中,如果返回1表示设置成功,说明是第一次提交;否则返回0,表示重复提交
$script = <<<tinywan
if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[2])
return true
else
return false
end
tinywan;
$scriptSha = $redis->script('load', $script);
$redis->set($scriptShaKey, $scriptSha);
}
// TODO 将orderSn保存到Redis中,并设置过期时间(120秒)
return (bool)self::_redis()->evalSha($scriptSha, [$orderSn, 'true', 12000], 1);
}
evalSha
和eval
两者的差异
-
eval直接执行上传的脚本不从缓存拿 -
evalsha直接从缓存中取sha中脚本信息执行
生产环境中,推荐使用EVALSHA,相较于EVAL的每次发送脚本主体、浪费带宽,会更高效。
核心代码如下
<?php
/**
* @desc MQDeliver
* @author Tinywan(ShaoBo Wan)
* @date 2023/9/17 13:02
*/
declare(strict_types=1);
class MQDeliver
{
/**
* @desc: 获取Redis实例
* @return Redis
* @author Tinywan(ShaoBo Wan)
*/
protected static function _redis()
{
return server_redis();
}
/**
* @desc 第一种处理方式
* @param string $orderSn
* @return bool
* @author Tinywan(ShaoBo Wan)
*/
protected static function saveDeliverSetCommand(string $orderSn): bool
{
return self::_redis()->set($orderSn, 'true', ['nx', 'ex' => 120]);
}
/**
* @desc 第二种处理方式
* @param string $orderSn
* @return bool
* @author Tinywan(ShaoBo Wan)
*/
protected static function saveDeliverSetnxCommand(string $orderSn): bool
{
$result = self::_redis()->setnx($orderSn, 'true');
if ($result) {
self::_redis()->expire($orderSn, 120);
}
return $result;
}
/**
* @desc 第三种处理方式
* @param string $orderSn
* @return bool
* @author Tinywan(ShaoBo Wan)
*/
protected static function saveDeliverLuaScript(string $orderSn): bool
{
// TODO 使用Lua脚本执行原子性操作
$script = <<<tinywan
if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[2])
return true
else
return false
end
tinywan;
// TODO 将业务唯一标识保存到Redis中,并设置过期时间(120秒)
return (bool)self::_redis()->eval($script, [$orderSn, 'true', 12000], 1);
}
/**
* @desc 第三种方式 Plus
* @param string $orderSn
* @return bool
* @author Tinywan(ShaoBo Wan)
*/
protected static function saveDeliverLuaScriptPlus(string $orderSn): bool
{
$redis = self::_redis();
$scriptShaKey = 'REDIS:SCRIPT:SHA';
$scriptSha = $redis->get($scriptShaKey);
if (!$scriptSha) {
// TODO 使用Lua脚本执行原子性操作
// TODO 使用SETNX命令尝试将$orderSn保存到Redis中,如果返回1表示设置成功,说明是第一次提交;否则返回0,表示重复提交
$script = <<<tinywan
if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[2])
return true
else
return false
end
tinywan;
$scriptSha = $redis->script('load', $script);
$redis->set($scriptShaKey, $scriptSha);
}
// TODO 将orderSn保存到Redis中,并设置过期时间(120秒)
return (bool)self::_redis()->evalSha($scriptSha, [$orderSn, 'true', 12000], 1);
}
/**
* @desc: 全局消息ID投递
* @param string $messageId
* @return bool
* @author Tinywan(ShaoBo Wan)
*/
public static function messageIdPost(string $messageId): bool
{
return self::_redis()->setnx($messageId, $messageId);
}
/**
* @desc: 业务处理
* @param string $orderSn
* @return string
* @author Tinywan(ShaoBo Wan)
*/
public static function businessHandle(string $orderSn): string
{
// 检查是否重复投递消息
if (self::isRepeatedDeliver($orderSn)) {
return '消息幂等投注失败';
}
// TODO 投递消息ID(全局唯一),业务唯一标识作为幂等处理的关键依据,如订单号、交易号、流水号等
// $execResult = self::saveDeliverSetCommand($orderSn);
// $execResult = self::saveDeliverSetnxCommand($orderSn);
// $execResult = self::saveDeliverLuaScript($orderSn);
$execResult = self::saveDeliverLuaScriptPlus($orderSn);
if (false === $execResult) {
// TODO 重复投递消息,直接返回
return '消息幂等投注失败';
}
try {
// TODO 根据业务唯一标识的Key做幂等处理
sleep(1);
} catch (Exception $e) {
return '处理失败 ' . $e->getMessage();
} finally {
// TODO 使用完毕后删除
self::deleteMessageId($orderSn);
}
return '处理完成';
}
/**
* @desc 是否重复投递消息
* @param string $messageId
* @return bool
* @author Tinywan(ShaoBo Wan)
*/
private static function isRepeatedDeliver(string $messageId): bool
{
return (bool)self::_redis()->exists($messageId);
}
/**
* @desc: 删除消息
* @param string $messageId
* @return int
* @author Tinywan(ShaoBo Wan)
*/
public static function deleteMessageId(string $messageId): int
{
return self::_redis()->del($messageId);
}
}
上述业务代码中删除业务全局唯一$messageId
的操作在finally
块中执行,无论是否重复消费处理逻辑成功与否都会确保删除业务全局唯一$messageId
。
redis lua 脚本相关命令
这一小节的内容是基本命令,可粗略阅读后跳过,等使用的时候再回来查询
redis 自 2.6.0 加入了 lua 脚本相关的命令,EVAL
、EVALSHA
、SCRIPT EXISTS
、SCRIPT FLUSH
、SCRIPT KILL
、SCRIPT LOAD
,自 3.2.0 加入了 lua 脚本的调试功能和命令SCRIPT DEBUG
。这里对命令做下简单的介绍。
-
EVAL
执行一段lua脚本,每次都需要将完整的lua脚本传递给redis服务器。 -
SCRIPT LOAD
将一段lua脚本缓存到redis中并返回一个tag串,并不会执行。 -
EVALSHA
执行一个脚本,不过传入参数是「2」中返回的tag,节省网络带宽。 -
SCRIPT EXISTS
判断「2」返回的tag串是否存在服务器中。 -
SCRIPT FLUSH
清除服务器上的所有缓存的脚本。 -
SCRIPT KILL
杀死正在运行的脚本。 -
SCRIPT DEBUG
设置调试模式,可设置同步、异步、关闭,同步会阻塞所有请求。
生产环境中,推荐使用EVALSHA
,相较于EVAL
的每次发送脚本主体、浪费带宽,会更高效。这里要注意SCRIPT KILL
,杀死正在运行脚本的时候,如果脚本执行过写操作了,这里会杀死失败,因为这违反了 redis lua 脚本的原子性。调试尽量放在测试环境完成之后再发布到生产环境,在生产环境调试千万不要使用同步模式,原因下文会详细讨论。
Redis 中 lua 脚本的书写和调试
redis lua 脚本是对其现有命令的扩充,单个命令不能完成、需要多个命令,但又要保证原子性的动作可以用脚本来实现。脚本中的逻辑一般比较简单,不要加入太复杂的东西,因为 redis 是单线程的,当脚本执行的时候,其他命令、脚本需要等待直到当前脚本执行完成。
因此,对 lua 的语法也不需完全了解,了解基本的使用就足够了,这里对 lua 语法不做过多介绍,会穿插到脚本示例里面。
原文始发于微信公众号(开源技术小栈):Redis+Lua 实现消息和接口幂等性
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/248404.html