go语言进阶-并发

0. 前言

优雅的并发编程范式,完善的并发支持,出色的并发性能是Go语言区别于其他语言的一大特色。

接下来,我们将从原理到应用,深入了解go并发。

1. 并发与并行

这些经典概念,对于学过操作系统的同学或许并不陌生。

  • 并发:多个代码片段(进程、线程)轮流在一个物理处理器(单核 CPU)上执行,通过快速的上下文切换,营造一种同时执行的假象,是虚假的同时执行。
  • 并行:多个代码片段(进程、线程)同时在不同的物理执行器上执行,是真正的同时执行。

2. 进程 线程 协程

  • 进程:一个正在运行的程序,系统进行资源分配的基本单位,拥有独立的内存空间。
  • 线程系统调度的基本单位,每个进程可以有一个到多个线程(进程的初始线程为主线程),线程有自己的寄存器和栈空间等,但也共享着进程的内存空间。这里的线程指内核级线程
  • 协程:一种用户态的轻量级线程,协程的调度完全由用户控制,没有内核的开销。类似于用户级线程

3. Goroutine

goroutine 是 Go 语言并发的关键部分,它是 Go 语言中的轻量级线程,也可以叫做用户级线程/协程。它是 Go 语言中的多线程

在 Go 语言中,创建并启动 goroutine 十分便捷,只需使用 go 关键字调用一个函数即可。如下,调用匿名函数启动一个 Goroutine:

1
2
3
go func() {
fmt.Println("hello goroutine")
}()

同时,它的轻量体现在:

  • 上下文切换代价更小。不必经过内核。
  • 栈空间占用小。线程栈空间通常是 2M,goroutine 栈空间最小 2K。

而轻量级线程搭配 Go 语言并发调度器,Go 语言可以轻松支持上万 goroutine 并发运行。

4. Go 并发调度器模型 G-P-M

多进程/多线程的第一大问题,便是调度问题

底层多进程和内核级多线程的调度,操作系统已经帮我们解决了。但作为用户级线程,goroutine 的调度问题,当然要靠 Go 语言自己解决。而这,就是 Go 调度器模型 G-P-M 的设计初衷。

介绍它之前,我们必须知道:内核级线程是系统调度的基本单位,这意味着系统只会给内核级线程分配物理处理器(CPU);而用户级线程,操作系统是不可见的。

在 G-P-M 模型中,有三大部分:

  • G(Goroutine):每个 goroutine 对应一个 G 结构体,G 存储 goroutine 的运行栈、状态以及任务函数。
  • P(Processor):逻辑处理器,Go 语言会在逻辑处理器上调度 goroutine 运行。
  • M(Machine):内核级线程,由系统调度。

相互之间的关系如下图:

1

  • M 内核级线程,运行时绑定在一个处理器(CPU 核)上;
  • P 逻辑处理器需要绑定在一个 M 内核级线程上,才能运行;
  • 一个 P 逻辑处理器可以调度多个 G(Goroutine),但同一个时间只能有一个 GP 上运行。

既然知道了三者的关系,那么调度器如何进行统筹兼顾,实现高效率并发呢?调度器有以下几种行为(算法):

  • 创建 G 时:当创建一个 goroutine 并准备运行时,它会被调度器放到 全局运行队列(GRQ) 中。之后,调度器会把它分配给一个逻辑处理器 P,即将它放到这个逻辑处理器的 本地运行队列(LRQ) 中。

  • 及时切换 P 上的 G:与操作系统并发调度类似,当一个 goroutine 在逻辑处理器上运行时间过长(毫秒)时,调度器会停止当前 goroutine ,并给 LRQ 上其它 goroutine 运行机会。
    1

  • 拒绝摸鱼的 P:当一个逻辑处理器空闲时,调度器会从其它逻辑处理器或者全局队列上,为它分配 goroutine (这也叫任务窃取)。

  • P 自动分离因系统调用而阻塞的 M:当 goroutine 需要执行一个阻塞的系统调用时(如:打开文件),调度器会将线程和 goroutine 从逻辑处理器上分离,并为逻辑处理器分配一个新线程,阻塞的 goroutine 返回后,会回到该逻辑处理器的本地队列中,而线程则继续使用。
    1

  • 将进行网络I/O的 G 交给网络轮询器(NetPoller):一旦该轮询器指示某个网络读或者写就绪,对应的 goroutine 就会重新回到之前的逻辑处理器上来完成操作。

