Featured image of post Go 中 Mutex 实现原理

Go 中 Mutex 实现原理

概述

其实 Mutex 到现在已经优化了很多版本了,总结了一下演进的过程可分为四个阶段,分别是:

  • 初版 Mutex:使用一个 flag 变量表示锁是否被持有;
  • 给新人机会:照顾新来的 goroutine 先获取到锁;
  • 多给些机会:照顾新来的和被唤醒的 goroutine 获取到锁;
  • 解决饥饿:存在竞争关系,有饥饿情况发生,需要解决。

初版 Mutex

设置一个 flag 变量,当 flag 为 1 时,就认为锁已被 goroutine 持有,其他的竞争的 goroutine 只能等待。当 flag 为 1 时,通过 CAS 将这个 flag 设置为 1,此时就被当前这个 goroutine 获取到锁了。

官方源码如下,点击这里浏览官方源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// CAS 操作,当时还没有抽象出 atomic 包
func cas(val *int32, old, new int32) bool
func semacquire(*int32)
func semrelease(*int32)

// A Mutex is a mutual exclusion lock.
// Mutexes can be created as part of other structures;
// the zero value for a Mutex is an unlocked mutex.
type Mutex struct {
    key int32;// 锁是否被持有的标识
    sema int32;// 信号量专用,用以阻塞 / 唤醒 goroutine
}

// 保证成功在 val 上增加 delta 的值
func xadd(val *int32, delta int32) (new int32) {
    for {
        v := *val;
        if cas(val, v, v+delta) {
            return v+delta;
        }
    }
    panic("unreached")
}

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    if xadd(&m.key, 1) == 1 {
        // changed from 0 to 1; we hold lock
        return;
    }
    semacquire(&m.sema);// 阻塞等待
}

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
    if xadd(&m.key, -1) == 0 {
        // changed from 1 to 0; no contention
        return;
    }
    semrelease(&m.sema);// 唤醒其他等待的 goroutine
}

这是 2008 年 Russ Cox 提交的第一版 Mutex 源码,很简洁!

简单说下 CAS,全称(compare-and-swap,或者 compare-and-set)。CAS 是指将给定的值和内存地址中的值比较,如果相同就用新值替换内存地址中的值,这个过程是原子性的。所谓原子性,就是操作的值总是最新的,不会出现其他线程同时修改了这个值,若有,CAS 会返回失败。这个在操作系统中也有实现,有兴趣的可以搜索相关资料学习一下。

上述代码中最核心的 Mutex 结构体中包含着两个字段:

  • key:用于标识是否被某个 goroutine 占用,若果大于 1,则表示已被占用;
  • sema:一个信号的变量,用于控制等待 goroutine 的阻塞与控制 goroutine;

用一张图表达一下文字,看看能不能加深大家的印象:

在使用 Mutex 的时候,需要严格遵循 “谁申请,谁释放” 原则

调用 Lock 请求获取锁时,我们通过 xadd 进行 CAS 操作(line: 29),通过循环执行 CAS 一直到成功,保证 key + 1 操作完成,当锁没有被任何 goroutine 占用,Lock 方法就会被改变为 1 ,此时这个 goroutine 就拥有了这个锁;若锁已经被其他的 goroutine 占用了,那么当前这个 goroutine 会把 key + 1,并且会调用 semacquire 方法(line:33), 然后使用信号量设置自己休眠,等待锁释放的时候,信号量就会将它再次唤醒

当前拿到锁的 goroutine 在进行 Unlock 释放锁操作时,会把 Key - 1(line:43)。若此时没有其他的 goroutine 在等待锁,直接返回了。如果还有其他的 goroutine 在等待锁,那么会调用 semrelease 方法(line:47),然后用信号量唤醒其他等待着的 goroutine。

所以我们总计一下,在初版的 Mutex 是利用 CAS 进行原子操作设置 key 的。key 标记锁被 goroutine 占用的同时也记录了当前获取锁和等待锁的 goroutine 数量,设计非常简洁。

我们会发现,Unlock 方法可以被任意的 goroutine 调用释放锁,即使是没持有这个互斥锁的 goroutine,也可以进行这个操作。这是因为,Mutex 本身并没有包含持有这把锁的 goroutine 的信息,所以,Unlock 也不会对此进行检查。Mutex 的这个设计在现在也一直在用。所以一定记住:

