应对大模型API限流:高效策略助你轻松突破QPS限制

分享技术,用心生活


背景

国内的公有大模型想要调用,基本都会有QPS的限制。那么如何实现我们在请求时候,有针对性的进行限流?被限流的请求又如何知道它的状态?是在队列中排队还是已经被消费在生成内容?本文将通过Redis的lua、zset+guava的RateLimiter来实现。

目的

  • 针对大模型的QPS限制,在用户发起请求时,在业务侧可以满足QPS的要求

  • 在超出QPS时,剩余请求进入队列排队,且方便页面展示请求状态(排队中或正在生成中)

实现原理

  • 当请求大模型API时
  1. 请求进入队列,并在redis中记录一个k-v;key为业务唯一id,value为当前队列的请求排队数。
  2. 新增当前业务请求的计数器zset:业务唯一id作为zset的value和初始化score=0(score业务上的含义为:从请求的时间点起,已经消费请求的个数。)
  • 用户侧展示请求状态时
  1. 获取第一步骤中的k-v值,记为rateLimitPointCount
  2. 获取第二步骤中的score值,记为rateLimitReduceCount
  3. 比较rateLimitPointCountrateLimitReduceCount大小,即可判断出请求此时处于已消费还是排队中状态
  • 通过redis的zset来存储每个业务请求排队点

  • 通过redis的incr实现消费自增

  • 通过guava的RateLimit实现令牌桶中令牌的管理

实现过程

定义大模型限流工具类

@Component
publicclass LLMRateLimiter {
    // 最大QPS
    privatestaticfinalint MAX_QPS = 5;
    // 请求队列最大长度
    privatestaticfinalint MAX_QUEUE_SIZE = 1000;
    privatestaticfinal RateLimiter rateLimiter = RateLimiter.create(MAX_QPS);
    privatestaticfinal ExecutorService executorService = Executors.newFixedThreadPool(2);
    publicstatic LinkedBlockingQueue<Runnable> requestQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);

设置qps=5,并初始化guava的RateLimitRateLimiter.create(5)表示创建一个令牌容量为5的令牌桶,且每秒新增5个令牌;初始化线程池用于消费;初始化请求队列,设置最大长度。

请求放入队列(LLMRateLimiter工具类)

public static Boolean addRequest(LLMChatParam llmChatParam, Consumer<LLMChatParam> doChat) throws InterruptedException {
    // 队列已满,新请求丢弃
    if (requestQueue.size() >= MAX_QUEUE_SIZE) {
        returnfalse;
    }
    // 请求放入队列
    requestQueue.put(() -> doChat.accept(llmChatParam));
    // 初始化zset中元素-业务唯一id
    StringRedisUtils.add("rate_counter", llmChatParam.getRateLimitReduceKey(), 0);
    // 消费队列
    processTasks();
    returntrue;
}

标记当前队列大小(Redis工具类)

public static void rateLimit(String key) {
    int size = LLMRateLimiter.requestQueue.size();
    // 大模型队列大小标记点key
    String rateLimitKey = SystemConstants.RedisKeyEnum.RATE_LIMIT_POINT.getKey(key);
    // 记录当前大模型队列大小
    set(rateLimitKey, Integer.toString(size));
}

以上为在请求入口处,使用Redis工具类使用rateLimit标记当前队列大小 同时使用LLMRateLimiter.addRequest将请求放入队列。

消费(LLMRateLimiter工具类)

private static void processTasks() {
    executorService.submit(() -> {
boolean acquire = rateLimiter.tryAcquire(51, TimeUnit.SECONDS);
if (acquire) {
    Runnable task = requestQueue.poll();
    if (task != null) {
        task.run();
    }
    // 队列消费后,所有计数器加1
    StringRedisUtils.incrementAllScoresByLua("rate_counter");
}
    });
}

通过自定义的线程池来异步消费请求,guava的rateLimiter.tryAcquire在1s内获取5个令牌。

更新所有计数器(Redis工具类)

所谓的计数器,即redis中维护的zset,1个请求=1个计数器,更新计数器即更新zset中元素的score值。有3种实现方式

  1. 循环zset实现:性能较差,不建议使用
  2. Lua脚本实现:能够保证原子性,且效率最高
  3. redis的pipeline实现:减少命令次数,效率较高