5. Go 并发应用

一个 goroutine 的简单使用示例如下。创建两个 goroutine ,一个输出字符 A ,一个输出字符 B ,两个 goroutine 并发运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"runtime"
"sync"
"time"
)

func main() {
// 分配一个逻辑处理器给调度器使用
runtime.GOMAXPROCS(1)

// wg 用来等待 goroutine 结束,Add(2) 表示要等待两个 goroutine
var wg sync.WaitGroup
wg.Add(2)

// 声明一个匿名函数,并创建一个 goroutine
go func() {
// 在函数退出时调用 Done 来通知 main 函数工作已经完成
defer wg.Done()
for count := 0; count < 10; count++ {
fmt.Printf("A")
time.Sleep(1)
}
}()

// 声明一个匿名函数,并创建一个 goroutine
go func() {
// 在函数退出时调用 Done 来通知 main 函数工作已经完成
defer wg.Done()
for count := 0; count < 10; count++ {
fmt.Printf("B")
time.Sleep(1)
}
}()

// 等待 goroutine 结束
wg.Wait()
}
// 输出(可能不相同):BAABBAABBAABBAABBAAB

6. Go 并发同步模型 CSP

多进程/多线程的第二大问题,则是同步问题(互斥是一种特殊的同步)

Go 语言提供了传统的同步机制,包括:原子函数(sync/atomic)、互斥锁(sync.Mutex)、读写锁(sync.RWMutex)、条件变量(sync.Cond 等。

同时,Go 语言也提出了独特的并发同步机制——来自一个叫作通信顺序进程(Communicating Sequential Processes,CSP)泛型。CSP 是一种消息传递模型,通过在 goroutine 之间传递数据来传递消息,而不是对数据进行加锁来实现同步访问。

Go 语言中的通道 channel 结构则实现了这一同步机制。

7. channel 通道

channel 是 Go 语言中的引用类型结构,声明后初值为 nil

通道的基本操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 声明一个整型 channel,为零值 nil
var channel chan int

// 声明一个只能发送的整型单通道(单通道多用于函数参数)
var send chan<- int

// 声明一个只能接受的整型单通道
var receive <-chan int

// 为无缓冲的整型单通道分配内存
send = make(chan<- int)

// 创建无缓冲的整型 channel
unbuffered := make(chan int)

// 创建有缓冲的字符串 channel
buffered := make(chan string, 10)

fmt.Println(channel, send, receive, unbuffered, buffered) // <nil> 0xc000048060 <nil> 0xc0000480c0 0xc000044060

// 向通道发送数据(向字符串型通道发送字符串)
buffered <- "hello goroutine"

// 从通道接收数据(从字符串型通道中接收一个字符串)
value := <-buffered
fmt.Println(value) // hello goroutine

// 关闭通道
close(buffered)
fmt.Println(buffered) // 0xc000044060

关于 close() 关闭通道函数的具体细节,可参考源码注释:

1
2
3
4
5
6
7
8
9
// The close built-in function closes a channel, which must be either
// bidirectional or send-only. It should be executed only by the sender,
// never the receiver, and has the effect of shutting down the channel after
// the last sent value is received. After the last value has been received
// from a closed channel c, any receive from c will succeed without
// blocking, returning the zero value for the channel element. The form
// x, ok := <-c
// will also set ok to false for a closed channel.
func close(c chan<- Type)

接下来,主要介绍无缓冲通道和有缓冲通道,以及通道在 goroutine 同步中的应用。

7.1. 无缓冲通道

无缓冲通道,是指通道中不能缓存数据。换句话说,发送方接收方必须都进入通道,才能完成数据传输,不然一方必须等待另一方。

发送方发送数据时,如果接收方还没准备好,则发送方会阻塞等待接收方;接收方准备好接收数据时,如果发送方还没来,则接受方也会阻塞。

如下图,第2步中发送方进入通道,但接收方还未进入,则发送方阻塞;第5、6步中接收方接收到数据,双方同时离开通道。

1

7.2. 有缓冲通道

有缓冲通道,是指通道中能缓存数据。

那么,通道并不要求双方必须同时完成发送和接受,发送方把数据放到通道中就可以离开,接收方从通道中拿到数据也可以离开

但是,当通道中没有多余空间时,发送方需要阻塞;当通道中没有缓存数据时,接收方需要阻塞。

如下图,发送方放完数据就可以进行下一步,接收方拿到数据也可以进行下一步:

1

8. Go 实现同步

8.1. 实现互斥

互斥是同步的一种特殊情况,也是一种重要情况。

从 goroutine 的角度来看,互斥是指:不允许两个以上的并发 goroutine 同时进入访问共享资源的临界区。其中,临界区是指:访问临界资源的程序片段;临界资源是指:并发 goroutine 必须互斥访问的共享资源。

简单来说,互斥就是不允许多个 goroutine 同时读写共享变量。

8.1.1. 未实现互斥的后果

看如下例子,运行两个 goroutine 分别对全局变量 count 加10次1和10次2。如果程序不保证互斥,那么就会出现超乎预期的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"runtime"
"sync"
)

