WaitGroup 任务编排
1. WaitGroup 使用
WaitGroup 很简单,就是 package sync 用来做任务编排的一个并发原语。它要解决的就是并发 - 等待的问题: goroutine A 等待一组 goroutine 全部完成。
很多操作系统和编程语言都提供了类似的并发原语。比如,Linux 中的 barrier、Pthread(POSIX 线程)中的 barrier、C++ 中的 std::barrier、Java 中的 CyclicBarrier 和 CountDownLatch 等。
WaitGroup 非常适用于此类场景: 需要启动多个 goroutine 执行任务,主 goroutine 需要等待子 goroutine 都完成后才继续执行。
1.1 WaitGroup 使用
Go 标准库中的 WaitGroup 提供了三个方法:
1
2
3
|
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
|
- Add,用来设置 WaitGroup 的计数值;
- Done,用来将 WaitGroup 的计数值减 1,其实就是调用了 Add(-1);
- Wait,调用这个方法的 goroutine 会一直阻塞,直到 WaitGroup 的计数值变为 0
下面是 WaitGroup 的使用示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
// 线程安全的计数器
type Counter struct {
mu sync.Mutex
count uint64
}
// 对计数值加一
func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
// 获取当前的计数值
func (c *Counter) Count() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
// sleep 1秒,然后计数值加1
func worker(c *Counter, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(time.Second)
c.Incr()
}
func main() {
var counter Counter
var wg sync.WaitGroup
wg.Add(10) // WaitGroup的值设置为10
for i := 0; i < 10; i++ { // 启动10个goroutine执行加1任务
go worker(&counter, &wg)
}
// 检查点,等待goroutine都完成任务
wg.Wait()
// 输出当前计数器的值
fmt.Println(counter.Count())
}
|
2. WaitGroup 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
type WaitGroup struct {
// 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则
noCopy noCopy
// 64bit(8bytes)的值分成两段,高32bit是计数值,低32bit是waiter的计数
// 另外32bit是用作信号量的
// 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法
// 总之,会找到对齐的那64bit作为state,其余的32bit做信号量
state1 [3]uint32
}
// 得到state的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
// %8 表示 8 个字节,注意不是位
// 32位还是64位计算机不是关键点,关键点是 state1 有没有按照 64 位对齐,32位计算机上 state1 也可能刚好对齐到 64 位上
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
|
WaitGroup 的数据结构包括两个字段:
noCopy noCopy
: 辅助字段,主要就是辅助 vet 工具检查是否通过 copy 赋值这个 WaitGroup 实例
state1 [3]uint32
:
- 一个具有复合意义的字段,包含 WaitGroup 的计数、阻塞在检查点的 waiter 数和信号量。
- 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样,处理方法在 state() 方法中
- 在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量
- 在 32 位环境下,如果 state1 不是 64 位对齐的地址,那么 state1 的第一个元素是信号量,后两个元素分别是 waiter 数和计数值
接下来我们来看 Add、Done 和 Wait 这三个方法的实现。在查看这部分源码实现时,除了这些方法本身的实现外,还会有一些额外的代码,主要是 race 检查和异常检查的代码。其中,有几个检查非常关键,如果检查不通过,会出现 panic。我们在介绍完这三个方法的实现之后再来统一介绍。
2.1 Add 方法实现
Add 方法主要操作的是 state 的计数部分,可以为计数值增加一个 delta 值,内部通过原子操作把这个值加到计数值上,delta 也可以是个负数,相当于为计数值减去一个值,Done 方法内部其实就是通过 Add(-1) 实现的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 高32bit是计数值v,所以把delta左移32,增加到计数上
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter count
if v > 0 || w == 0 {
return
}
// 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
// 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
// Add(-n) 的处理逻辑
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
// Done方法实际就是计数器减1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
|
2.2 Wait 方法实现
Wait 方法的实现逻辑是:
- 不断检查 state 的值。如果其中的计数值变为了 0,那么说明所有的任务已完成,调用者不必再等待,直接返回
- 如果计数值大于 0,说明此时还有任务没完成,那么调用者就变成了等待者,需要加入 waiter 队列,并且阻塞住自己。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter的数量
if v == 0 {
// 如果计数值为0, 调用这个方法的goroutine不必再等待,继续执行它后面的逻辑即可
return
}
// 否则把waiter数量加1。期间可能有并发调用Wait的情况,所以最外层使用了一个for循环
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 阻塞休眠等待
runtime_Semacquire(semap)
// 被唤醒,不再阻塞,返回
return
}
}
}
|
2.3 异常检测
前面我们分析 Add 和 Wait 实现的时候删除了异常检测的代码,这些异常检测的逻辑就是我们使用 WaitGroup 的避坑指南。通常使用 WaitGroup 时会出现以下三个问题:
- 计数器设置为负值:
- 检查点: WaitGroup 的计数器的值必须大于等于 0。我们在更改这个计数值的时候,WaitGroup 会先做检查,如果计数值被设置为负数,就会导致 panic。
- 错误点: 一般情况下,有两种方法会导致计数器设置为负数
- 调用 Add 的时候传递一个负数,导致计数器加上这个负值后小于 0
- 调用 Done 方法的次数过多,超过了 WaitGroup 的计数值
- 建议: 使用 WaitGroup 的正确姿势是,预先确定好 WaitGroup 的计数值,然后调用相同次数的 Done 完成相应的任务
- 不期望的 Add 时机: 在使用 WaitGroup 的时候,你一定要遵循的原则就是,等所有的 Add 方法调用之后再调用 Wait,否则就可能导致 panic 或者不期望的结果
- 前一个 Wait 还没结束就重用 WaitGroup
- WaitGroup 是可以重用的。只要 WaitGroup 的计数值恢复到零值的状态,那么它就可以被看作是新创建的 WaitGroup,被重复使用。
- 但是,如果我们在 WaitGroup 的计数值还没有恢复到零值的时候就重用,就会导致程序 panic
- 这个并发的 Data Race 在于: Add() 方法在将计数器归零时,需要唤醒所有被 Wait 住的协程,在这个过程中间是不能有并发的 Add 和 Wait 方法调用的
2.4 错误示例
在这个例子中,我们原本设想的是,等四个 goroutine 都执行完毕后输出 Done 的信息,但是它的错误之处在于,将 WaitGroup.Add 方法的调用放在了子 gorotuine 中。等主 goorutine 调用 Wait 的时候,因为四个任务 goroutine 一开始都休眠,所以可能 WaitGroup 的 Add 方法还没有被调用,WaitGroup 的计数还是 0,所以它并没有等待四个子 goroutine 执行完毕才继续执行,而是立刻执行了下一步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// 不期望的 Add 时机
func main() {
var wg sync.WaitGroup
// 解决方法一:
// wg.Add(4) // 预先设定WaitGroup的计数值
go dosomething(100, &wg) // 启动第一个goroutine
go dosomething(110, &wg) // 启动第二个goroutine
go dosomething(120, &wg) // 启动第三个goroutine
go dosomething(130, &wg) // 启动第四个goroutine
wg.Wait() // 主goroutine等待完成
fmt.Println("Done")
}
func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
duration := millisecs * time.Millisecond
time.Sleep(duration) // 故意sleep一段时间
wg.Add(1)
fmt.Println("后台执行, duration:", duration)
wg.Done()
}
|
在下面的例子中,第 6 行虽然让 WaitGroup 的计数恢复到 0,但是因为第 9 行有个 waiter 在等待,如果等待 Wait 的 goroutine,刚被唤醒就和 Add 调用(第 7 行)有并发执行的冲突,所以就会出现 panic。
1
2
3
4
5
6
7
8
9
10
11
|
// 前一个 Wait 还没结束就重用 WaitGroup
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
time.Sleep(time.Millisecond)
wg.Done() // 计数器减1
wg.Add(1) // 计数值加1
}()
wg.Wait() // 主goroutine等待,有可能和第7行并发执行
}
|
总结一下:WaitGroup 虽然可以重用,但是是有一个前提的,那就是必须等到上一轮的 Wait 完成之后,才能重用 WaitGroup 执行下一轮的 Add/Wait,如果你在 Wait 还没执行完的时候就调用下一轮 Add 方法,就有可能出现 panic。
2.4 WaitGroup 的 Race Check
WaitGroup 的异常检查,我看几遍,都没能彻底理解 WaitGroup 重用导致 panic 的根本原因。其实 WaitGroup 代码很少,直接看代码可能更容易帮助理解 panic 发生确切时机。
Add 的 Race Check
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync
import (
"internal/race"
"sync/atomic"
"unsafe"
)
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
|
从代码可以看到 Add 是触发 panic 有三个地方:
v < 0
: 即计数器设置为负值
w != 0 && delta > 0 && v == int32(delta)
:
- waiter 数不为 0, 此时添加了 Add,并且 state 字段记录的计数器等于此次 Add 的值,说明上一次 Wait() 还没有结束就 Add 了新值。这是并发修改的场景一
*statep != state
:
- 执行到此处,经过前面的判断条件过滤,此处 v 只能等于 0,此时需要做的只有唤醒所有 Wait 住的协程,不能有并发的 Add() 和 Wait() 方法调用,这是并发修改的场景二。
- 同时正如注释里面所说的这是一个"连接的检查",如果并发是发生在 这个检查和下面触发 Wait 住的协程之间是不会异常的,但是程序已经出现了意料之外的结果。
Wait 的 Race Check
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
|
从代码可以看到 Wait() 触发的异常只有一个地方: *statep != 0
:
- 因为在 Add 唤醒阻塞的 Wait 协程前,已经将
*statep = 0
,而如果此时 *statep != 0
说明在 Add() 触发被 Wait 住协程之后,有其他 Add() 和 Wait() 方法调用,是并发修改的场景。
2.5 noCopy 辅助 vet 检查
noCopy 就是指示 vet 工具在做检查的时候,这个数据结构不能做值复制使用。更严谨地说,是不能在第一次使用之后复制使用 ( must not be copied after first use)。noCopy 是一个通用的计数技术,其他并发原语中也会用到。
我们在前面学习 Mutex 的时候用到了 vet 工具。vet 会对实现 Locker 接口的数据类型做静态检查,一旦代码中有复制使用这种数据类型的情况,就会发出警告。WaitGroup 同步原语不就是 Add、Done 和 Wait 方法吗?vet 能检查出来吗?其实是可以的。通过给 WaitGroup 添加一个 noCopy 字段,我们就可以为 WaitGroup 实现 Locker 接口,这样 vet 工具就可以做复制检查了。而且因为 noCopy 字段是未输出类型,所以 WaitGroup 不会暴露 Lock/Unlock 方法。
noCopy 字段的类型是 noCopy,它只是一个辅助的、用来帮助 vet 检查用的类型:
1
2
3
4
5
|
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
|
如果你想要自己定义的数据结构不被复制使用,或者说,不能通过 vet 工具检查出复制使用的报警,就可以通过嵌入 noCopy 这个数据类型来实现。
2.6 总结
关于如何避免错误使用 WaitGroup 的情况,我们只需要尽量保证下面 5 点就可以了:
- 不重用 WaitGroup。新建一个 WaitGroup 不会带来多大的资源开销,重用反而更容易出错
- 保证所有的 Add 方法调用都在 Wait 之前
- 不传递负数给 Add 方法,只通过 Done 来给计数值减 1
- 不做多余的 Done 方法调用,保证 Add 的计数值和 Done 方法调用的数量是一样的
- 不遗漏 Done 方法的调用,否则会导致 Wait hang 住无法返回
参考
本文内容摘录自:
- 极客专栏-鸟叔的 Go 并发编程实战