让sync.WaitGroup支持并发数量限制
重新定义WaitGroup
重新定义WaitGroup,目的就是为了支持并发数量限制,跟以往不确定并发相比,重新构造的WaitGroup可以限制并发数量和查看pending数量。
代码
// WaitGroup 实现一个简单的goroutine池
type WaitGroup struct {
size int
pool chan byte
waitCount int64
waitGroup sync.WaitGroup // 底层还是利用sync.WaitGroup去做并发控制
}
// NewWaitGroup 创建一个带有size的并发池 当size为<=0时,直接走sync.WaitGroup逻辑
func NewWaitGroup(size int) *WaitGroup {
wg := &WaitGroup{
size: size,
}
if size > 0 {
wg.pool = make(chan byte, size)
}
return wg
}
// BlockAdd 将“1”推入并发池中,如果池已满,则阻塞,这样就打到了限制并发的目的
func (wg *WaitGroup) BlockAdd() {
if wg.size > 0 {
wg.pool <- 1
}
wg.waitGroup.Add(1)
}
// Done 代表一个并发结束
func (wg *WaitGroup) Done() {
if wg.size > 0 {
<-wg.pool
}
wg.waitGroup.Done()
}
// Wait 等待所有并发goroutine结束
func (wg *WaitGroup) Wait() {
wg.waitGroup.Wait()
}
// PendingCount 返回所有pending状态的goroutine数量
func (wg *WaitGroup) PendingCount() int64 {
return int64(len(wg.pool))
}
func main() {
urls := []string{
"https://www.a.com/",
"https://www.b.com",
"https://www.c.com",
"https://www.d.com/",
"https://www.e.com",
"https://www.f.com",
}
wg := NewWaitGroup(2)
for _, url := range urls {
wg.BlockAdd()
go func(url string) {
defer wg.Done()
res, err := http.Get(url)
if err != nil {
fmt.Printf("%s: result: %vn", url, err)
return
}
defer res.Body.Close()
}(url)
}
wg.Wait()
fmt.Println("Finished")
}
结果

小结
其实原则上我们用sync.WaitGroup
可以满足并发控制,但是有些业务场景需要限制并发数量,而语言本身提供的又无法满足,所以这里就简单封装,实现了WaitGroup的并发限制,当然,这只是一小步,如果大家有兴趣,可以在其之上加上context,这样就可以满足超时控制和取消了。
– END –
原文始发于微信公众号(堆栈future):让sync.WaitGroup支持并发数量限制
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/103504.html