Golang调度器(4)—goroutine调度
0. 简介
1. 协程调度发生的时机
在以下情形中,goroutine
可能会发生调度:
情形 | 说明 |
---|---|
go func(){} |
使用go 关键字创建一个新的goroutine ,调度器会考虑调度 |
GC |
由于GC 也需要在系统线程M上执行,且其中需要所有的goroutine 都停止运行,所以也会发生调度 |
系统调用 | 发生系统的调用时,会阻塞M,所以它会被调度走,同时新的goroutine 也会被调度上来 |
同步内存访问 | mutex 、channel 等操作会使得goroutine 阻塞,因此会被调度走,等条件满足后,还会被调度上来继续运行 |
2. 创建协程时的调度
其中,使用go
关键字创建协程时的调度分析,上篇博客做了初步的分析,特别是有关调度循环的分析,但是我们没有具体分析,当创建协程时,系统是怎么发生调度的。
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, gp, pc)
_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
}
我们还记得,go
关键字在创建协程时,Go
的编译器会将其转换为runtime.newproc
函数,上篇我们详细分析了main goroutine
的创建过程,在runtime.main
函数中,全局变量mainStarted
会被置为true
,之后普通协程的创建,则会调用runtime.wakep
函数尝试唤醒空闲的P。
func wakep() {
if atomic.Load(&sched.npidle) == 0 {
return
}
// be conservative about spinning threads
if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
wakep
函数首先确认是否有其他线程正在处于spinning
状态,即M是否在找工作,如果没有的话,则调用startm
函数创建一个新的、或者唤醒一个处于睡眠状态的工作线程出来工作。
func startm(_p_ *p, spinning bool) {
// Disable preemption.
//
// Every owned P must have an owner that will eventually stop it in the
// event of a GC stop request. startm takes transient ownership of a P
// (either from argument or pidleget below) and transfers ownership to
// a started M, which will be responsible for performing the stop.
//
// Preemption must be disabled during this transient ownership,
// otherwise the P this is running on may enter GC stop while still
// holding the transient P, leaving that P in limbo and deadlocking the
// STW.
//
// Callers passing a non-nil P must already be in non-preemptible
// context, otherwise such preemption could occur on function entry to
// startm. Callers passing a nil P may be preemptible, so we must
// disable preemption before acquiring a P from pidleget below.
mp := acquirem() // 保证在此期间不会发生栈扩展
lock(&sched.lock)
if _p_ == nil { // 没有指定p,那么需要从空闲队列中取一个p
_p_ = pidleget()
if _p_ == nil {// 如果没有空闲的p,直接返回
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
releasem(mp)
return
}
}
nmp := mget() // 如果有空闲的p,那么取出一个空闲的m
if nmp == nil {// 如果没有空闲的m,那么调用newm创建一个,然后返回
// No M is available, we must drop sched.lock and call newm.
// However, we already own a P to assign to the M.
//
// Once sched.lock is released, another G (e.g., in a syscall),
// could find no idle P while checkdead finds a runnable G but
// no running M's because this new M hasn't started yet, thus
// throwing in an apparent deadlock.
//
// Avoid this situation by pre-allocating the ID for the new M,
// thus marking it as 'running' before we drop sched.lock. This
// new M will eventually run the scheduler to execute any
// queued G's.
id := mReserveID()
unlock(&sched.lock)
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_, id)
// Ownership transfer of _p_ committed by start in newm.
// Preemption is now safe.
releasem(mp)
return
}
unlock(&sched.lock)
if nmp.spinning {
throw("startm: m is spinning")
}
if nmp.nextp != 0 {
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// The caller incremented nmspinning, so set m.spinning in the new M.
nmp.spinning = spinning
nmp.nextp.set(_p_)
notewakeup(&nmp.park) // 如果有空闲的m,则唤醒这个m
// Ownership transfer of _p_ committed by wakeup. Preemption is now
// safe.
releasem(mp)
}
startm
函数首先判断是否有空闲的P,如果没有则直接返回;如果有,则判断是否有空闲的M,如果没有,则新建一个;如果有空闲的M,则唤醒这个M。说白了,wakep
函数就是为了更大程度的利用P,利用CPU资源。
说到这里,我们就需要重温一下上篇博客讲到的,调度中获取goroutine
的规则是:
- 每调度61次就需要从全局队列中获取
goroutine
; - 其次优先从本P所在队列中获取
goroutine
; - 如果还没有获取到,则从其他P的运行队列中窃取
goroutine
;
其中,从其他P队列中窃取goroutine
,调用的是findrunnable
函数,这个函数很长,为了简化说明,我们删除一些不是很重要的代码:
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
...
// local runq
// 再从本地队列找找
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
// 再看看全局队列
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
...
// Spinning Ms: steal work from other Ps.
//
// Limit the number of spinning Ms to half the number of busy Ps.
// This is necessary to prevent excessive CPU consumption when
// GOMAXPROCS>>1 but the program parallelism is low.
procs := uint32(gomaxprocs)
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
gp, inheritTime, tnow, w, newWork := stealWork(now) // 调用stealWork盗取goroutine
now = tnow
if gp != nil {
// Successfully stole.
return gp, inheritTime
}
if newWork {
// There may be new timer or GC work; restart to
// discover.
goto top
}
if w != 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
pollUntil = w
}
}
...
// return P and block
// 上面的窃取没有成功,那么解除m和p的绑定,摒弃娥江p放到空闲队列,然后去休眠
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)
...
_g_.m.spinning = false // m即将睡眠,状态不再是spinning
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
...
stopm() // 休眠
goto top
}
从上面的代码可以看出,工作线程会反复尝试寻找运行的goroutine
,实在找不到的情况下才会进入到睡眠。需要注意的是,工作线程M从其他P的本地队列中盗取goroutine
时的状态称之为自旋(spinning)状态,而前面讲到wakep
调用startm
函数,也是优先从自旋状态的M中选取,实在没有才去唤醒休眠的M,再没有就创建新的M。
窃取算法stealWork
我们就不分析了,有兴趣的同学可以看看。下面具体分析下stopm
是怎么实现线程睡眠的。
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
lock(&sched.lock)
mput(_g_.m) // 把m放到sched.midle空闲队列
unlock(&sched.lock)
mPark()
acquirep(_g_.m.nextp.ptr()) // 绑定这个m和其下一个p,这里没有看懂为啥这么操作
_g_.m.nextp = 0
}
func mPark() {
gp := getg()
notesleep(&gp.m.park) // 进入睡眠状态
noteclear(&gp.m.park)
}
可以看出,stopm
主要是将m对象放到调度器的空闲线程队列,然后通过notesleep
进入睡眠状态。note
是go runtime
实现的一次性睡眠和唤醒机制,通过notesleep
进入睡眠状态,然后另一个线程可以通过notewakeup
唤醒这个线程。
2.1 小结
如果是创建一个新的工作线程,那么其开启执行的点也是mstart
函数(注意区分mstart
和startm
),然后在schedule
函数中会尝试去获取goroutine
,如果全局和本地的goroutine
队列都没有,则会去其他的P上窃取goroutine
,如果窃取不成功,则会休眠。
如果是去唤醒工作协程,唤醒后会在休眠的地方开始,重新进行窃取。
窃取到工作协程后,就会去执行,然后就会因为各种原因重新开始调度循环。
3. 主动挂起
在Go
中,有很多种情形会导致goroutine
阻塞,即其主动挂起,然后被调度走,等满足其运行条件时,还会被调度上来继续运行。比如channel
的读写,我们以通道的阻塞读为例,来介绍goroutine
的主动挂起的调度方式。
3.1 协程挂起
和前面介绍的Map
一样,channel
的读也有以下两种读取方式:
v := <- ch
v, ok := <- ch
分别对应以下chanrecv1
和chanrecv2
函数:
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
无论是哪个函数,最终调用的都是chanrecv
函数:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
c.recvq.enqueue(mysg) // 将这个goroutine放到channel的recv的queue中
atomic.Store8(&gp.parkingOnChan, 1)
// 挂起这个goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}
chanrecv
会先判断channel是否有数据可读,如果有则直接读取并返回,如果没有则将这个goroutine
放到channel
的recv
的queue
中,然后调用gopark
函数将当前goroutine
挂起并阻塞。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}
gopark
函数则使用mcall
函数(前面分析过,主要作用是保存当前goroutine
现场,然后切换到g0
栈去调用作为参数传入的函数)取执行park_m
函数:
// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}
park_m
首先把当前goroutine
的状态设置为_Gwaiting
(因为它正在等待其它goroutine
往channel
里面写数据),然后调用dropg
函数解除g
和m
之间的关系,最后通过调用schedule
函数进入调度循环。
至此,一个goroutine
就被主动挂起了。
3.2 协程唤醒
我们继续以上例子,当另一个goroutine
对这个channel
发送数据的时候
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
goready(gp, skip 1)
}
channel
的发送流程和读取类似,当检查到接收队列中有等待着时,会调用send
函数然后调用goready
唤醒协程:
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
func ready(gp *g, traceskip int, next bool) {
if trace.enabled {
traceGoUnpark(gp, traceskip)
}
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()
mp := acquirem() // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
wakep()
releasem(mp)
}
这里发现,ready
函数和创建协程时一样,会触发wakep
来检查是否需要唤醒空闲P来执行。而在此之前,这个被唤醒的goroutine
会放到P的本地队列的下一个执行goroutine
,以提升时效性。
到这里,一个被挂起的协程也就被唤醒了。
4. 小结
以上,我们分析了创建协程时发生的调度,也介绍了以channel
读写为例子的主动挂起似的调度。而系统调用和GC触发的调度比较复杂,我们放在后面介绍。
5. 参考文献
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tangibkaj
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13