热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

golang源码分析调度概述

golang源码分析-调度过程概述本文主要概述一下golang的调度器的大概工作的流程,众所周知golang是基于用户态的协程的调度来完成多任务的执行。在Linux

golang源码分析-调度过程概述

本文主要概述一下golang的调度器的大概工作的流程,众所周知golang是基于用户态的协程的调度来完成多任务的执行。在Linux操作系统中,以往的多线程执行都是通过操作系统陷入内核来创建线程并提供给操作系统进行调度,在操作系统中的线程调度可以充分利用操作系统提供的各种资源,当线程执行到阻塞或者等待操作时,操作系统会休眠对应线程直到阻塞的事情来唤醒该线程继续执行,但是在通过操作系统创建的线程无论是阻塞还是调度都需要陷入内核,从而导致线程在这些过程中的开销较大。golang中的协程更多的是在用户态进行调度不需要陷入内核,但是同时这也限制了golang的调度策略并不能使用操作系统提供的阻塞唤醒或者抢占式调度的机制,本文主要就是探讨一下golang在用户态是如何进行调度执行。


golang的运行模型

golang主要根据CSP模型,通过通信进行数据交互,并且由于是实现的用户态的协程调度,但是本质上还是对应与操作系统的线程去详细执行对应的具体内容,故在golang中就设置了三种不同的模型分别为M,P和G。


Machine(M)操作系统线程

Machine即对应于真正的操作系统创建的线程,这个线程的创建调度与运行都是受操作系统所控制,如果golang执行的是一个阻塞操作,那么该线程还是会阻塞,知道阻塞完成之后被操作系统唤醒并继续执行。


Processor§

Processor就是虚拟的提供给g执行的上下文环境,该环境包括一个本地的g的队列,本地内存的对象等操作资源,只有M在绑定了P之后才能执行对应的G。


Groutine(G)

Groutine就是golang中对应的用户态的协程的具体内容,默认的用户态栈的大小是2KB,包括这执行任务的上下文的环境,在切换过程中保存执行的环境,调度器就是调度G到可执行的P中从而完成高效的并发调度操作。

三者整体的运行状态如图所示;

在这里插入图片描述

golang可能的一个运行状态图如上所示,从运行过程也可看出,G的调度过程都是在用户态进行的,接下来就分析一下调度的场景


golang的调度场景

在golang的初始化过程中,首先第一个M0就是初始化完成的M0,该M0就会在初始化完成之后调度执行对应的G,在golang的启动过程中可知,golang中的main函数其实也是对应的一个G来调度执行,如果在golang程序中启动协程来执行,并根据协程的执行情况或者现有的内核线程的工作情况来决定是否重新开启一个内核线程。


内核线程的启动过程

在拥有大量的G未执行的时候,或者是有的内核线程在执行系统调用阻塞的情况下,或者有些G长时间运行的情况,会根据情况来开启一个新的内核线程来执行可执行的G,从而确保G能够快速被执行。

在golang的启动过程中,会启动一个sysmon内核线程,该线程不知道具体的G内容,而是用来监控一些非阻塞的事件是否完成,监控各个正在被执行的G的运行时间,并从事抢占性调度的标志位的设置。

func newm(fn func(), _p_ *p) { // 生成内核工作线程mp := allocm(_p_, fn) // 申请对应的内存设置新的栈信息mp.nextp.set(_p_) mp.sigmask = initSigmaskif gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {// We're on a locked M or a thread that may have been// started by C. The kernel state of this thread may// be strange (the user may have locked it for that// purpose). We don't want to clone that into another// thread. Instead, ask a known-good thread to create// the thread for us.//// This is disabled on Plan 9. See golang.org/issue/22227.//// TODO: This may be unnecessary on Windows, which// doesn't model thread creation off fork.lock(&newmHandoff.lock)if newmHandoff.haveTemplateThread == 0 {throw("on a locked thread with no template thread")}mp.schedlink = newmHandoff.newmnewmHandoff.newm.set(mp)if newmHandoff.waiting {newmHandoff.waiting = falsenotewakeup(&newmHandoff.wake)}unlock(&newmHandoff.lock)return}newm1(mp) // 生成该工作线程
}func newm1(mp *m) {if iscgo {var ts cgothreadstartif _cgo_thread_start == nil {throw("_cgo_thread_start missing")}ts.g.set(mp.g0)ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))ts.fn = unsafe.Pointer(funcPC(mstart))if msanenabled {msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))}execLock.rlock() // Prevent process clone.asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))execLock.runlock()return}execLock.rlock() // Prevent process clone.newosproc(mp) // 系统调用线程 Linux主要是clone系统调用execLock.runlock()
}

