content/discuss/2019-03-08-wechat-discuss.md
有下面一段程序,面试官问这段程序有什么问题?
type Store struct {
a string
b string
sync.RWMutex
}
func (s *Store) GetA() string {
fmt.Println("get a")
s.RLock()
fmt.Println("get a2")
defer s.RUnlock()
return s.a
}
func (s *Store) GetAB() (string, string) {
fmt.Println("get ab")
s.RLock()
fmt.Println("get ab2")
defer s.RUnlock()
return s.GetA(), s.b
}
func (s *Store) Write(a, b string) {
fmt.Println("write")
s.Lock()
defer s.Unlock()
fmt.Println("write2")
s.a = a
s.b = b
}
s.RUnlock 前通过调用 s.GetA,又做了一次读写锁上锁 s.RLock,但是读锁可以多次上锁,所以单看这里没什么问题;Write 和 GetAB 并发调用的时候会存在问题呢?思考了一会,觉得没问题,就放弃了。以上,是面试时整个思路。
回头,越想越觉得这里哪里有问题,就在夜读群里求教了一下,群里大神发了一篇读写锁优先级的文章,然后给了一段测试样例,瞬间豁然开朗。
main函数逻辑如下:
func main() {
store := Store{}
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for i := 1; i < 10000; i += 1 {
fmt.Println("main write ", i)
store.Write("111", "1111")
}
}()
go func() {
defer wg.Done()
for i := 1; i < 10000; i += 1 {
fmt.Println("main get ab", i)
store.GetAB()
}
}()
wg.Wait()
}
执行结果
main get ab 12 //main函数读取ab
get ab //进入 s.GetAB 函数
main write 1 //main 函数写数据
write //进入 s.Write 函数
write2 //获取写锁
main write 2
.... //写锁一直抢占
....
main write 13 //main 函数写数据
write //进入 s.Write 函数
write2 //获取写锁
main write 14
write
write2
get ab2 //之前 get ab 12 才获得读锁
get a //进入 GetA
get a2 //获取读锁
main get ab 13 //main函数 get ab
get ab //进入 s.GetAB 函数
get ab2 //获取读锁
main write 15 //注意⚠️ 这个时候写数据开始了
write //进入 Write 函数,后面尝试获取写锁
get a //这个时候 GetAB 进入了 GetA,尝试获取读锁
fatal error: all goroutines are asleep - deadlock! //出现了死锁
分析:
GetAB | GetA | Write
| |
r0 占用读锁 | |
| | w0 尝试获取写锁 等待r0释放读锁
| r1 尝试获取读锁,排在w0后面 |
由于读写锁的优先级,读锁和写锁同时竞争时,读锁要排在写锁后面,导致了 r1 竞争 w0的锁,w0竞争r0,r0执行不下去,最后死锁。
读写锁前置条件:
可以思考下,如果让你设计一个这样的锁,你会怎么设计?
go中读写锁的结构,如下:
type RWMutex struct {
w Mutex // 用来保证同一时间只有一个写锁能够抢到锁
writerSem uint32 // 写锁信号量,在读锁全部解锁时通知阻塞的写锁
readerSem uint32 // 读锁信号量,在写锁解锁时通知阻塞的读操作
readerCount int32 // 等待、已上锁的读锁数量
readerWait int32 // 写锁获得锁前,已经上锁的读锁数量
}
首先,看一下读上锁逻辑:
func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false)
}
...
}
上面,上读锁逻辑获试图获取读锁数量原子性加一: atomic.AddInt32(&rw.readerCount, 1)。自增操作返回值如果小于0,则阻塞等待信号量 readerSem 唤醒。
疑问:
readerCount 小于0;runtime_SemacquireMutex 不会造成读读互斥么?再来看一下,读解锁逻辑:
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
...
}
解锁逻辑:先对 atomic.AddInt32(&rw.readerCount, -1) 进行原子性减一操作。
atomic.AddInt32(&rw.readerWait, -1) 针对 readerWait 原子性减一后判断是否为 0,为 0 则唤起写锁信号量;与读加锁类似,同样有 atomic.AddInt32(&rw.readerCount, -1) 小于 0 判断。可以有结论 rw.readerCount 小于 0,为写锁上锁的充要条件,后面分析写锁时进行验证。
解决了的问题:
疑问:
readerCount 修改成一个负数?如何保证这个负数足够小呢?先上代码:
func (rw *RWMutex) Lock() {
...
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false)
}
...
}
写上锁逻辑:
readerCount 原子性减去 rwmutexMaxReaders(这是个常量,具体定义 const rwmutexMaxReaders = 1 << 30)。这里可以验证之前猜想,rw.readerCount 小于0,是持有锁的充要条件。
atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders 返回结果是在写锁获取前,已持有读锁的数量 r。
读解锁数量>读加锁数量,或写锁多次时发生;第一个情况,读解锁会 check;第二种情况,mutex 保证同时只有一个写锁;atomic.AddInt32(&rw.readerWait, r),记录需要等待的读锁数量,然后等待writerSem唤醒。最终,保证:1. 写锁唯一性;2. 等待读锁完全释放;3. 阻塞后面读锁的获取;
再来看一下,写锁解锁逻辑:
func (rw *RWMutex) Unlock() {
...
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// Allow other writers to proceed.
rw.w.Unlock()
...
}
解锁逻辑:
atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)。这里,能够看到两个隐含的点:
写锁释放锁时,读锁要比写锁优先级高;runtime_Semrelease 唤起阻塞着的读锁。
runtime_Semrelease > runtime_SemacquireMutex 会不会存在问题?验证过不会。通过分析,可以得出结论:
读锁已经上锁,或没有持有读写锁的协程条件下,读写锁都有机会获取锁;所以,针对之前的面试题,读锁嵌套读锁,在有写锁的时候,依据结论2会发生死锁。
通过上面分析,存在待验证问题:
一个协程个已获取读锁,另个协程试图获取写锁,还有一个协程在完全获取写锁前调用Unlock,再一个协程释放读锁,按顺序进行流程。会发生死锁具体可以自己分析(写锁信号量永远阻塞);
一个协程已上写锁锁,一个协程试图获取读锁,然后另一个协程释放读锁,最后一个协程释放写锁,同样会发生死锁(读信号量永远阻塞);
在以后用锁的时候不管有没有优先级,都要时刻记住死锁的四个必要条件: