Go实现后台任务调度系统

Go实现后台任务调度系统

一、背景

平常我们在开发API的时候,前端传递过来的大批数据需要经过后端处理,如果后端处理的速度快,前端响应就快,反之则很慢,影响用户体验。针对这种场景我们一般都是后台异步处理,不需要前端等待所有的都执行完才返回。为了解决这一问题,需要我们自己实现后台任务调度系统。

二、任务调度器实现

Go实现后台任务调度系统

poll.go

package poller

import (
 "context"
 "fmt"
 "log"
 "sync"
 "time"
)

type Poller struct {
 routineGroup *goroutineGroup // 并发控制
 workerNum    int // 记录同时在运行的最大goroutine数

 sync.Mutex
 ready  chan struct{} // 某个goroutine已经准备好了
 metric *metric // 统计当前在运行中的goroutine数量
}

func NewPoller(workerNum int) *Poller {
 return &Poller{
  routineGroup: newRoutineGroup(),
  workerNum:    workerNum,
  ready:        make(chan struct{}, 1),
  metric:       newMetric(),
 }
}

// 调度器
func (p *Poller) schedule() {
 p.Lock()
 defer p.Unlock()
 if int(p.metric.BusyWorkers()) >= p.workerNum {
  return
 }

 select {
 case p.ready <- struct{}{}: // 只要满足当前goroutine数量小于最大goroutine数量 那么就通知poll去调度goroutine执行任务
 default:
 }
}

func (p *Poller) Poll(ctx context.Context) error {
 for {
  // step01
  p.schedule() // 调度

  select {
  case <-p.ready: // goroutine准备好之后 这里就会有消息
  case <-ctx.Done():
   return nil
  }

 LOOP:
  for {
   select {
   case <-ctx.Done():
    break LOOP
   default:
    // step02
    task, err := p.fetch(ctx) // 获取任务
    if err != nil {
     log.Println("fetch task error:", err.Error())
     break
    }
    fmt.Println(task)
    p.metric.IncBusyWorker() // 当前正在运行的goroutine+1
    // step03
    p.routineGroup.Run(func() { // 执行任务
     if err := p.execute(ctx, task); err != nil {
      log.Println("execute task error:", err.Error())
     }
    })
    break LOOP
   }
  }
 }
}

func (p *Poller) fetch(ctx context.Context) (string, error) {
 time.Sleep(1000 * time.Millisecond)
 return "task", nil
}

func (p *Poller) execute(ctx context.Context, task string) error {
 defer func() {
  p.metric.DecBusyWorker() // 执行完成之后 goroutine数量-1
  p.schedule() // 重新调度下一个goroutine去执行任务 这一步是必须的
 }()
 return nil
}

metric.go

package poller

import "sync/atomic"

type metric struct {
 busyWorkers uint64
}

func newMetric() *metric {
 return &metric{}
}

func (m *metric) IncBusyWorker() uint64 {
 return atomic.AddUint64(&m.busyWorkers, 1)
}

func (m *metric) DecBusyWorker() uint64 {
 return atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}

func (m *metric) BusyWorkers() uint64 {
 return atomic.LoadUint64(&m.busyWorkers)
}

goroutine_group.go

package poller

import "sync"

type goroutineGroup struct {
 waitGroup sync.WaitGroup
}

func newRoutineGroup() *goroutineGroup {
 return new(goroutineGroup)
}

func (g *goroutineGroup) Run(fn func()) {
 g.waitGroup.Add(1)

 go func() {
  defer g.waitGroup.Done()
  fn()
 }()
}

func (g *goroutineGroup) Wait() {
 g.waitGroup.Wait()
}

三、测试

package main

import (
 "context"
 "fmt"
 "ta/poller"
 "go.uber.org/goleak"
 "testing"
)

func TestMain(m *testing.M)  {
 fmt.Println("start")
 goleak.VerifyTestMain(m)
}

func TestPoller(t *testing.T) {
 producer := poller.NewPoller(5)
 producer.Poll(context.Background())
}

结果:Go实现后台任务调度系统

四、总结

大家用别的方式也可以实现,核心要点就是控制并发节奏,防止大量请求打到task service,在这里起到核心作用的就是schedule,它控制着整个任务系统的调度。同时还封装了WaitGroup,这在大多数开源代码中都比较常见,大家可以去尝试。另外就是test case一定得跟上,防止goroutine泄漏。


原文始发于微信公众号(堆栈future):Go实现后台任务调度系统

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/103423.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!