如何基于 Mutex 实现一个可重入锁
1. Mutex 的扩展
上一节我们介绍了 Mutex 的实现原理,这一节我们来看看如何基于标准库的 Mutex 来扩展 Mutex 提供的并发原语,包括:
- 实现一个可重入锁
- TryLock
- 获取等待者的数量
- 实现一个线程安全的队列
2. 可重入锁
实现可重入锁的关键是要锁能记住当前哪个 goroutine 持有锁,这里有两个方案:
- 通过 hacker 的方式获取到 goroutine id,记录下获取锁的 goroutine id,它可以实现 Locker 接口
- 调用 Lock/Unlock 方法时,由 goroutine 提供一个 token,用来标识它自己,而不是我们通过 hacker 的方式获取到 goroutine id,但是,这样一来,就不满足 Locker 接口了
可重入锁(递归锁)解决了代码重入或者递归调用带来的死锁问题,同时它也带来了另一个好处: 只有持有锁的 goroutine 才能 unlock 这个锁。接下来我们来看看这两个方案具体如何实现。
2.1 goroutine id
这个方案的关键第一步是获取 goroutine id,方式有两种,分别是
- 简单方式:通过 runtime.Stack 方法获取栈帧信息,栈帧信息里包含 goroutine id
- hacker 方式:
- 原理: 我们获取运行时的 g 指针,反解出对应的 g 的结构。每个运行的 goroutine 结构的 g 指针保存在当前 goroutine 的一个叫做 TLS 对象中
- 第一步:我们先获取到 TLS 对象
- 第二步:再从 TLS 中获取 goroutine 结构的 g 指针
- 第三步:再从 g 指针中取出 goroutine id。
- 需要注意的是,不同 Go 版本的 goroutine 的结构可能不同,所以需要根据 Go 的不同版本进行调整。没必要重复造轮子,直接使用第三方库就可以: petermattis/goid
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
import (
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/petermattis/goid" // 使用第三方包通过 hacker 方式获取 goroutine id
)
// 简单方式
func GoID() int {
var buffer [64]byte
n := runtime.Stack(buffer[:], false)
idField := strings.Fields(strings.TrimPrefix(string(buffer[:n]), "goroutine "))[0]
d, err := strconv.Atoi(idField)
if err != nil {
panic(fmt.Sprintf("can not get goroutine id %v", err))
}
return d
}
// RecursiveMutex 包装一个Mutex,实现可重入
type RecursiveMutex struct {
sync.Mutex
owner int64 // 当前持有锁的goroutine id
recursion int32 // 这个goroutine 重入的次数
}
func (m *RecursiveMutex) Lock() {
gid := goid.Get()
// 如果当前持有锁的goroutine就是这次调用的goroutine,说明是重入
if atomic.LoadInt64(&m.owner) == gid {
m.recursion++
return
}
m.Mutex.Lock()
// 获得锁的goroutine第一次调用,记录下它的goroutine id,调用次数加1
atomic.StoreInt64(&m.owner, gid)
m.recursion = 1
}
func (m *RecursiveMutex) Unlock() {
gid := goid.Get()
// 非持有锁的goroutine尝试释放锁,错误的使用
if atomic.LoadInt64(&m.owner) != gid {
panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid))
}
// 调用次数减1
m.recursion--
if m.recursion != 0 { // 如果这个goroutine还没有完全释放,则直接返回
return
}
// 此goroutine最后一次调用,需要释放锁
atomic.StoreInt64(&m.owner, -1)
m.Mutex.Unlock()
}
|
2.2 token
通过 token 的实现方式需要调用者自己提供一个 token,获取锁的时候把这个 token 传入,释放锁的时候也需要把这个 token 传入。通过用户传入的 token 替换方案一中 goroutine id,其它逻辑和方案一一致。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// Token方式的递归锁
type TokenRecursiveMutex struct {
sync.Mutex
token int64
recursion int32
}
// 请求锁,需要传入token
func (m *TokenRecursiveMutex) Lock(token int64) {
if atomic.LoadInt64(&m.token) == token { //如果传入的token和持有锁的token一致,说明是递归调用
m.recursion++
return
}
m.Mutex.Lock() // 传入的token不一致,说明不是递归调用
// 抢到锁之后记录这个token
atomic.StoreInt64(&m.token, token)
m.recursion = 1
}
// 释放锁
func (m *TokenRecursiveMutex) Unlock(token int64) {
if atomic.LoadInt64(&m.token) != token { // 释放其它token持有的锁
panic(fmt.Sprintf("wrong the owner(%d): %d!", m.token, token))
}
m.recursion-- // 当前持有这个锁的token释放锁
if m.recursion != 0 { // 还没有回退到最初的递归调用
return
}
atomic.StoreInt64(&m.token, 0) // 没有递归调用了,释放锁
m.Mutex.Unlock()
}
|
3. TryLock
我们可以为 Mutex 添加一个 TryLock 请求锁的方法:
- 如果 goroutine 获取锁成功,则持有锁,并返回 true
- 如果这把锁已经被其他 goroutine 所持有,或者是正在准备交给某个被唤醒的 goroutine,那么 TryLock 直接返回 false,不会阻塞在方法调用上
TryLock 在不是必需要获取锁去更新共享资源时非常有用,它可以提升程序的并发度。具体实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
// 复制Mutex定义的常量
const (
mutexLocked = 1 << iota // 加锁标识位置
mutexWoken // 唤醒标识位置
mutexStarving // 锁饥饿标识位置
mutexWaiterShift = iota // 标识waiter的起始bit位置
)
// 扩展一个Mutex结构
type Mutex struct {
sync.Mutex
}
// 尝试获取锁
func (m *Mutex) TryLock() bool {
// 如果能成功抢到锁
if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) {
return true
}
// 如果处于唤醒、加锁或者饥饿状态,这次请求就不参与竞争了,返回false
old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {
return false
}
// 尝试在竞争的状态下请求锁
new := old | mutexLocked
return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new)
}
|
4. 获取等待者的数量
4.1 应用场景
如果我们要监控锁的竞争情况,一个监控指标就是,等待这把锁的 goroutine 数量。我们可以把这个指标推送到时间序列数据库中,再通过一些监控系统(比如 Grafana)展示出来。要知道,锁是性能下降的“罪魁祸首”之一,所以,有效地降低锁的竞争,就能够很好地提高性能。因此,监控关键互斥锁上等待的 goroutine 的数量,是我们分析锁竞争的激烈程度的一个重要指标。
4.2 实现
Mutex 结构中的 state 字段有很多个含义,通过 state 字段,你可以知道锁是否已经被某个 goroutine 持有、当前是否处于饥饿状态、是否有等待的 goroutine 被唤醒、等待者的数量等信息。但是要想获取这些信息,我们需要将他们从 state 字段中一一解析出来,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
)
type Mutex struct {
sync.Mutex
}
func (m *Mutex) Count() int {
// 获取state字段的值
v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
isLock = v & mutexLocked
v = v >> mutexWaiterShift //得到等待者的数值
v = v + isLock //再加上锁持有者的数量,0或者1
return int(v)
}
// 锁是否被持有
func (m *Mutex) IsLocked() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexLocked == mutexLocked
}
// 是否有等待者被唤醒
func (m *Mutex) IsWoken() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexWoken == mutexWoken
}
// 锁是否处于饥饿状态
func (m *Mutex) IsStarving() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexStarving == mutexStarving
}
|
需要注意的是在获取 state 字段的时候,并没有通过 Lock 获取这把锁,所以获取的这个 state 的值是一个瞬态的值。
5. 实现一个线程安全的队列
Mutex 经常会和其他非线程安全(对于 Go 来说,我们其实指的是 goroutine 安全)的数据结构一起,组合成一个线程安全的数据结构。新数据结构的业务逻辑由原来的数据结构提供,而 Mutex 提供了锁的机制,来保证线程安全。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
type SliceQueue struct {
data []interface{}
mu sync.Mutex
}
func NewSliceQueue(n int) (q *SliceQueue) {
return &SliceQueue{data: make([]interface{}, 0, n)}
}
// Enqueue 把值放在队尾
func (q *SliceQueue) Enqueue(v interface{}) {
q.mu.Lock()
q.data = append(q.data, v)
q.mu.Unlock()
}
// Dequeue 移去队头并返回
func (q *SliceQueue) Dequeue() interface{} {
q.mu.Lock()
if len(q.data) == 0 {
q.mu.Unlock()
return nil
}
v := q.data[0]
q.data = q.data[1:]
q.mu.Unlock()
return v
}
|
参考
本文内容摘录自:
- 极客专栏-鸟叔的 Go 并发编程实战