在使用 Mutex 的时候,需要严格遵循 “谁申请,谁释放” 原则。

给新人机会

在 2011 年 6 月 30 日的 commit 中对 Mutex 进行了一次大的调整,点此浏览官方源码,改动后的源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// A Mutex is a mutual exclusion lock.
// Mutexes can be created as part of other structures;
// the zero value for a Mutex is an unlocked mutex.
type Mutex struct {
    state int32
    sema  uint32
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexWaiterShift = iota
)

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    awoke := false
    for {
        old := m.state
        new := old | mutexLocked
        if old&mutexLocked != 0 {
            new = old + 1<<mutexWaiterShift
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&mutexLocked == 0 {
                break
            }
            runtime.Semacquire(&m.sema)
            awoke = true
        }
    }
}

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        panic("sync: unlock of unlocked mutex")
    }

    old := new
    for {
        // If there are no waiters or a goroutine has already
        // been woken or grabbed the lock, no need to wake anyone.
        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
            return
        }
        // Grab the right to wake someone.
        new = (old - 1<<mutexWaiterShift) | mutexWoken
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime.Semrelease(&m.sema)
            return
        }
        old = m.state
    }
}

在这个版本中,state 不再仅表示等待的 goroutine 数量,而是分为了 waiters、Woken 和 Locked,如下:

state 一个字段多个意义,方便我们利用较少的内存实现互斥锁。state 由三个部分组成,分别是:

  • 第一位:锁是否被占有;
  • 第二位:是否有被唤醒的 goroutine,当 goroutine 被唤醒之前,mutexWoken 被置为 1,在 goroutine 被唤醒以后,会尝试获取锁,无论是否获取成功,其都会将 mutexWoken 置为 0;
  • 其他位:是等待此锁的 goroutine 数量。

在这个版本中,当一个 goroutine 被唤醒时,不代表该 goroutine 一定能获得锁,它需要与其他新执行 Lock() 的 goroutine 竞争锁,这种方式使得锁的获取不再是强制的 FIFO,新到的 goroutine 由于已经在 CPU 上执行了,因此其更有可能获取到锁。

接下来我们来解析一下本版本中 Lock 方法的实现,该方法的实现源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    awoke := false
    for {
        old := m.state
        new := old | mutexLocked
        if old&mutexLocked != 0 {// 如果锁已经被其他 goroutine 获取
            new = old + 1<<mutexWaiterShift
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            // 如下代码等同于 new = new & (^mutexWoken)
            // 即先将 mutexWoken 按位取反,然后与 new 进行按位与,
            // 最后将结果赋值给 new
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&mutexLocked == 0 {
                break
            }
            runtime.Semacquire(&m.sema)// 进入休眠等待
            awoke = true
        }
    }
}

首先看下代码第 6 行 Fast path 的解释,当没有 goroutine 占有锁且没有其他的 goroutine,直接获得锁。

相反,state 不为 0,进行循环检查,若当前的 goroutine 没有获取到锁,就会进行休眠等待锁的释放,当被唤醒后,需要和正在请求锁的 goroutine 进行竞争获得锁。代码中的第 13 行将要设置的 state 的新值(new 变量)设置为加锁状态,如果能成功地通过 CAS 把这个新值赋予 state(即 atomic.CompareAndSwapInt32(&m.state, old, new) 返回 true),就代表抢夺锁的操作成功了。

此时需要注意,若成功的改变了 state 值,但是之前的 state 是有锁的状态(即第 14 行 if 条件为真),那么 state 只是清除 mutexWoken 标志或者增加一个 waiter 而已

请求锁的 goroutine 有两类,

  • 一类是新来请求锁的 goroutine;
  • 另一类是被唤醒的等待请求锁的 goroutine。

锁的状态也有两种:加锁和未加锁。下面的表格说明每种 goroutine 在两种锁状态下的处理逻辑。

请求锁的 goroutine 类型 当前锁被持有 当前锁未被持有
新来的 gorutine waiter++;休眠 获取到锁
被唤醒的 gorutine 清除 mutextWoken 标志;加入等待队列,重新休眠 清除 mutextWoken 标志;获取到锁

