golang的信号量的实现原理
概述
我们前面讲过 操作系统的信号量,以及 golang中的Mutex原理解析,就抛出了一个问题,操作系统的信号量的管理对象是线程,而 Mutex 中使用的信号量是针对协程的,那么这就意味着golang需要重新实现一套基于协程的信号量,随着对golang源码的研究,我发现golang的 runtime 就像一个微型的操作系统,功能非常强大。
go version go1.18.3 windows/amd64
// src/runtime/sema.go
// Semaphore implementation exposed to Go.
// Intended use is provide a sleep and wakeup
// primitive that can be used in the contended case
// of other synchronization primitives.
// Thus it targets the same goal as Linux's futex,
// but it has much simpler semantics.
//
// That is, don't think of these as semaphores.
// Think of them as a way to implement sleep and wakeup
// such that every sleep is paired with a single wakeup,
// even if, due to races, the wakeup happens before the sleep.
//
// See Mullender and Cox, ``Semaphores in Plan 9,''
// https://swtch.com/semaphore.pdf
具体的用法是提供 sleep 和 wakeup 原语
以使其能够在其它同步原语中的竞争情况下使用
因此这里的 semaphore 和 Linux 中的 futex 目标是一致的
只不过语义上更简单一些也就是说,不要认为这些是信号量
把这里的东西看作 sleep 和 wakeup 实现的一种方式
每一个 sleep 都会和一个 wakeup 配对
即使在发生 race 时,wakeup 在 sleep 之前时也是如此
上面提到了和futex
作用一样,关于futex
。
futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁如信号量和POSIX互斥的基本工具。
Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用CPU提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在用户空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用。
go中的semaphore
作用和futex
目标一样,提供sleep
和wakeup
原语,使其能够在其它同步原语中的竞争情况下使用。当一个goroutine
需要休眠时,将其进行集中存放,当需要wakeup
时,再将其取出,重新放入调度器中。
主要源码
// src/sync/runtime.go
// SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
// If lifo is true, queue waiter at the head of wait queue.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_SemacquireMutex's caller.
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
// ----------------------------------------------------------------
// src/runtime/sema.go
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}
// Prime to not correlate with any user patterns.
const semTabSize = 251
var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
// 获取当前协程
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// Easy case.
if cansemacquire(addr) {
return
}
// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lockWithRank(&root.lock, lockRankRoot)
// Add ourselves to nwait to disable "easy case" in semrelease.
atomic.Xadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s, lifo)
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4 skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3 skipframes)
}
releaseSudog(s)
}
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
cansemacquire()
,此函数通过原子操作来修改和判断信号量的值。此处加载的包是runtime/internal/atomic
,对应的函数。
//go:noescape
func Cas(ptr *uint32, old, new uint32) bool
// src/runtime/internal/atomic/atomic_amd64.s
// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
// if(*val == old){
// *val = new;
// return 1;
// } else
// return 0;
TEXT ·Cas(SB),NOSPLIT,$0-17
MOVQ ptr 0(FP), BX
MOVL old 8(FP), AX
MOVL new 12(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret 16(FP)
RET
addr 为 uint32 类型,那么atomic.Cas(addr, v, v-1)
最低只能将其值修改到0,如果v-1 < 0
,那么就会返回 false,并放弃修改,这就实现了比较和修改的原子化。
golang中的信号量没有做初始化,默认值是0,那么在阅读函数cansemacquire()
的时候肯定会有疑惑。实际上,在充分理解了 Mutex 和 RWMutex 源码之后才会知道,golang中不对 sema 做初始化,它们的使用规范是先释放信号量,再获取信号量,如果还不理解可以看看golang中的Mutex原理解析。
这里的Easy case 和 Harder case
就是Fast path 和 slow path
,golang源码中对于循环代码块都喜欢这个干。
skipframe
参数是用作trace跟踪性能分析用的,包括releasetime
,acquiretime
。
数据结构
看到这里要先停下来搞清楚semtable, semaRoot, sudug
的关系。
addr 为一个信号量的地址,在一个程序中可能存在多个信号量,那么这些 addr 会被放入 semtable 数组中,采用取模的方式,semtable 长度为251,在声明的时候就做了初始化,每一个元素中包含一个 semaRoot,而 semaRoot 中包含一个平衡二叉数结构,用来存储着竞争信号量的协程 sudug。
func semroot(addr *uint32) *semaRoot {
return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
取模的过程肯定会存在冲突,类似于哈希冲突,因此不同的 addr 可能会被定位到同一个 semaRoot,那么在操作 semaRoot 的时候依然还需要带上 addr 参数,并将 addr 参数填充到 sudug 的 elem 字段,比如root.queue(addr) 和 root.dequeue(addr)
操作。
lifo
为后进先出模式,fifo
为先进先出。
sudug 的结构比较丰富,即可以通过它来构造一个平衡二叉树(parent, prev, next)
,又可以构造一个单向链表(waitlink, waittail)
,并且可以同时存在。二叉树的查找是为了满足多个 addr 通过取模后落到了同一个位置,提高查询效率,二叉树的每一个节点都意味着不同的 addr,所以相同的 addr 进来之后发现在二叉树上存在这个 addr 的节点,那么就会作为单向链表节点挂在这个节点下面。
sudug 的waittail
都指向链表的最后一个元素。
关于sleep和wakeup协程
与线程的挂起和唤醒原理类似,在前面成功的将协程加入到 semaRoot 之后,只需要将协程的状态设置为 Gwaiting 就可以实现挂起,而唤醒的过程是将其移出 semaRoot ,修改状态,加入到就绪队列。
// src/runtime/sema.go
func readyWithTime(s *sudog, traceskip int) {
if s.releasetime != 0 {
s.releasetime = cputicks()
}
goready(s.g, traceskip)
}
// src/runtime/proc.go
// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).
func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {
gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so execution resumes automatically.
func Gosched() {
checkTimeouts()
mcall(gosched_m)
}
// goyield is like Gosched, but it:
// - emits a GoPreempt trace event instead of a GoSched trace event
// - puts the current G on the runq of the current P instead of the globrunq
func goyield() {
checkTimeouts()
mcall(goyield_m)
}
readyWithTime()
把 sudog 对应的 g 唤醒,并且放到P本地队列的下一个执行位置。
goyield()
是调度控制,让出执行权,并放到P本地队列的的队尾,并不会挂起。
runtime.Gosched()
是调度控制,让出执行权,并放到全局队列的的队尾,并不会挂起。
关于lock
在对二叉树做操作的时候肯定是要加锁的,显然这个锁是要加在 semaRoot 上的,而采用 semtable 分散化在一定程度上可以降低锁的粒度。
golang通过 sema 来实现 sync.Mutex,然后在实现 sema 的时候又用了 mutex,那么这里的 mutex 是什么呢?
相关函数
// Mutual exclusion locks. In the uncontended case,
// as fast as spin locks (just a few user-level instructions),
// but on the contention path they sleep in the kernel.
// A zeroed Mutex is unlocked (no need to initialize each lock).
// Initialization is helpful for static lock ranking, but not required.
type mutex struct {
// Empty struct if lock ranking is disabled, otherwise includes the lock rank
lockRankStruct
// Futex-based impl treats it as uint32 key,
// while sema-based impl as M* waitm.
// Used to be a union, but unions break precise GC.
key uintptr
}
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4 skipframes)
lockWithRank(&root.lock, lockRankRoot)
unlock(&root.lock)
// src/runtime/lock_sema.go
func lock2(l *mutex) {
gp := getg()
if gp.m.locks < 0 {
throw("runtime·lock: lock count")
}
gp.m.locks
// Speculative grab for lock.
if atomic.Casuintptr(&l.key, 0, locked) {
return
}
semacreate(gp.m)
// On uniprocessor's, no point spinning.
// On multiprocessors, spin for ACTIVE_SPIN attempts.
spin := 0
if ncpu > 1 {
spin = active_spin
}
Loop:
for i := 0; ; i {
v := atomic.Loaduintptr(&l.key)
if v&locked == 0 {
// Unlocked. Try to lock.
if atomic.Casuintptr(&l.key, v, v|locked) {
return
}
i = 0
}
if i < spin {
procyield(active_spin_cnt)
} else if i < spin passive_spin {
osyield()
} else {
// Someone else has it.
// l->waitm points to a linked list of M's waiting
// for this lock, chained through m->nextwaitm.
// Queue this M.
for {
gp.m.nextwaitm = muintptr(v &^ locked)
if atomic.Casuintptr(&l.key, v, uintptr(unsafe.Pointer(gp.m))|locked) {
break
}
v = atomic.Loaduintptr(&l.key)
if v&locked == 0 {
continue Loop
}
}
if v&locked != 0 {
// Queued. Wait.
semasleep(-1)
i = 0
}
}
}
}
//go:nowritebarrier
// We might not be holding a p in this code.
func unlock2(l *mutex) {
gp := getg()
var mp *m
for {
v := atomic.Loaduintptr(&l.key)
if v == locked {
if atomic.Casuintptr(&l.key, locked, 0) {
break
}
} else {
// Other M's are waiting for the lock.
// Dequeue an M.
mp = muintptr(v &^ locked).ptr()
if atomic.Casuintptr(&l.key, v, uintptr(mp.nextwaitm)) {
// Dequeued an M. Wake it.
semawakeup(mp)
break
}
}
}
gp.m.locks--
if gp.m.locks < 0 {
throw("runtime·unlock: lock count")
}
if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
gp.stackguard0 = stackPreempt
}
}
golang中能同时并行执行的G的个数其实就是逻辑CPU的个数,也就是GMP模型中的M个数,此时每一个M上正在运行一个G,而这些G同时都在抢 mutex 来操作二叉树,通过源码可以大致判断出,此处是直接对M加的锁,通过atomic.Casuintptr(&l.key, 0, 1)
来限制只能是第一个G能操作成功,从而能获得锁,其他的G则要继续往下走,先是自旋一定次数获取锁,还是不行的话就调用操作系统的信号量来对线程M进行阻塞,自然G也就没法执行了,要知道,这个锁只发生在对二叉树的操作前后,时间很短,当然如果要抢锁的G过多肯定会造成M被锁的时间变长。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgfibii
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01