UP | HOME

Golang 中的 channel 学习笔记

目录

channel

channel 是典型的管道,可以通过 channel 操作符(<-)来进行发送和接收值。它是有顺序的,先发送的数据,肯定会先出来。

non-buffered channel

c := make(chan int)

非缓冲 buffer 。它会在发送端和接收端同时阻塞,直到另一方已经准备好了。

详细点说两个情况

  1. 发送端在发送时会一直阻塞,直到接收端已经接收了数据,否则会一直阻塞下去。
  2. 接收端也会一直阻塞,直接发送端已经发送了数据,否则会一直阻塞下去。

比如下面的代码,就不能正常进行,而会导致死锁:

package main

import (
  "fmt"
)

func main() {
  hello := make(chan string)
  hello <- "hello world"
  fmt.Println(<-hello)
}

运行上面的代码,会报如下错误:

[Running] go run "/Users/emacsist/go/src/test/main.go"
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
	/Users/emacsist/go/src/test/main.go:9 +0x78
exit status 2

所以,一般来说,这种非缓冲channel一般用在如下情况:

  • 线程进行等待,直到channel准备好了数据。典型的情况是,在应用程序中,我们想程序一直运行,直到收到系统的 kill 信号。比如:

    // Handle SIGINT and SIGTERM.
    ch := make(chan os.Signal)
    signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
    log.Println(<-ch)
    fmt.Println("收到Kill信号,关闭程序")
    
  • 一些第三方库,通过使用 channel 来发送事件通知。比如 golang 中的 rabbitmq 的代码,可以注册一个 Connection 关闭的通知:

    // Conn *amqp.Connection
    // CloseEventChan chan *amqp.Error = make(chan *amqp.Error)
    Conn.NotifyClose(CloseEventChan)
    

    以上的代码的意思,就是当 Conn 对象已经关闭时,就会发送一条消息到 CloseEventChan 这个 channel 里,然后我们自己写一条线程来监听这个事件,然后进行相应的逻辑,比如自动重连。

  • 其他的情况,可以自行想像一下。

buffered channel

既然有 non-buffered channel ,那就会有相应的 buffered channel 。这种情况下,对于接收端和发送端也有两种情况:

c := make(chan int, 10)
  1. 发送端,只会在缓冲的channel满了之后,才会阻塞。即 len(c) == bufferSize (即上面make方法中的第二个参数中的10),后面的数据再进行发送时就会阻塞。
  2. 接收端,只有在缓冲的channel是空,即 len(c)==0 时才会阻塞。即读取时,它会一直阻塞,直到该 channel 上有数据后就可以继续接收数据并处理了。

例子:

例子1

正常运行

  func main() {
    hello := make(chan string, 2)
    hello <- "hello world"
    fmt.Println(<-hello)
  }

[Running] go run "/Users/emacsist/go/src/test/main.go"
hello world

例子2

下面的会阻塞,但会导致死锁

func main() {
  hello := make(chan string, 2)
  hello <- "hello world"
  fmt.Println(<-hello)
  fmt.Println(<-hello)
}


[Running] go run "/Users/emacsist/go/src/test/main.go"
hello world
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
/Users/emacsist/go/src/test/main.go:11 +0x155
exit status 2

因为虽然缓冲的大小为2,但实际上的数据只有一条数据。所以,读取一次就没问题,但在主线程里读取第二次,就会导致死锁了。

例子3

关闭 channel 再读取:

  func main() {
    hello := make(chan string, 2)
    hello <- "hello world"
    close(hello)
    fmt.Println(<-hello)
    fmt.Println(<-hello)
  }

[Running] go run "/Users/emacsist/go/src/test/main.go"
1==>hello world
2==>

常用情况

比如控制 生产-消费者 的缓冲大小。缓和生产者与消费者的速率。再具体一点的例子就是自己在产生环境中遇到的,DSP环境中,收到到 win 的事件通知对象,为了快速响应,我们使用了 channel 作为一个缓冲池,下面是示例代码:

// StartWinNoticeWorker : 启动一个线程来处理 win notice 任务
func StartWinNoticeWorker(waitGroup *sync.WaitGroup) {
 go func() {
   channel, err := service.Conn.Channel()
   if err != nil {
     panic(err.Error())
   }
   defer channel.Close()
   log.Infof("start win notice worker ok....")
   for winLog := range WinLogChannel {
     if len(winLog) == 0 {
       continue
     }
     service.PutMessageWithChannel(channel, config.WinNoticeQueue, winLog)
   }
   waitGroup.Done()
   log.Infof("win notice worker done.")
 }()
}

controller中将接收到的消息,放到缓冲channel的 WinLogChannel 对象中,即可立即返回,然后这个 worker 再进行实际的入队操作。(这里使用了 waitGroup 是为了在关闭应用时,优雅关闭,即等待缓冲channel的数据都落地了,才正式退出程序)

注意事项

  • 读取一个未关闭的 channel,会导致接收方会一直阻塞。
  • 读取一个已经关闭的 channel,它会收到该 channel 的数据类型的零值。
  • 千万要记得,关闭channel 的操作,要在发送端关闭。
  • 判断一个channel是否已经关闭
    1. v, isClosed := <- hello 。如果 isClosed 为 true,则表示已经关闭。
    2. 使用for

      for d := range hello {
      
      }
      

      它会在 hello 关闭的时候,读取完所有数据后就会自动退出该 for 循环。

  • 千万不要向一个已经关闭的 channel 再发送数据,否则会报: panic: send on closed channel

channel 结合 select

select 可以让一个 goroutine 在多个通信操作上等待。 select 语句会一直阻塞,直到其中的一个 case 可以运行,然后它就会执行这个 case 的代码;如果有多个都可以准备执行的话,那么 select 会随机选择其中的一个 case 来执行。

注意, select 语句中的每个 case 的条件,必须是IO操作的。

一般情况下,select 会和 for {} 这种死循环结合使用。

## 一个比较完整的例子代码

package main

import (
  "fmt"
  "strconv"
  "sync"
  "time"
)

func main() {
  var wg sync.WaitGroup
  bufferSize := 100000
  dataChannel := make(chan string, bufferSize)
  quitChannel := make(chan bool)
  for i := 0; i < bufferSize; i++ {
    dataChannel <- "hello" + strconv.Itoa(i)
  }
  close(dataChannel)
  fmt.Println("Hello world")
  wg.Add(1)
  go consumer(dataChannel, quitChannel, &wg)
  wg.Add(1)
  go crond(quitChannel, &wg)
  wg.Wait()
  fmt.Println("exit program.")
}

func crond(quitChannel chan<- bool, wg *sync.WaitGroup) {
  timeout := time.After(time.Second * 5)
  <-timeout
  //5秒后,发送退出通知
  fmt.Println("5秒时间到,发送准备退出通知")
  quitChannel <- true
  close(quitChannel)
  wg.Done()
}

func consumer(dataChannel <-chan string, quitChannel <-chan bool, wg *sync.WaitGroup) {
  for {
    select {
    case d := <-dataChannel:
      fmt.Printf("consum data %v\n", d)
      time.Sleep(time.Millisecond * 700)
    case <-quitChannel:
      fmt.Println("收到退出的通知")
      wg.Done()
      return
    }
  }
}


下面是输出的结果例子:
[Running] go run "/Users/emacsist/go/src/test/main.go"
Hello world
consum data hello0
consum data hello1
consum data hello2
consum data hello3
consum data hello4
consum data hello5
consum data hello6
consum data hello7
5秒时间到,发送准备退出通知
consum data hello8
收到退出的通知
exit program.

  [Done] exited with code=0 in 6.585 seconds

可以注意到,什么时候该关闭channel,以及使用 sync.WaitGroup ,再结合 for select 的。

作者: emacsist

Created: 2017-04-12 Wed 23:58