var count int
var wg sync.WaitGroup

// work 对 count 全局变量进行读写操作,未保护临界资源
func work(add int) {
defer wg.Done()
for i := 0; i < 10; i++ {
temp := count + add
runtime.Gosched() // 当前 goroutine 从线程退出,并放回到队列
count = temp
fmt.Printf("add %d: %d\n", add, count)
}
}

func main() {
fmt.Println("init:", count)

// 要等待两个 goroutine
wg.Add(2)

go work(1)
go work(2)

// 等待 goroutine 结束
wg.Wait()
fmt.Println("result:", count)
}

运行这段代码,result 时而输出 23、时而输出 10,各种各样奇怪的结果,总之很难得到正确答案 30。具体原因就不在此赘述了,学过操作系统、玩过多线程的肯定都清楚。

正如上文所说的,在 Go 语言中,实现互斥(同步)有很多种方法(机制),接下来将主要介绍 通道 两种方法。

8.1.2. 锁实现互斥

对于上述例子,我们可以使用 互斥锁(sync.Mutex) 来保证互斥,用加锁解锁将临界区保护起来,从而保证程序的正确性。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"runtime"
"sync"
)

var count int
var wg sync.WaitGroup
var mutex sync.Mutex

// workWithMutex 对 count 全局变量进行读写操作,使用 mutex 互斥锁保护临界资源
func workWithMutex(add int) {
defer wg.Done()
for i := 0; i < 10; i++ {
// 用互斥锁保护临界区
mutex.Lock()
temp := count + add
runtime.Gosched() // 当前 goroutine 从线程退出,并放回到队列
count = temp
mutex.Unlock() // 释放锁
fmt.Printf("add %d: %d\n", add, temp)
}
}

func main() {
count = 0
fmt.Println("init:", count)

// 要等待两个 goroutine
wg.Add(2)

go workWithMutex(1)
go workWithMutex(2)

// 等待 goroutine 结束
wg.Wait()
fmt.Println("result:", count)
}

不管你怎样运行该程序,result 结果都是 30。

8.1.3. 通道实现互斥

用互斥锁实现互斥的方法,在很多语言中都很常见。

接下来,将介绍如何使用通道来实现互斥。同样是上述例子,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"fmt"
"runtime"
"sync"
)

var count int
var wg sync.WaitGroup
var countChan chan int

// workWithChannel 对 count 全局变量进行读写操作,使用 channel 保护临界资源
func workWithChannel(add int) {
defer wg.Done()
for i := 0; i < 10; i++ {
// 用 channel 保护临界资源
temp := <-countChan + add
runtime.Gosched() // 当前 goroutine 从线程退出,并放回到队列
countChan <- temp
fmt.Printf("add %d: %d\n", add, temp)
}
}

func main() {
count = 0
fmt.Println("init:", count)

// 创建通道,必须是有缓冲通道,无缓冲会导致死锁
countChan = make(chan int, 1)

wg.Add(2)

go workWithChannel(1)
go workWithChannel(2)

// 将初始数据传入通道
countChan <- count

// 等待 goroutine 结束
wg.Wait()

// 将最终数据取出
count = <-countChan
fmt.Println("result:", count)
}