func newosproc(mp *m) {stk :&#61; unsafe.Pointer(mp.g0.stack.hi) // 设置栈/** note: strace gets confused if we use CLONE_PTRACE here.*/if false {print("newosproc stk&#61;", stk, " m&#61;", mp, " g&#61;", mp.g0, " clone&#61;", funcPC(clone), " id&#61;", mp.id, " ostk&#61;", &mp, "\n")}// Disable signals during clone, so that the new thread starts// with signals disabled. It will enable them in minit.var oset sigsetsigprocmask(_SIG_SETMASK, &sigset_all, &oset)ret :&#61; clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) // 系统调用生成线程并设置g0堆栈开始执行mstart函数&#xff0c;从而重新开启一个线程执行sigprocmask(_SIG_SETMASK, &oset, nil)if ret <0 {print("runtime: failed to create new OS thread (have ", mcount(), " already; errno&#61;", -ret, ")\n")if ret &#61;&#61; -_EAGAIN {println("runtime: may need to increase max user processes (ulimit -u)")}throw("newosproc")}
}

从流程可知&#xff0c;生成一个工作线程主要通过系统调用生成一个&#xff0c;生成完成之后再重新从mstart函数开始执行任务&#xff0c;重新开始去调度执行G。新增工作内核线程可能会在系统调用的过程中触发检查也可能在监控线程中通过retake函数触发。


schedule调度过程

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {_g_ :&#61; getg() if _g_.m.locks !&#61; 0 {throw("schedule: holding locks")}if _g_.m.lockedg !&#61; 0 {stoplockedm()execute(_g_.m.lockedg.ptr(), false) // Never returns.}// We should not schedule away from a g that is executing a cgo call,// since the cgo call is using the m&#39;s g0 stack.if _g_.m.incgo {throw("schedule: in cgo")}top:if sched.gcwaiting !&#61; 0 {gcstopm()goto top}if _g_.m.p.ptr().runSafePointFn !&#61; 0 {runSafePointFn()}var gp *gvar inheritTime boolif trace.enabled || trace.shutdown {gp &#61; traceReader()if gp !&#61; nil {casgstatus(gp, _Gwaiting, _Grunnable)traceGoUnpark(gp, 0)}}if gp &#61;&#61; nil && gcBlackenEnabled !&#61; 0 {gp &#61; gcController.findRunnableGCWorker(_g_.m.p.ptr()) // 进行GC模式}if gp &#61;&#61; nil { // Check the global runnable queue once in a while to ensure fairness.// Otherwise two goroutines can completely occupy the local runqueue// by constantly respawning each other.if _g_.m.p.ptr().schedtick%61 &#61;&#61; 0 && sched.runqsize > 0 { // 为了公平每隔61个检查一下全局列表中是否有可执行的G如果有则执行lock(&sched.lock)gp &#61; globrunqget(_g_.m.p.ptr(), 1) // 从全局列表中获取一个Gunlock(&sched.lock)}}if gp &#61;&#61; nil { // 如果全局没有获取到或者没从全局获取gp, inheritTime &#61; runqget(_g_.m.p.ptr()) // 从本地的p的队列中获取Gif gp !&#61; nil && _g_.m.spinning {throw("schedule: spinning with local work") // 检查是否是自选}}if gp &#61;&#61; nil {gp, inheritTime &#61; findrunnable() // blocks until work is available 从其他地方获取G如果获取不到则阻塞在这里直到找到}// This thread is going to run a goroutine and is not spinning anymore,// so if it was marked as spinning we need to reset it now and potentially// start a new spinning M.if _g_.m.spinning {resetspinning()}if sched.disable.user && !schedEnabled(gp) {// Scheduling of this goroutine is disabled. Put it on// the list of pending runnable goroutines for when we// re-enable user scheduling and look again.lock(&sched.lock)if schedEnabled(gp) {// Something re-enabled scheduling while we// were acquiring the lock.unlock(&sched.lock)} else {sched.disable.runnable.pushBack(gp)sched.disable.n&#43;&#43;unlock(&sched.lock)goto top}}if gp.lockedm !&#61; 0 {// Hands off own p to the locked m,// then blocks waiting for a new p.startlockedm(gp)goto top}execute(gp, inheritTime) // 找到之后就执行该G
}

调度函数主要执行的流程就是&#xff1b;


  1. 如果隔了61次调度&#xff0c;则本次去全局G列表中去查找一个可执行的G&#xff1b;
  2. 如果不是61次或者61次去查找全局G列表的时候未能找到&#xff0c;则获取本地P中的G列表中的G&#xff1b;
  3. 如果本地都还没有找到则通过findrunnable函数去查找&#xff0c;该函数会分别从全局、poll列表中或者其他的P中去尝试获取可运行的G&#xff0c;如果还没有找到则进入休眠。

G的执行过程

如果在上一步找到了可执行的G&#xff0c;则此时就会执行execute(gp, inheritTime)函数&#xff0c;执行该任务。


G的任务正常执行流程

func execute(gp *g, inheritTime bool) {_g_ :&#61; getg()casgstatus(gp, _Grunnable, _Grunning) // 设置该G位运行可调用可运行状态gp.waitsince &#61; 0gp.preempt &#61; false // 是否抢占式调度标志位gp.stackguard0 &#61; gp.stack.lo &#43; _StackGuard // 设置堆栈if !inheritTime {_g_.m.p.ptr().schedtick&#43;&#43;}_g_.m.curg &#61; gpgp.m &#61; _g_.m// Check whether the profiler needs to be turned on or off.hz :&#61; sched.profilehzif _g_.m.profilehz !&#61; hz {setThreadCPUProfiler(hz)}if trace.enabled {// GoSysExit has to happen when we have a P, but before GoStart.// So we emit it here.if gp.syscallsp !&#61; 0 && gp.sysblocktraced {traceGoSysExit(gp.sysexitticks)}traceGoStart()}gogo(&gp.sched) // 执行G对应的内容
}

主要就是进行了检查和设置标志位之后&#xff0c;再就调用gogo执行&#xff1b;

TEXT runtime·gogo(SB), NOSPLIT, $16-8MOVQ buf&#43;0(FP), BX // gobufMOVQ gobuf_g(BX), DXMOVQ 0(DX), CX // make sure g !&#61; nilget_tls(CX)MOVQ DX, g(CX)MOVQ gobuf_sp(BX), SP // restore SP 将gobuf中保存的现场内容回复MOVQ gobuf_ret(BX), AXMOVQ gobuf_ctxt(BX), DXMOVQ gobuf_bp(BX), BPMOVQ $0, gobuf_sp(BX) // clear to help garbage collectorMOVQ $0, gobuf_ret(BX)MOVQ $0, gobuf_ctxt(BX)MOVQ $0, gobuf_bp(BX)MOVQ gobuf_pc(BX), BX // 将要执行的地址放入BXJMP BX // 跳转执行该处代码

此时我们回到newproc1函数中创建G的过程中的时候&#xff0c;在G执行完成之后的执行地址设置成了goexit函数处。

