在我们业务开发过程中,经常会有使用分页批量查询数据。例如:我想获取某个商品所有的订单,根据订单数据做出相应的逻辑处理。这时我们需要分批量去获取数据,处理完一批后,如果还有的那么再去获取,就这样循环执行。问题就出现在这个循环上。
1.方法论设计
那接下来从Sql查询的着手,通常情况下有两种分页查询获取数据的方式:
-
select * from table_name where id = #{id} limit #{offset}, #{pageSize} -
select * from table_name where id = #{id} and created_time> {nextPageToken}
对于第一种情况而言,我们不用关心返回数据的具体内容是什么,我们只需要知道返回数据的数量。例如:一批查询10条数据,返回结果还是10条,这时是不能判断是否还有数据,所以得再查询一下。如果查询的结果小于10条数据,可以肯定认为满足条件的数据已经全部获取,这时候得停止查询。
第二种情况和第一种情况最大的区别在于进行下一次批量查询的时候,第二种情况需要知道最后一条数据的start_time(此处仅作为案例,还可以为一些id),拿到created_time后作为nextPageToken传入,该nextPageToken可以理解成下一批数据的起点。
基于上述所述,实现业务方不感知整个循环的流程,需要做到两点:
-
定义一个Function函数作为方法入参,用于承载业务逻辑的主体; -
定义一个查询条件类作为方法入参,因为在循环体中需要对该查询条件做出修改,用于获取下一批数据; -
定义一个返回结果Payload接口,用于获取业务方返回的nextPageToken; -
定义一个顶层的接口PageQuery,主要用于做类型检查,防止业务方传参数出错;
死循环中断保护器
由于在开发者使用过程中会因为数据等一些其他的问题,造成一些死循环。举一个例子,也是真实发生过:数据迁移的时候使用脚本往数据库中插入数据时,这些数据的created_time差距很小的,如果该字段的单位精确到秒的话,那么一秒内插入的数据是不少的(至少是多于20条),这时候根据created_time来查询的话,最后一条的数据和第一条数据的created_time是一样的。这样就会导致查询的数据原地踏步,每次查询拿到的数据都是一样的,这就导致死循环的产生。为了规避死循环的风险,提出使用循环保护的方式来完成。
Negative数据记忆
虽然存在了死循环的保护,例如在订单业务中的1234test的订单号发生了死循环,当达到阈值保护的时候是可以跳出循环,防止循环的发生,但是当下一次请求还是这个1234test的订单号过来的,又得达到阈值保护才能跳出循环,这是需要对程序添加对Negative数据的记忆功能,对于发生过达到阈值的循环直接返回,因为该订单号对于整个系统而言是一种损害。
2.方法论实现
-
定义顶层接口,用于泛型校验,避免业务方出错(此处暂时理解不了后文有解释)
public interface PageQuery<T> {
int SECURITY_THRESHOLD = 1_000_000;
}
上述定义的常量是一个默认的安全阈值,在业务方没有重新设置的情况下就使用该默认值,该默认值是全局,业务方不能随便修改。
-
定义返回主体的接口,主要用于获取nextPageToken
public interface PageResultPayload<T> extends PageQuery<T>{
default T retrieveNextPageToken(){
return null;
};
}
这里使用了接口中的默认方法,如果你的版本不支持的话就直接定义成接口。该接口中待实现的方法就是为了去寻找下一页,也是上文中分页查询的第二种方式。实际上,抛开该情况不看,大多数循环获取数据时,在返回当前批次数据后,都会对查询参数做出修改,再继续获取下一批数据,那么此处的retrieveNextPageToken方法就是查询参数中需要修改的内容。当然对于上文第一种情况,我们只需要改变pageNum就可以了,比第二种容易很多。
-
定义查询参数的抽象类
@Data
public abstract class QueryParam<T> implements PageQuery<T>{
private int protectThreshold = SECURITY_THRESHOLD;
private int protectCounter;
private int pageNum = 1;
private int pageSize = 20;
private T nextPageToken;
public int getOffset() {
return (pageNum - 1) * pageSize;
}
public void increasePageNum() {
++pageNum;
}
public void increaseProtectCounter() {
++protectCounter;
}
public abstract String retrieveBizKey();
}
retrieveBizKey抽象方法是为了让业务方提供该业务下的查询唯一条件,主要用死循环时存储该key,那么接下来再来获取的时候直接返回,这么描述可能很难理解。例如:业务方需要获取某个产品下所有订单数据,这时bizKey可定义成Product_{productId} 如果在循环获取数据的过程中,发生死循环达到阈值后,就会把Product__{productId}存储,在后续访问的时候直接返回即可,不会再次循环达到安全阈值,可以理解该数据在系统中留下前科了。(我这里只是在当前机器存储,实际上可以存储到redis中,注意线程安全)
protectThreshold是用于发生死循环时,有效帮助跳出死循环的安全阈值,业务方可自己定义;
protectCounter保护累加器,是用于和protectThreshold做比较的,该字段在一次循环后会累加一次;
pageSize默认从第一页开始,pageNum默认是20,业务方可根据需要自行修改;
nextPageToken是在没循环一次,如果需要进行下一次循环时,需要修改该值,对应上述的第二种情况:select * from table_name where id = #{id} limit #{offset}, #{pageSize};
getOffset()方法主要和nextPageToken作用一样,都是为了获取下一页数据而存在的,对应上述的第一种情况:select * from table_name where id = #{id} limit #{offset}, #{pageSize};
-
循环主体
public class PageQueryHelper {
private static final Set<String> NEGATIVE_ID_SET = new CopyOnWriteArraySet<>();
public static <T> void pageQuery(Function<QueryParam<T>, List<? extends PageResultPayload<T>>> function, QueryParam<T> queryParam){
int size = 0;
String bizKey = queryParam.retrieveBizKey();
if(StringUtils.isNotBlank(bizKey) && NEGATIVE_ID_SET.contains(bizKey)){
return;
}
do {
List<? extends PageResultPayload<T>> result = function.apply(queryParam);
if(null != result && (size = result.size()) >= queryParam.getPageSize()){
PageResultPayload<T> pageQueryPayload = result.get(queryParam.getPageSize() - 1);
queryParam.setNextPageToken(pageQueryPayload.retrieveNextPageToken());
queryParam.increasePageNum();
}
queryParam.increaseProtectCounter();
} while (size >= queryParam.getPageSize() && queryParam.getProtectCounter() < queryParam.getProtectThreshold());
if(queryParam.getProtectCounter() >= queryParam.getProtectThreshold() && StringUtils.isNoneBlank(bizKey)){
NEGATIVE_ID_SET.add(bizKey);
}
}
}
pageQuery方法中对泛型T传值是要保持一致的,这个泛型也就是nextPageToken的类型,通常情况下可为某个id或者时间戳,因此查询参数务必和返回主体中该类型保持一致,不然会出错。为了规避业务方使用出错的风险,上述中加了一个顶层接口做类型检查。如果在调用pageQuery方法时,QueryParam和PageResultPayload定义的泛型不一致是,此处会报编译错误;
3.应用
-
定义业务查询参数类继承QueryParam
@Data
public class MyQueryParam extends QueryParam<Instant> {
private String productId;
private Instant created_time;
public String retrieveBizKey() {
return "product_" + productId;
}
}
-
定义业务返回主体实现PageResultPayload
@Data
public class MyQueryPayload implements PageResultPayload<Instant> {
private String productId;
private String orderId;
private Instant createdTime;
public Instant retrieveNextPageToken(){
return createdTime;
};
}
对于第一种case改变offset用于获取下一页的话,该泛型显得多余,此处可以优化;
-
Demo演示
public class MyQueryDemo {
public static void main(String[] args) {
MyQueryParam queryParam = new MyQueryParam();
queryParam.setProductId("productId");
//queryParam.setProtectThreshold(2);
List<String> list = new ArrayList<>();
PageQueryHelper.pageQuery(query -> {
List<MyQueryPayload> result = getPayloadList(query.getOffset(), query.getPageSize());
for (MyQueryPayload myQueryPayload : result) {
list.add(myQueryPayload.getOrderId());
}
return result;
}, queryParam);
System.out.println(list.size());
}
private static List<MyQueryPayload> getPayloadList(int offset, int pageSize) {
if(offset == 100){
return Collections.emptyList();
}
return IntStream.range(offset, offset + pageSize).mapToObj(index -> {
MyQueryPayload myQueryPayload = new MyQueryPayload();
myQueryPayload.setProductId("productId");
myQueryPayload.setOrderId("orderId_" + index);
myQueryPayload.setCreatedTime(Instant.now().plusMillis(index * 1000L));
return myQueryPayload;
}).collect(Collectors.toList());
}
}
演示的是第一种case,根据offset来获取下一页数据,定义了一个私有的静态方法用于获取返回的数据。这时业务方只需要把获取数据后所作的一些逻辑操作放在function中,不用关心具体是怎么个循环,并且该循环中做了一些保护。
原文始发于微信公众号(处女座码农):循环器的设计:再见死循环
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/251642.html