目录

Mutex 扩展

如何基于 Mutex 实现一个可重入锁

1. Mutex 的扩展

上一节我们介绍了 Mutex 的实现原理,这一节我们来看看如何基于标准库的 Mutex 来扩展 Mutex 提供的并发原语,包括:

  1. 实现一个可重入锁
  2. TryLock
  3. 获取等待者的数量
  4. 实现一个线程安全的队列

2. 可重入锁

实现可重入锁的关键是要锁能记住当前哪个 goroutine 持有锁,这里有两个方案:

  1. 通过 hacker 的方式获取到 goroutine id,记录下获取锁的 goroutine id,它可以实现 Locker 接口
  2. 调用 Lock/Unlock 方法时,由 goroutine 提供一个 token,用来标识它自己,而不是我们通过 hacker 的方式获取到 goroutine id,但是,这样一来,就不满足 Locker 接口了

可重入锁(递归锁)解决了代码重入或者递归调用带来的死锁问题,同时它也带来了另一个好处: 只有持有锁的 goroutine 才能 unlock 这个锁。接下来我们来看看这两个方案具体如何实现。

2.1 goroutine id

这个方案的关键第一步是获取 goroutine id,方式有两种,分别是

  1. 简单方式:通过 runtime.Stack 方法获取栈帧信息,栈帧信息里包含 goroutine id
  2. hacker 方式:
    • 原理: 我们获取运行时的 g 指针,反解出对应的 g 的结构。每个运行的 goroutine 结构的 g 指针保存在当前 goroutine 的一个叫做 TLS 对象中
    • 第一步:我们先获取到 TLS 对象
    • 第二步:再从 TLS 中获取 goroutine 结构的 g 指针
    • 第三步:再从 g 指针中取出 goroutine id。
    • 需要注意的是,不同 Go 版本的 goroutine 的结构可能不同,所以需要根据 Go 的不同版本进行调整。没必要重复造轮子,直接使用第三方库就可以: petermattis/goid
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import (
	"fmt"
	"runtime"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"

	"github.com/petermattis/goid" // 使用第三方包通过 hacker 方式获取 goroutine id 
)

// 简单方式
func GoID() int {
	var buffer [64]byte
	n := runtime.Stack(buffer[:], false)
	idField := strings.Fields(strings.TrimPrefix(string(buffer[:n]), "goroutine "))[0]
	d, err := strconv.Atoi(idField)
	if err != nil {
		panic(fmt.Sprintf("can not get goroutine id %v", err))
	}
	return d
}


// RecursiveMutex 包装一个Mutex,实现可重入
type RecursiveMutex struct {
    sync.Mutex
    owner     int64 // 当前持有锁的goroutine id
    recursion int32 // 这个goroutine 重入的次数
}

func (m *RecursiveMutex) Lock() {
    gid := goid.Get()
    // 如果当前持有锁的goroutine就是这次调用的goroutine,说明是重入
    if atomic.LoadInt64(&m.owner) == gid {
        m.recursion++
        return
    }
    m.Mutex.Lock()
    // 获得锁的goroutine第一次调用,记录下它的goroutine id,调用次数加1
    atomic.StoreInt64(&m.owner, gid)
    m.recursion = 1
}

func (m *RecursiveMutex) Unlock() {
    gid := goid.Get()
    // 非持有锁的goroutine尝试释放锁,错误的使用
    if atomic.LoadInt64(&m.owner) != gid {
        panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid))
    }
    // 调用次数减1
    m.recursion--
    if m.recursion != 0 { // 如果这个goroutine还没有完全释放,则直接返回
        return
    }
    // 此goroutine最后一次调用,需要释放锁
    atomic.StoreInt64(&m.owner, -1)
    m.Mutex.Unlock()
}

2.2 token

通过 token 的实现方式需要调用者自己提供一个 token,获取锁的时候把这个 token 传入,释放锁的时候也需要把这个 token 传入。通过用户传入的 token 替换方案一中 goroutine id,其它逻辑和方案一一致。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

// Token方式的递归锁
type TokenRecursiveMutex struct {
    sync.Mutex
    token     int64
    recursion int32
}

// 请求锁,需要传入token
func (m *TokenRecursiveMutex) Lock(token int64) {
    if atomic.LoadInt64(&m.token) == token { //如果传入的token和持有锁的token一致,说明是递归调用
        m.recursion++
        return
    }
    m.Mutex.Lock() // 传入的token不一致,说明不是递归调用
    // 抢到锁之后记录这个token
    atomic.StoreInt64(&m.token, token)
    m.recursion = 1
}

