多线程处理数据二

上一期写了怎么多线程处理数据,其实算不上数据处理,只能算一半:拉取数据。这不最近又要处理数据了。需要处理三批数据,要说数据量也不大,思路也很清晰,然后就评估了2天的时间,结果啃哧啃哧的加班带熬夜干了5天,现在总算处理完成了。

使用Java处理大数据避坑指南


一、开始

思考一下何为:处理数据,应当是对数据做一些增、删、改操作对不对。

举一个很简单的例子,要对表A的中user_type = 1的数据,去修改它name = '小道仙',这样的处理你很容易就写出如下脚本

UPDATE table_name FROM SET name = '小道仙' WHERE user_type = 1


上面这个脚本当然是没有问题的,但是如果逻辑稍微复杂一些,复杂到你无法直接使用SQL完成:

比如A表里面有100w数据,B表里面有200w数据

使用A表的A_1、A_2,去匹配B表里面的B_1、B_2

然后让B表的B_3、B_4,去更新A表的A_3、A_4

同时要删除无法匹配B表里面的数据

面对上述这种场景,单纯的SQL脚本已经无法满足了,我们必须得写代码脚本来实现。

处理逻辑可能也很简单吧,获取A表里面的数据,然后循环A表的数据,依次去匹配B表中的数据,可以匹配更新、无法匹配删除。这个思路是可行的,但有如下几个问题

  • 100w的数据去匹配200w的数据,再写入100w表里面,这得要多长时间?
  • 万一脚本出了一点点错误,如何恢复?重新执行脚本又面临第一步的问题


抛去业务逻辑,任何一个数据处理脚本都只要解决上述两个问题就好了

二、改进前

数据量我们无法去改变,那么只能从速度来下文章了,多线程是此为一解决办法。

多线程的写法,和上一篇文章一样的,我们只需要知道一点:线程安全来自共享变量,我们只需要保证每个线程只去处理自己的数据,这样即便是你开100个线程理论上也不会出现问题的。(所以说理论上不会出问题,是因为当我开启了50个线程的时候,数据库已经扛不住了)

实现上面逻辑的伪代码

public void xdxFun() {
   final int pageSize = 100;
    // 获取A表里面数据的数量
    Integer totalCount = yxxBatchDataMapper.getAllCount();
    // 计算好分页,每个线程单独去处理每页的数据
    final int pageCount = totalCount % pageSize == 0 ?  totalCount / pageSize : totalCount / pageSize + 1;
    for (int i = 1; i <= pageCount; i++) {
        // 这里是个坑,i 是一个共享变量,是会不断变化的,但是page不是,因为它是每次 new 出来的
        Integer page = new Integer(i);
        threadPoolExecutor.execute(() -> {
            // 获取A每页的数据 MYSQL里面的 LIMIT 分页第一个参数是偏移量,这里我就不处理了
            List<Object> datas = yxxBatchDataMapper.getAllPage(page, pageSize);

            for (Object item : datas) {
                // 通过A表去匹配B表的数据
                Object bData = yxxBatchDataMapper.equalsB(item);
                if (bData != null) {
                    // 更新A表里面的数据
                    yxxBatchDataMapper.updateA(item, bData);
                }else {
                    // 删除A表里面的数据
                    yxxBatchDataMapper.deleteA(item);
                }
            }
        });
    }
}

代码逻辑并不复杂,但考虑到两张频繁操作的表都是上百万的数据量。单线程的话可能要几个小时吧,如果是多线程或许二十分钟左右可以处理完(考虑到数据库的瓶颈,开15个线程吧)

上面的处理逻辑的确是一直最简单最快捷的方法,但这意味着高风险。

  • 频繁操作数据库,会导致其它正常的操作变得很慢(当然你可以在凌晨执行)。
  • 数据无法核对,上面的代码虽然看似没有问题,但是数据库里面的数据可不会按照你预想的那样,可能存在一些异常数据导致你的数据中断,一旦中断就很麻烦了,要先恢复数据,然后再去处理。
  • 如果基于某种原因,你无法访问线上数据库(这很正常),那你的脚本就必须先发布再执行,这样每次修改的代价大到无法承担。
  • 频繁的备份、恢复数据也是很麻烦的,如果一旦出错,也是凉凉。


三、改进后

为了解决上述种种问题,我提供一种这样的思路:在本地执行把所有的操作都写成SQL脚本,然后去执行这个SQL脚本

