上一期写了怎么多线程处理数据,其实算不上数据处理,只能算一半:拉取数据。这不最近又要处理数据了。需要处理三批数据,要说数据量也不大,思路也很清晰,然后就评估了2天的时间,结果啃哧啃哧的加班带熬夜干了5天,现在总算处理完成了。
一、开始
思考一下何为:处理数据,应当是对数据做一些增、删、改操作对不对。
举一个很简单的例子,要对表A的中user_type = 1
的数据,去修改它name = '小道仙'
,这样的处理你很容易就写出如下脚本
UPDATE table_name FROM SET name = '小道仙' WHERE user_type = 1;
上面这个脚本当然是没有问题的,但是如果逻辑稍微复杂一些,复杂到你无法直接使用SQL完成:
比如A表里面有100w数据,B表里面有200w数据
使用A表的A_1、A_2,去匹配B表里面的B_1、B_2
然后让B表的B_3、B_4,去更新A表的A_3、A_4
同时要删除无法匹配B表里面的数据
面对上述这种场景,单纯的SQL脚本已经无法满足了,我们必须得写代码脚本来实现。
处理逻辑可能也很简单吧,获取A表里面的数据,然后循环A表的数据,依次去匹配B表中的数据,可以匹配更新、无法匹配删除。这个思路是可行的,但有如下几个问题
-
100w的数据去匹配200w的数据,再写入100w表里面,这得要多长时间? -
万一脚本出了一点点错误,如何恢复?重新执行脚本又面临第一步的问题
抛去业务逻辑,任何一个数据处理脚本都只要解决上述两个问题就好了
二、改进前
数据量我们无法去改变,那么只能从速度来下文章了,多线程是此为一解决办法。
多线程的写法,和上一篇文章一样的,我们只需要知道一点:线程安全来自共享变量,我们只需要保证每个线程只去处理自己的数据,这样即便是你开100个线程理论上也不会出现问题的。(所以说理论上不会出问题,是因为当我开启了50个线程的时候,数据库已经扛不住了)
实现上面逻辑的伪代码
public void xdxFun() {
final int pageSize = 100;
// 获取A表里面数据的数量
Integer totalCount = yxxBatchDataMapper.getAllCount();
// 计算好分页,每个线程单独去处理每页的数据
final int pageCount = totalCount % pageSize == 0 ? totalCount / pageSize : totalCount / pageSize + 1;
for (int i = 1; i <= pageCount; i++) {
// 这里是个坑,i 是一个共享变量,是会不断变化的,但是page不是,因为它是每次 new 出来的
Integer page = new Integer(i);
threadPoolExecutor.execute(() -> {
// 获取A每页的数据 MYSQL里面的 LIMIT 分页第一个参数是偏移量,这里我就不处理了
List<Object> datas = yxxBatchDataMapper.getAllPage(page, pageSize);
for (Object item : datas) {
// 通过A表去匹配B表的数据
Object bData = yxxBatchDataMapper.equalsB(item);
if (bData != null) {
// 更新A表里面的数据
yxxBatchDataMapper.updateA(item, bData);
}else {
// 删除A表里面的数据
yxxBatchDataMapper.deleteA(item);
}
}
});
}
}
代码逻辑并不复杂,但考虑到两张频繁操作的表都是上百万的数据量。单线程的话可能要几个小时吧,如果是多线程或许二十分钟左右可以处理完(考虑到数据库的瓶颈,开15个线程吧)
上面的处理逻辑的确是一直最简单最快捷的方法,但这意味着高风险。
-
频繁操作数据库,会导致其它正常的操作变得很慢(当然你可以在凌晨执行)。 -
数据无法核对,上面的代码虽然看似没有问题,但是数据库里面的数据可不会按照你预想的那样,可能存在一些异常数据导致你的数据中断,一旦中断就很麻烦了,要先恢复数据,然后再去处理。 -
如果基于某种原因,你无法访问线上数据库(这很正常),那你的脚本就必须先发布再执行,这样每次修改的代价大到无法承担。 -
频繁的备份、恢复数据也是很麻烦的,如果一旦出错,也是凉凉。
三、改进后
为了解决上述种种问题,我提供一种这样的思路:在本地执行把所有的操作都写成SQL脚本,然后去执行这个SQL脚本
我们可以把线上需要操作的数据库备份到本地来,虽然这也是一项很麻烦的事情,但是安全,后期调试也快。
public void xdxFun() {
final int pageSize = 100;
// 获取A表里面数据的数量
Integer totalCount = yxxBatchDataMapper.getAllCount();
// 计算好分页,每个线程单独去处理每页的数据
final int pageCount = totalCount % pageSize == 0 ? totalCount / pageSize : totalCount / pageSize + 1;
for (int i = 1; i <= pageCount; i++) {
// 这里是个坑,i 是一个共享变量,是会不断变化的,但是page不是,因为它是每次 new 出来的
Integer page = new Integer(i);
threadPoolExecutor.execute(() -> {
// 创建两个用来生成SQL的变量
Map<String, Object> params = new HashMap<>();
Map<String, Object> wheres = new HashMap<>();
// 获取A每页的数据 MYSQL里面的 LIMIT 分页第一个参数是偏移量,这里我就不处理了
List<Object> datas = yxxBatchDataMapper.getAllPage(page, pageSize);
for (Object item : datas) {
// 通过A表去匹配B表的数据
Object bData = yxxBatchDataMapper.equalsB(item);
if (bData != null) {
// 更新A表里面的数据
params.clear(); wheres.clear();
params.put("A_3", bData.getB_3());
params.put("A_4", bData.getB_4());
wheres.put("id", item.getId());
String updateSql = updateSql(params, wheres, "table_A");
}else {
// 删除A表里面的数据
wheres.clear();
wheres.put("id", item.getId());
String deleteSql = deleteSql(wheres, "table_A");
}
}
});
}
}
这里的 updateSql
、和deleteSql
就是SQL脚本,现在我们已经生成了(后面会附上这个生成SQL脚本的方法,其实就是字符串拼接)现在我们要把它存入文件里去。
我们可以是一个并发队列LinkedBlockingQueue
,把每个SQL存入到queue里面,然后统一写入文件。
这里又引入了三个问题
-
多线程同时写入一个LinkedBlockingQueue是否有问题呢?我只能说我测试过50个线程并发是没有问题的。 -
全部写入了一个queue里面,然后由一个线程写入文件是否慢呢?这个写入文件是顺序写入的,它的速度只比内存慢一点点(如果了解MySQL的各种日志会很清楚这一点),8w条数据写入是秒级别的。 -
什么时候写入文件呢?我们知道要等全部线程都运行完了再写入文件,但什么时候全部线程运行完呢?这里我引入一个变量 CountDownLatch
,在new它的时候可以传入一个数值,每个线程结束计数+1,直到计数等于这个数值,就可以执行后面的代码了。
完整代码如下:
public void xdxFun() {
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
final int pageSize = 100;
// 获取A表里面数据的数量
Integer totalCount = yxxBatchDataMapper.getAllCount();
// 计算好分页,每个线程单独去处理每页的数据
final int pageCount = totalCount % pageSize == 0 ? totalCount / pageSize : totalCount / pageSize + 1;
CountDownLatch latch = new CountDownLatch(pageCount);
for (int i = 1; i <= pageCount; i++) {
// 这里是个坑,i 是一个共享变量,是会不断变化的,但是page不是,因为它是每次 new 出来的
Integer page = new Integer(i);
threadPoolExecutor.execute(() -> {
// 创建两个用来生成SQL的变量
Map<String, Object> params = new HashMap<>();
Map<String, Object> wheres = new HashMap<>();
// 获取A每页的数据 MYSQL里面的 LIMIT 分页第一个参数是偏移量,这里我就不处理了
List<Object> datas = yxxBatchDataMapper.getAllPage(page, pageSize);
for (Object item : datas) {
// 通过A表去匹配B表的数据
Object bData = yxxBatchDataMapper.equalsB(item);
if (bData != null) {
// 更新A表里面的数据
params.clear(); wheres.clear();
params.put("A_3", bData.getB_3());
params.put("A_4", bData.getB_4());
wheres.put("id", item.getId());
String updateSql = updateSql(params, wheres, "table_A");
}else {
// 删除A表里面的数据
wheres.clear();
wheres.put("id", item.getId());
String deleteSql = deleteSql(wheres, "table_A");
}
}
latch.countDown();
});
}
try {
latch.await();
}catch (Exception e){
log.info("多线程异常了:{}", e);
}
// 写入文件
try {
BufferedWriter out = new BufferedWriter(new FileWriter("/Users/xiaodaoxian/Desktop/数据处理脚本SQL.sql"));
while (!linkedBlockingQueue.isEmpty()) {
out.write(linkedBlockingQueue.poll() + "rn");
}
out.close();
log.info("写入文件结束:{}", DateFormatUtils.format(new Date(), "YYYY-MM-DD hh:mm:ss"));
}catch (Exception e) {
log.error("写入文件异常", e);
}
}
N、后续
对于这种大数据处理真的很头疼,写代码的时候一定要仔细仔细,再仔细,即便是写完了,对于产生的SQL也要抽样检查。数据一但处理错了先要回滚还要重新处理,又因为大数据,每次处理都是极为耗时的,身心折磨。
N-1、如果真遇到了queue和文件写入瓶颈呢?
如果真就遇到了队列和文件的瓶颈,也并不是没有办法的。
思路一
每个线程写入单独的queue,然后使用有返回的多线程(callable),把每个返回的队列再合并。
思路二
每个线程都写入单独的文件,然后再做文件合并。
N-2、多线程
算上实习经历,我已经工作了2年半了,在实际工作中也并未使用到多线程,一直以来对多线程都是比较恐惧的。但这次处理数据,让我真实使用上了多线程,并无想象中可怕。
理论先行,实践在后。可以先去理解多线程里面的这些概念,不管你后续是面试还是要使用多线程,都是很有帮助的。
N-3、updateSql、deleteSql、insertSql
这是我当时写的零时处理sql的方法,勉强能用,是有bug的,比如你传入的参数是带 ` 或者 ‘ 的,会产生SQL注入,真实遇到了。
我们是在每个线程里面去调用这个方法的,所以这个方法并不需要考虑线程安全。也就不必使用 StringBuffer
了
public String insertSql(Map<String, Object> params, String tableName) {
if (params == null || params.isEmpty()) {
return "insertSql错误的参数 : " + tableName;
}
StringBuilder insertSql = new StringBuilder("INSERT INTO " + tableName + " ");
StringBuilder cloumSql = new StringBuilder();
StringBuilder paramsSql = new StringBuilder();
for (String key : params.keySet()) {
cloumSql.append(key + ",");
paramsSql.append( "'" + params.get(key) + "' ,");
}
insertSql.append("(" + cloumSql.substring(0, cloumSql.length()-1) + ")");
insertSql.append(" VALUES (" + paramsSql.substring(0, paramsSql.length() - 1) + ");");
return insertSql.toString();
}
// UPDATE table SET name = '张三', age = 18 WHERE id = 10;
public String updateSql(Map<String, Object> params, Map<String, Object> wheres ,String tableName) {
if (params == null || params.isEmpty()) {
return "updateSql错误的参数 : " + tableName;
}
StringBuilder updateSql = new StringBuilder("UPDATE " + tableName + " SET ");
StringBuilder setSql = new StringBuilder();
for (String key : params.keySet()) {
setSql.append(key + " = '" + params.get(key) + "' ,");
}
StringBuilder whereSql = new StringBuilder();
for (String key : wheres.keySet()) {
whereSql.append(" AND " + key + " = '" + wheres.get(key) + "'");
}
updateSql.append(setSql.substring(0, setSql.length() - 1));
updateSql.append(" WHERE " + whereSql.substring(4, whereSql.length()) + ";");
return updateSql.toString();
}
// DELETE FROM table WHERE id = 1 AND name = '张三';
public String deleteSql(Map<String, Object> wheres ,String tableName) {
if (wheres == null || wheres.isEmpty()) {
return "delete错误的参数 : " + tableName;
}
StringBuilder deleteSql = new StringBuilder("DELETE FROM " + tableName + " WHERE ");
StringBuilder paramsSql = new StringBuilder();
for (String key : wheres.keySet()) {
paramsSql.append( " AND " + key + " = '" + wheres.get(key) + "'");
}
deleteSql.append(paramsSql.substring(4,paramsSql.length()) + ";");
return deleteSql.toString();
}
原文始发于微信公众号(小道仙97):多线程处理数据二
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41330.html