【经验项】GO协程应该怎么使用

问题发现

在生产环境下,微服务时不时直接panic,导致服务重启。而在k8s下服务重启5次后,会进入惩罚模式重启时长变5分钟(可以修改,但是治标不治本)。 最终要的降低程序的影响面。
根因:微服务都是HTTP/grpc服务,在底层实现都是通过go协程来处理http请求,并且未进行panic的recover操作。从而导致整个服务的崩溃。

解决方式
在grpc的server拦截器中添加github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery的使用。并且配合Prometheus的上报,捕获程序的panic次数,进行快速告警。

关键字GO存在什么问题

可以参考之前的文章:
【100 Mistakes】golang并发的坑-1
【100 Mistakes】golang并发的坑-2.md
主要的原因有以下几点:

  1. go协程的panic未处理,会引发整个程序的panic。
  2. go协程的泄漏,如果goroutine启动后没有正确退出或没有及时终止,可能会导致goroutine泄漏。
  3. go协程滥用,如果goroutine在使用完资源后不正确地释放资源,可能会导致资源泄漏。资源泄漏可能包括内存泄漏、文件描述符泄漏等,最终会耗尽系统资源。

本篇文章主要针对1,3的解决方案:https://github.com/sourcegraph/conc。

conc库

短小精悍的go协程库(相比较来说,用起来比较舒服),由Sourcegraph开源的,该组织在github还没有WebIDE时,提供过一个github网页代码查看插件。
具体的使用例子可以查看:

  • GITHUB
  • 官网

一个优秀的开源框架,往往代码单测以及举例都会比较详细。 conc就是这样的代码。

使用场景 协程池

当有集合对象需要进行处理,满足以下2个条件可以使用:

  1. 处理时间,比较耗时的时候。
  2. 各个对象的处理相对独立。

这样的情况下,可以使用conc.Pool 来并发处理。

官方推荐:池是高效的,但不是零成本。不应该使用很短的时间任务。启动和拆卸的开销约为 1μs,并且每个任务的开销约为 300ns。

简单的使用:代码

     g := pool.New()
  var completed atomic.Int64
  for i := 0; i < 100; i++ {
   g.Go(func() {
    time.Sleep(1 * time.Millisecond)
    completed.Add(1)
   })
  }
  g.Wait()

问题来了,这跟waitGroup有啥区别?
区别在于:

  1. pool调用的Go函数,内部自己封装了panic相关处理。
  2. 方便使用,调用goroutine时,不用再使用Done, Add。
  3. 可以通过WithMaxGoroutines,直接指定最大协程数。

推荐的最重要的理由:它补充了一个场景—-往往函数是有错误返回的,那么该如何处理?
代码

func ExampleErrorPool() {
 p := pool.New().WithErrors()
 for i := 0; i < 3; i++ {
  i := i
  p.Go(func() error {
   if i == 2 {
    return errors.New("oh no!")
   }
   return nil
  })
 }
 err := p.Wait()
 fmt.Println(err)
 // Output:
 // oh no!
}

在Wait的时候,会返回所有协程的错误,如果有或者若干个的时候,会进行换行。其内部是通过joinError进行组合的error切片。
另外还有一些常用的功能,比如

WithContext(context.Background())
WithCancelOnError()
NewWithResults()

使用场景  迭代增强

切片提供了2个强力方法ForEach()ForEachIdx(),有过其他语言基础,应该能够从名字就能看出来;
iter_test.go

input := []int{1234}
iterator := iter.Iterator[int]{
    MaxGoroutines: len(input) / 2,
}

iterator.ForEach(input, func(v *int) {
    if *v%2 != 0 {
        *v = -1
    }
})

fmt.Println(input)

// Output:
// [-1 2 -1 4]
  ints := make([]int10000)
  iter.ForEachIdx(ints, func(i int, val *int) {
   *val = i
  })

如果需要对切片内数据进行操作的时候,它会通过多协程的方式来,进行操作。并且协程安全的。很有意思的源码。
这里只是对切片进行原地修改,当我们需要返回值或者返回错误的时候,可以使用iter.Map 方法进行。

使用场景 有序CallBack

什么是有序callBack?
场景:当需要对一组数据进行操作后,需要有序调用callback方法来做进一步处理时,可以使用conc.Stream来进行操作;
stream_test.go

func ExampleStream() {
 times := []int{20521645480}

 s := stream.New()
 for _, millis := range times {
  dur := time.Duration(millis) * time.Millisecond
  s.Go(func() stream.Callback {
   time.Sleep(dur)
   // This will print in the order the tasks were submitted
   return func() { fmt.Println(dur) }
  })
 }
 s.Wait()

 // Output:
 // 20ms
 // 52ms
 // 16ms
 // 45ms
 // 4ms
 // 80ms
}