newg.sched.pc &#61; funcPC(goexit) &#43; sys.PCQuantum // &#43;PCQuantum so that previous instruction is in same function

此时查看goexit函数的执行过程&#xff1b;

// The top-most function running on a goroutine
// returns to goexit&#43;PCQuantum.
TEXT runtime·goexit(SB),NOSPLIT,$0-0BYTE $0x90 // NOPCALL runtime·goexit1(SB) // does not return 调用goexit1// traceback from goexit1 must hit code range of goexitBYTE $0x90 // NOP

func goexit1() {if raceenabled {racegoend()}if trace.enabled {traceGoEnd()}mcall(goexit0) // 切换到g0释放该执行完成的g
}TEXT runtime·mcall(SB), NOSPLIT, $0-8MOVQ fn&#43;0(FP), DIget_tls(CX)MOVQ g(CX), AX // save state in g->schedMOVQ 0(SP), BX // caller&#39;s PCMOVQ BX, (g_sched&#43;gobuf_pc)(AX)LEAQ fn&#43;0(FP), BX // caller&#39;s SPMOVQ BX, (g_sched&#43;gobuf_sp)(AX)MOVQ AX, (g_sched&#43;gobuf_g)(AX)MOVQ BP, (g_sched&#43;gobuf_bp)(AX)// switch to m->g0 & its stack, call fn 切换栈MOVQ g(CX), BXMOVQ g_m(BX), BXMOVQ m_g0(BX), SICMPQ SI, AX // if g &#61;&#61; m->g0 call badmcallJNE 3(PC)MOVQ $runtime·badmcall(SB), AXJMP AXMOVQ SI, g(CX) // g &#61; m->g0MOVQ (g_sched&#43;gobuf_sp)(SI), SP // sp &#61; m->g0->sched.sp 调用g0的sched.sp的内容PUSHQ AXMOVQ DI, DXMOVQ 0(DI), DI CALL DI // 执行该函数POPQ AXMOVQ $runtime·badmcall2(SB), AXJMP AXRET// goexit continuation on g0.
func goexit0(gp *g) {_g_ :&#61; getg()casgstatus(gp, _Grunning, _Gdead) // 设置状态为执行完成if isSystemGoroutine(gp, false) {atomic.Xadd(&sched.ngsys, -1)}gp.m &#61; nil // 设置m为空locked :&#61; gp.lockedm !&#61; 0 // 值重新置空gp.lockedm &#61; 0_g_.m.lockedg &#61; 0gp.paniconfault &#61; falsegp._defer &#61; nil // should be true already but just in case.gp._panic &#61; nil // non-nil for Goexit during panic. points at stack-allocated data.gp.writebuf &#61; nilgp.waitreason &#61; 0gp.param &#61; nilgp.labels &#61; nilgp.timer &#61; nilif gcBlackenEnabled !&#61; 0 && gp.gcAssistBytes > 0 {// Flush assist credit to the global pool. This gives// better information to pacing if the application is// rapidly creating an exiting goroutines.scanCredit :&#61; int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)gp.gcAssistBytes &#61; 0}// Note that gp&#39;s stack scan is now "valid" because it has no// stack.gp.gcscanvalid &#61; truedropg() // 将该G与M的关系if GOARCH &#61;&#61; "wasm" { // no threads yet on wasmgfput(_g_.m.p.ptr(), gp)schedule() // never returns}if _g_.m.lockedInt !&#61; 0 {print("invalid m->lockedInt &#61; ", _g_.m.lockedInt, "\n")throw("internal lockOSThread error")}gfput(_g_.m.p.ptr(), gp) // 放入到空余列表中if locked {// The goroutine may have locked this thread because// it put it in an unusual kernel state. Kill it// rather than returning it to the thread pool.// Return to mstart, which will release the P and exit// the thread.if GOOS !&#61; "plan9" { // See golang.org/issue/22227.gogo(&_g_.m.g0.sched)} else {// Clear lockedExt on plan9 since we may end up re-using// this thread._g_.m.lockedExt &#61; 0}}schedule() // 重新调度
}

