热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

扒一扒Kotlin协程的底裤

0.前言Kotlin1.3开始,协程从experimental变成了release,前些日子看了看简单的用法,今天就从源码的角度来看看Kotlin的协程究竟是怎样形成的.1.问题看
0.前言

Kotlin1.3开始,协程从experimental变成了release,前些日子看了看简单的用法,今天就从源码的角度来看看Kotlin的协程究竟是怎样形成的.

1.问题

看源码要带着问题,我决定从以下三个问题来进行分析

1.1协程是如何创建的

1.2协程间是如何切换的

1.3协程是如何绑定到指定线程的

2.分析

2.1协程是如何创建的

启动一个协程的方法

GlobalScope.launch { // launch new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}

这段代码就是启动一个协程,并启动,延迟1秒后打印world,就从这个launch方法进行切入

public fun CoroutineScope.launch(
context: CoroutineCOntext= EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newCOntext= newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

代码很清楚,根据CoroutineStart是不是CoroutineStart.LAZY对象,创建不同的Job实现类,默认我们传入的start参数为CoroutineStart.DEFAULT,这时我们创建的是一个StandaloneCoroutine对象,调用它的start方法启动,然后对它进行返回。

2.2协程间是如何切换的

GlobalScope.launch(Dispatchers.Default){
println("Current thread is ${Thread.currentThread().name}")
launch {
delay(1000)
println("now")
}
println("next")
}

看一下这段代码,这段代码先打印出next,然后延迟1秒钟后打印出now,有没有一种感觉,这像是android里handler的post和postDelay方法。首先看一下delay方法


@InternalCoroutinesApi
public interface Delay {
suspend fun delay(time: Long) {
if (time <= 0) return // don't delay
return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, it) }
}
fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation)
fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block)
}
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay

delay方法在Delay.kt文件里,可以看到,这里定义了一个Delay接口,scheduleResumeAfterDelay是用来重新把任务恢复调度的,invokeOnTimeout显然是调度过程中发现时间到了以后要恢复执行的方法体。Delay是一个接口,看一它的实现类是如何实现scheduleResumeAfterDelay方法的。

internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
...
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) =
schedule(DelayedResumeTask(timeMillis, continuation))

...

先看DelayResumeTask

private inner class DelayedResumeTask(
timeMillis: Long,
private val cont: CancellableContinuation
) : DelayedTask(timeMillis) {
init {
// Note that this operation isn't lock-free, but very short
cont.disposeOnCancellation(this)
}
override fun run() {
with(cont) { resumeUndispatched(Unit) }
}
}

这个类继承自DelayTask,而DelayedTask实现了runnable接口,这里复写了run方法,调用了CancellableContinuation的resumeUndispatched方法。通过方法名可以看出经过等待时间后就会恢复执行。CancellableContinuation的实现类是CancellableContinuationImp跟进去看一看这个类

@PublishedApi
internal open class CancellableContinuationImpl(
delegate: Continuation,
resumeMode: Int
) : AbstractContinuation(delegate, resumeMode), CancellableContinuation, Runnable {
...
override fun completeResume(token: Any) = completeStateUpdate(token as NotCompleted, state, resumeMode)
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
val dc = delegate as? DispatchedContinuation
resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}
...
}

resumeUndispatched方法里调用了resumeImp方法,这是继承自AbstractContinuation的方法

protected fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
loopOnState { state ->
when (state) {
is NotCompleted -> {
if (updateStateToFinal(state, proposedUpdate, resumeMode)) return
}
is CancelledContinuation -> {
/*
* If continuation was cancelled, then all further resumes must be
* ignored, because cancellation is asynchronous and may race with resume.
* Racy exception are reported so no exceptions are lost
*
* :todo: we should somehow remember the attempt to invoke resume and fail on the second attempt.
*/
if (proposedUpdate is CompletedExceptionally) {
handleException(proposedUpdate.cause)
}
return
}
else -> error("Already resumed, but proposed with update $proposedUpdate")
}
}
}

这里会根据不同的状态调用不同的方法.

private fun updateStateToFinal(expect: NotCompleted, proposedUpdate: Any?, mode: Int): Boolean {
...
completeStateUpdate(expect, proposedUpdate, mode)
return true
}protected fun completeStateUpdate(expect: NotCompleted, update: Any?, mode: Int) {
...
dispatchResume(mode)
} private fun dispatchResume(mode: Int) {
if (tryResume()) return // completed before getResult invocation -- bail out
// otherwise, getResult has already commenced, i.e. completed later or in other thread
dispatch(mode)
}internal fun DispatchedTask.dispatch(mode: Int = MODE_CANCELLABLE) {
val delegate = this.delegate
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
// dispatch directly using this instance's Runnable implementation
val dispatcher = delegate.dispatcher
val cOntext= delegate.context
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.resumeUndispatched(this)
}
} else {
resume(delegate, mode)
}
}

删掉了不相关的代码,只保留dispatch这条主线,相信很容易个看明白最终又把这个任务放回到Dispatcher里面去了。那个else分支的resume其实内部调用的是Continuation.resume扩展方法,最终一样要调用到resumeImpl中,又回到上面已经分析的流程里了,这是处理有Continuation代理的情况。以上就是当delay时间到达后协程是如何重新恢复的。

