需求背景
开发中,经常有这样一种场景:需要查询很多的数据量出来,然后去做各种业务操作。对于刚入行的小伙伴来说,通常就是查询过多的数据加载到内存,没有对数据量做限制。
表面上看,在该表数据量不大,在机器配置足够高的情况下短时间内不会有什么大问题,但随着业务的增长,数据的增加,莫名其妙的在线上出现了OOM,很多人为之疑惑。
博主在企业工作的多年经历中,配合开发运维排查了大量线上OOM问题,由于开发人员水平的参次不齐,很多这种扫全表或一次查询过多数据到内存的操作,导致了线上机器 的OOM。
问题分析
所谓jvm的OOM,就是Out Of Memory,也就是在我们的内存中加载了过多对象,但又没办法回收,对于查询一张超大的表,不做任何限制,很容易就导致OOM。
不规范操作
select * from table;
然后查询到内存中,处理各种业务。
所以比较推荐类似的操作应该是:
select * from table limit 0,200;
通常情况能做到这一步的程序员才算是入了行的,但这种情况在数据量不大的情况下可能也不会有什么太大的问题,但处理过一定大数据量的同学会可能会发现一个问题,使用limit分页,在数据量大的情况下这种limit的性能是极低的。
select * from table limit 10000000,200;
如何你表的id主键使用这种连续自增id,在这种情况下可以通过这种方式处理:
select * from table where id > 0 and id < 201
代码实现
基于上面的分析思路,代码实现的差异不大,以下代码基于目前大家使用比较多的MyBatis-Plus提供代码示例:
/**
* 定义所需要的参数
*/
@Data
@Builder
@RequiredArgsConstructor
public class BulkExecutorParam<T> {
/**
* batch size
*/
@Builder.Default
private Integer batchSize = 500;
/**
* start page
*/
@Builder.Default
private Integer start = 1;
/**
* query wrapper
*/
private Wrapper<T> queryWrapper;
/**
* query service
*/
private IService<T> service;
/**
* is need multiple threads
*/
@Builder.Default
private Boolean isMultiThreaded = Boolean.TRUE;
/**
* parallelism for executors
*/
@Builder.Default
private Integer parallelism = 0;
/**
* count: if present,use it else use service.count()
*/
@Builder.Default
private Integer count = 0;
/**
* query function
*/
private Function<Integer, List<T>> queryFunc;
/**
* execute consumer
*/
private Consumer<List<T>> execConsumer;
/**
* thread pool
*/
private ExecutorService executors;
public ExecutorService getExecutors() {
return this.parallelism > 0 ? Executors.newWorkStealingPool(this.parallelism) : Executors.newWorkStealingPool();
}
}
核心执行类抽象:
/**
* @author caoyong
* @version 1.0.0
**/
public class BulkExecutorUtil {
/**
* execute batch
*
* @param param parameters that need
* @param consumer consumer
* @param <T> execute type
*/
public static <T> void execute(BulkExecutorParam<T> param, Consumer<List<T>> consumer) {
param.setExecConsumer(consumer);
submit(param, null);
}
/**
* batch submit with result
*
* @param param parameters that need
* @param execFunc execute function
* @param <T> type that param
* @param <R> type that result
* @return result of future
*/
public static <T, R> List<Future<R>> submit(BulkExecutorParam<T> param, Function<List<T>, R> execFunc) {
List<Future<R>> futures = new ArrayList<>();
//query service
IService<T> service = param.getService();
//query wrapper
Wrapper<T> queryWrapper = param.getQueryWrapper();
int count = param.getCount() > 0 ? param.getCount() : service.count(queryWrapper);
if (count == 0) {
return futures;
}
Integer batchSize = param.getBatchSize();
int pageCount = (count + batchSize - 1) / batchSize;
//iterate all page
IntStream.rangeClosed(param.getStart(), pageCount)
.forEach(currentPage -> {
//support multi thread and can alternate execute thread pool
if (param.getIsMultiThreaded()) {
if (execFunc == null) {
//execute only consumer type task
param.getExecutors().execute(() -> {
List<T> records = getRecords(param.getQueryFunc(), service, queryWrapper, batchSize, currentPage);
if (CollectionUtil.isEmpty(records)) {
return;
}
param.getExecConsumer().accept(records);
});
} else {
Future<R> submit = param.getExecutors().submit(() -> {
//execute submit task with futures return
List<T> records = getRecords(param.getQueryFunc(), service, queryWrapper, batchSize, currentPage);
return execFunc.apply(records);
});
futures.add(submit);
}
} else {
//with main thread execute
List<T> records = getRecords(param.getQueryFunc(), service, queryWrapper, batchSize, currentPage);
if (CollectionUtil.isEmpty(records)) {
return;
}
param.getExecConsumer().accept(records);
}
});
return futures;
}
/**
* get records
*
* @param queryFunc query function
* @param service query service
* @param queryWrapper query wrapper
* @param batchSize batch execute size
* @param currentPage current page
* @param <T> param type
* @return return type
*/
private static <T> List<T> getRecords(Function<Integer, List<T>> queryFunc,
IService<T> service,
Wrapper<T> queryWrapper,
Integer batchSize,
int currentPage) {
//execute records:
//if customizing supply current page for it else using mybatis-plus service and query wrapper
return service == null ? queryFunc.apply(currentPage) :
service.page(new Page<>(currentPage, batchSize), queryWrapper).getRecords();
}
}
execute使用示例:
private void batchProcessor(List<T> records) {
//实现你的处理逻辑
}
//bulk executor
LambdaQueryWrapper<T> wrapper = Wrappers.lambdaQuery(T.class);
BulkExecutorParam<T> param = BulkExecutorParam.<T>builder()
.service(this).queryWrapper(wrapper).build();
BulkExecutorUtil.execute(param, this::batchProcessor);
submit使用示例:
private File batchProcessor(List<T> records) {
//实现你的处理逻辑
File file = new File("/your-path");
return file;
}
//IO密集型的任务,核心线程数使用 CPU核数*2 + 1
int parallelism = Runtime.getRuntime().availableProcessors() * 2 + 1;
BulkExecutorParam<T> exeParam = BulkExecutorParam.<T>builder()
.queryWrapper(param.getQueryWrapper())
.batchSize(param.getBatchSize())
.service(this)
.parallelism(parallelism)
.build();
List<Future<File>> futures = BulkExecutorUtil.submit(exeParam, this::batchProcessorFile);
if (CollectionUtil.isEmpty(futures)) {
return downloadFilePath;
}
//iterator all future file
for (Future<File> future : futures) {
try {
//处理你的相关逻辑
File file = future.get();
} catch (Exception e) {
log.error("future get error:{}", e.getMessage(), e);
}
}
来源:blog.csdn.net/m0_37797991/
article/details/123015082
构建高质量的技术交流社群,欢迎从事编程开发、技术招聘HR进群,也欢迎大家分享自己公司的内推信息,相互帮助,一起进步!
文明发言,以
交流技术
、职位内推
、行业探讨
为主
广告人士勿入,切勿轻信私聊,防止被骗

原文始发于微信公众号(Java面试题精选):分批查询超大数据量,避免JVM出现OOM,这样做就对了!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/146718.html