接下来我们看下释放锁的 Unlock 方法,源代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        panic("sync: unlock of unlocked mutex")
    }

    old := new
    for {
        // If there are no waiters or a goroutine has already
        // been woken or grabbed the lock, no need to wake anyone.
        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
            return
        }
        // Grab the right to wake someone.
        new = (old - 1<<mutexWaiterShift) | mutexWoken
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime.Semrelease(&m.sema)
            return
        }
        old = m.state
    }
}

其中,第 9 行是通过减 1 将占有锁的标识状态设置成未加锁,第 10 到 12 行会检测原来锁的状态是否是未加锁,如果是直接回抛 pannic。

释放锁还需要一些额外的操作,因为可能还会存在一些等待这个锁的 goroutine(也称为 waiter),需要通过信号量的方式唤醒其中的一个,会出现以下两种情况

  • 当没有其他 goroutine(waiter),此时竞争锁的 goroutine 只有一个,Unlock 后就可以直接返回了;当有唤醒的 goroutine,或者被别人加了锁,此时当前的 goroutine Unlock 后也可以直接返回。
  • 若有等待的 goroutine 并且没有唤醒的 waiter,那么就需要唤醒一个等待的 waiter。在这之前,我们需要将 waiter 的数量减 1,并且标记 mutexWoken,这样 Unlock 就可以返回了。

通过这样复杂的检查、判断、设置,我们就可以安全地将互斥锁 Unlock 了。

这一次的改动总结一下,就是新来的 goroutine 也有机会获取到锁,甚至一个 goroutine 会连续获得,和之前的设计不太一样,从代码复杂度就能看的出来。这一版需要竞争获得 Mutex。

没有空闲的锁或者竞争失败才加入到等待队列中。但是其实还可以进一步优化。我们接着往下看。

多给些机会

在大多数情况下,goroutine 获取锁的时间都是非常短的(例如仅是执行 i++),其比唤醒 goroutine 加 goroutine 调度的时间还要短。

在上一个版本中,如果 g1 在持有锁时,g2 尝试获取锁,那么当 g2 通过一次 CAS 获取不到时,其就会阻塞。然而,g1 持有的锁很快会被得到释放,此时会唤醒 g2,然后再调度到 g2,这时 g2 才能执行临界区代码。在 2015 年 2 月版本提交的改动中解决了这个问题,点此浏览官方本次提交,如果新来的 goroutine 或者被唤醒的 goroutine 获取不到锁,那么会通过自旋等待的方式,超过一定的自旋次数后,再执行原来的逻辑。

本次修改后的官方源码如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
// A Mutex is a mutual exclusion lock.
// Mutexes can be created as part of other structures;
// the zero value for a Mutex is an unlocked mutex.
type Mutex struct {
    state int32
    sema  uint32
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexWaiterShift = iota
)

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if raceenabled {
            raceAcquire(unsafe.Pointer(m))
        }
        return
    }

    awoke := false
    iter := 0
    for {
        old := m.state
        new := old | mutexLocked
        if old&mutexLocked != 0 {
            if runtime_canSpin(iter) {
                // Active spinning makes sense.
                // Try to set mutexWoken flag to inform Unlock
                // to not wake other blocked goroutines.
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                continue
            }
            new = old + 1<<mutexWaiterShift
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                panic("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&mutexLocked == 0 {
                break
            }
            runtime_Semacquire(&m.sema)
            awoke = true
            iter = 0
        }
    }

    if raceenabled {
        raceAcquire(unsafe.Pointer(m))
    }
}

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
    if raceenabled {
        _ = m.state
        raceRelease(unsafe.Pointer(m))
    }

    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        panic("sync: unlock of unlocked mutex")
    }

    old := new
    for {
        // If there are no waiters or a goroutine has already
        // been woken or grabbed the lock, no need to wake anyone.
        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
            return
        }
        // Grab the right to wake someone.
        new = (old - 1<<mutexWaiterShift) | mutexWoken
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime_Semrelease(&m.sema)
            return
        }
        old = m.state
    }
}

对于临界区代码执行非常短的场景来说,这是一个非常高效的一次优化。因为临界区的代码耗时很短,锁很快就能释放,而抢夺锁的 goroutine 不用通过休眠唤醒方式等待调度,只需要自旋转几次,就能获得锁。

解决饥饿

通过上面几次的优化迭代,Mutex 的底层源码越来越复杂,在高并发场景下获取锁也更加的公平竞争。但是细细回味,会发现新来的 goroutine 也参与到了竞争,会导致每次新来的 goroutine 能够拿到,在极端的情况,会出现等待中的 goroutine 一直在等待,拿不到锁。

