本文主要讨论 channel
使用的相关内容。
目录 Table of Contents
前情提要
为了实现 goroutine
通信,有两种常见并发模型:
- 共享内存:使用共享内存方式,
Go
中 sync
库包提供了多种同步的机制。
- 消息队列:使用类似管道和消息队列的方式,各个并发单元数据相互独立,通过消息进行数据交换,
Go
中 channel
类型模拟了这种同步的模式。
让我们再一次来复读 Go
社区的并发口号——“不要通过共享内存来通信,而应该通过通信来共享内存”。
本文将讨论 Go
并发编程中的通信桥梁 channel
的使用,如有错漏,欢迎指出 ;P
概念引申
管道 Pipe
- 管道 (Pipe) 是操作系统中进程间通信的一种方式。管道的本质是一个存在于内存或文件系统的缓冲有限的特殊文件,进程以先进先出的方式从缓冲区存取数据,管道一端的进程顺序地将数据写入缓冲区,另一端的进程则顺序地读出数据。
- 匿名管道没有名字,是半双工的;匿名管道对于管道两端的进程而言是一个存在于内存的特殊文件;匿名管道只能用于父子进程或兄弟进程之间通信。
- 有名管道拥有名字,是全双工的;有名管道对于管道两端的进程而言是一个存在于文件系统的特殊文件;有名管道可以用于本机任意两个进程之间通信。
- 管道传送无格式字节流,这就要求管道的读出方和写入方必须事先约定好数据的格式。
消息队列 Message
- 消息队列 (Message) 是操作系统中进程间通信的一种方式。管道的本质是一个存放在内核中的长度不限的消息链表,一般遵循先进先出规则,也可以随机或按消息的类型查询,并且允许一个或多个进程向它写入或读取消息。
- 消息队列由消息队列标识符标识,提供有格式字节流并且具有类型。
信道 Channel
- 信道 (Channel) 是
Golang
中协程间通信的一种方式。信道的本质是一个线程安全的先进先出队列,可选择无缓冲或有缓冲,任意时刻同时只能有一个 goroutine
访问 channel
。
- 信道可以指定类型。
Golang
并不支持无限容量的 channel
,原因是如果生产速率远远大于消费速率,那么 channel
内的数据不断累积将会爆掉内存。
channel
使用
make
1 2 3 4 5 6 7 8
| ch := make(chan T, cap)
ch_or := make(<-chan T, cap)
ch_ow := make(chan<- T, cap)
|
- 引用类型,使用
make
创建,零值为 nil
- 可选择数据类型
- 可选择缓冲容量
- 无缓冲:
cap = 0
,读写同步
- 有缓冲:
cap > 0
,读写异步
- 可选择通信方向
read-write
从 channel
中读取数据:
- 无缓冲:没有
goroutine
写入,channel
阻塞
- 有缓冲:
channel
容量变空,channel
阻塞
- 向
channel
里写入数据:
- 无缓冲:没有
goroutine
读取,channel
阻塞
- 有缓冲:
channel
容量变满,channel
阻塞
close
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
close(ch)
ch <- d
d := <-ch
d, ok := <-ch
close(ch)
|
- 关闭
channel
可以通过显式的代码关闭或隐式的垃圾回收
- 对关闭后的
channel
进行写入操作:
- 对关闭后的
channel
进行读取操作:
- 存在已经发送成功的数据:返回数据
- 不存在已经发送成功的数据:返回零值
- 对关闭后的
channel
进行重复关闭:
close
常常与 select
、range
和 defer
一起使用
range
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ch := make(chan struct{})
for item := range ch { }
for { item, ok := <-ch if !ok { break } }
|
for...range
从信道中读出数据,会一直迭代到信道关闭,当 channel
中没有数据时会阻塞当前 goroutine
。
select
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| switch { case Case1: case Case2: case Case3: default: }
select { case item1, ok := <-ch1: case item2, ok := <-ch2: case item3, ok := <-ch3: default: }
|
- 如果所有信道堵塞,有
default
语句:按照默认逻辑处理
- 如果所有信道堵塞,无
default
语句:等待至某一个信道可以处理
- 如果某个信道可以处理:直接处理该信道的逻辑
- 如果多个信道可以处理:随机处理某信道的逻辑
- 在
nil channel
上操作将会一直阻塞;使用 select{}
语句将会一直阻塞
应用实例
单生产者-单消费者模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package main
import ( "fmt" "time" )
const BUFLEN = 5
func producer(ch chan<- int) { d := 1 for { ch <- d fmt.Println("Produce:", d) d++ time.Sleep(1 * time.Second) } }
func consumer(ch <-chan int) { for { d := <-ch fmt.Println("Consume:", d) time.Sleep(2 * time.Second) } }
func main() { ch := make(chan int, BUFLEN)
go producer(ch)
go consumer(ch)
for {
} }
|
多生产者-多消费者模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| package main
import ( "fmt" "math/rand" "sync" "time" )
const BUFLEN = 5
var cond *sync.Cond = sync.NewCond(&sync.Mutex{})
func producer(ch chan<- int) { for { cond.L.Lock() for len(ch) == BUFLEN { cond.Wait() } data := rand.Intn(1000) ch <- data fmt.Println("Produce:", data) cond.L.Unlock()
cond.Signal() time.Sleep(2 * time.Second) } }
func consumer(ch <-chan int) { for { cond.L.Lock() for len(ch) == 0 { cond.Wait() } data := <-ch fmt.Println("Consume:", data) cond.L.Unlock()
cond.Signal() time.Sleep(1 * time.Second) } }
func main() { rand.Seed(time.Now().UnixNano())
ch := make(chan int, BUFLEN)
for i := 0; i < 10; i++ { go producer(ch) }
for i := 0; i < 10; i++ { go consumer(ch) }
for {
} }
|
规律总结
参考链接