前言
如果说goroutine是Go语言程序的并发体的话,那么channel则是它们之间的通信机制。一个channel就是一个通讯机制,可以让一个goroutine通过它给另一个goroutine发送值信息。每一个channel都有一个特殊的类型,也就是channel可发送数据的类型。
与map类似,channel也对应一个make创建的底层数据结构的引用。当我们复制一个channel或者用于函数参数传递时,我们只是拷贝了一个channel引用,因此调用者和被调用者将引用同一个channel对象。
两个相同类型的channel可以使用==运算符比较
用法
channel又可以分为无缓存channel和带缓存channel
无缓存Channel
ch:=make(chan int)
ch:=make(chan int,0)
一个基于无缓存的channel的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的channel上执行接收操作
反之,如果接收操作先发生,那么接受者goroutine也将阻塞
在无缓存channel发送数据时,接收者收到数据发生在唤醒发送者goroutine之前(happens before)
带缓存Channel
ch:=make(chan int,1)
内部持有一个元素队列,队列的最大的容量是在调用make函数创建channel时通过第二个参数指定的
向缓存channel的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素
如果内存队列满了,那么发送操作将会阻塞直到另一个goroutine执行接收操作
反之,如果channel为空,接收操作将会阻塞直到另一个goroutine执行发送操作
向 Channel 中发送内容
ch<-1
从 Channel 中接收内容
var i:=<-ch
如果channel中已经没有数据,将会产生一个零值数据
关闭Channel
close(ch)
关闭后,即使channel为空,接收者也不会再阻塞
关闭后,接收操作依然可以接收到之前已经成功发送的数据
注意:重复关闭一个channel会导致panic异常,试图关闭一个nil值的channel也会导致panic异常
源码分析
注意:我会把源码中每个方法的作用都注释出来,可以参考注释进行理解。
func main() {
ch:=make(chan int,1) // 4
ch<-1 //5
close(ch) //6
<-ch //7
}
首页,我们通过汇编语言查看底层函数,我们主要看CALL指令就可以了
0x0024 00036 (channel.go:4) PCDATA $2, $1
0x0024 00036 (channel.go:4) PCDATA $0, $0
0x0024 00036 (channel.go:4) LEAQ type.chan int(SB), AX
0x002b 00043 (channel.go:4) PCDATA $2, $0
0x002b 00043 (channel.go:4) MOVQ AX, (SP)
0x002f 00047 (channel.go:4) MOVQ $1, 8(SP)
0x0038 00056 (channel.go:4) CALL runtime.makechan(SB)
0x003d 00061 (channel.go:4) PCDATA $2, $1
0x003d 00061 (channel.go:4) MOVQ 16(SP), AX
0x0042 00066 (channel.go:4) PCDATA $0, $1
0x0042 00066 (channel.go:4) MOVQ AX, "".ch+24(SP)
0x0047 00071 (channel.go:5) PCDATA $2, $0
0x0047 00071 (channel.go:5) MOVQ AX, (SP)
0x004b 00075 (channel.go:5) PCDATA $2, $2
0x004b 00075 (channel.go:5) LEAQ "".statictmp_0(SB), CX
0x0052 00082 (channel.go:5) PCDATA $2, $0
0x0052 00082 (channel.go:5) MOVQ CX, 8(SP)
0x0057 00087 (channel.go:5) CALL runtime.chansend1(SB)
0x005c 00092 (channel.go:6) PCDATA $2, $1
0x005c 00092 (channel.go:6) MOVQ "".ch+24(SP), AX
0x0061 00097 (channel.go:6) PCDATA $2, $0
0x0061 00097 (channel.go:6) MOVQ AX, (SP)
0x0065 00101 (channel.go:6) CALL runtime.closechan(SB)
0x006a 00106 (channel.go:7) PCDATA $2, $1
0x006a 00106 (channel.go:7) PCDATA $0, $0
0x006a 00106 (channel.go:7) MOVQ "".ch+24(SP), AX
0x006f 00111 (channel.go:7) PCDATA $2, $0
0x006f 00111 (channel.go:7) MOVQ AX, (SP)
0x0073 00115 (channel.go:7) MOVQ $0, 8(SP)
0x007c 00124 (channel.go:7) CALL runtime.chanrecv1(SB)
结构体
首先我们看一下channel结构体
runtime\chan.go
type hchan struct {
qcount uint // 队列中的元素总数 len
dataqsiz uint // 队列的大小 cap
buf unsafe.Pointer // 指向dataqsiz 的指针
elemsize uint16 // sizeof(chan) 中的数据
closed uint32 //是否关闭
elemtype *_type // 元素类型
sendx uint //发送下标
recvx uint // 接收下标
recvq waitq // 等待读消息的goroutine队列
sendq waitq //等待写消息的goroutine队列
//锁保护hchan中的所有字段,以及几个
//在此通道上被阻止的sudogs中的字段。
//按住此锁定时不要更改另一个G的状态
//(特别是不要准备G),因为这可能会导致死锁
lock mutex
}
初始化makechan
func makechan(t *chantype, size int) *hchan {
elem := t.elem
//编译器检查元素类型
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//获取需要分配的内存
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 如果 hchan 中的元素不包含有指针,那么就没什么和 GC 相关的信息了
var c *hchan
//多实用switch少用if else
switch {
//队列或元素大小为0
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
//元素中不包含指针
case elem.kind&kindNoPointers != 0:
//元素不包含指针。
//一次调用分配hchan和buf。
//这种情况,gc不会对 channel 中的元素进行 scan
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素中包含指针
//区别:调用了两次分配空间的函数
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
发送数据chansend1
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//如果channel为nil
if c == nil {
if !block {
return false
}
// nil channel 发送数据会永远阻塞下去
// 挂起当前 goroutine
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
//检查是否阻塞
//检查是否关闭,channel关闭之后不能发送
//检查channel容量&&检查channel 接收队列是否为空
//检查channel队列是否满了
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// channel 已被关闭,panic异常
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//找到等待的接收者
if sg := c.recvq.dequeue(); sg != nil {
//直接把要发的数据拷贝给这个 receiver
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//如果channel队列还没满
if c.qcount < c.dataqsiz {
// 通道缓冲区中有可用空间。发送的元素入队。
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
//将 goroutine 的数据拷贝到 buffer 中
typedmemmove(c.elemtype, qp, ep)
// 将发送 index 加一
c.sendx++
//满了,重置index
if c.sendx == c.dataqsiz {
c.sendx = 0
}
//长度加1
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
//在 channel 上阻塞,receiver 会帮我们完成后续的工作
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中
c.sendq.enqueue(mysg)
// 将这个发送 goroutine 从 Grunning -> Gwaiting
// 进入休眠
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
//确保发送的值保持活动状态,直到接收者将其复制出来
KeepAlive(ep)
// 唤醒
//如果不是合法的唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
//唤醒
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
//唤醒字后发现channel被关闭了
panic(plainError("send on closed channel"))
}
//可唤醒
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
上面代码看着复杂,我们在总结一下发送数据的过程:
- 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出goroutine,并把数据写入,最后把该G唤醒,结束发送过程;
- 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
- 如果缓冲区中没有空余位置,将待发送数据写入goroutine,将当前goroutine加入sendq,进入睡眠,等待被读goroutine唤醒;
接收数据chanrecv1
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\n")
}
//如果channel为空
if c == nil {
if !block {
return
}
// 挂起当前 goroutine
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//同上
// 非阻塞且没内容可收的情况下要直接返回
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//加锁
lock(&c.lock)
//当前channel没有数据可读
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
//释放锁
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// sender 队列中有 sudog 在等待
// 直接从该 sudog 中获取数据拷贝到当前 g 即可
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//还有可读数据
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 直接从 buffer 里拷贝数据
typedmemclr(c.elemtype, qp)
//接收索引 +1
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
//buffer 元素计数 -1// buffer 元素计数 -1
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
//没有可用的发送者,阻塞当前channel
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 进入 recvq 队列
c.recvq.enqueue(mysg)
//等待 Grunning -> Gwaiting
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 非正常唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
//唤醒
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
读取数据过程总结如下:
- 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出goroutine,把goroutine中数据读出,最后把goroutine唤醒,结束读取过程;
- 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把goroutine中数据写入缓冲区尾部,把goroutine唤醒,结束读取过程;
- 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
- 将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;
关闭closechan
func closechan(c *hchan) {
//channel为空,不能关闭
if c == nil {
panic(plainError("close of nil channel"))
}
//加锁
lock(&c.lock)
// 如果 channel 已经关闭过了
// 直接触发 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
//释放所有的接收者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 将 goroutine 入 glist
glist.push(gp)
}
// 释放所有的发送者
//可能会出现panic
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 将 goroutine 入 glist
glist.push(gp)
}
//释放锁
unlock(&c.lock)
//把所有的G重置
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
//使 g 的状态切换到 Grunnable
goready(gp, 3)
}
}
关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。
除此之外,panic出现的常见场景还有:
- 关闭值为nil的channel
- 关闭已经被关闭的channel
- 向已经关闭的channel写数据
Goroutine泄漏
例如:
func query()string{
ch:=make(chan string)
go func() {ch<-"A"}()
go func() {ch<-"B"}()
return <-ch
}
我们使用了无缓存channel,那么慢的那个goroutine将会因为没有人接收而被永远卡住,这种情况称为goroutine泄露。和垃圾变量不同,泄露的goroutine并不会被自动回收,因此确保每个不再需要的goroutine能正常退出是非常重要的。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/12907.html