之前的版本中也有这样的困境,goroutine 总是拿不到。

当然 Go 聪明的开发者不能允许这种事情发生,2016 年 Go 1.9 中 Mutex 增加了饥饿模式,点此浏览官方源码,让竞争锁变得更加公平,等待时间限制在 1ms,也修复了一个历史大 Bug。

以前的版本中总是把唤醒的 goroutine 放到等待队列的末尾,会导致增加不公平的等待时间。

之后,2018 年,Go 开发者将 fast path 和 slow path 拆成独立的方法,以便内联,提高性能。2019 年也有一个 Mutex 的优化,虽然没有对 Mutex 做修改,但是,对于 Mutex 唤醒后持有锁的那个 waiter,调度器可以有更高的优先级去执行,这已经是很细致的性能优化了。

在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:

  • mutexLocked:表示互斥锁的锁定状态;
  • mutexWoken:表示是否有被唤醒的 goroutine;
  • mutexStarving:当前的互斥锁处于饥饿状态;
  • waitersCount:当前互斥锁上等待的 goroutine 个数;

当然,你也可以暂时跳过这一段,源码有点多,以后喝茶慢慢品,但是要记住:

Mutex 绝不容忍一个 goroutine 被落下,永远没有机会获取锁。不抛弃不放弃是它的宗旨,而且它也尽可能地让等待较长的 goroutine 更有机会获取到锁。

加锁源码分析如下,引自 https://zhuanlan.zhihu.com/p/365552668

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
func (m *Mutex) Lock() {
    // 快速路径,加锁
    // CAS 比较 state 和 0 是否相等,如果相等则说明 mutex 锁未锁定。
    // 此时将 state 赋值为 1,表明加锁成功。
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        // 竞态检测
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // 表明已经锁在使用中,则调用 goroutine 直到互斥锁可用为止
    m.lockSlow()
}

