Golang最大的特点就是Goroutine多协程高并发。
但只要一有多协程就可能会出现Data Race的状况。
如果不好好处理Data Race的问题,就会导致数据乱码等等未知错误。
而锁,就是避免这种情况的工具,所以在多线程环境中非常重要。golang自然考虑到这一点,搞了两种锁。
一种是Mutex,互斥锁
一种是RWMutex,读写互斥锁
而他们的实现都是基于CPU中的atomic指令操作(也叫原子指令)。
原子指令的实现,我就不细嗦了,大概就是等待一个读写完,还没有读写完出现竞争状况就把数据存入缓存,类似做了个队列。
不过那不是我们讨论的。
我现在是讨论Golang的Mutex锁。
Golang是如何实现Mutex的
相关算法
Golang Mutex锁的算法很简单,甚至不能叫算法,它简称为CAS(Compare And Swap)
什么叫CAS呢
其实你看它英文也很好理解。首先compare,也就是我比较一下这个值的状态,如果它满足我想要的,我就把它换成(Swap)新的值。
尽管很简单,但CAS却是最常见的无锁(lock-free)算法之一
golang也用代码解释了这个过程
if *addr == old {
*addr = new
return true
}
return false
相关库
runtime race
这是golang内部的一个race detect库。
什么叫race detect呢。顾名思义,就是检测有没有goroutine竞争的。
1.上锁
golang中维护了一个锁的全局状态变量(state)。
并规定如下状态参数
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
解释一下,mutexLocked,就是上锁了
0,就是没锁。
而mutexWoken和mutexStarving都和Go调度器调度goroutine相关。
当一个goroutine取得锁后,它会牢牢护住这把锁,其他的goroutine抢不了,怎么办呢。go调度器就把他们暂时休眠,还能节省CPU资源。
锁的工作模式
Mutex有两种工作模式。
一种是normal,一种是starvation。
normal就是一般的上锁,然后另一个goroutine等待解锁
starvation则是另一个goroutine等待解释途中突然有新goroutine进来抢锁,我上面说过,go调度器已经将等待锁的goroutine休眠了,但是这个新goroutine刚进来,还是没被休眠的,它自然有更高的优先级去和之前到的goroutine抢锁,这样一来,就出现了不公平的情况。
为了保证公平,才有了starvation状态。
因此才会有mutexWoken和mutexStarving状态,这就是饥饿竞争状态。什么叫饥饿竞争呢,顾名思义,就是多个goroutine去抢着上锁,锁只有一把,不够用了,这就是饥饿竞争,在golang中,抢着上锁是非常常见的状况。
那么golang如何判断starvation呢。
非常简单。
当这个goroutine被Go调度器唤醒后,它开始去抢锁,结果发现怎么抢都抢不到,直到1ms后,它就认为有别的goroutine在和它竞争,于是转入starvation状态。
不多BB,直接贴代码
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
什么意思呢,就是首先判断有没有进入starving,然后再判断等待时间是否超过了1ms
Mutex上锁分两步
第一步
第一次上锁,也就不存在竞争状况,那很简单啊,直接CAS,把锁状态值用atomic操作改成上锁状态。
因为是atomic,所以会有一个排队状况。第二个来的肯定会发现,啊,怎么上锁了。
贴代码。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
race.Enabled是检查是否存在竞争状态,存在就用atomic.Load类似的函数(race.Acquire)去取得锁
但如果已经上锁了的话怎么办。
等。
Normal模式
我之前说到休眠,具体是如何实现呢。
通过自旋。什么叫自选?就是等啊等啊等啊,像转圈一样,自己旋转,很形象。
英文叫Spining
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
它会尝试去用CAS去看看有没有解锁,没解锁,继续自旋休眠,但如果检测到有新goroutine进来抢锁,就会进入starvation状态,或者已经进入。并更新其值为mutexStarving(3,二进制11),怎么判断和得到的呢。old&mutexWoken这行代码说明什么,我们来假设以下,如果正位于锁定状态,那么锁状态值为1(二进制01),mutexWoken为2(二进制10),与操作后就是00。意思就是位于上锁,还没有解锁。但是mutexWaiterShift告诉它有人来和你竞争了。于是它将值做CAS, 01|10=11=3=mutexStarving。CAS作用应该是保护当前状态不被改变,确保其正确进入starvation模式
而runtime_canSpin函数还有一个功能,也就是判断自旋是否达到现在,iter为迭代次数
它的实现是这样的
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq or on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
什么意思呢。大概就是如果单核心cpu不自旋,无论如何。多核心的话开始判断是否大于活跃自旋个数。
runq和P大概就是和调度器有关的东东了,暂时没去了解。
所以以下代码有两种情况。
第一,解锁了。
第二,进入了Starvation模式,开始抢锁竞争。
进入starvation或不需要自旋后进入以下代码。
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
starvation判断过程先不细嗦,先细嗦解锁了。它是如何判断解锁的呢
if old&mutexStarving == 0 {
new |= mutexLocked
}
这一行代码,mutexStarving二进制为11,也就是能和它与操作为0的只能是解锁状态(00),那么new的值会尝试被修改成01(mutexLocked),也就是准备抢锁
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
这一行代码说明了什么。与(mutexLocked|mutexStarving)=11不等于0,意思是锁目前状态不为00,意思就是没有解锁,或者那么new会加上 1 << mutexWaiterShift=3,意思就是新增等待的,告诉正在准备下一轮自旋的goroutine马上进入starvation模式。
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
这一行就开始判断是否已经为starvation模式了。
代码开始执行到另一个CAS。确保目前状态能被new顺利替换。替换成功,如果成功抢锁就直接break,跳出循环。但如果替换失败呢,说明目前原子操作被别的goroutine抢了,那么就记录下新状态,下一轮循环继续。
抢锁失败,则开始记录waitTime,剩下的就是Starvation模式了。
Starvation模式
前面剖析代码途中,有如下参数我没有嗦。
awoke。
这个awoke是干啥用的捏。
其实是告诉自旋的goroutine,你锁要被抢了。
靠,有人抢我锁?我不干他丫的。
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
看到这行代码。它要开始狂暴了。&^是先与后异或操作。
什么意思呢。大概就是重置吧。
然后就是starvation饥饿竞争了。
首先记录下waitTime。第一个进入先-8。虽然goroutine是无需的,但是此时awoke,也就是刚睡醒的goroutine,发现有人要抢他锁后,他已经去CAS换掉了此时的状态值,如果没换到就继续尝试换,不行就继续自旋。
换掉状态值后,通过runtime_SemacquireMutex把该睡醒的goroutine优先级调到最高。
这样新来的goroutine便抢不过它了。它便有机会去把delta加上8。但万一有goroutine抢得过它呢。它就继续滚回去自旋了。再重设,再来抢。
暂无评论