goroutine Pool池化技术
池化技术介绍
池化技术指的是提前准备一些资源,在需要时可以重复使用这些预先准备的资源。
在系统开发过程中,我们经常会用到池化技术。通俗的讲,池化技术就是:把一些资源预先分配好,组织到对象池中,之后的业务使用资源从对象池中获取,使用完后放回到对象池中。这样做带来几个明显的好处:
-
资源重复使用, 减少了资源分配和释放过程中的系统消耗。比如,在IO密集型的服务器上,并发处理过程中的子线程或子进程的创建和销毁过程,带来的系统开销将是难以接受的。所以在业务实现上,通常把一些资源预先分配好,如线程池,数据库连接池,Redis连接池,HTTP连接池等,来减少系统消耗,提升系统性能。
-
可以对资源的整体使用做限制。这个好理解,相关资源预分配且只在预分配是生成,后续不再动态添加,从而限制了整个系统对资源的使用上限。类似一个令牌桶的功能。
-
池化技术分配对象池,通常会集中分配,这样有效避免了碎片化的问题。
池化技术简单点来说,就是提前保存大量的资源,以备不时之需。池化技术有两个特点,提前创建和重复利用。
应用场景
-
线程池 -
内存池 -
连接池 -
协称池
协称池代码实现
package main
import (
"fmt"
"log"
"sync"
"time"
)
// 创建真正执行任务的worker
type worker struct {
workerPool chan *worker
jobChannel chan Job
stop chan struct{}
}
// 开始执行任务
func (w *worker) start() {
go func() {
var job Job
for {
// worker free, add it to pool
w.workerPool <- w
select {
case job = <-w.jobChannel:
runJob(job)
case <-w.stop:
w.stop <- struct{}{}
return
}
}
}()
}
// 真正执行任务
func runJob(f func()) {
defer func() {
if err := recover(); err != nil {
log.Printf("gpool Job panic err: %v", err)
}
}()
f()
}
// 初始化worker
func newWorker(pool chan *worker) *worker {
return &worker{
workerPool: pool,
jobChannel: make(chan Job),
stop: make(chan struct{}),
}
}
// 接受来自客户端的任务,并等待第一个空闲的worker交付工作
type dispatcher struct {
workerPool chan *worker
jobQueue chan Job
stop chan struct{}
}
// 分配任务
func (d *dispatcher) dispatch() {
for {
select {
case job := <-d.jobQueue:
worker := <-d.workerPool
worker.jobChannel <- job
case <-d.stop:
for i := 0; i < cap(d.workerPool); i++ {
worker := <-d.workerPool
worker.stop <- struct{}{}
<-worker.stop
}
d.stop <- struct{}{}
return
}
}
}
// 初始化分配器
func newDispatcher(workerPool chan *worker, jobQueue chan Job) *dispatcher {
d := &dispatcher{
workerPool: workerPool,
jobQueue: jobQueue,
stop: make(chan struct{}),
}
for i := 0; i < cap(d.workerPool); i++ {
worker := newWorker(d.workerPool)
worker.start()
}
go d.dispatch()
return d
}
type Job func()
type Pool struct {
JobQueue chan Job
dispatcher *dispatcher
wg sync.WaitGroup
}
// NewPool 初始化goroutine Pool
func NewPool(numWorkers int, jobQueueLen int) *Pool {
jobQueue := make(chan Job, jobQueueLen)
workerPool := make(chan *worker, numWorkers)
pool := &Pool{
JobQueue: jobQueue,
dispatcher: newDispatcher(workerPool, jobQueue),
}
return pool
}
// 打包任务
func (p *Pool) wrapJob(job func()) func() {
return func() {
defer p.JobDone()
job()
}
}
func (p *Pool) SendJobWithTimeout(job func(), t time.Duration) bool {
select {
case <-time.After(t):
return false
case p.JobQueue <- p.wrapJob(job):
p.WaitCount(1)
return true
}
}
func (p *Pool) SendJobWithDeadline(job func(), t time.Time) bool {
s := t.Sub(time.Now())
if s <= 0 {
s = time.Second // timeout
}
select {
case <-time.After(s):
return false
case p.JobQueue <- p.wrapJob(job):
p.WaitCount(1)
return true
}
}
// SendJob 发送任务
func (p *Pool) SendJob(job func()) {
p.WaitCount(1)
p.JobQueue <- p.wrapJob(job)
}
func (p *Pool) JobDone() {
p.wg.Done()
}
func (p *Pool) WaitCount(count int) {
p.wg.Add(count)
}
// WaitAll 等待所有goroutine退出
func (p *Pool) WaitAll() {
p.wg.Wait()
}
// Release 释放资源
func (p *Pool) Release() {
p.dispatcher.stop <- struct{}{}
<-p.dispatcher.stop
}
func main() {
// 初始化 10个worker(goroutine) 任务队列长度是1000
var pool = NewPool(10, 1000)
pool.SendJobWithTimeout(func() {
fmt.Println("SendJobWithTimeout")
}, 2*time.Second)
// 发送任务
pool.SendJob(func() {
fmt.Println("send job")
})
pool.SendJobWithDeadline(func() {
fmt.Println("SendJobWithDeadline")
}, time.Now().Add(time.Second*3))
// 等待资源释放和退出
pool.WaitAll()
pool.Release()
}
原文始发于微信公众号(堆栈future):goroutine Pool池化技术
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/103526.html