func (m *Mutex) lockSlow() {
    var waitStartTime int64   // 标记当前 goroutine 的开始等待时间戳
    starving := false // 标记当前 goroutine 的饥饿模式,true: 饥饿模式,false: 正常模式
    awoke := false // 标记当前 goroutine 是否已唤醒,true: 被唤醒,flase: 未被唤醒
    iter := 0 // 自旋次数
    old := m.state // 保存当前对象锁状态赋值给 old 变量,做对比用
    for {
        // mutexLocked:      0001   锁定状态
        // mutexWoken:       0010   唤醒状态
        // mutexStarving:    0100   饥饿模式
        // mutexWaiterShift: 3      等待上锁的 goroutine 数量

        // 当 old&(mutexLocked|mutexStarving) == mutexLocked, old 只能处在如下两种状态之一:
        // 1) old = 0011 时,说明 mutex 锁被锁定并且有被唤醒的 goroutine,此时 0011&(0001|0100) = 0001
        // 2) old = 0001 时,说明 mutex 锁被锁定,此时 0001&(0001|0100) = 0001
        // 间接说明了 mutex 不能处于饥饿模式下。

        // runtime_canSpin:
        // 在 src/runtime/proc.go 中通过 sync_runtime_canSpin 实现;
        // 表示比较保守的自旋,golang 中自旋锁并不会一直自旋下去

        // 判断:被锁定状态;正常模式;可以自旋。(不要在饥饿模式下自旋)
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // !awoke: 当前 goroutine 未被唤醒
            // old&mutexWoken == 0:当前没有被唤醒的 goroutine。
            // old>>mutexWaiterShift != 0:查看当前 mutex 锁排队的 goroution 数量
            // atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken):state 追加唤醒状态
            // 为 mutex 锁和当前 goroutine 追加唤醒状态
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true // 此时当前 goroutine 被唤醒
            }
            runtime_doSpin() //runtime_canSpin 见文末尾
            iter++ // 自旋次数 + 1
            old = m.state // 将 old 重新赋值
            continue // 跳出本次循环,开始下次循环
        }
        // 执行到这里说明以下情况至少出现一个
        // 1. runtime_canSpin(iter) 返回 false
        // 2. 锁被释放
        // 3. 锁处在饥饿模式
        new := old
        // old&mutexStarving == 0 即 old&(0100)=0
        if old&mutexStarving == 0 {
            // 当锁不处于饥饿模式时,尝试获取锁
            new |= mutexLocked
        }

        if old&(mutexLocked|mutexStarving) != 0 {
            // 进入这里说明 mutex 至少满足如下两个状态中的一个
            // 1. mutex 处于饥饿模式
            // 2. mutex 被其他 goroutine 获取
            // 只要出现上面两者之一,则进行等待,等待者加 1
            new += 1 << mutexWaiterShift
        }
        // 当前 goroutine 处于饥饿模式,并且当前锁被占用, 标记 new 变量为饥饿状态
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        // 当前 goroutine 被唤醒
        if awoke {
            // 一定要将 mutex 标识为唤醒状态,不然 panic
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }

        // 上面四个 if 语句构造本 goroutine 期望的 mutex 的下一个状态。

        // 将 state 与 old 比较,如果此时 mutex 的状态依旧是 old 记录的状态,
        // 则将 state 赋值为 new
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 只有 mutex 的上一个版本处于未加锁状态且在正常模式下,才算加锁成功。
            if old&(mutexLocked|mutexStarving) == 0 {
                break // 使用 cas 方法成功抢占到锁。
            }
            // waitStartTime != 0 说明当前 goroutine 是从等待状态被唤醒的 ,
            // 此时 queueLifo 为 true,反之为 false。
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                // 记录当前 goroutine 等待时间
                waitStartTime = runtime_nanotime()
            }
            // 阻塞等待锁的释放
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // 当 goroutine 等待时间超过 starvationThresholdNs,设置 starving 为 true
            starving = starving || runtime_nanotime()-waitStartTime> starvationThresholdNs
            old = m.state // 得到 mutex 锁的状态
            if old&mutexStarving != 0 { // 当前 mutex 处于饥饿模式
                // 如果当前的 state 已加锁,已唤醒,或者等待的队列中为空, 那么 state 是一个非法状态,panic
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // 等待状态的 goroutine - 1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // 如果本 goroutine 并不处于饥饿状态(等待时间小于 1ms),或者它是最后一个等待者
                if !starving || old>>mutexWaiterShift == 1 {
                    // 退出饥饿模式
                    delta -= mutexStarving
                }
                // 设置新 state, 因为已经获得了锁,退出、返回
                atomic.AddInt32(&m.state, delta)
                break
            }
            // 修改本 goroutine 为唤醒状态,并且自旋次数清 0
            awoke = true
            iter = 06
        } else {
            // 如果 CAS 不成功,重新获取锁的 state, 从 for 循环开始处重新开始 继续上述动作
            old = m.state
        }
    }
    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving // 从 state 字段中分出一个饥饿标记
    mutexWaiterShift = iota

    starvationThresholdNs = 1e6
)

func (m *Mutex) Lock() {
    // Fast path: 幸运之路,一下就获取到了锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    // Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿 goroutine 竞争
    m.lockSlow()
}

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false // 此 goroutine 的饥饿标记
    awoke := false // 唤醒标记
    iter := 0 // 自旋次数
    old := m.state // 当前的锁的状态
    for {
        // 锁是非饥饿状态,锁还没被释放,尝试自旋
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
            continue
        }
        // 执行到这里说明当前 goroutine 不满足自旋条件或者 Mutex 处于如下两种状态之一
        // 1. 锁已经被释放
        // 2. 锁未被释放但是处于饥饿模式
        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked // 非饥饿状态,加锁
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            // 1. Mutex 被其他 goroutine 持有
            // 2. Mutex 处于饥饿状态
            // 3. 以上两种情况的结合
            new += 1 << mutexWaiterShift // waiter 数量加 1
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving // 设置饥饿状态
        }
        if awoke {
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken // 新状态清除唤醒标记
        }
        // 成功设置新状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // 处理饥饿状态
            // 如果以前就在队列里面,加入到队列头
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 阻塞等待
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // 唤醒之后检查锁是否应该处于饥饿状态
            starving = starving || runtime_nanotime()-waitStartTime> starvationThresholdNs
            old = m.state
            // 如果锁已经处于饥饿状态,直接抢到锁,返回
            if old&mutexStarving != 0 {
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // 有点绕,加锁并且将 waiter 数减 1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving // 最后一个 waiter 或者已经不饥饿了,清除饥饿标记
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        runtime_Semrelease(&m.sema, true, 1)
    }
}

