Go让消费速度更快
可复用的生产消费逻辑
有时候我们从接口当中接受一批数据,想要它们在后台运行,或者说同步执行效率更快;有时候从缓存或者队列中消费数据,想要增加消费的速度;有时候跑一批数据,想要处理效率更高;那么遇到这些场景,这一套生产消费模式就足以应对了,来看下代码。
代码
func ConsumeTask(ctx context.Context) {
LOOP:
var total int
var success int
start := time.Now()
wg := sync.WaitGroup{}
gLock := sync.Mutex{}
taskChan := make(chan Task, 50)
wg.Add(1)
go func() {
defer wg.Done()
for {
// 获取的长度是0或者错误 直接break
// 生产 遇到错误continue
// 反序列化
total += 1
taskChan <- task
}
// 结束生产
close(taskChan)
}()
// 多个消费者
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
if task, ok := <-taskChan; ok { // 消费
// 。。。。。。
gLock.Lock()
success += 1
gLock.Unlock()
}
} else {// chan关闭了 就退出消费
break
}
}
}()
}
log.Warn(ctx, fmt.Sprintf("消费中"))
wg.Wait()
log.Warn(ctx, fmt.Sprintf("消费结束"))
if success == 0 || total == 0 {
log.Warn(ctx, fmt.Sprintf("当前无待消耗的任务, sleep 10s"))
time.Sleep(10 * time.Second)
goto LOOP
}
larkText := requestcommon.NewLarkCustomBotContentRichText("消费", time.Now().Format("2006-01-02 15:04:05"))
totalText := fmt.Sprintf("总共待消费:%d", total)
failText := fmt.Sprintf("失败:%d", total-success)
successText := fmt.Sprintf("成功: %d", success)
takeText := fmt.Sprintf("耗时: %v", time.Since(start))
ipText := fmt.Sprintf("IP: %s", common.LocalIP())
// 增加各种指标预警
larkText.AddTextWithTag(totalText).AddTextWithTag(successText).AddTextWithTag(failText).AddTextWithTag(takeText).AddTextWithTag(ipText)
// 通过飞书hook url 发送出去
err := requestcommon.SendLarkCustomBotMsgRichText(ctx, "hook_url", *larkText)
log.Warn(ctx, fmt.Sprintf("飞书发送消费通知 err: %v", err))
goto LOOP
}
小结
这套模板大家可以拿来直接用,简单高效,欢迎有兴趣的同学一起交流哈。
– END –
原文始发于微信公众号(堆栈future):Go让消费速度更快
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/103497.html