运行也是可以得到正确结果 30。

用通道实现互斥的思路是:由于互斥是同步的一种特殊情况,那么我们可以将临界资源放入通道中,当一个 goroutine 从通道中取出资源使用时,下一个 goroutine 必须等待当前 goroutine 使用完资源放回通道后,才可以使用,这就相当于一种同步。

8.1.4. 用通道实现互斥性能并不佳

虽然在 Go 语言中,可以使用通道实现互斥,但是使用通道方式的性能并不佳,请看如下示例。

我们定义两个分别使用锁和通道实现互斥的简单函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// channel.go
package main

import (
"sync"
)

var count int
var countChan chan int
var mutex sync.Mutex

// workWithMutex 对 count 全局变量进行读写操作,使用 mutex 互斥锁保护临界资源
func workWithMutex(add int) {
for i := 0; i < 10; i++ {
// 用互斥锁保护临界区
mutex.Lock()
temp := count + add
count = temp
mutex.Unlock() // 释放锁
}
}

// workWithChannel 对 count 全局变量进行读写操作,使用 channel 保护临界资源
func workWithChannel(add int) {
for i := 0; i < 10; i++ {
// 用 channel 保护临界资源
temp := <-countChan + add
countChan <- temp
}
}

然后,我们定义两个性能测试函数(Go语言自带性能测试,测试函数需要以 Benchmark 开头),分别测试 workWithMutexworkWithChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//channel_test.go
package main

import (
"testing"
)

func BenchmarkWorkWithMutex(b *testing.B) {
count = 0
for i := 0; i < b.N; i++ {
workWithMutex(1)
}
}

func BenchmarkWorkWithChannel(b *testing.B) {
countChan = make(chan int, 1)
countChan <- 0
for i := 0; i < b.N; i++ {
workWithChannel(1)
}
<-countChan
}

使用 Go 自带的性能测试工具进行测试,结果如下:

1
2
3
4
5
6
7
8
D:\my_code\go\src\study\goroutine_test\channel_mutex_cmp>go test -bench .
goos: windows
goarch: amd64
pkg: goroutine_test/channel_mutex_cmp
BenchmarkWorkWithChannel-8 2264170 543 ns/op
BenchmarkWorkWithMutex-8 8005128 141 ns/op
PASS
ok goroutine_test/channel_mutex_cmp 4.166s

可以看出:使用锁的性能比使用通道的性能高 2~3 倍

其中原因,我猜测是:通道的本质跟锁的本质是一样的,都是原子操作 + goroutine 的阻塞唤醒,但锁的实现更加简单,而通道的实现较为复杂。所以,在实现互斥中,使用通道不如直接使用锁

既然通道性能不如锁,那 Go 语言为何要提出通道呢?因为,Go 语言中,通道主要用于更方便地解决 goroutine 之间的同步通信问题,也就是同步问题中经典的生产者消费者问题,而不是互斥问题。

8.2. 同步通信

生产者消费者问题是并发同步中的经典问题。

在学习操作系统过程中,我们知道生产者消费者问题可以用信号量PV解决,但过程相对复杂,既要检测资源数以实现同步,又要实现互斥。在 C 语言中,我们可以用 互斥锁+条件变量 的方式解决。

而在 Go 语言中,可以用通道轻松实现,如下就是一个生产者消费者的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

// 缓冲区
var productChan chan int
var wg sync.WaitGroup

// 生产者
func produce() {
rand.Seed(time.Now().Unix())
for i := 0; i < 10; i++ {
product := rand.Intn(10)
productChan <- product
fmt.Printf("produce: %d\n", product)
}
wg.Done()
}

// 消费者
func consume() {
for i := 0; i < 10; i++ {
product := <-productChan
fmt.Printf("consume: %d\n", product)
}
wg.Done()
}

func main() {
productChan = make(chan int, 3)
wg.Add(2)
go produce()
go consume()
wg.Wait()
}

9. 总结

极其简单的 goroutine 线程创建运行,而通道又大大简化了 goroutine 线程间的通信,再加上 goroutine 高性能的调度模型。这样的并发编程,谁不喜欢呢?

参考