如何优雅地关闭channel

referenced: http://www.tapirgames.com/blog/golang-channel-closing

文章首先说明了一个问题:对于go语言中的channel,官方没有给出一个内建的fun closed(c chan T) bool方法来检查一个管道是不是已经关闭了.然后还指出了即使有,等你得到结果的时候管道的状态很可能又改变了…你得到的永远是前一刻的状态(mmp啊).

作者提出了一个比较合理的关闭管道的原则:
不要从接收端关闭channel,也不要关闭有多个并发发送者的channel。换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receiver(s)(接收者们)已经没有值可以读了.这个对于concurrency的总结很有用.

然后作者按着这个规则,给不同的关管道方式做了评级…

低级关闭

  1. 下面这个函数在接收端关闭了管道

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func SafeClose(ch chan T) (justClosed bool) {
    defer func() {
    if recover() != nil {
    justClosed = false
    }
    }()

    // assume ch != nil here.
    close(ch) // panic if ch is closed
    return true // <=> justClosed = true; return
    }
  2. 下面这个函数在多个发送端关闭管道

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    func SafeSend(ch chan T, value T) (closed bool) {
    defer func() {
    if recover() != nil {
    // The return result can be altered
    // in a defer function call.
    closed = true
    }
    }()

    ch <- value // panic if ch is closed
    return false // <=> closed = false; return
    }

顺便分析一下这个函数的性能: channel没有关闭的时候,SafeSend()的性能跟ch <- value几乎一样;当channel关闭之后,每个sender的goroutine会执行1次recover(),对性能的影响不是很大.然而你还是打破了原则啊

中级关闭

作者提出用golang中内建的Once/Mutex结构实现管道的关闭.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type MyChannel struct{
ch chan T
once sync.Once
}

func NewMyChannel() *MyChannel{
return &MyChannel{ch: make(chan T)}
}

func (mych *MyChannel) SafeClose() {
mych.once.Do(func(){
close(mych.ch)
})
}

这里将T类型的channel和一个sync.Once对象封装在一起,这样对于每一个channel,once.Do使得close()只会被调用一次.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type MyChannel struct{
ch chan T
closed bool
mutex sync.Mutex
}

func NewMyChannel() *MyChannel{
return &MyChannel{ch: make(chan T)}
}

func (mych *MyChannel) SafeClose(){
mych.mutex.lock()
if !mych.closed {
close(mych.ch)
mych.closed = true
}
mych.mutex.unlock()
}

注意:不管用哪种方法实现,SafeClose()都应该在Sender端被调用,这是之前提到的close principle.

高级关闭

这里讨论了三种情况:
这里的三种方法都是通过sync.WaitGroup实现的.具体的用法在这里

  1. 1个发送者,M个接收者:由发送者告诉所有接受者”我再也不发东西了!”(关闭管道)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    package main

    import (
    "sync"
    "log"
    "time"
    "math/rand"
    )

    func main() {
    rand.Seed(time.Now.UnixNano())
    log.SetFlags(0)

    const MaxRandomNumber = 100000
    const NumReceivers = 100

    wgRecveivers := sync.WaitGroup{}
    waitGroup.add(NumReceivers)

    dataCh := make(chan int)
    // sender
    go func() {
    for {
    if value := rand.Intn(MaxRandomNumber) == 0 {
    close(dataCh)
    return
    }
    else{
    dataCh <- value
    }
    }
    }()

    // receiver
    for i := 0; i < NumReceivers; i++ {
    go func(){
    defer wgRecveivers.Done()

    for data := range dataCh {
    log.Println(value)
    }
    }()
    }

    wgRecveivers.wait()
    }
  2. N个发送者,1个接收者:有接受者跟所有发送者收”别再给我发东西了!”

  3. N个发送者,M个接收者:通过一个moderator进行信息交互
    这个真的太难了…分为两种情况:
    • 所有的sender都会在某个时间点自动停止发送,像这个例子.这个时候使用WaitGroup可以完美解决这个问题
    • 某个sender会在某个时间关闭管道.比如前面的例子:每个goroutine随机生成整数直到某个线程随机到0,该线程会把channel关闭.未解之谜…好像从设计的角度讲不应该有这种goroutine出现.

默认认为第一种情况比较常见,如果出现第二种情况,我觉得就应该重新设计模块了吧