文章目录
背景
缓存存在大量数据要写入数据库或保存到存储服务,这个过程中可能会发生:
- 数据库多次开闭导致数据库超时
- 短时间大量数据写入可能打爆存储
针对问题一思路是分批batch存储,对于总量为m的数据,既保证一定的写入效率(一次写入n条),又保证一定的安全性(把一个事物拆成m / n + 1个事务)。而这种方案恰好也能解决问题二,因为分批执行,时间长度就被拉长了,实际上就是一种流量削峰的效果。
实现
思路有了,执行就很简单,这里采用mybatis batchUpdate进行批处理,采用CompletableFuture以及递归实现分事务多线程执行。
batchUpdate可以参阅此文档 batchUpdate在mybatis的应用。这里主要说一说CompletableFuture.
CompletableFuture作为一个Java异步包提供的多线程工具类,功能很强大,不需要我们自己配置线程池,直接使用CompletableFuture.runAsync就可以达到多线程执行的目的。它也不止可以异步执行,参考官方文档可知,对CompletableFuture的返回值可以调用whenComplete方法,在上一批任务执行结束后再执行下一批:
public void execAsync(List<List<User>> all, int i) {
if(i < all.size()) {
CompletableFuture<Void> future = CompletableFuture.runAsync(()->userService.batchInsert(all.get(i)));
if(i + 1 < all.size()) {
future.whenComplete(() -> execAsync(all, i + 1));
}
}
}
附录:zip和gz文件的解压与存储
public class FileUtil {
public static InputStream getNewInputStream(InputStream inputStream, String fileName, String subfix) {
return handleInputStream(inputStream, fileName, subfix);
}
private static InputStream handleInputStream(InputStream inputStream, String fileName, String subfix) {
Assert.isTrue(inputStream != null, "输入流为null");
File file = null;
FileOutputStream fos = null;
try {
file = File.createTempFile(fileName, subfix);
if (!file.exists()) {
file.createNewFile();
}
fos = new FileOutputStream(file);
byte[] bytes = new byte[1024];
while ((inputStream.read(bytes)) != -1) {
fos.write(bytes);
}
} catch (IOException e) {
log.error("创建临时文件失败,原因为:{}", e.getMessage());
} finally {
Assert.isTrue(fos != null, "文件写入流为null");
try {
inputStream.close();
fos.flush();
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (subfix.equals(SubFixConfig.ZIP)) {
return unzipFile(file, fileName);
}
if (subfix.equals(SubFixConfig.GZ)) {
return ungzFile(file);
}
return null;
}
private static InputStream unzipFile(File file, String filename) {
InputStream is = null;
try {
ZipFile zipFile = new ZipFile(file);
is = zipFile.getInputStream(zipFile.getEntry(filename));
} catch (IOException e) {
log.error("获取解压zip后文件流出现错误,原因为:{}", e.getMessage());
}
return is;
}
private static InputStream ungzFile(File file) {
GZIPInputStream gzipInputStream = null;
try {
gzipInputStream = new GZIPInputStream(new FileInputStream(file));
} catch (IOException e) {
log.error("获取解压gz后文件流出现错误,原因为:{}", e.getMessage());
}
return gzipInputStream;
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/153508.html