10S向MySQL中插入10W+数据

👨‍🏫 10w+数据10S 打向MySQL.

其实这里也是可以横向扩展到100W+数据,只是把goroutine调大一点即可.

💡 1.  简单实现思路:

当我们使用一个 goroutine 的去进行增加的时候,不难会发现,会增加的很慢,我当初在用 Java 尝试使用一个 Conn 增加了1w 个数据(那个时候是测试上万数据查询效率问题),那个过程可想而知,等了我好久。。。

这几天,在写需求的时候,需要一次性(一次HTTP请求)打向 MySQL 中,我首先是通过事务来写入的,但是发现真的好慢,所以我改成了如下模式,通过在当前线程分发指定的数据给一个 goroutine,让该协程负责打向MySQL指定数量的数据。但是这个度要把握好,什么意思呢?总价下来就是两个问题:1. 一共开多少 goroutine 2. 每个 goroutine负责多少数据量呢?这里我的依据是一个 goroutine负责 1000条数据,那么根据当前需要插入多少条数据就可以计算出一共需要多少个 goroutine了.  代码如下.

💡  2. 代码.

我把生成MD5的xcommon和日志组件xlogging删掉了,需要的可以自行补全.

package cdkmod

import (
    "errors"
    "strconv"
    "time"
)

// CDKItem 表示一个CDK:  t_cd_key_store.
// 对应数据库中的表. 
// 表明随意,我主要是为了切合场景. 
type CDKItem struct {
    Id            uint64 `json:"id" xorm:"id"`
    BillNo        string `json:"bill_no" xorm:"bill_no"`
    Md5CdKey      string `json:"md5_cd_key" xorm:"md5_cd_key"`
    ExchangeActID string `json:"exchange_act_id" xorm:"exchange_act_id"`
    StatusUsed    uint64 `json:"status_used" xorm:"status_used"`
    CreateTime    string `json:"create_time" xorm:"create_time"`
    LastModifier  string `json:"last_modifier" xorm:"last_modifier"`
    ModTime       string `json:"mod_time" xorm:"mod_time"`
}

const (
    CDKItemTable = "t_cd_key_store"
    OneExecSQL   = 1000
)

// BatchInsertCDK 将生成的所有数据进行返回.
func (c *CDKItem) BatchInsertCDK(data map[int64]string, log *xlogging.Entry) ([][]string, error) {

    log.Info("BatchInsertCDK start.. Parameter len(map): "len(data))
    if len(data) <= 0 {
        return nil,errors.New("cdk 数量为0")
    }
    start := time.Now().Unix()
    rows := make([][]stringlen(data))
    index := 0
    datas := make(map[int64]string10000)
    // error同步.
    ch := make(chan int5)
    // 记录1w中的下标.
    i := 0
    // 开的协程数.
    nums := 0

    for cdkNum, cdk := range data {

        datas[cdkNum] = cdk
        num := strconv.Itoa(int(cdkNum))
        row := []string{num, xcommon.GetMD5(cdk)}
        rows[index] = row
        index++
        i++
        if i == OneExecSQL {
            go batch(datas, ch, log)
            log.Info("开启第[" + strconv.Itoa(nums) + "]个goroutine")
            datas = make(map[int64]string, OneExecSQL)
            i = 0
            nums++
            continue
        }

        if len(data)-(nums*OneExecSQL) > 0 && len(data)-(nums*OneExecSQL) < OneExecSQL {
            go batch(datas, ch, log)
            log.Info("开启第[" + strconv.Itoa(nums) + "]个goroutine")
            nums++
            datas = make(map[int64]string)
        }

    }

    count := 0
    for {
        err := <-ch
        count++
        if err == 1 {
            // 异常.
            return nil, errors.New("batch insert error")
        }
        if count == nums {
            break
        }
    }

    end := time.Now().Unix()
    log.Info("向MySQL中插入数据耗费时间: ", end-start)
    return rows, nil
}

func batch(data map[int64]string, ch chan int, log *xlogging.Entry) {

    session := db.NewSession()
    defer session.Close()
    session.Begin()
    start := time.Now().Unix()
    for cdkNum, cdk := range data {
        item := &CDKItem{
            BillNo:     strconv.Itoa(int(cdkNum)),
            Md5CdKey:   xcommon.GetMD5(cdk),
            CreateTime: xcommon.GetNowTime(),
            ModTime:    xcommon.GetNowTime(),
        }
        _, err := session.Table(CDKItemTable).Insert(item)
        if nil != err {
            session.Rollback()
            log.Info("batch insert cdk item error session.Rollback: ", err.Error())
            ch <- 1
            return
        }
    }
    err := session.Commit()
    if err != nil {
        log.Info("batch insert cdk item error session.Commit: ", err.Error())
        ch <- 1
        return
    }
    // 正常.
    ch <- 0
    end := time.Now().Unix()
    log.Info("batch goroutine 执行耗时: ", end-start)
}

效果图:

10S向MySQL中插入10W+数据
image-20210702161919521
  • StartTime

10S向MySQL中插入10W+数据
image-20210702161959635
  • EndTime

10S向MySQL中插入10W+数据
image-20210702161941555
💡 3.  总结:

其实以后在面对这种大数据量增加的情况下,我们只需要协调 goroutine 的数量以及每个 goroutine 负责的数据部分即可。只要调理好了,10S百万数据也不是不可以的哈~

如果要搞10S百万数据的话,我们可以分10个goroutine去处理,每个分10w条数据,那么当10个 goroutine同时进行的时候,就相当于一个goroutine在进行,所以把时间进行压缩、CPU利用率提高就可以达成10S百万数据的存储了。


原文始发于微信公众号(社恐的小马同学):10S向MySQL中插入10W+数据

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/269778.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!