100个任务起10个goroutine去消费
一. 按照正常思路来一个架构设计
1. 基础知识必备
-
sync.WaitGroup -
chan -
go func
2. 原始消费架构模式设计
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
const concurrencyProcesses = 10 // 限制最大并发处理数量
const jobCount = 100 // 有多少任务
var wg sync.WaitGroup
wg.Add(jobCount)
found := make(chan int)
limitCh := make(chan struct{}, concurrencyProcesses) // 用chan去控制并发数量
for i := 0; i < jobCount; i++ { //处理100个job
limitCh <- struct{}{} // 控制并发数量
go func(val int) {
defer func() {
wg.Done()
<-limitCh
}()
waitTime := rand.Int31n(1000)
fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
time.Sleep(time.Duration(waitTime) * time.Millisecond)
found <- val
}(i)
}
go func() { // 等待goroutine结束并且关闭结果chan
wg.Wait()
close(found)
}()
var results []int
for p := range found { // 从结果chan获取数据
fmt.Println("Finished job:", p)
results = append(results, p)
}
fmt.Println("result:", results)
}
大家猜猜结果是怎么样的?
3. 公布结果

为什么deadlock?
4. 究其原因
这就是Limit Concurrency问题,即限制并发问题
在读取第一个job的时候,现将空的struct放进limitCh中,这个时候limitCh中就只剩下9个可以继续处理,接着重复这个步骤继续放入job,直到10个job装满limitCh。但是当第11个job需要处理的时候,程序就直接停止在limitCh <- struct{}{}
处,因为前10个goroutine不知道什么时候才能处理结束,这样第11个job它后面的代码就完全没机会执行,造成整个系统deadlock
。
但是如果你的job数量小于等于10,是完全看不出来任何问题的,系统可以正常运行,只有job数量大于并发数量的时候才会出这个BUG。
二. 按照BUG再设计一版
相信大家想到了一种解决方式,既然程序是卡在limitCh <- struct{}{}
,那么就直接将这段处理逻辑丢到go func中处理就好了。
...
for i := 0; i < jobCount; i++ {
go func() { //丢到go func中去处理
limitCh <- struct{}{}
}()
go func(val int) {
...
发现100个job同时处理直到结束,并没有达到限制并发处理的要求,虽然满足最终把这些job处理完成,但是我们要的可是Limit Concurrency
三. 最佳实践,也是大家拿来就可以用的限制并发框架模式
既然要限制并发数量,那么就建立特定数量的worker,每个worker读取chan就可以了,所以
第一步就是先建立queue的通道,将所有job都放入queue中,但是以go func方式去处理,避免阻塞main程序。
第二步就是建立特定的worker数量来消化全部的job。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
const concurrencyProcesses = 10 // 限制最大并发处理数量
const jobCount = 100 // 有多少任务
var wg sync.WaitGroup
wg.Add(jobCount)
found := make(chan int)
queue := make(chan int)
go func(queue chan<- int) { // 生产端负责生产任务到queue
for i := 0; i < jobCount; i++ {
queue <- i
}
close(queue)
}(queue)
for i := 0; i < concurrencyProcesses; i++ { // 当前只有10个并发 他们做的事情就是处理任务 并且将结果set in found
go func(queue <-chan int, found chan<- int) {
for val := range queue {
defer wg.Done()
waitTime := rand.Int31n(1000)
fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
time.Sleep(time.Duration(waitTime) * time.Millisecond)
found <- val
}
}(queue, found) // 传递进去 防止data race
}
go func() { // 等待goroutine结束并且关闭结果chan
wg.Wait()
close(found)
}()
var results []int
for p := range found { // 从结果chan获取数据
fmt.Println("Finished job:", p)
results = append(results, p)
}
fmt.Println("result:", results)
}
可以看到这里的for循环是以concurrencyProcesses为并发数量去处理job了。10个goroutine通过内层for循环不断读取chan中的job,直到queue中没有job为止,这样生产端生产完job之后会关闭queue,那么goroutine最后收到关闭消息后退出for循环,结束goroutine。
小结
其实还有很多常见的其他限制并发处理模式,但是我喜欢用上面这种处理模式。这里有个细节需要大家注意哈,就是控制并发的时候要处理好数据竞争(data race
)问题,如果处理不好,可能最后结果不一定是对的。
– END –
原文始发于微信公众号(堆栈future):Limit Concurrency你真的会吗
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/103546.html