// 释放锁
func (m *TokenRecursiveMutex) Unlock(token int64) {
    if atomic.LoadInt64(&m.token) != token { // 释放其它token持有的锁
        panic(fmt.Sprintf("wrong the owner(%d): %d!", m.token, token))
    }
    m.recursion-- // 当前持有这个锁的token释放锁
    if m.recursion != 0 { // 还没有回退到最初的递归调用
        return
    }
    atomic.StoreInt64(&m.token, 0) // 没有递归调用了,释放锁
    m.Mutex.Unlock()
}

3. TryLock

我们可以为 Mutex 添加一个 TryLock 请求锁的方法:

  1. 如果 goroutine 获取锁成功,则持有锁,并返回 true
  2. 如果这把锁已经被其他 goroutine 所持有,或者是正在准备交给某个被唤醒的 goroutine,那么 TryLock 直接返回 false,不会阻塞在方法调用上

TryLock 在不是必需要获取锁去更新共享资源时非常有用,它可以提升程序的并发度。具体实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

// 复制Mutex定义的常量
const (
    mutexLocked = 1 << iota // 加锁标识位置
    mutexWoken              // 唤醒标识位置
    mutexStarving           // 锁饥饿标识位置
    mutexWaiterShift = iota // 标识waiter的起始bit位置
)

// 扩展一个Mutex结构
type Mutex struct {
    sync.Mutex
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
    // 如果能成功抢到锁
    if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) {
        return true
    }

    // 如果处于唤醒、加锁或者饥饿状态,这次请求就不参与竞争了,返回false
    old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {
        return false
    }

    // 尝试在竞争的状态下请求锁
    new := old | mutexLocked
    return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new)
}

4. 获取等待者的数量

4.1 应用场景

如果我们要监控锁的竞争情况,一个监控指标就是,等待这把锁的 goroutine 数量。我们可以把这个指标推送到时间序列数据库中,再通过一些监控系统(比如 Grafana)展示出来。要知道,锁是性能下降的“罪魁祸首”之一,所以,有效地降低锁的竞争,就能够很好地提高性能。因此,监控关键互斥锁上等待的 goroutine 的数量,是我们分析锁竞争的激烈程度的一个重要指标。

4.2 实现

Mutex 结构中的 state 字段有很多个含义,通过 state 字段,你可以知道锁是否已经被某个 goroutine 持有、当前是否处于饥饿状态、是否有等待的 goroutine 被唤醒、等待者的数量等信息。但是要想获取这些信息,我们需要将他们从 state 字段中一一解析出来,代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota
)

type Mutex struct {
    sync.Mutex
}

func (m *Mutex) Count() int {
    // 获取state字段的值
    
    v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    isLock = v & mutexLocked
    v = v >> mutexWaiterShift //得到等待者的数值
    v = v + isLock //再加上锁持有者的数量,0或者1
    return int(v)
}


// 锁是否被持有
func (m *Mutex) IsLocked() bool {
    state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    return state&mutexLocked == mutexLocked
}

// 是否有等待者被唤醒
func (m *Mutex) IsWoken() bool {
    state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    return state&mutexWoken == mutexWoken
}

// 锁是否处于饥饿状态
func (m *Mutex) IsStarving() bool {
    state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    return state&mutexStarving == mutexStarving
}

需要注意的是在获取 state 字段的时候,并没有通过 Lock 获取这把锁,所以获取的这个 state 的值是一个瞬态的值。

5. 实现一个线程安全的队列

Mutex 经常会和其他非线程安全(对于 Go 来说,我们其实指的是 goroutine 安全)的数据结构一起,组合成一个线程安全的数据结构。新数据结构的业务逻辑由原来的数据结构提供,而 Mutex 提供了锁的机制,来保证线程安全。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type SliceQueue struct {
    data []interface{}
    mu   sync.Mutex
}

func NewSliceQueue(n int) (q *SliceQueue) {
    return &SliceQueue{data: make([]interface{}, 0, n)}
}

// Enqueue 把值放在队尾
func (q *SliceQueue) Enqueue(v interface{}) {
    q.mu.Lock()
    q.data = append(q.data, v)
    q.mu.Unlock()
}

// Dequeue 移去队头并返回
func (q *SliceQueue) Dequeue() interface{} {
    q.mu.Lock()
    if len(q.data) == 0 {
        q.mu.Unlock()
        return nil
    }
    v := q.data[0]
    q.data = q.data[1:]
    q.mu.Unlock()
    return v
}

参考

本文内容摘录自:

  1. 极客专栏-鸟叔的 Go 并发编程实战