#Golang# Golang与并发编程(4) channel使用

#Golang# Golang与并发编程(4) channel使用

本文主要讨论 channel 使用的相关内容。


目录 Table of Contents


前情提要

为了实现 goroutine 通信,有两种常见并发模型:

  • 共享内存:使用共享内存方式,Gosync 库包提供了多种同步的机制。
  • 消息队列:使用类似管道和消息队列的方式,各个并发单元数据相互独立,通过消息进行数据交换,Gochannel 类型模拟了这种同步的模式。

让我们再一次来复读 Go 社区的并发口号——“不要通过共享内存来通信,而应该通过通信来共享内存”

本文将讨论 Go 并发编程中的通信桥梁 channel的使用,如有错漏,欢迎指出 ;P

概念引申

管道 Pipe

Pipe
  • 管道 (Pipe) 是操作系统中进程间通信的一种方式。管道的本质是一个存在于内存或文件系统缓冲有限的特殊文件,进程以先进先出的方式从缓冲区存取数据,管道一端的进程顺序地将数据写入缓冲区,另一端的进程则顺序地读出数据。
  • 匿名管道没有名字,是半双工的;匿名管道对于管道两端的进程而言是一个存在于内存的特殊文件;匿名管道只能用于父子进程或兄弟进程之间通信。
  • 有名管道拥有名字,是全双工的;有名管道对于管道两端的进程而言是一个存在于文件系统的特殊文件;有名管道可以用于本机任意两个进程之间通信。
  • 管道传送无格式字节流,这就要求管道的读出方和写入方必须事先约定好数据的格式。

消息队列 Message

Message
  • 消息队列 (Message) 是操作系统中进程间通信的一种方式。管道的本质是一个存放在内核中的长度不限的消息链表,一般遵循先进先出规则,也可以随机或按消息的类型查询,并且允许一个或多个进程向它写入或读取消息。
  • 消息队列由消息队列标识符标识,提供有格式字节流并且具有类型

信道 Channel

Channel
  • 信道 (Channel)Golang协程间通信的一种方式。信道的本质是一个线程安全的先进先出队列,可选择无缓冲或有缓冲,任意时刻同时只能有一个 goroutine 访问 channel
  • 信道可以指定类型
  • Golang不支持无限容量channel,原因是如果生产速率远远大于消费速率,那么 channel 内的数据不断累积将会爆掉内存。

channel 使用

make

1
2
3
4
5
6
7
8
// 双向channel
ch := make(chan T, cap)

// 只读channel(但是没什么意义,双向channel可以赋值单向channel)
ch_or := make(<-chan T, cap)

// 只写channel(但是没什么意义,双向channel可以赋值单向channel)
ch_ow := make(chan<- T, cap)
  • 引用类型,使用 make 创建,零值为 nil
  • 可选择数据类型
  • 可选择缓冲容量
    • 无缓冲cap = 0,读写同步
    • 有缓冲cap > 0,读写异步
  • 可选择通信方向
    • 只读<-chan
    • 只写chan<-

read-write

1
data := <- ch
  • channel 中读取数据:

    • 无缓冲:没有 goroutine 写入,channel 阻塞
    • 有缓冲channel 容量变空,channel 阻塞
1
ch <- data
  • channel 里写入数据:
    • 无缓冲:没有 goroutine 读取,channel 阻塞
    • 有缓冲channel 容量变满,channel 阻塞

close

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 关闭 channel
// close 手动关闭或等待自动 GC
close(ch)

// 尝试写入已关闭的 channel
// 导致 panic 异常
ch <- d

// 返回数据或零值
// 还有剩余数据则返回数据
// 没有剩余数据则返回零值
d := <-ch

// 返回数据和布尔值
// true 表示成功从 channel 接收到值
// false 表示 channel 已经被关闭并且里面没有值可接收
d, ok := <-ch

// 尝试重复关闭 channel
// 导致 panic 异常
close(ch)
  • 关闭 channel 可以通过显式的代码关闭或隐式的垃圾回收
  • 对关闭后的 channel 进行写入操作
    • 导致 panic 异常
  • 对关闭后的 channel 进行读取操作
    • 存在已经发送成功的数据:返回数据
    • 不存在已经发送成功的数据:返回零值
  • 对关闭后的 channel 进行重复关闭
    • 导致 panic 异常
  • close 常常与 selectrangedefer 一起使用

range

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ch := make(chan struct{})

// for-range 写法
for item := range ch {
// ...
}

// for-break 写法
for {
item, ok := <-ch
// ...
if !ok {
break
}
}
  • for...range 从信道中读出数据,会一直迭代到信道关闭,当 channel 中没有数据时会阻塞当前 goroutine

select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 处理一般逻辑,case 为一般表达式
switch {
case Case1:
// ...
case Case2:
// ...
case Case3:
// ...
default:
// ...
}

// 处理通信逻辑,case 为通信表达式
select {
case item1, ok := <-ch1:
// ...
case item2, ok := <-ch2:
// ...
case item3, ok := <-ch3:
// ...
default:
// ...
}
  • 如果所有信道堵塞,有 default 语句:按照默认逻辑处理
  • 如果所有信道堵塞,无 default 语句:等待至某一个信道可以处理
  • 如果某个信道可以处理:直接处理该信道的逻辑
  • 如果多个信道可以处理:随机处理某信道的逻辑
  • nil channel 上操作将会一直阻塞;使用 select{} 语句将会一直阻塞

应用实例

单生产者-单消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"fmt"
"time"
)

const BUFLEN = 5

// 生产者
func producer(ch chan<- int) {
d := 1
for {
ch <- d
fmt.Println("Produce:", d)
d++
time.Sleep(1 * time.Second) // 生产者速率不宜过快
}
}

// 消费者
func consumer(ch <-chan int) {
for {
d := <-ch
fmt.Println("Consume:", d)
time.Sleep(2 * time.Second) // 消费者速率不宜过快
}
}

func main() {
// 生产消费的缓冲区
ch := make(chan int, BUFLEN)

// 启动生产者协程
go producer(ch)

// 启动消费者协程
go consumer(ch)

// 防止主函数的退出
for {

}
}

多生产者-多消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

const BUFLEN = 5

var cond *sync.Cond = sync.NewCond(&sync.Mutex{})

// 生产者
func producer(ch chan<- int) {
for {
cond.L.Lock()
for len(ch) == BUFLEN {
cond.Wait()
}
data := rand.Intn(1000)
ch <- data
fmt.Println("Produce:", data)
cond.L.Unlock()

cond.Signal()
time.Sleep(2 * time.Second) // 生产者速率不宜过快
}
}

// 消费者
func consumer(ch <-chan int) {
for {
cond.L.Lock()
for len(ch) == 0 {
cond.Wait()
}
data := <-ch
fmt.Println("Consume:", data)
cond.L.Unlock()

cond.Signal()
time.Sleep(1 * time.Second) // 消费者速率不宜过快
}
}

func main() {
rand.Seed(time.Now().UnixNano())

// 生产消费的缓冲区
ch := make(chan int, BUFLEN)

// 启动 10 个生产者协程
for i := 0; i < 10; i++ {
go producer(ch)
}

// 启动 10 个消费者协程
for i := 0; i < 10; i++ {
go consumer(ch)
}

// 防止主函数的退出
for {

}
}

规律总结

操作 channel 规律

参考链接


Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×