一、请求合并功能介绍
1、常见的使用场景:
在我们平时业务中,经常会遇到一些情况,请求频率很高,需要频繁请求第三方接口,或者需要频繁操作数据库。
比如,如下几个例子:
(1)电商系统,秒杀场景 ,需要频繁的去数据库修改库存。
(2)业务场景,当前接口需要频繁的调用三方接口,当三方接口有反爬虫,或者有固定时间请求次数限制的话,就会导致请求报错或者超时。
(3)消息队列:在处理消息队列时,可以将多个消息合并到一个批次中,然后由一个线程来处理这个批次中的所有消息。这样可以减少消息处理的延迟和并发控制开销,提高消息处理的效率和可靠性。
(4)分布式系统:在分布式系统中,通常需要将多个请求发送到不同的服务器节点进行处理。使用请求合并功能可以将多个请求合并到一个批次中,然后由一个线程将这个批次中的所有请求发送到不同的服务器节点进行处理。这样可以减少网络传输次数和并发控制开销,提高分布式系统的处理效率和响应速度。
2、请求合并的要点:
将一段短暂时间内的请求,先进行阻塞,进行合并之后,一次性去处理,然后在拆分结果,最后唤醒被阻塞的请求。
3、请求合并的前提:
(1)如果是数据库操作,如果是插入、修改、删除,需要支持批量操作的sql语句,并且如果修改失败了,支持回滚;如果是查询,需要支持结果和请求的拆分,也就是要能够将查询结果进行拆分,可以将结果分配给每个请求。
(2)如果是请求第三方接口,三方接口要支持批量操作,同时请求和响应也需要有能够标识区分的字段,以便可以将结果进行拆分。
二、项目实际使用场景描述
项目中商品表访问很频繁,尤其是商品销售、采购入库、商品出库等场景,需要不断的查询单个商品详情。假设有三个人浏览商品详情,要查询商品的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?
我们可以在服务器端把请求合并,只发出一条SQL查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求ID,把数据分组,返回给对应用户。
思考这种场景的问题及解决方案:
1、如何存放一段时间内的请求?这里我们可以用队列。
2、如何每隔一段时间执行任务?用定时任务线程池。
3、每个请求都是单独的线程,如何保证各个请求能得到自己的查询结果?这里我们使用callable返回查询结果,在没有查到结果前阻塞线程。
二、使用 CompletableFuture 实现请求合并功能
1、首先我们定义一个用户请求类Request:
public class RequestTest {
//请求条件
private Long key;
//异步编程类
private CompletableFuture<String> future;
public CompletableFuture<String> getFuture() {
return future;
}
public void setFuture(CompletableFuture<String> future) {
this.future = future;
}
public Long getKey() {
return key;
}
public void setKey(Long key) {
this.key = key;
}
}
2、接着是请求合并的主要代码:
@RequestMapping("/request")
@RestController
public class RequestController {
//存放请求的队列
LinkedBlockingDeque<RequestTest> queue = new LinkedBlockingDeque<>();
@Autowired
private ProductInfoService productInfoService;
//初始化方法
@PostConstruct
public void init() {
//定时执行的线程池,每隔5毫秒执行一次(间隔时间可以由业务决定),把所有堆积的请求
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(() -> {
//在这里具体执行批量查询逻辑
int size = queue.size();
if (size == 0) {
//若没有请求堆积,直接返回,等10毫秒再执行一次
return;
}
//若有请求堆积把所有请求都拿出来
List<RequestTest> requestTests = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
//把请求拿出来
RequestTest poll = queue.poll();
requestTests.add(poll);
}
//至此请求已经被合并了
System.out.print("##############################################n");
System.out.printf("请求合并了" + requestTests.size() + "条!n");
//组装批量查询条件
List<Long> idList = Lists.newArrayListWithCapacity(requestTests.size());
for (RequestTest requestTest : requestTests) {
idList.add(requestTest.getKey());
}
//进行批量查询
List<ProductInfo> productInfoList = productInfoService.listByIds(idList);
if (CollectionUtils.isEmpty(productInfoList)) {
return;
}
//把批查结果放入一个map
Map<Long, String> map = Maps.newHashMapWithExpectedSize(productInfoList.size());
for (ProductInfo productInfo : productInfoList) {
map.put(productInfo.getId(), productInfo.getProductName());
}
for (RequestTest requestTest : requestTests) {
//把放在map中的结果集放回给对应的线程
//future是对应每个请求的,因为是每个请求线程都传了自己的future是对应的过来
requestTest.getFuture().complete(map.get(requestTest.getKey()));
}
}, 0, 5, TimeUnit.MILLISECONDS);
}
//请求合并
@SneakyThrows
@GetMapping("/requestMerge/getProductInfo")
public String getProductInfoMerge(Long key) throws InterruptedException, ExecutionException {
long currentMillis = System.currentTimeMillis();
//CompletableFuture可以使一个线程执行操作后,主动返回值给另一个线程
CompletableFuture<String> future = new CompletableFuture<>();
RequestTest requestTest = new RequestTest();
//把future(把future可以认为是线程间的"传话人")放到等待队列中去,让定时调度的线程池执行并返回值
requestTest.setFuture(future);
requestTest.setKey(key);
//把requestTest加入等待队列(LinkedBlockingDeque)
queue.add(requestTest);
//future(传话人)阻塞2s没获取到值会抛异常
String productName = future.get(2,TimeUnit.SECONDS);
System.out.printf("商品名为:" + productName + "---线程名为:" + Thread.currentThread().getName() +
"---执行时间为:" + (System.currentTimeMillis() - currentMillis) + "n");
return productName;
}
}
3、测试:
我们在ApiPost上访问接口地址,进行压测,结果如下:
控制台打印日志如下:可以看到对请求进行了合并,并把查询数据进行了分发。分配给了指定的请求,得到了正确的响应结果。至此,我们就完成了利用请求合并发挥高并发下批量处理的优势。
当然,上述代码并不完善。而且请求合并也有一些弊端,比如:
1.如果定时线程池的间隔时间比较长,反而会造成请求堆积时间太长,用户不能快速得到响应。
2.同时在请求数量比较小时,请求合并的场景也是没有必要的。
三、Hystrix 请求合并的功能
1、介绍
要实现请求合并,可以使用【@HystrixCollapser】注解和【@HystrixCommand】注解。@HystrixCollapser 注解:指定某个方法需要开启请求合并功能。@HystrixCommand 注解:标记某个方法作为 Hystrix 的命令。
@HystrixCollapser:该注解的作用是标识当前的方法是一个的合并请求的方法,并且此方法内的逻辑是不会被执行的。
-
batchMethod:请求合并完毕的后触发的方法 -
scope:请求合并的模式 -
collapserProperties:请求合并的设置 -
timerDelayInMilliseconds:请求合并的等待的时间 -
maxRequestsInBatch:指定时间内对请求合并的请求的最大数 -
@retun: Future:注意请求的合并的方法的返回值必须为Future
1、引入pom文件
<!-- Hystrix -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
<version>2.2.10.RELEASE</version>
</dependency>
2、修改启动类代码
在启动类上面加上注解@EnableHystrix
@EnableHystrix
public class OrgApp {
public static void main(String[] args) {
}
}
3、详细代码如下
@Slf4j
@Service
public class BatchService {
@Autowired
private ProductInfoService productInfoService;
@HystrixCollapser(batchMethod = "getProductInfoBatch", scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
//请求时间间隔在 20ms 之内的请求会被合并为一个请求,默认为 10ms
collapserProperties = {
@HystrixProperty(name = "timerDelayInMilliseconds", value = "20"),
//设置触发批处理执行之前,在批处理中允许的最大请求数。
@HystrixProperty(name = "maxRequestsInBatch", value = "200")
})
public Future<ProductInfo> getProductInfoById(Long id) {
return null;
}
/**
* 1.@HystrixCommand:表示当前的方法开启熔断
2.请求合并完毕后触发的方法,要和batchMethod 内的名字一致
3.在请求合并完毕后会将合并的参数的使用list集合的方式进行传递
*
* @return
*/
@HystrixCommand
public List<ProductInfo> getProductInfoBatch(List<Long> ids) {
log.info("批处理,[{}]", ids);
return productInfoService.listByIds(ids);
}
}
编写测试的ProductController
/**
* 请求合并接口
*
* @param id 主键id
* @return ProductInfo
*/
@SneakyThrows
@GetMapping("/requestMergeHystrix/getProductInfoById")
public ProductInfo getProductInfoById(Long id) {
Future<ProductInfo> byId = batchService.getProductInfoById(id);
return byId.get();
}
同样,我们用线程池结合 CountDownLatch 模拟 20 个并发请求接口。
@Autowired
private RestTemplate restTemplate;
/**
* 查询数据
*/
@GetMapping("/requestProductInfo")
public void requestProductInfo() {
//模拟20个并发请求
ExecutorService executorService = Executors.newFixedThreadPool(20);
CountDownLatch countDownLatch = new CountDownLatch(20);
for (int i = 1; i <= 20; i++) {
int finalI = i;
executorService.execute(() -> {
try {
countDownLatch.await();
ProductInfo forObject = restTemplate.getForObject("http://127.0.0.1:3456/request/requestMergeHystrix/getProductInfoById?id=" + finalI, ProductInfo.class);
System.out.println("finalI" + finalI + forObject);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
countDownLatch.countDown();
}
}
测试结果如下:
我们可以看到接受到了20个请求只执行了一条sql语句。
四、总结:
请求合并,批量的办法能大幅节省被调用系统的连接资源,本例是以数据库为例,其他RPC调用也是类似的道理。缺点就是请求的时间在执行实际的逻辑之前增加了等待时间,不适合低并发的场景。本文讲解了使用java8提供的 CompletableFuture 和Hystrix两种方式如何实现了请求合并功能。
1.CompletableFuture 使用了CompletableFuture.get(2,TimeUnit.SECONDS)方法阻塞当前线程,直到异步计算完成或者超时,然后返回计算结果。我们使用CompletableFuture.complete()标记异步计算已完成,并且其结果将被设置为其参数的值。
2.Hystrix的请求合并就是把重复的请求批量的用一个HystrixCommand命令去执行,以减少通信消耗和线程数的占用。Hystrix的请求合并用到了HystrixCollapser这个抽象类,它在HystrixCommand之前前放置一个合并处理器,将处于一个很短的时间窗(默认10ms)内对同一依赖服务的多个请求进行整合并以批量方式发起请求的功能。
原文始发于微信公众号(明月予我):面试官问:说一说高并发场景下的接口请求合并方案?|
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/272788.html