前言

之前实现过使用redis的分布式锁,分布式锁解决的是主机与主机之间的临界资源的保护。单机系统内部需要使用单机锁来完成对临界资源的保护。Go中单机锁是由sync.Mutex包实现。设计思路清晰,编码高度凝练,因为Go本身就是为高并发而设计,所以锁的设计上也是尽可能保证并发获取锁的效率和公平性上达到一个相对的平衡。整个代码实现加注释也才250多行,资源上只使用了一个int32stateuint32sema来实现整个锁的功能。出自大师手笔确实有很多值得借鉴和学习的地方。


设计模式

atomic

原子指得是一个操作是最小的不可分割的。cpu有一级缓存二级缓存三级缓存才会到内存中,如何保证多核cpu读取或者写入内从中的数据是一致的呢,这里会设计到内存模型,数据的写入会通过状态机来和cpu指令来完成同步。

cas

cas是(Compare-And-Swap)的缩写,意思是比较并且赋值。成功的条件是当前变量的值和比较的值一致时才可以写入成功,并且整个过程是原子操作。

获取锁的方式或者行为

在线程如果尝试加锁,如果没有获取到锁我们会如何处理。通常会有两种方式:

  1. 采用自旋的方式,就是不停去尝试获取锁,redis分布式锁中获取锁就是采用此种方式。
  2. 获取不到锁则将自己挂起,当其它线程释放锁后来将我唤醒再去执行后面的操作。

乐观锁和悲观锁

上面两种行为其实就是对应乐观和悲观。自旋锁乐观的认为当前环境不是很忙或者没有很多并发再给我争抢锁,我将很快获取锁,所以通过自旋几次便可以获取锁。而悲观锁认为当前环境已经比较忙了,我需要将自己挂起来减少系统负担。

饥饿模式和正常模式

如果我们系统采用乐观锁模式,所有的线程都是自旋获取锁,可能会导致最早到达的线程一直获取不到锁,或者再悲观一点永远获取不到锁导致业务一直不会被执行。那么此时长时间获取不到锁的线程就会进入饥饿模式,嗷嗷待哺。

如果我们只使用悲观锁,有获取锁的线程我们就将线程进行入队,锁释放后每次都从队首获取要执行的线程,此种就是饥饿模式。再饥饿模式下是可以完全保证公平性但是所有的线程都要执行入队出队,挂起和唤醒本身会有一定的性能损耗。

综上Go的单机锁Mutex兼顾饥饿模式和正常,可以动态探测当前系统的负载和线程的饥饿状态来动态切换正常模式和饥饿模式。


源码实现

类型定义

type Mutex struct {
    state int32 // 状态量,内部被分成三个区
    sema  uint32 // 信号量,用于协程唤醒
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken 
    mutexStarving
    mutexWaiterShift = iota

    starvationThresholdNs = 1e6
)

Mutex肚子里只有两个4字节长度的变量。但是state内部被分成多个位置来传递不同信息,如图:

state

结合常量定义

  • mutexLocked:值等于1,state的最低位,表示锁状态标志
  • mutexWoken:值等于2,state的第二位,表示是否有协程在唤醒标志,在unlock是决定要不要进行协程的唤醒
  • mutexStarving:等于4,state的第三位,表示是否是饥饿模式
  • mutexWaiterShift:值等于3,用于直接二进制偏移来计算当前等待的协程数量
  • starvationThresholdNs:协程等待时间阈值,如果超过1ms则转变为饥饿模式

加锁流程

直接获取锁

func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

通过cas判断m.state是否为0,如果为0则写入mutexLocked表示加锁成功,直接返回,否则进入lockSlow,进行慢加锁。

这一段逻辑比较容易理解,在state的四个区中所有状态都为0,说明当前锁没有被任何线程占用,此时直接获取锁即可。

慢加锁逻辑

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    //...
}

局部变量

  • waitStartTime:用于计算协程等待的时间
  • starving:饥饿模式
  • awoke:当前协程是否处于唤醒状态
  • iter:递增次数,用于结束自旋
  • old:局部变量保存state的值

自旋逻辑

for {
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
    
        // ...
}

判断成立的条件,old中不是饥饿模式,并且当前是加锁状态并且runtime_canSpin(iter)可以进入自旋。

  • old&(mutexLocked|mutexStarving) == mutexLocked:这里要排除饥饿模式目的是,饥饿模式所有线程是要进入阻塞队列采用FIFO的方式进行唤醒出队,所以如果是饥饿模式就不要进行自旋尝试获取锁,如果此时锁已经释放也不需要标记意向,直接获取锁即可。
  • runtime_canSpin(iter):底层函数,用来判断是否可以进入自旋。这里不可以进入自旋的条件:

    1.是我们传递的iter循环次数,大于某个值会不满足自旋,因为不可能一直自旋下去

    2.函数内部的条件如果当前系统是单线程,也没有比较进行自旋。直接释放cpu让其它获取锁的协程运行,运行完释放锁之后我们才有运行的意义。

    3.如果GPM中的P队列还有排队的协程,说明当前系统比较繁忙也不适合进入自旋。

    4.如果一个协程被一直阻塞,后面一直有通过自旋获取的协程,可能也会返回falst(这一点是我猜的,不一定准确)

进入自旋后继续判断!awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)

  • !awoke:如果当前已经是被唤醒了则不需要再重复写标志。
  • old&mutexWoken == 0:当前没有协程在唤醒
  • old>>mutexWaiterShift != 0:已经有协程在排队等待获取锁了,如果等待队列是空的,也就意味着只有自己在获取锁,只有自己在获取锁的情况下解锁时就不用考虑再唤醒后面的协程。
  • atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken):cas写入唤醒标志成功

