Featured image of post Golang 并发编程之同步原语

Golang 并发编程之同步原语

概述

当提到并发编程、多线程编程时,我们往往都离不开『锁』这一概念,Go 语言作为一个原生支持用户态进程 Goroutine 的语言,也一定会为开发者提供这一功能,锁的主要作用就是保证多个线程或者 Goroutine 在访问同一片内存时不会出现混乱的问题,锁其实是一种并发编程中的同步原语(Synchronization Primitives)。

在这一节中我们就会介绍 Go 语言中常见的同步原语 MutexRWMutexWaitGroupOnceCond 以及扩展原语 ErrGroupSemaphoreSingleFlight 的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。

基本原语

Go 语言在 sync 包中提供了用于同步的一些基本原语,包括常见的互斥锁 Mutex 与读写互斥锁 RWMutex 以及 OnceWaitGroup

这些基本原语的主要作用是提供较为基础的同步功能,我们应该使用 Channel 和通信来实现更加高级的同步机制,我们在这一节中并不会介绍标准库中全部的原语,而是会介绍其中比较常见的 MutexRWMutexOnceWaitGroupCond,我们并不会涉及剩下两个用于存取数据的结构体 MapPool

Mutex

Go 语言中的互斥锁在 sync 包中,它由两个字段 statesema 组成,state 表示当前互斥锁的状态,而 sema 真正用于控制锁状态的信号量,这两个加起来只占 8 个字节空间的结构体就表示了 Go 语言中的互斥锁。

1
2
3
4
type Mutex struct {
    state int32
    sema  uint32
}

状态

互斥锁的状态是用 int32 来表示的,但是锁的状态并不是互斥的,它的最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置都用来表示当前有多少个 Goroutine 等待互斥锁被释放:

互斥锁在被创建出来时,所有的状态位的默认值都是 0

  • 当互斥锁被锁定时 mutexLocked 就会被置成 1
  • 当互斥锁被在正常模式下被唤醒时 mutexWoken 就会被被置成 1
  • mutexStarving 用于表示当前的互斥锁进入了状态,最后的几位是在当前互斥锁上等待的 Goroutine 个数

饥饿模式

在了解具体的加锁和解锁过程之前,我们需要先简单了解一下 Mutex 在使用过程中可能会进入的饥饿模式,饥饿模式是在 Go 语言 1.9 版本引入的特性,它的主要功能就是保证互斥锁的获取的『公平性』(Fairness)。

互斥锁可以同时处于两种不同的模式,也就是正常模式和饥饿模式,在正常模式下,所有锁的等待者都会按照先进先出的顺序获取锁,但是如果一个刚刚被唤起的 Goroutine 遇到了新的 Goroutine 进程也调用了 Lock 方法时,大概率会获取不到锁,为了减少这种情况的出现,防止 Goroutine 被『饿死』,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换到饥饿模式。

在饥饿模式中,互斥锁会被直接交给等待队列最前面的 Goroutine,新的 Goroutine 在这时不能获取锁、也不会进入自旋的状态,它们只会在队列的末尾等待,如果一个 Goroutine 获得了互斥锁并且它是队列中最末尾的协程或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。

相比于饥饿模式,正常模式下的互斥锁能够提供更好地性能,饥饿模式的主要作用就是避免一些 Goroutine 由于陷入等待无法获取锁而造成较高的尾延时,这也是对 Mutex 的一个优化。

加锁

互斥锁 Mutex 的加锁是靠 Lock 方法完成的,最新的 Go 语言源代码中已经将 Lock 方法进行了简化,方法的主干只保留了最常见、简单并且快速的情况;当锁的状态是 0 时直接将 mutexLocked 位置成 1

1
2
3
4
5
6
func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    m.lockSlow()
}

但是当 Lock 方法被调用时 Mutex 的状态不是 0 时就会进入 lockSlow 方法尝试通过自旋或者其他的方法等待锁的释放并获取互斥锁,该方法的主体是一个非常大 for 循环,我们会将该方法分成几个部分介绍获取锁的过程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }

在这段方法的第一部分会判断当前方法能否进入自旋来等待锁的释放,自旋(Spinnig)其实是在多线程同步的过程中使用的一种机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真,在多核的 CPU 上,自旋的优点是避免了 Goroutine 的切换,所以如果使用恰当会对性能带来非常大的增益。

在 Go 语言的 Mutex 互斥锁中,只有在普通模式下才可能进入自旋,除了模式的限制之外,runtime_canSpin 方法中会判断当前方法是否可以进入自旋,进入自旋的条件非常苛刻:

  1. 运行在多 CPU 的机器上;

  2. 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;

  3. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列是空的;

官方的源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const (
    mutex_unlocked = 0
    mutex_locked   = 1
    mutex_sleeping = 2

    active_spin     = 4
    active_spin_cnt = 30
    passive_spin    = 1
)

// Active spinning for sync.Mutex.
//
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
    // sync.Mutex is cooperative, so we are conservative with spinning.
    // Spin only few times and only if running on a multicore machine and
    // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
    // As opposed to runtime mutex we don't do passive spinning here,
    // because there can be work on global runq or on other Ps.
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 {
        return false
    }
    // 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列不为空则不进入自旋;
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}

一旦当前 Goroutine 能够进入自旋就会调用 runtime_doSpin,它最终调用汇编语言编写的方法 procyield 并执行指定次数的 PAUSE 指令,PAUSE 指令什么都不会做,但是会消耗 CPU 时间,每次自旋都会调用 30PAUSE,下面是该方法在 386 架构的机器上的实现:

1
2
3
4
5
6
7
TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ again
    RET

处理了自旋相关的特殊逻辑之后,互斥锁接下来就根据上下文计算当前互斥锁最新的状态了,几个不同的条件分别会更新 state 中存储的不同信息 mutexLockedmutexStarvingmutexWokenmutexWaiterShift

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            new &^= mutexWoken
        }

计算了新的互斥锁状态之后,我们就会使用 atomic 包提供的 CAS 函数修改互斥锁的状态,如果当前的互斥锁已经处于饥饿和锁定的状态,就会跳过当前步骤,调用 runtime_SemacquireMutex 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

runtime_SemacquireMutex 方法的主要作用就是通过 Mutex 的使用互斥锁中的信号量保证资源不会被两个 Goroutine 获取,从这里我们就能看出 Mutex 其实就是对更底层的信号量进行封装,对外提供更加易用的 API,runtime_SemacquireMutex 会在方法中不断调用 goparkunlock 将当前 Goroutine 陷入休眠等待信号量可以被获取。

一旦当前 Goroutine 可以获取信号量,就证明互斥锁已经被解锁,该方法就会立刻返回,Lock 方法的剩余代码也会继续执行下去了,当前互斥锁处于饥饿模式时,如果该 Goroutine 是队列中最后的一个 Goroutine 或者等待锁的时间小于 starvationThresholdNs(1ms),当前 Goroutine 就会直接获得互斥锁并且从饥饿模式中退出并获得锁。

Licensed under CC BY-NC-SA 4.0
最后更新于 2023/06/01 13:42:40
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计