referenced: http://www.tapirgames.com/blog/golang-channel-closing
文章首先说明了一个问题:对于go语言中的channel,官方没有给出一个内建的fun closed(c chan T) bool
方法来检查一个管道是不是已经关闭了.然后还指出了即使有,等你得到结果的时候管道的状态很可能又改变了…你得到的永远是前一刻的状态(mmp啊).
作者提出了一个比较合理的关闭管道的原则:
不要从接收端关闭channel,也不要关闭有多个并发发送者的channel。换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receiver(s)(接收者们)已经没有值可以读了.这个对于concurrency的总结很有用.
然后作者按着这个规则,给不同的关管道方式做了评级…
低级关闭
下面这个函数在接收端关闭了管道
1
2
3
4
5
6
7
8
9
10
11func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
justClosed = false
}
}()
// assume ch != nil here.
close(ch) // panic if ch is closed
return true // <=> justClosed = true; return
}下面这个函数在多个发送端关闭管道
1
2
3
4
5
6
7
8
9
10
11
12func SafeSend(ch chan T, value T) (closed bool) {
defer func() {
if recover() != nil {
// The return result can be altered
// in a defer function call.
closed = true
}
}()
ch <- value // panic if ch is closed
return false // <=> closed = false; return
}
顺便分析一下这个函数的性能: channel没有关闭的时候,SafeSend()
的性能跟ch <- value
几乎一样;当channel关闭之后,每个sender的goroutine会执行1次recover()
,对性能的影响不是很大.然而你还是打破了原则啊
中级关闭
作者提出用golang中内建的Once/Mutex
结构实现管道的关闭.1
2
3
4
5
6
7
8
9
10
11
12
13
14type MyChannel struct{
ch chan T
once sync.Once
}
func NewMyChannel() *MyChannel{
return &MyChannel{ch: make(chan T)}
}
func (mych *MyChannel) SafeClose() {
mych.once.Do(func(){
close(mych.ch)
})
}
这里将T类型的channel和一个sync.Once
对象封装在一起,这样对于每一个channel,once.Do
使得close()
只会被调用一次.
1 | type MyChannel struct{ |
注意:不管用哪种方法实现,SafeClose()
都应该在Sender端被调用,这是之前提到的close principle.
高级关闭
这里讨论了三种情况:
这里的三种方法都是通过sync.WaitGroup
实现的.具体的用法在这里
1个发送者,M个接收者:由发送者告诉所有接受者”我再也不发东西了!”(关闭管道)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46package main
import (
"sync"
"log"
"time"
"math/rand"
)
func main() {
rand.Seed(time.Now.UnixNano())
log.SetFlags(0)
const MaxRandomNumber = 100000
const NumReceivers = 100
wgRecveivers := sync.WaitGroup{}
waitGroup.add(NumReceivers)
dataCh := make(chan int)
// sender
go func() {
for {
if value := rand.Intn(MaxRandomNumber) == 0 {
close(dataCh)
return
}
else{
dataCh <- value
}
}
}()
// receiver
for i := 0; i < NumReceivers; i++ {
go func(){
defer wgRecveivers.Done()
for data := range dataCh {
log.Println(value)
}
}()
}
wgRecveivers.wait()
}N个发送者,1个接收者:有接受者跟所有发送者收”别再给我发东西了!”
- N个发送者,M个接收者:通过一个moderator进行信息交互
这个真的太难了…分为两种情况:- 所有的sender都会在某个时间点自动停止发送,像这个例子.这个时候使用WaitGroup可以完美解决这个问题
- 某个sender会在某个时间关闭管道.比如前面的例子:每个goroutine随机生成整数直到某个线程随机到0,该线程会把channel关闭.未解之谜…好像从设计的角度讲不应该有这种goroutine出现.
默认认为第一种情况比较常见,如果出现第二种情况,我觉得就应该重新设计模块了吧