接下来看一看延时是如何实现的,协程里有个默认的DefaultExecutor线程用来执行协程代码

override fun run() {
timeSource.registerTimeLoopThread()
try {
var shutdownNanos = Long.MAX_VALUE
if (!notifyStartup()) return
while (true) {
Thread.interrupted() // just reset interruption flag
var parkNanos = processNextEvent()
if (parkNanos == Long.MAX_VALUE) {
// nothing to do, initialize shutdown timeout
if (shutdownNanos == Long.MAX_VALUE) {
val now = timeSource.nanoTime()
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
val tillShutdown = shutdownNanos - now
if (tillShutdown <= 0) return // shut thread down
parkNanos = parkNanos.coerceAtMost(tillShutdown)
} else
parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
}
if (parkNanos > 0) {
// check if shutdown was requested and bail out in this case
if (isShutdownRequested) return
timeSource.parkNanos(this, parkNanos)
}
}
} finally {
_thread = null // this thread is dead
acknowledgeShutdownIfNeeded()
timeSource.unregisterTimeLoopThread()
// recheck if queues are empty after _thread reference was set to null (!!!)
if (!isEmpty) thread() // recreate thread if it is needed
}
}

override fun processNextEvent(): Long {
if (!isCorrectThread()) return Long.MAX_VALUE
// queue all delayed tasks that are due to be executed
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = timeSource.nanoTime()
while (true) {
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
// to make sure that 'isEmpty' and `nextTime` that check both of them
// do not transiently report that both delayed and queue are empty during move
delayed.removeFirstIf {
if (it.timeToExecute(now)) {
enqueueImpl(it)
} else
false
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
}
}
// then process one event from queue
dequeue()?.run()
return nextTime
}

DefaultExecutor不断获取task并执行,而这些task事件就是存储在_delayed里的,这里可以将_delayed理解为一个队列。简述这两段代码做的事情就是就是死循环遍历task队列该执行的就执行并出队,没到执行时间的就留在队列。
总结一下,协程就是维持了一个类似android Looper和MessageQueuen的东西,将要执行的代码封装成Coroutine放入队列,然后通过循环并根据一定条件不停的取出执行。

2.3协程是如何绑定到指定线程的

回到launch方法

public fun CoroutineScope.launch(
context: CoroutineCOntext= EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newCOntext= newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

看一下StandaloneCoroutine的start方法

public fun start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}

start(block, receiver, this)调用的就是CoroutineStart里的invoke方法,这里其实是CoroutineStart对操作符进行了复写,并不是递归调用,这个start就是launch方法传进来的,默认是CoroutineStart.DEFAULT,这是一个枚举对象

@InternalCoroutinesApi
public operator fun invoke(block: suspend R.() -> T, receiver: R, completion: Continuation) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}

internal fun (suspend () -> T).startCoroutineCancellable(completion: Continuation) =
createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)internal fun Continuation.resumeCancellable(value: T) = when (this) {
is DispatchedContinuation -> resumeCancellable(value)
else -> resume(value)
}
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeCancellable(value: T) {
if (dispatcher.isDispatchNeeded(context)) {
_state = value
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.execute(this, value, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatched(value)
}
}
}
}

总之到了这里,就是通过 dispatcher.dispatch(&#8230;)把这个任务分发给线程/线程池去执行了,分发方式根据CoroutineStart对象有关。

3.总结一下

上面说了很多源码上的东西,画张图,方便理解

《扒一扒Kotlin协程的底裤》 image

Continuation存放着协程要执行的代码块,协程要执行时放入EventLoop的队列里,根据一定规则从里面取出Continuation来执行。同时EventLoop里指定了Continuation执行时所在的线程

《扒一扒Kotlin协程的底裤》 image

关注我的公众号


推荐阅读
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 标题: ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • PDO MySQL
    PDOMySQL如果文章有成千上万篇,该怎样保存?数据保存有多种方式,比如单机文件、单机数据库(SQLite)、网络数据库(MySQL、MariaDB)等等。根据项目来选择,做We ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • 本文介绍了在MFC下利用C++和MFC的特性动态创建窗口的方法,包括继承现有的MFC类并加以改造、插入工具栏和状态栏对象的声明等。同时还提到了窗口销毁的处理方法。本文详细介绍了实现方法并给出了相关注意事项。 ... [详细]
  • Android实战——jsoup实现网络爬虫,糗事百科项目的起步
    本文介绍了Android实战中使用jsoup实现网络爬虫的方法,以糗事百科项目为例。对于初学者来说,数据源的缺乏是做项目的最大烦恼之一。本文讲述了如何使用网络爬虫获取数据,并以糗事百科作为练手项目。同时,提到了使用jsoup需要结合前端基础知识,以及如果学过JS的话可以更轻松地使用该框架。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • 本文讨论了在Spring 3.1中,数据源未能自动连接到@Configuration类的错误原因,并提供了解决方法。作者发现了错误的原因,并在代码中手动定义了PersistenceAnnotationBeanPostProcessor。作者删除了该定义后,问题得到解决。此外,作者还指出了默认的PersistenceAnnotationBeanPostProcessor的注册方式,并提供了自定义该bean定义的方法。 ... [详细]
author-avatar
王一飞
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有