思考:这里的自旋只是写入唤醒标志,而不是自旋获取锁,就是说每次进来都要固定自旋几次这是为什么?注释中说的也很清楚加唤醒的目的是告诉解锁线程不需要唤醒其它协程,目的是减少锁竞争,因为此时我已经再尝试获取锁了就没有必要再唤醒其它协程跟我一起竞争锁了。

退出自旋后准备获取锁

        new := old
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
  • new := old:保存old的值,准备新值
  • old&mutexStarving:如果不是饥饿模式才会尝试加锁,饥饿模式要进行队尾入队的不可以直接获取锁
  • old&(mutexLocked|mutexStarving) != 0:当前锁以经被占用或者是饥饿模式就要给排队计数加1了
  • starving && old&mutexLocked != 0:如果自己上次被唤醒时间超过阈值,并且当前锁是锁定状态那么就要进入饥饿模式,如果从上次出来直接可以获取锁也不需要进入饥饿模式。
  • if awoke:如果是自己增加的期望获取锁标志,此时需要清除,因为接下来就要进行挂起或者获取锁了

获取锁或者挂起

        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
  • atomic.CompareAndSwapInt32(&m.state, old, new):一旦cas成功,接下来只能获取锁或者进入挂起逻辑
  • old&(mutexLocked|mutexStarving) == 0:如果当前没有锁或者不是饥饿模式则获得锁,饥饿模式需要入队
  • queueLifo := waitStartTime != 0:查看我们是首次获取还是已经被排队然后被重新唤醒的,如果首次则要进队尾,如果是已经被唤醒的那么需要进队首
  • runtime_SemacquireMutex(&m.sema, queueLifo, 1):将自己挂起,等待被唤醒
  • starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs:唤醒之后检查一下睡眠时间是否超过阈值,如果超过需要将局部饥饿模式标志置位,至于要不要真正进入饥饿模式还要看下次获取锁的是不是自己,如果是自己与不需要进入饥饿模式。
  • old = m.state:获取最新状态
  • old&mutexStarving != 0:如果当前是饥饿模式(注意虽然是old,但是也是表示当前的状态)
  • old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0:这个可能不太容易想,首先在饥饿模式被唤醒的协程此时已经是因为unlock时被唤醒,唤醒之后锁被其它协程占有了,此种情况肯定是不可能的所以mutexLocked一定是0,在饥饿模式下所有协程都是排队的,不会有自旋自然mutexWoken这个值也不能被置1。old>>mutexWaiterShift == 0也就是我被唤醒之后发现等待队列是空的,那我自己是啥?所以发生以上情况就要报错了。
  • delta := int32(mutexLocked - 1<<mutexWaiterShift):被唤醒之后就要从等待对列减1了
  • !starving || old>>mutexWaiterShift == 1:如果当前协程模式不是饥饿模式,或者我自己是队尾了,那么就可以退出饥饿模式了

解锁流程

直接解锁

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}
  • new != 0:解锁之后如果不是0,表示我可能需要唤醒其它协程

慢解锁逻辑

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        fatal("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        runtime_Semrelease(&m.sema, true, 1)
    }
}
  • (new+mutexLocked)&mutexLocked == 0:不允许多次解锁
  • new&mutexStarving == 0:如果是饥饿模式,释放信号唤醒队首协程即可
  • if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0:在非饥饿模式下old>>mutexWaiterShift == 0表示没有等待的协程或者old&(mutexLocked|mutexWoken|mutexStarving) != 0表示已经有其它协程获取锁或者有已经唤醒的协程,获取进入饥饿模式那么直接返回即可。
  • 以上都不是:需要进行唤醒操作了将排队减1,然后释放信号即可

思考:下面

        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }

        if old&mutexStarving != 0 {
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }

再加锁时,if old&(mutexLocked|mutexStarving) != 0成立则会在队列中加1,但是减1的位置只有在饥饿模式,或者解锁时不满足情况才会减1,这个会不会有泄露的情况,或者说加了但是没有减,或者多次减的情况。

这个确实能看出作者的功底和深思熟虑,首先执行new += 1 << mutexWaiterShift的条件是old&(mutexLocked|mutexStarving) != 0,就是当前处于锁状态或者饥饿模式,饥饿模式这个加1没有问题,再被唤醒后会自己减去1,如果是非饥饿模式,如果当前锁已经被获取了,那么好在下面if atomic.CompareAndSwapInt32(&m.state, old, new) {cas成立的条件是old从自旋出来就没有发生任何变化,也就是说如果是锁定状态进行的等待数量加1,如果此时有唤醒的协程,意味着锁已经被唤醒协程获取,唤醒的协程不会执行new += 1 << mutexWaiterShift,所以本次解锁不需要进行唤醒,交给当前唤醒的协程负责唤醒下一个协程即可。如果当前唤醒的协程一直是最新的,也就是通过自旋获取的那么队列中的协程就一直没有被唤醒的机会。但是Go中还有是否满足自旋条件的一个内部函数,可以帮助睡眠的线程被唤醒唤醒后进行判断是否需要进入饥饿模式来进行调节。

并发关系

如图:

并发锁关系

总结:被唤醒的协程有两条路要么获取锁,要么加入阻塞队列。获取锁时不需要将等待系数相加,如果没有获取锁需要增加队列计数和挂起。再解锁时如果当前有唤醒协程则不需要额外进行唤醒,交给唤醒的协程即可,同时唤醒的协程因为获取锁不会增加队列计数,当他解锁时唤醒标志是0所以要帮忙唤醒一下阻塞的协程,并且帮忙将唤醒标志置1。

--EOF

最后修改:2025 年 02 月 27 日
如果觉得我的文章对你有用,请随意赞赏