👨🏫 10w+数据10S 打向MySQL.
其实这里也是可以横向扩展到100W+数据,只是把goroutine调大一点即可.
💡 1. 简单实现思路:
当我们使用一个 goroutine
的去进行增加的时候,不难会发现,会增加的很慢,我当初在用 Java
尝试使用一个 Conn
增加了1w
个数据(那个时候是测试上万数据查询效率问题),那个过程可想而知,等了我好久。。。
这几天,在写需求的时候,需要一次性(一次HTTP请求)打向 MySQL
中,我首先是通过事务来写入的,但是发现真的好慢,所以我改成了如下模式,通过在当前线程分发指定的数据给一个 goroutine
,让该协程负责打向MySQL指定数量的数据。但是这个度要把握好,什么意思呢?总价下来就是两个问题:1. 一共开多少 goroutine
2. 每个 goroutine
负责多少数据量呢?这里我的依据是一个 goroutine
负责 1000条数据,那么根据当前需要插入多少条数据就可以计算出一共需要多少个 goroutine
了. 代码如下.
💡 2. 代码.
我把生成MD5的xcommon和日志组件xlogging删掉了,需要的可以自行补全.
package cdkmod
import (
"errors"
"strconv"
"time"
)
// CDKItem 表示一个CDK: t_cd_key_store.
// 对应数据库中的表.
// 表明随意,我主要是为了切合场景.
type CDKItem struct {
Id uint64 `json:"id" xorm:"id"`
BillNo string `json:"bill_no" xorm:"bill_no"`
Md5CdKey string `json:"md5_cd_key" xorm:"md5_cd_key"`
ExchangeActID string `json:"exchange_act_id" xorm:"exchange_act_id"`
StatusUsed uint64 `json:"status_used" xorm:"status_used"`
CreateTime string `json:"create_time" xorm:"create_time"`
LastModifier string `json:"last_modifier" xorm:"last_modifier"`
ModTime string `json:"mod_time" xorm:"mod_time"`
}
const (
CDKItemTable = "t_cd_key_store"
OneExecSQL = 1000
)
// BatchInsertCDK 将生成的所有数据进行返回.
func (c *CDKItem) BatchInsertCDK(data map[int64]string, log *xlogging.Entry) ([][]string, error) {
log.Info("BatchInsertCDK start.. Parameter len(map): ", len(data))
if len(data) <= 0 {
return nil,errors.New("cdk 数量为0")
}
start := time.Now().Unix()
rows := make([][]string, len(data))
index := 0
datas := make(map[int64]string, 10000)
// error同步.
ch := make(chan int, 5)
// 记录1w中的下标.
i := 0
// 开的协程数.
nums := 0
for cdkNum, cdk := range data {
datas[cdkNum] = cdk
num := strconv.Itoa(int(cdkNum))
row := []string{num, xcommon.GetMD5(cdk)}
rows[index] = row
index++
i++
if i == OneExecSQL {
go batch(datas, ch, log)
log.Info("开启第[" + strconv.Itoa(nums) + "]个goroutine")
datas = make(map[int64]string, OneExecSQL)
i = 0
nums++
continue
}
if len(data)-(nums*OneExecSQL) > 0 && len(data)-(nums*OneExecSQL) < OneExecSQL {
go batch(datas, ch, log)
log.Info("开启第[" + strconv.Itoa(nums) + "]个goroutine")
nums++
datas = make(map[int64]string)
}
}
count := 0
for {
err := <-ch
count++
if err == 1 {
// 异常.
return nil, errors.New("batch insert error")
}
if count == nums {
break
}
}
end := time.Now().Unix()
log.Info("向MySQL中插入数据耗费时间: ", end-start)
return rows, nil
}
func batch(data map[int64]string, ch chan int, log *xlogging.Entry) {
session := db.NewSession()
defer session.Close()
session.Begin()
start := time.Now().Unix()
for cdkNum, cdk := range data {
item := &CDKItem{
BillNo: strconv.Itoa(int(cdkNum)),
Md5CdKey: xcommon.GetMD5(cdk),
CreateTime: xcommon.GetNowTime(),
ModTime: xcommon.GetNowTime(),
}
_, err := session.Table(CDKItemTable).Insert(item)
if nil != err {
session.Rollback()
log.Info("batch insert cdk item error session.Rollback: ", err.Error())
ch <- 1
return
}
}
err := session.Commit()
if err != nil {
log.Info("batch insert cdk item error session.Commit: ", err.Error())
ch <- 1
return
}
// 正常.
ch <- 0
end := time.Now().Unix()
log.Info("batch goroutine 执行耗时: ", end-start)
}
效果图:

-
StartTime

-
EndTime

💡 3. 总结:
其实以后在面对这种大数据量增加的情况下,我们只需要协调 goroutine
的数量以及每个 goroutine
负责的数据部分即可。只要调理好了,10S百万数据也不是不可以的哈~
如果要搞10S百万数据的话,我们可以分10个goroutine
去处理,每个分10w条数据,那么当10个 goroutine
同时进行的时候,就相当于一个goroutine在进行,所以把时间进行压缩、CPU利用率提高就可以达成10S百万数据的存储了。
原文始发于微信公众号(社恐的小马同学):10S向MySQL中插入10W+数据
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/269778.html