饥饿模式和正常模式

Mutex 有两种操作模式

  • 正常模式
  • 饥饿模式

我们需要在这里先了解正常模式和饥饿模式都是什么以及它们有什么样的关系。接下来我们分析一下 Mutex 对这两种模式的处理。

在调用 Lock 方法时,当前没有竞争直接获取锁返回。否则进入了 lockSlow 方法,

正常模式下,goroutine 都是进入先入先出到等待队列中,被唤醒的 goroutine 不会直接拿到锁,而是和新来的 goroutine 就行竞争。但是新来的 goroutine 有先天的优势拿到锁,因为他们正在 CPU 中运行。所以高并发下,被唤醒的 goroutine 可能拿不到锁,这时他就会被插入到队列的前面,此时如果 goroutine 获取不到锁的时间超过了设定的阈值 1 ms,那么此时 Mutex 就会进入到饥饿模式。

饥饿模式下,Mutex 的拥有者将直接把锁交给队列最前面的 goroutine,新来的 goroutine 会加入到等待队列的尾部,如果拥有锁的 goroutine 发现一下两种情况,会把 Mutex 转换成正常模式:

  • 此时新的 goroutine 的等待时间小于 1ms
  • 此 goroutine 已经是队列中的最后一个了,没有其它的等待锁的 goroutine 了

对比一下,正常模式拥有更好的性能,因为即使有等待抢锁的 goroutine, 也可以连续多次获取到锁。

饥饿模式是一种平衡,他让一些等待很长的 goroutine,能够优先拿到锁。

我们已经从多个方面和历史版本分析了 Mutex 的实现原理,这里我们从 Lock 和 Unlock 个方面总结注意事项。

Mutex 的 Lock 过程比较复杂,目前使用的新版本中,它涉及自旋、信号量以及调度等概念:

  • 如果 Mutex 处于初始化状态,会通过置位 mutexLocked 加锁;
  • 如果 Mutex 处于 mutexLocked 状态并且在正常模式下工作,会进入自旋;
  • 如果当前 goroutine 等待锁的时间超过了 1ms,Mutex 就会切换到饥饿模式;
  • Mutex 在正常情况下会将尝试获取锁的 goroutine 切换至休眠状态,等待锁的持有者唤醒;
  • 如果当前 goroutine 是 Mutex 上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将 Mutex 切换回正常模式;

Mutex 的 Unlock 过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:

  • 当 Mutex 已经被解锁时,调用 Unlock 会直接抛出异常;
  • 当 Mutex 处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
  • 当 Mutex 处于正常模式时,如果没有 goroutine 等待锁的释放或者已经有被唤醒的 goroutine 获得了锁,会直接返回

总结

我们已经从多个方面和历史版本分析了 Mutex 的实现原理,这里我们从 Lock 和 Unlock 个方面总结注意事项。

Mutex 的 Lock 过程比较复杂,目前使用的新版本中,它涉及自旋、信号量以及调度等概念:

  • 如果 Mutex 处于初始化状态,会通过置位 mutexLocked 加锁;
  • 如果 Mutex 处于 mutexLocked 状态并且在正常模式下工作,会进入自旋;
  • 如果当前 goroutine 等待锁的时间超过了 1ms,Mutex 就会切换到饥饿模式;
  • Mutex 在正常情况下会将尝试获取锁的 goroutine 切换至休眠状态,等待锁的持有者唤醒;
  • 如果当前 goroutine 是 Mutex 上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将 Mutex 切换回正常模式;

Mutex 的 Unlock 过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:

  • 当 Mutex 已经被解锁时,调用 Unlock 会直接抛出异常;
  • 当 Mutex 处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
  • 当 Mutex 处于正常模式时,如果没有 goroutine 等待锁的释放或者已经有被唤醒的 goroutine 获得了锁,会直接返回

思考问题

Q:目前 Mutex 的 state 字段有几个意义,这几个意义分别是由哪些字段表示的?

A:state 字段一共有四个子字段,前三个 bit 是 mutexLocked(锁标记)、mutexWoken(唤醒标记)、mutexStarving(饥饿标记),剩余 bit 标示 mutexWaiter(等待数量)。

原文地址

Licensed under CC BY-NC-SA 4.0
最后更新于 2023/06/05 09:38:48
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计