我们可以把线上需要操作的数据库备份到本地来,虽然这也是一项很麻烦的事情,但是安全,后期调试也快。

public void xdxFun() {
    final int pageSize = 100;
    // 获取A表里面数据的数量
    Integer totalCount = yxxBatchDataMapper.getAllCount();
    // 计算好分页,每个线程单独去处理每页的数据
    final int pageCount = totalCount % pageSize == 0 ?  totalCount / pageSize : totalCount / pageSize + 1;
    for (int i = 1; i <= pageCount; i++) {
        // 这里是个坑,i 是一个共享变量,是会不断变化的,但是page不是,因为它是每次 new 出来的
        Integer page = new Integer(i);
        threadPoolExecutor.execute(() -> {
            // 创建两个用来生成SQL的变量
            Map<String, Object> params = new HashMap<>();
            Map<String, Object> wheres = new HashMap<>();

            // 获取A每页的数据 MYSQL里面的 LIMIT 分页第一个参数是偏移量,这里我就不处理了
            List<Object> datas = yxxBatchDataMapper.getAllPage(page, pageSize);

            for (Object item : datas) {
                // 通过A表去匹配B表的数据
                Object bData = yxxBatchDataMapper.equalsB(item);
                if (bData != null) {
                    // 更新A表里面的数据
                    params.clear(); wheres.clear();
                    params.put("A_3", bData.getB_3());
                    params.put("A_4", bData.getB_4());
                    wheres.put("id", item.getId());

                    String updateSql = updateSql(params, wheres, "table_A");
                }else {
                    // 删除A表里面的数据
                    wheres.clear();
                    wheres.put("id", item.getId());
                    String deleteSql = deleteSql(wheres, "table_A");
                }
            }
        });
    }
}

这里的 updateSql、和deleteSql 就是SQL脚本,现在我们已经生成了(后面会附上这个生成SQL脚本的方法,其实就是字符串拼接)现在我们要把它存入文件里去。

我们可以是一个并发队列LinkedBlockingQueue,把每个SQL存入到queue里面,然后统一写入文件。

这里又引入了三个问题

  • 多线程同时写入一个LinkedBlockingQueue是否有问题呢?我只能说我测试过50个线程并发是没有问题的。
  • 全部写入了一个queue里面,然后由一个线程写入文件是否慢呢?这个写入文件是顺序写入的,它的速度只比内存慢一点点(如果了解MySQL的各种日志会很清楚这一点),8w条数据写入是秒级别的。
  • 什么时候写入文件呢?我们知道要等全部线程都运行完了再写入文件,但什么时候全部线程运行完呢?这里我引入一个变量 CountDownLatch,在new它的时候可以传入一个数值,每个线程结束计数+1,直到计数等于这个数值,就可以执行后面的代码了。

完整代码如下:

public void xdxFun() {
    LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
    final int pageSize = 100;
    // 获取A表里面数据的数量
    Integer totalCount = yxxBatchDataMapper.getAllCount();
    // 计算好分页,每个线程单独去处理每页的数据
    final int pageCount = totalCount % pageSize == 0 ?  totalCount / pageSize : totalCount / pageSize + 1;
    CountDownLatch latch = new CountDownLatch(pageCount);
    for (int i = 1; i <= pageCount; i++) {
        // 这里是个坑,i 是一个共享变量,是会不断变化的,但是page不是,因为它是每次 new 出来的
        Integer page = new Integer(i);
        threadPoolExecutor.execute(() -> {
            // 创建两个用来生成SQL的变量
            Map<String, Object> params = new HashMap<>();
            Map<String, Object> wheres = new HashMap<>();

            // 获取A每页的数据 MYSQL里面的 LIMIT 分页第一个参数是偏移量,这里我就不处理了
            List<Object> datas = yxxBatchDataMapper.getAllPage(page, pageSize);

            for (Object item : datas) {
                // 通过A表去匹配B表的数据
                Object bData = yxxBatchDataMapper.equalsB(item);
                if (bData != null) {
                    // 更新A表里面的数据
                    params.clear(); wheres.clear();
                    params.put("A_3", bData.getB_3());
                    params.put("A_4", bData.getB_4());
                    wheres.put("id", item.getId());

                    String updateSql = updateSql(params, wheres, "table_A");
                }else {
                    // 删除A表里面的数据
                    wheres.clear();
                    wheres.put("id", item.getId());
                    String deleteSql = deleteSql(wheres, "table_A");
                }
            }

            latch.countDown();
        });
    }


    try {
        latch.await();
    }catch (Exception e){
        log.info("多线程异常了:{}", e);
    }

    // 写入文件
    try {
        BufferedWriter out = new BufferedWriter(new FileWriter("/Users/xiaodaoxian/Desktop/数据处理脚本SQL.sql"));
        while (!linkedBlockingQueue.isEmpty()) {
            out.write(linkedBlockingQueue.poll() + "rn");
        }
        out.close();
        log.info("写入文件结束:{}", DateFormatUtils.format(new Date(), "YYYY-MM-DD hh:mm:ss"));
    }catch (Exception e) {
        log.error("写入文件异常", e);
    }
}