至此一个正常的G的一个执行过程就完成了。函数的调用链路如下&#xff1b;


执行完成
schedule函数
execute函数
gogo函数
G协程的内容
goexit函数
mcall函数
goexit0函数


总结

本文只是简单的概述了一下golang中的一些基本场景&#xff0c;然后分析了一下G的调度执行过程&#xff0c;其中有大量的细节还未涉及&#xff0c;只是简单的把正常的G的创建过程和执行流程梳理了一下&#xff0c;具体的调度策略和实现还需要进一步学习与了解。由于本人才疏学浅&#xff0c;如有错误请批评指正。


推荐阅读
  • CentOS7.8下编译muduo库找不到Boost库报错的解决方法
    本文介绍了在CentOS7.8下编译muduo库时出现找不到Boost库报错的问题,并提供了解决方法。文章详细介绍了从Github上下载muduo和muduo-tutorial源代码的步骤,并指导如何编译muduo库。最后,作者提供了陈硕老师的Github链接和muduo库的简介。 ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • Python瓦片图下载、合并、绘图、标记的代码示例
    本文提供了Python瓦片图下载、合并、绘图、标记的代码示例,包括下载代码、多线程下载、图像处理等功能。通过参考geoserver,使用PIL、cv2、numpy、gdal、osr等库实现了瓦片图的下载、合并、绘图和标记功能。代码示例详细介绍了各个功能的实现方法,供读者参考使用。 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 本文介绍了深入浅出Linux设备驱动编程的重要性,以及两种加载和删除Linux内核模块的方法。通过一个内核模块的例子,展示了模块的编译和加载过程,并讨论了模块对内核大小的控制。深入理解Linux设备驱动编程对于开发者来说非常重要。 ... [详细]
  • Day2列表、字典、集合操作详解
    本文详细介绍了列表、字典、集合的操作方法,包括定义列表、访问列表元素、字符串操作、字典操作、集合操作、文件操作、字符编码与转码等内容。内容详实,适合初学者参考。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 解决github访问慢的问题的方法集锦
    本文总结了国内用户在访问github网站时可能遇到的加载慢的问题,并提供了解决方法,其中包括修改hosts文件来加速访问。 ... [详细]
  • 本文介绍了利用ARMA模型对平稳非白噪声序列进行建模的步骤及代码实现。首先对观察值序列进行样本自相关系数和样本偏自相关系数的计算,然后根据这些系数的性质选择适当的ARMA模型进行拟合,并估计模型中的位置参数。接着进行模型的有效性检验,如果不通过则重新选择模型再拟合,如果通过则进行模型优化。最后利用拟合模型预测序列的未来走势。文章还介绍了绘制时序图、平稳性检验、白噪声检验、确定ARMA阶数和预测未来走势的代码实现。 ... [详细]
  • 第四讲ApacheLAMP服务器基本配置Apache的编译安装从Apache的官方网站下载源码包:http:httpd.apache.orgdownload.cgi今 ... [详细]
  • 原文地址http://balau82.wordpress.com/2010/02/28/hello-world-for-bare-metal-arm-using-qemu/最开始时 ... [详细]
  • Linux线程的同步和互斥
    目录1、线程的互斥2、可重入VS线程安全3、线程的同步1、线程的互斥 ... [详细]
  • const限定符全解一、const修饰普通变量  intconsta500;  constinta600;  上述两种情况相同,都是声明一个const型的变量,它们 ... [详细]
  • NotSupportedException无法将类型“System.DateTime”强制转换为类型“System.Object”
    本文介绍了在使用LINQ to Entities时出现的NotSupportedException异常,该异常是由于无法将类型“System.DateTime”强制转换为类型“System.Object”所导致的。同时还介绍了相关的错误信息和解决方法。 ... [详细]
author-avatar
七彩咩_131
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有