第一种我们直接弃用,不采纳。那么可能有读者就疑问了,既然lua脚本效果最好,为什么不直接用呢,还要用pipeline。如果你使用阿里云Redis的话,那么这个坑我先帮你踩了。 首先笔者第一方案确实是用lua实现的,且在测试环境(自建redis服务)执行也是ok的;但是,上线后大量用户反馈所有的请求都是排队中状态,经过排查发现,用户的请求其实已经处理完毕,大模型也返回了响应的内容。继续排查发现所有的计数器score值都为0,也就是初始化后一直没有成功更新。查看日志后,发现lua脚本在生产环境执行失败了。 提工单给阿里云,反馈如下

应对大模型API限流:高效策略助你轻松突破QPS限制
工单

查看官方文档说明

应对大模型API限流:高效策略助你轻松突破QPS限制最终真相大白!既然有限制,又没有时间去研究文档中如何正确使用lua;所以最快的修复方式就是采用另一种方式去实现。下面把2种实现方式都展示出来,方便大家自取。

  • lua脚本实现
public static void incrementAllScoresByLua(String key) {
    RedisScript<Long> script = new DefaultRedisScript<>(
            "local key = KEYS[1]n" +
                    "n" +
                    "local elements = redis.call('ZRANGE', key, 0, -1, 'WITHSCORES')n" +
                    "for i = 1, #elements,2 don" +
                    "    local member = elements[i]n" +
                    "    redis.call('ZINCRBY', key, 1, member)n" +
                    "endn");

    STRING_REDIS_TEMPLATE.execute(script, Collections.singletonList(key));

}
  • redis的pipeline实现
    public static void incrementAllScoresByLua(String key) {
        Set<String> members = STRING_REDIS_TEMPLATE.opsForZSet().range(key, 0, -1);
        STRING_REDIS_TEMPLATE.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) throws DataAccessException {
                ZSetOperations zSetOperations = operations.opsForZSet();
                for (String member : members) {
                    zSetOperations.incrementScore(key, member, 1.0);
                }
                return null;
            }
        });
    }

两种方式原理都是通过获取zset中所有元素->循环,incr命令计数器加1。

请求状态返回(Redis工具类)

public static Map<String, Object> rateLimitStatus(String key) {
    Map<String, Object> map = new HashMap<>(2);
    String rateLimitPointKey = SystemConstants.RedisKeyEnum.RATE_LIMIT_POINT.getKey(key);
    String rateLimitNeedReduceCountKey = SystemConstants.RedisKeyEnum.RATE_LIMIT_NEED_REDUCE_COUNT.getKey(key);
    Double rateLimitReduceCount = score("rate_counter", rateLimitNeedReduceCountKey) == null ? 0d : score("rate_counter", rateLimitNeedReduceCountKey);
    Double rateLimitPointCount = StringUtils.isEmpty(get(rateLimitPointKey)) ? 0d : Double.parseDouble(get(rateLimitPointKey));
    // 代表此次生成之前的队列已经消费完毕,开始消费本次生成的队列
    if (Double.compare(rateLimitReduceCount, rateLimitPointCount) >= 0) {
        map.put("rateLimit", Constants.RateLimitEnum.END.getType());
        // 删除计数器
        StringRedisUtils.removeZset("rate_counter", rateLimitNeedReduceCountKey);
        // 删除此次记录点
        StringRedisUtils.delete(rateLimitPointKey);
    } else {
        map.put("rateLimit", Constants.RateLimitEnum.ING.getType());
    }
    return map;
}

在页面中需要呈现状态时,使用此方法。通过比较标记点队列大小和计数器大小,可得出请求状态。同时在消费后,删除对应的计数器和标记点。

效果展示

  • 排队中状态
应对大模型API限流:高效策略助你轻松突破QPS限制
排队中
  • 消费中状态
应对大模型API限流:高效策略助你轻松突破QPS限制
消费中

总结

本文使用了queueRateLimiter实现限流、zsetlua脚本实现请求状态计算。但同时以上也存在着对代码侵入性较大。需要在每个请求处都要添加一个标记点,在每个查询状态处都要添加查询代码。读者可以自行通过自定义注解和AOP等其他方式实现,减少对代码的入侵。如果大家有更好的方案,欢迎在留言区讨论。


原文始发于微信公众号(指南针技术):应对大模型API限流:高效策略助你轻松突破QPS限制

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/308497.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!