N、后续

对于这种大数据处理真的很头疼,写代码的时候一定要仔细仔细,再仔细,即便是写完了,对于产生的SQL也要抽样检查。数据一但处理错了先要回滚还要重新处理,又因为大数据,每次处理都是极为耗时的,身心折磨。


N-1、如果真遇到了queue和文件写入瓶颈呢?

如果真就遇到了队列和文件的瓶颈,也并不是没有办法的。

思路一

每个线程写入单独的queue,然后使用有返回的多线程(callable),把每个返回的队列再合并。

思路二

每个线程都写入单独的文件,然后再做文件合并。


N-2、多线程

算上实习经历,我已经工作了2年半了,在实际工作中也并未使用到多线程,一直以来对多线程都是比较恐惧的。但这次处理数据,让我真实使用上了多线程,并无想象中可怕。

理论先行,实践在后。可以先去理解多线程里面的这些概念,不管你后续是面试还是要使用多线程,都是很有帮助的。

SpringBoot多线程,线程池讲解


N-3、updateSql、deleteSql、insertSql

这是我当时写的零时处理sql的方法,勉强能用,是有bug的,比如你传入的参数是带 ` 或者 ‘ 的,会产生SQL注入,真实遇到了。

我们是在每个线程里面去调用这个方法的,所以这个方法并不需要考虑线程安全。也就不必使用 StringBuffer

public String insertSql(Map<String, Object> params, String tableName) {
   if (params == null || params.isEmpty())  {
       return "insertSql错误的参数 : " + tableName;
   }
   StringBuilder insertSql = new StringBuilder("INSERT INTO " + tableName + " ");
   StringBuilder cloumSql = new StringBuilder();
   StringBuilder paramsSql = new StringBuilder();
   for (String key : params.keySet()) {
       cloumSql.append(key + ",");
       paramsSql.append( "'" + params.get(key) + "' ,");
   }
   insertSql.append("(" + cloumSql.substring(0, cloumSql.length()-1)  + ")");
   insertSql.append(" VALUES (" + paramsSql.substring(0, paramsSql.length() - 1) + ");");
   return insertSql.toString();
}

// UPDATE table SET name = '张三', age = 18 WHERE id = 10;
public String updateSql(Map<String, Object> params, Map<String, Object> wheres ,String tableName) {
   if (params == null || params.isEmpty())  {
       return "updateSql错误的参数 : " + tableName;
   }
   StringBuilder updateSql = new StringBuilder("UPDATE " + tableName + " SET ");
   StringBuilder setSql = new StringBuilder();
   for (String key : params.keySet()) {
       setSql.append(key + " = '" + params.get(key) + "' ,");
   }
   StringBuilder whereSql = new StringBuilder();
   for (String key : wheres.keySet()) {
       whereSql.append(" AND " + key + " = '" + wheres.get(key) + "'");
   }
   updateSql.append(setSql.substring(0, setSql.length() - 1));
   updateSql.append(" WHERE " + whereSql.substring(4, whereSql.length()) + ";");
   return updateSql.toString();
}

// DELETE FROM table WHERE id = 1 AND name = '张三';
public String deleteSql(Map<String, Object> wheres ,String tableName) {
   if (wheres == null || wheres.isEmpty())  {
       return "delete错误的参数 : " + tableName;
   }
   StringBuilder deleteSql = new StringBuilder("DELETE FROM " + tableName + " WHERE ");
   StringBuilder paramsSql = new StringBuilder();
   for (String key : wheres.keySet()) {
       paramsSql.append( " AND " + key + " = '" + wheres.get(key) + "'");
   }
   deleteSql.append(paramsSql.substring(4,paramsSql.length()) + ";");
   return deleteSql.toString();
}


原文始发于微信公众号(小道仙97):多线程处理数据二

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41330.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!