可以看到,在异步做处理后,进一步调用回调方法,其输出的结果还是有序的。

源码分析

对于conc这种短小精悍的库,我们可以使用goplantuml直接来查看其内部的对象封装情况。

Pool包的ER图

【经验项】GO协程应该怎么使用
plantuml中的类图,可以帮我们快速的查看对象的依赖关系。

好玩的知识

堆栈捕获

当goroutine出现panic后,我们具体需要做什么操作,conc源码中给了一个非常好的实现方式:

type Recovered struct {
 //  panic的原始值
 Value any
 // 当恐慌发生时,runtime.Callers 返回的调用者列表
 // 恢复了。可用于生成更详细的堆栈信息
 // 运行时.CallersFrames。
 Callers []uintptr
 // 来自恢复恐慌的 goroutine 的格式化堆栈跟踪。
 // 比Callers更容易使用。
 Stack []byte
}

定义了一个Recovered的结构体,在Catcher结构体中,调用Catcher.Try()来封装传入的结构体,如果出现了panic后,创建Recovered对象来记录panic的详细信息。pool或conc.WaitGroup底层都使用Catcher来调用函数。
ER图:
【经验项】GO协程应该怎么使用
可以直接下载源码执行:panics_test.go

iter的实现

在conc中iter的实现也比较有趣,通过golang的atomic库+WaitGroup即可实现,实现代码+注释才75行左右;
关键步骤如下:

var idx atomic.Int64
 // Create the task outside the loop to avoid extra closure allocations.
 task := func() {
  i := int(idx.Add(1) - 1)
  for ; i < numInput; i = int(idx.Add(1) - 1) {
   f(i, &input[i])
  }
 }

 var wg conc.WaitGroup
 for i := 0; i < iter.MaxGoroutines; i++ {
  wg.Go(task)
 }
 wg.Wait()

当切片对象传入后,声明一个idx对象,用来保存执行索引(原子操作),然后通过WaitGroup创建goroutine来进行操作。

Stream的实现

stream最有趣的地方在于其异步并且有序的执行回调函数。异步比较容易实现,在异步后并保持回调的有序性,这里可以思考下如何实现?
在goroutine中,通过chan来保证执行的有序性。那么就可以在执行goroutine前,创建一个chan与其通过闭包的方式进行一一绑定,在后续回调的时候,实现同步方式。
ER图如下:
【经验项】GO协程应该怎么使用
从图中,我们可以看到定义了三个类型:

type callbackCh chan func()   // 保存回调函数的通道
type Callback func()  //  定义回调函数
type Task func() Callback   // 定义任务类型

var callbackChPool = sync.Pool{   
 New: func() any {
  return make(callbackCh, 1)
 },
}

func (s *Stream) Go(f Task) {
 s.init()

 // 获取一个channel类型, 即callbackCh
 ch := getCh()
    
 // 将ch 放入到队列中, 这里转同步了  queue的类型为  chan callbackCh 
    // 在 callBackCh 上 加上了一层 作为channel。
    // s.queue 在callbacker()函数中循环等待
 s.queue <- ch

 // Submit the task for execution.
 s.pool.Go(func() {
  defer func() {
   // In the case of a panic from f, we don't want the callbacker to
   // starve waiting for a callback from this channel, so give it an
   // empty callback.
   if r := recover(); r != nil {
    ch <- func() {}
    panic(r)
   }
  }()

  // 最后将回调函数插入到s.queue中
  callback := f()
  ch <- callback
 })
}

func (s *Stream) callbacker() {
 var panicCatcher panics.Catcher
 defer panicCatcher.Repanic()

 // !!!!!!For every scheduled task, read that tasks channel from the queue.
 for callbackCh := range s.queue {
  // Wait for the task to complete and get its callback from the channel.
  callback := <-callbackCh

  // Execute the callback (with panic protection).
  if callback != nil {
   panicCatcher.Try(callback)
  }

  // Return the channel to the pool of unused channels.
  putCh(callbackCh)
 }
}

在这个代码里面还有一个有趣的点, s.queue 通过在Wait函数中 close(s.queue)来进行break循环。所以channel的特征,活学活用!!!!

小结

conc的代码实现非常漂亮,并且其单测的代码也可以借鉴。


原文始发于微信公众号(小唐云原生):【经验项】GO协程应该怎么使用

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/284933.html

(0)
葫芦侠五楼的头像葫芦侠五楼

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!