Kotlin协程原理

前言

Java 19 引入了Virtual Threads并Preview,这个便是协程,对于它的实现原理还未做了解,理论上应该为此在JVM层面会有一些Support,但也有一个疑惑,就是在此之前,不少JVM语言已经有协程了,它们是如何在API层面来实现这个的呢(似乎JVM不支持这个的样子),所以对Kotlin的协程做了些了解,虽然当前还是很懵,但也算有些收获。文章主体引用了文末的第一篇文章。第一篇有些浅显,第二篇比较详细,有时间慢慢看一下。

总结下kotlin的协程实现原理:continuation 方法+ 状态机(每个continuation 风格的方法会创建一个状态机)

为什么使用Kotlin协程

在 Android 上,避免阻塞主线程是非常必要的。主线程是一个处理所有界面更新的线程,也是调用所有点击处理程序和其他界面回调的线程。因此,主线程必须顺畅运行才能确保出色的用户体验

在实际开发中我们会遇到这种场景

  • 创建子线程执行耗时操作,然后切换到主线程处理界面显示逻辑;但是如果我们在一个接口请求完成后,拿到这个接口返回的结果,在需要去请求另一个接口时,逻辑就会十分复杂,就会出现回调地狱;并且如果在需要考虑接口失败的场景呢?
  • 使用Rxjava优化上面的场景;没有内存泄漏、支持取消、正确的使用线程,但是它比较复杂,如 subscribeOnobserveOnmap 或者 subscribe,都需要学习

这个时候kotlin 协程就出场了,它可以帮我们解决上面的痛点

前置知识

在阅读 Kotlin 源码之前,可以先了解一些前置知识。

Function

Function 是 Kotlin 对函数类型的封装,对于函数类型,它会被编译成 FunctionX 系列的类:

// 0 个参数
public interface Function0<out R> : Function<R> {
    public operator fun invoke(): R
}

// 1 个参数
public interface Function1<in P1, out R> : Function<R> {
    public operator fun invoke(p1: P1): R
}

// X 个参数
public interface Function4<in P1, in P2, in P3, in P4, out R> : Function<R> {
    /** Invokes the function with the specified arguments. */
    public operator fun invoke(p1: P1, p2: P2, p3: P3, p4: P4): R
}

Kotlin 提供了从 Function0 到 Function22 之间的接口,这意味着我们的 lambda 函数最多可以支持 22 个参数,另外 Function 接口有一个 invoke 操作符重载,因此我们可以直接通过 () 调用 lambda 函数:

val sum = { a: Int, b: Int ->
    a + b
}
sum(10, 12)
sum.invoke(10, 12)

编译成 Java 代码后:

Function2 sum = (Function2)null.INSTANCE;
sum.invoke(10, 12);
sum.invoke(10, 12);

// lambda 编译后的类
final class KotlinTest$main$sum$1 extends Lambda implements Function2<Integer, Integer, Integer> {
    public static final KotlinTest$main$sum$1 INSTANCE = new KotlinTest$main$sum$1();

    KotlinTest$main$sum$1() {
        super(2);
    }

    @Override // kotlin.jvm.functions.Function2
    public /* bridge */ /* synthetic */ Integer invoke(Integer num, Integer num2) {
        return invoke(num.intValue(), num2.intValue());
    }

    public final Integer invoke(int a, int b) {
        return Integer.valueOf(a + b);
    }
}

可以看到对于 lambda 函数,在编译后会生成一个实现 Function 接口的类,并在使用 lambda 函数时创建一个单例对象来调用,创建对象的过程是编译器自动生成的代码

而对于协程里的 lambda 代码块,也会为其创建一个对象,它实现 FunctionX 接口,并继承 SuspendLambda 类,不一样的地方在于它会自动增加一个 Continuation 类型的参数。

Continuation Passing Style(CPS)

Continuation Passing Style(续体传递风格): 约定一种编程规范,函数不直接返回结果值,而是在函数最后一个参数位置传入一个 callback 函数参数,并在函数执行完成时通过 callback 来处理结果。回调函数 callback 被称为续体(Continuation),它决定了程序接下来的行为,整个程序的逻辑通过一个个 Continuation 拼接在一起。

Kotlin 协程本质就是利用 CPS 来实现对过程的控制,并解决了 CPS 会产生的问题(如回调地狱,栈空间占用)

  • Kotlin suspend 挂起函数写法与普通函数一样,但编译器会对 suspend 关键字的函数做 CPS 变换,这就是咱们常说的用看起来同步的方式写出异步的代码,消除回调地狱(callback hell)。
  • 另外为了避免栈空间过大的问题, Kotlin 编译器并没有把代码转换成函数回调的形式,而是利用状态机模型。每两个挂起点之间可以看为一个状态,每次进入状态机时都有一个当前的状态,然后执行该状态对应的代码;如果程序执行完毕则返回结果值,否则返回一个特殊值,表示从这个状态退出并等待下次进入。相当于创建了一个可复用的回调,每次都使用这同一个回调,根据不同状态来执行不同的代码。

Continuation

Kotlin 续体有两个接口: Continuation 和 CancellableContinuation, 顾名思义 CancellableContinuation 是一个可以取消的 Continuation。

Continuation 成员

  • val context: CoroutineContext: 当前协程的 CoroutineContext 上下文
  • fun resumeWith(result: Result<T>): 传递 result 恢复协程

CancellableContinuation 成员

  • isActive, isCompleted, isCancelled: 表示当前 Continuation 的状态
  • fun cancel(cause: Throwable? = null): 可选通过一个异常 cause 来取消当前 Continuation 的执行

可以将 Continuation 看成是在挂起点恢复后需要执行的代码封装(通过状态机实现的),比如说对如下逻辑:

suspend fun request() = suspendCoroutine<Response> {
    val response = doRequest()
    it.resume(response)
}

fun test() = runBlocking {
    val response = request()
    handle(response)
}

用下面的伪代码简单描述 Continuation 的工作:

// 假装是 Continuation 接口
interface Continuation<T> {
    fun resume(t: T)
}

fun request(continuation: Continuation<Response>) {
    val response = doRequest()
    continuation.resume(response)
}

fun test() {
    request(object :Continuation<Response>{
        override fun resume(response: Response) {
            handle(response)
        }
    })
}

对于 suspend 关键词修饰的挂起函数,编译器会为其增加一个 Continuation 续体类型的参数(相当于 CPS 中的回调),可以通过这个 Continuation 续体对象的 resume 方法返回结果值来恢复协程的执行

协程创建与启动

SuspendLambda

Kotlin 编译时会将 lambda 协程代码块编译成 SuspendLambda 的子类:

private suspend fun getId(): String {
    return GlobalScope.async(Dispatchers.IO) {
        delay(1000)
        "hearing"
    }.await()
}

private suspend fun getAvatar(id: String): String {
    return GlobalScope.async(Dispatchers.IO) {
        delay(1000)
        "avatar-$id"
    }.await()
}

fun main() {
    GlobalScope.launch {
        val id = getId()
        val avatar = getAvatar(id)
        println("${Thread.currentThread().name} - $id - $avatar")
    }
}

对应的字节码可以看到:

BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null,
    (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
    int label;

    public final Object invokeSuspend(@NotNull Object $result) {
        Object var10000;
        String id;
        label17: {
            CoroutineScope $this$launch;
            switch(this.label) {
            case 0: // a
                ResultKt.throwOnFailure($result);
                $this$launch = this.p$;
                this.label = 1; // label置为1
                var10000 = getId(this);
                if (var10000 == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                }
                // 若此时已经有结果,则不挂起,直接break
                break;
            case 1: // b
                ResultKt.throwOnFailure($result);
                var10000 = $result;
                break;
            case 2: // d
                id = (String)this.L$1;
                ResultKt.throwOnFailure($result);
                var10000 = $result;
                break label17; // 退出label17
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            // c
            id = (String)var10000;
            this.L$1 = id; // 将id赋给L$1
            this.label = 2; // label置为2
            var10000 = getAvatar(id, this);
            if (var10000 == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED;
            }
        }
        // e
        String avatar = (String)var10000;
        String var5 = var9.append(var10001.getName()).append(" - ").append(id).append(" - ").append(avatar).toString();
        System.out.println(var5);
        return Unit.INSTANCE;
    }

    @NotNull
    public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
        Intrinsics.checkParameterIsNotNull(completion, "completion");
        Function2 var3 = new <anonymous constructor>(completion);
        var3.p$ = (CoroutineScope)value;
        return var3;
    }

    public final Object invoke(Object var1, Object var2) {
        return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
    }
}


final class Main$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2

SuspendLambda 实现了 Continuation 续体接口,其 resume 方法可以恢复协程的执行;另外它将协程体封装成 SuspendLambda 对象,其内以状态机的形式消除回调地狱,并实现逻辑的顺序执行

当我们调用 launch()withContext() 等协程构建器函数时,对应的协程代码块会被 Kotlin 编译器转换为一个匿名内部类,这个匿名内部类继承了 SuspendLambda ,实现了 Function2 接口。

SuspendLambda挂起 Lambda 表示式 的运行时类型,它重写了父类 BaseContinuationImplinvokeSuspend() 方法和 create() 方法,invokeSuspend() 方法中的代码就是协程代码块中的代码,而 create() 方法则是用来实例化代码块对应的匿名类。

我们定义的suspend相关的方法返回值变成了Object,并且多了个Continuation类型的参数,这是为什么呢?

  • 我们知道挂起函数必须在协程作用域或者挂起函数里面才能调用,就是因为编辑器通过CPS转换帮我们加入了入参Continuation,普通函数没有这个参数,所以无法调用挂起函数。
  • 用suspend修饰的方法也有可能没有真正挂起,当真正挂起时,这个方法会返回CoroutineSingletons.COROUTINE_SUSPENDED,,没有真正挂起时这块返回的就是我们方法需要返回的对象,所以这块用了Object作为函数的返回值
  • 可以看一下这个

这里我们根据上面的注释以及字母标签来看一下执行流程(invokeSuspend 方法会在协程体中的 suspend 函数得到结果后被调用,具体是在哪里被调用的稍后会讲到):

  • a: launch 协程体刚执行到 getId 方法时,getId 方法的返回值将是 COROUTINE_SUSPENDED, 此时直接 return, 则 launch 协程体中 getId 后面的代码暂时不会执行,即 launch 协程体被挂起(非阻塞, 该线程依旧会做其它工作)。这里将 label 置为了 1. 而若此时 getId 已经有结果(内部没有调用 delay 之类的 suspend 函数等),则不挂起,而是直接 break。
  • b: 若上面 a 中 getId 返回 COROUTINE_SUSPENDED, 则当 getId 有可用结果返回后,会重新执行 launch 协程体的 invokeSuspend 方法,根据上面的 label==1, 会执行到这里检查一下 result 没问题的话就 break, 此时 id 赋值给了 var10000。
  • c: 在 a 中若直接 break 或 在 b 中得到 getId 的结果然后 break 后,都会执行到这里,得到 id 的值并把 label 置为2。然后调用 getAvatar 方法,跟 getId 类似,若其返回 COROUTINE_SUSPENDED 则 return,协程被挂起,等到下次 invokeSuspend 被执行,否则离开 label17 接着执行后续逻辑。
  • d: 若上面 c 中 getAvatar 返回 COROUTINE_SUSPENDED, 则当 getAvatar 有可用结果返回后会重新调用 launch 协程体的 invokeSuspend 方法,此时根据 label==2 来到这里并取得之前的 id 值,检验 result(即avatar),然后break label17。
  • e: c 中直接返回了可用结果 或 d 中 break label17 后,launch 协程体中的 suspend 函数都执行完毕了,这里会执行剩下的逻辑。

suspend 函数不会阻塞线程,且 suspend 函数不一定会挂起协程,如果相关调用的结果已经可用,则继续运行而不挂起,例如 async{} 返回值 Deferred 的结果已经可用时,await()挂起函数可以直接返回结果,不用再挂起协程。

继承关系

SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation, 因此走 create(receiver, completion) 方法,从上面反编译出的 Java 代码可以看到 create 方法创建了一个 Continuation 实例(create 方法创建了 Main$main$1 实例)

- Continuation: 续体,恢复协程的执行
    - BaseContinuationImpl: 实现 resumeWith(Result) 方法,控制状态机的执行,定义了 invokeSuspend 抽象方法
        - ContinuationImpl: 增加 intercepted 拦截器,实现线程调度等
            - SuspendLambda: 封装协程体代码块
                - 协程体代码块生成的子类: 实现 invokeSuspend 方法,其内实现状态机流转逻辑

协程启动流程

CoroutineScope.launch

CoroutineScope.launch 开始跟踪协程启动流程:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    // newContext = scope作用域上下文 + context参数上下文 + Dispatchers.Default(未指定则添加)
    val newContext = newCoroutineContext(context)
    // 创建协程对象
    val coroutine = if (start.isLazy) {
        LazyStandaloneCoroutine(newContext, block)
    } else {
        StandaloneCoroutine(newContext, active = true)
    }
    // 启动协程
    coroutine.start(start, coroutine, block)
    return coroutine
}

// 启动协程
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    start(block, receiver, this)
}

上面 coroutine.start 的调用涉及到运算符重载,实际上会调到 CoroutineStart.invoke() 方法:

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        ATOMIC -> block.startCoroutine(receiver, completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        LAZY -> Unit // will start lazily
    }

我们可以注意下 completion 参数,它是一个续体 Continuation 类型,此时传入的实参为 StandaloneCoroutine/LazyStandaloneCoroutine 对象,在协程体的逻辑执行完后会调用到其 resume 方法(CPS),做一些收尾工作,比如说修改状态等

此时 receiver 和 completion 都是 launch() 中创建的 StandaloneCoroutine 协程对象。接着往下看:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) = runSafely(completion) {
    // 重新创建 SuspendLambda 子类对象
    createCoroutineUnintercepted(receiver, completion)
        // 调用拦截器逻辑,进行线程调度等
        .intercepted()
        // 真正执行协程逻辑
        .resumeCancellableWith(Result.success(Unit), onCancellation)
}

创建SuspendLambda

看看上面 createCoroutineUnintercepted 中的代码:

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

我们在前面说过,这个协程体会被编译成 SuspendLambda 的子类,其也是 BaseContinuationImpl 的子类对象,因此会走上面的 create() 方法,通过 completion 续体参数创建一个新的 SuspendLambda 对象,这是下面的协程三层包装里的第二层包装,它持有的 completion 对象是第一层封装(AbstractCoroutine)。

所以在协程启动过程中针对一个协程体会创建两个 SuspendLambda 的子类对象:

  1. 调用 launch() 时创建第一个,传入 null 作为参数,作为一个普通的 Function 对象使用
  2. 调用 create() 时创建第二个,传入 completion 续体作为参数
BuildersKt.launch$default(/*...*/ (Function2)(new Function2((Continuation)null))

线程调度

接着调用 SuspendLambda.intercepted() 方法执行拦截器逻辑,从上下文中获取拦截器(Dispatcher调度器)拦截当前 continuation 对象,将其包装成 DispatchedContinuation 类型,这就是协程的第三层包装,封装了线程调度等逻辑,其 continuation 参数就是第二层包装(SuspendLambda)实例。

线程调度的具体逻辑

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    // 如果是ContinuationImpl类型,则调用intercepted方法,否则返回自身
    // 这里的 this 是 Main$main$1 实例 - ContinuationImpl的子类
    (this as? ContinuationImpl)?.intercepted() ?: this

// ContinuationImpl
public fun intercepted(): Continuation<Any?> =
    // context[ContinuationInterceptor]是 CoroutineDispatcher 实例
    // 需要线程调度 - 返回 DispatchedContinuation,其 continuation 参数值为 SuspendLambda
    // 不需要线程调度 - 返回 SuspendLambda
    intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }

// CoroutineDispatcher
// continuation - SuspendLambda -> ContinuationImpl -> BaseContinuationImpl
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    DispatchedContinuation(this, continuation)

启动协程

在通过 SuspendLambda 对象创建了 DispatchedContinuation 续体后,接着执行其 resumeCancellableWith() 方法,最终会调用到 continuation.resumeWith(result) 方法,而这个 continuation 就是之前传入的第二层封装 SuspendLambda 对象,其 resumeWith() 方法在父类 BaseContinuationImpl 中:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
    public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
        // 进行线程调度,最后也会执行到continuation.resumeWith方法
        is DispatchedContinuation -> resumeCancellableWith(result)
        // 直接执行continuation.resumeWith方法
        else -> resumeWith(result)
    }

    inline fun resumeCancellableWith(result: Result<T>) {
        val state = result.toState()
        // 判断是否需要线程调度
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            // 需要调度则先进行调度
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled()) {
                    // 不需要调度则直接在当前线程执行协程
                    resumeUndispatchedWith(result)
                }
            }
        }
    }

    inline fun resumeUndispatchedWith(result: Result<T>) {
        withCoroutineContext(context, countOrElement) {
            continuation.resumeWith(result)
        }
    }
}
  • 当需要线程调度时,则在调度后会调用 DispatchedContinuation.continuation.resumeWith 来启动协程,其中 continuation 是 SuspendLambda 实例;
  • 当不需要线程调度时,则直接调用 SuspendLambda.resumeWith 来启动协程

resumeWith 方法调用的是父类 BaseContinuationImpl 中的 resumeWith 方法:

internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        // ...
        val outcome = invokeSuspend(param)
        // ...
    }
}

协程的启动是通过 BaseContinuationImpl.resumeWith 方法调用到了子类 SuspendLambda.invokeSuspend 方法,然后通过状态机来控制顺序运行。

上面的 invokeSuspend() 是一个抽象方法,它的实现在编译器生成的 SuspendLambda 子类中,具体逻辑是通过状态机来执行协程体中的逻辑,具体见下章解析。

到这里我们 launch() 里的协程体逻辑就开始真正执行了。

协程挂起与恢复

协程的启动,挂起和恢复有两个关键方法: invokeSuspend()resumeWith(Result)invokeSuspend() 方法是对协程代码块的封装,内部加入状态机机制将整个逻辑分为多块,分隔点就是每个挂起点。协程启动时会先调用一次 invokeSuspend() 函数触发协程体的开始执行,后面每当调用到一个挂起函数时,挂起函数会返回 COROUTINE_SUSPENDED 标识,从而 return 停掉 invokeSuspend() 函数的执行,即非阻塞挂起。编译器会为挂起函数自动添加一个 continuation 续体对象参数,表示调用它的那个协程代码块,在该挂起函数执行完成后,就会调用到续体 continuation.resumeWith() 方法来返回结果(或异常),而在 resumeWith() 中又调用了 invokeSuspend() 方法,其内根据状态机的状态来恢复协程的执行。这就是整个协程的挂起和恢复过程。

接下来看具体解析。

协程的状态机

对如下代码:

fun main() = CoroutineScope(Dispatchers.Main).launch {
    println("label 0")
    val isLogin = checkLogin() // suspend

    println("label 1")
    println(isLogin)
    val login = login() // suspend

    println("label 2")
    println(login)
    val id = getId() // suspend

    println("label 3")
    println(id)
}

对于协程体中的代码,首个挂起点前的代码可看为初始状态, 其后每两个挂起点之间都是一个新的状态,最后一个挂起点到结束是最终的状态。其对应的状态机伪代码如下,协程体被编译成 SuspendLambda 子类,它实现父类中的 invokeSuspend() 方法,是协程的真正执行逻辑:

final class KotlinTest$main$1 extends SuspendLambda implements Function2 {
    int label = 0;  // 状态码

    public final Object invokeSuspend(Object result) {
        switch(this.label) {
            case 0:
                println("label 0");
                label = 1;
                result = checkLogin(this); // this 是编译器添加的续体参数
                if (result == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                }
                break;
            case 1:
                // 此时传入的 result 是 checkLogin() 的结果
                println("label 1")
                val isLogin = result;
                println(isLogin)
                label = 2;
                result = login(this); // this 是编译器添加的续体参数
                if (result == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                }
                break;
            case 2:
                // 此时传入的 result 是 login() 的结果
                println("label 2")
                val login = result;
                println(login)
                label = 3;
                result = getId(this); // this 是编译器添加的续体参数
                if (result == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                }
                break;
            case 3:
                // 此时传入的 result 是 getId() 的结果
                println("label 3")
                val id = result;
                println(id)
                return;
        }
    }
}

看上面每次调用 suspend 函数时都会传一个 this 参数(continuation),这个参数是编译器添加的续体参数,表示的是协程体自身,在 suspend 挂起函数执行完毕后会调用 continuation.resumeWith() -> invokeSuspend(result) 来恢复该状态机的执行。

协程挂起

上面给出了协程体 SuspendLambda.invokeSuspend() 方法的状态机伪代码,那再看下 SuspendLambda 父类 BaseContinuationImpl 中的 resumeWith() 方法:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            with(current) {
                val outcome: Result<Any?> = try {
                    // invokeSuspend() 执行续体下一个状态的逻辑
                    val outcome = invokeSuspend(param)
                    // 如果续体里调用到了挂起函数,则直接 return
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {
                    Result.failure(exception)
                }
                if (completion is BaseContinuationImpl) {
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    // 对于 launch 启动的协程体,传入的 completion 是 AbstractCoroutine 子类对象
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}

我们说过协程启动后会调用到上面这个 resumeWith() 方法,接着调用其 invokeSuspend() 方法:

  1. 当 invokeSuspend() 返回 COROUTINE_SUSPENDED 后,就直接 return 终止执行了,此时协程被挂起。
  2. 当 invokeSuspend() 返回非 COROUTINE_SUSPENDED 后,说明协程体执行完毕了,对于 launch 启动的协程体,传入的 completion 是 AbstractCoroutine 子类对象,最终会调用其 AbstractCoroutine.resumeWith() 方法做一些状态改变之类的收尾逻辑。至此协程便执行完毕了。

协程恢复

这里我们接着看上面第一条:协程执行到挂起函数被挂起后,当这个挂起函数执行完毕后是怎么恢复协程的,以下面挂起函数为例:

private suspend fun login() = withContext(Dispatchers.IO) {
    Thread.sleep(1000)
    return@withContext true
}

通过反编译可以看到上面挂起函数中的函数体也被编译成了 SuspendLambda 的子类,创建其实例时也需要传入 Continuation 续体参数(调用该挂起函数的协程所在续体)。贴下 withContext 的源码:

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // compute new context
        val oldContext = uCont.context
        val newContext = oldContext + context
        // always check for cancellation of new context
        newContext.ensureActive()
        // FAST PATH #1 -- new context is the same as the old one
        if (newContext === oldContext) {
            val coroutine = ScopeCoroutine(newContext, uCont)
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
        // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // There are changes in the context, so this thread needs to be updated
            withCoroutineContext(newContext, null) {
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // SLOW PATH -- use new dispatcher
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}

首先调用了 suspendCoroutineUninterceptedOrReturn 方法,看注释知道可以通过它来获取到当前的续体对象 uCont, 接着有几条分支调用,但最终都是会通过续体对象来创建挂起函数体对应的 SuspendLambda 对象,并执行其 invokeSuspend() 方法,在其执行完毕后调用 uCont.resume() 来恢复协程,具体逻辑大家感兴趣可以自己跟代码,与前面大同小异

至于其他的顶层挂起函数如 await(), suspendCoroutine(), suspendCancellableCoroutine() 等,其内部也是通过 suspendCoroutineUninterceptedOrReturn() 来获取到当前的续体对象,以便在挂起函数体执行完毕后,能通过这个续体对象恢复协程执行。

协程库没有直接提供创建续体对象的方式,一般都是通过 suspendCoroutineUninterceptedOrReturn() 函数获取的,感兴趣的同学可以看看这个方法的注释: Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension...

总结

Kotlin 协程本质就是利用 CPS 来实现对过程的控制,并解决了 CPS 会产生的问题(如回调地狱,栈空间占用)。

Kotlin suspend 挂起函数写法与普通函数一样,但编译器会对 suspend 关键字的函数做 CPS 变换;Kotlin 编译器并没有把代码转换成函数回调的形式,而是利用状态机模型,消除 callback hell, 解决栈空间占用问题。

即将协程代码块编译成 SuspendLambda 子类,实现 invokeSuspend() 方法。

invokeSuspend() 方法是对协程代码块的封装,内部加入状态机机制将整个逻辑分为多块,分隔点就是每个挂起点。协程启动时会先调用一次 invokeSuspend() 函数触发协程体的开始执行,后面每当调用到一个挂起函数时,挂起函数会返回 COROUTINE_SUSPENDED 标识,从而 return 停掉 invokeSuspend() 函数的执行,即非阻塞挂起。编译器会为挂起函数自动添加一个 continuation 续体对象参数,表示调用它的那个协程代码块,在该挂起函数执行完成后,就会调用到续体 continuation.resumeWith() 方法来返回结果(或异常),而在 resumeWith() 中又调用了 invokeSuspend() 方法,其内根据状态机的状态来恢复协程的执行。

Kotlin 协程中存在三层包装,每层包装都持有上层包装的引用,用来执行其 resumeWith() 方法做一些处理:

  • 第一层包装: launch & async 返回的 Job, Deferred 继承自 AbstractCoroutine, 里面封装了协程的状态,提供了 cancel 等接口;
  • 第二层包装: 编译器生成的 SuspendLambda 子类,封装了协程的真正执行逻辑,其继承关系为 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, 它的 completion 参数就是第一层包装实例;
  • 第三层包装: DispatchedContinuation, 封装了线程调度逻辑,它的 continuation 参数就是第二层包装实例。

这三层包装都实现了 Continuation 续体接口,通过代理模式将协程的各层包装组合在一起,每层负责不同的功能。

下图的 resumeWith() 可能表示 resume(), 也可能表示 resumeCancellableWith() 等系列方法:

img

1. Kotlin 协程原理概述

Kotlin 协程执行流程_i3bg0pYTZG.png

Kotlin 协程的大致的执行流程如上图所示,这个流程是各种类型的协程执行时都大致遵循的流程,不是一个严格精确的执行流程。

下面先来看下协程执行过程中的一些关键类的介绍。

1. 协程代码块

当我们调用 launch()withContext() 等协程构建器函数时,对应的协程代码块会被 Kotlin 编译器转换为一个匿名内部类,这个匿名内部类继承了 SuspendLambda ,实现了 Function2 接口。

SuspendLambda挂起 Lambda 表示式 的运行时类型,它重写了父类 BaseContinuationImplinvokeSuspend() 方法和 create() 方法,invokeSuspend() 方法中的代码就是协程代码块中的代码,而 create() 方法则是用来实例化代码块对应的匿名类。

2. 任务

任务负责执行协程代码块,任务指的是 DispatchedTask 的子类,比如 DispatchedContinuationAwaitContinuaton 以及 CancellableContinuationImpl

协程分发器会把 SuspendLambda 封装为任务 DispatchedContinuation

CancellableContinuationImpl 是在内联函数 suspendCancellableCoroutine() 中创建的,Retrofitsuspend 函数的支持就是通过这个内联函数 ,CancellableContinuationImpl 中有一个简单的决策状态机,它的状态能从 UNDECIDED(未定) 迁移到 SUSPENDED(挂起) ,也能从 UNDECIDED(未定) 迁移到 RESUMED (恢复),但是 SUSPENDED 和 RESUMED 这两个状态之间无法相互迁移。

AwaitContinuationCancellableContinuationImpl 的子类。当我们调用 CoroutineScopeasync() 扩展函数后,这个函数会返回一个 Deferred 对象,这个对象的具体实现是 DeferredCoroutine ,当我们调用 Deferredawait() 方法等待协程的执行结果时,DeferredCoroutine 就会创建一个 AwaitContinuaton

3. 协程

协程主要负责维护工作节点,包括传播取消、完成和异常等事件给工作结点和父协程,工作结点的类型是 JobNode,它是对任务和子协程的封装。

当我们调用协程构建器函数启动创建协程时,这些函数的内部会创建协程 ,比如 runBlocking() 中会创建BlockingCoroutine ,launch() 中会创建 StandaloneCoroutine ,这些协程都是 JobSupport 的子类,JobSupport 实现了 Job 接口,也就是可以把协程理解为一个工作项,工作项中可以包含子工作项或子任务。

4. 协程分发器

协程分发器负责执行任务,协程启动后会通过协程分发器分发(执行)任务,比如用来执行阻塞任务的 BlockingEventLoop 和默认协程调度器 DefaultScheduler 都是协程分发器 CoroutineDispatcher 的子类。

CoroutineDispatcher实现了 CoroutineContext 接口,也就是协程分发器是一个协程上下文,在协程上下文会通过不同的 Key 保存上下文中的元素 Element,Key 和Element 是 CoroutineContext 中定义的接口, ContinuationInterceptorCoroutineDispatcher 都声明了实现了 Key 接口的伴生对象,而 DefaultScheduler 和 BlockingEventLoop 则实现了 CoroutineContext 中的 Element 接口。

runBlocking() 函数使用的分发器BlockingEventLoop 中有 3 个队列,分别是无限制(优先)任务队列延迟任务队列以及普通任务队列

而默认协程调度器 DefaultScheduler 中有 2 个全局队列和 1 个局部队列。全局队列分别是全局阻塞任务队列全局 CPU 任务队列,它们的类型都是 GlobalQueue 。DefaultScheduler 实现了 Executor 接口,也就是它是一个线程池,在它的工作者线程 Worker 类中,有一个局部任务队列 ,这个队列的类型为WorkQueue ,当 Worker 运行时,会优先从局部任务队列中获取任务执行。

CoroutineDispatcher 实现了 ContinuationInterceptor 接口,启动协程需要通过协程启动选项 CoroutineStart 启动,当 CoroutineStart 通过 startCoroutineCancellable() 等方法创建任务时,会让协程分发器拦截这个任务,所谓拦截就是把接收到的 Continuation 封装为 DispatchedContinuation

5. 挂起点后的延续操作 Continuation

Continuation 接口实现类_sTN9q21OwN.png

Continuation 是 Kotlin 协程中非常重要的一个概念,它表示一个挂起点之后的延续操作

可以把它理解为一个步骤,而挂起点就是这些步骤之间的边界。Continuation 中声明了一个协程上下文 context 常量和一个用于执行延续操作的 resumeWith() 方法。

继续与挂起_pZkUYtx8pP.png

如上图所示,协程代码块会根据挂起点分为一段段的Continuation ,Continuation 是以一个接一个 resume 的方式连续执行的,不同的 label 代表代码块的不同段落。

Continuation 的实现类有很多,比如 DispatchdContinuationSuspendLambda 以及各个协程,也就是它们都是一个挂起点之后的延续操作。协程的 resumeWith() 方般是在协程代码块中的任务都完成后,最后调用的。

任务会执行协程构建器函数对应的代码块的代码,Kotlin 编译器会把这些代码转化为继承了 SuspendLambda 的匿名内部类,在匿名内部类的 invokeSuspend() 方法中,就包含了协程代码块的代码,当 DispatchedContinuation 执行时,invokeSuspend() 就会以 completion 的形式被 DispatchedContinuation 调用,也就是completion 表示当前任务完成后,后续需要执行的操作。

KotlinSuspendTest.kt_72Ja3KaA8b.png

上面这段用 Retrofit 发起网络请求的代码的执行时序图如下。

BlockingCoroutine 挂起与恢复_Mo0H_0U_BM.png

上面这段代码中用 runBlocking() 执行请求,runBlocking 使用的协程分发器是 BlockingEvent当 BlockingEventLoop运行 DispatchedContinuation 时,DispatchedContinuation 会执行代码块 SuspendLambda 的代码,协程代码块中如果有调用 suspendCanellableCoroutine() 的话,上面这段代码是通过 Retrofit 发起请求的,这就会间接调用到 CanecllableContinuationImplgetResult() 方法挂起协程,等待耗时操作执行完成后返回结果。

当OkHttp 返回响应后,就会在 onResponse() 回调中调用 CancellableContinuationImplresume() 方法把执行结果作为自己的状态。

CancellableContinuationImplresume() 方法会把自己作为任务分发到协程分发器中,然后协程分发器就会调用到 DispatchedTaskrun() 方法,DispatchedTask 会获取子类的状态,也就是耗时操作的执行结果,拿到这个状态后,就会再次调用 SuspendLambda 的 invokeSuspend() 方法,也就是 SuspendLambda 的 invokeSuspend() 方法被调用了两次,第一次返回的是挂起标志 COROUTINE_SUSPEND ,返回这个标志就意味着这次任务已经完成,等下一个任务启动后再继续执行。

第二次调用 invokeSuspend() 返回的是耗时操作执行结果,这时会调用 BlockingCoroutineresumeWith() ,并把结果值作为 BlockingCoroutine 的状态值 。

6. 通过状态机决定恢复操作

SuspendLambda 内部状态机_Bepcd4kNTK.png

协程挂起的实现是通过状态机让代码分段执行,协程代码块中的代码执行到挂起点后就不会继续往下执行,直到被恢复(resume) 。对于执行协程的线程来说,当协程执行到挂起点后,就认为这个任务已经执行完成了,直到耗时操作的结果回来,再恢复(resume)新的任务的执行,这时由于代码块对应的匿名内部类内部的状态(label)已经迁移到下一个状态了,所以协程恢复执行的时候会执行下一段代码。

如上图所示,协程代码块对应的匿名内部类的 invokeSuspend() 方法会根据不同的 label 值执行不同的操作,labelSuspendLambda成员变量,默认值为 0

假如协程代码块中执行了两个任务(调用了两次 suspendCancellableCoroutine) ,当 label 为 0 时,就会执行任务 1,假如任务 1 返回了挂起标志 COROUTINE_SUSPENDED,那 SuspendLambda 就不会继续往下执行,也就是不会执行任务 2

当耗时操作结束后(如 OkHttp 响应回来),会调用 CancellableContinuationImplresume() 方法,resume() 方法会再次触发 SuspendLambdainvokeSuspend() 方法,这时由于 label1 ,那么就不会执行任务 1 ,而是执行任务 2 ,然后会通过 ResultthrowOnFailure() 方法检查任务执行结果是否为异常,如果是异常就会抛出异常

除了 suspendCancellableCoroutine() 函数中会调用 CancellableContinuationImplgetResult() 方法会把协程挂起以外,withContext() 方法也会调用 DispatchedCoroutinegetResult() 方法把父协程挂起。

7. 小结

执行 Kotlin 协程代码块的相关对象_YKJx1MK60H.png

以上图中左侧的代码为例,这段代码中使用了 viewModelviewModelScope 启动了一个用 Retrofit 进行网络请求的协程,在这个协程代码块中,调用了 Service 接口的挂起函数 body()

对于 body() 的调用,Retrofit 会通过 suspendCancellableCoroutine() 创建一个 CancellalbeContinuationImpl ,对于 launch() 代码块,Kotlin 编译器会把它转换为 SuspendLambda,然后协程分发器会把 SuspendLambda 封装为 DispatchedContinuation ,CancellableContinuationImpl 和 DispatcheContinuation 都是 Task 的子类,也就是它们两个都是任务

DispatchedContinuation 持有了 SuspendLambda ,而 CancellableContinuationImpl 则持有了另外一个 DispatchedContinuation 实例,该实例也持有了协程代码块对应的 SuspendLambda 。DispatchedContinuation 和 CancellableContinuationImpl 在 resume 的时候都会调用 SuspendLambda 的 invokeSuspend() 方法执行协程代码块中的代码。

假如这个 ViewModelActivity 的,那么在 Activity 退出的时候,ViewModel 的 clear() 方法就会被调用,clear() 会调用 CloseableCoroutineContextcancel() 扩展函数,cancel() 会通过协程上下文获取到该上下文中的协程,然后会调用协程StandaloneCoroutinecancel() 方法取消协程的执行。StandaloneCoroutine 的父类 JobSupport 中有一个状态机,这个状态机的其中一个状态为多结点状态,这时状态的类型为 NodeList ,也就是一个工作结点列表,通过这个工作结点列表,协程就可以把在协程代码块中启动的任务给取消掉。

10. CoroutineScope#launch 原理

看完了 runBlocking()suspendCancellableCoroutine() 的实现后,下面我们来看下更常用的 CoroutineScopelaunch() 扩展函数的实现。

这一大节讲的主要是 CoroutineScope 的 launch() 扩展函数的实现以及默认协程分发器 DefaultScheduler 的实现

1. CoroutineScope#launch 原理概述

以下面这段代码为例。

2. CoroutineScope

CoroutineScope 实现类_b9NQgsahXe.png

CoroutineScope 接口中声明了一个 coroutineContext 常量,也就是实现该接口要提供一个协程上下文。CoroutineScope 有一个 cancel() 扩展函数,Lifecycle 会在生命周期状态为 DESTROYED 时调用 cancel() 函数取消掉该作用域启动的协程,ViewModel 则会在 clear() 方法中调用 cancel() 函数,对于 ActivityFragmentViewModelclear() 方法也是在 destroy 回调里触发的。

CoroutineScope.kt_Gb02Y_Gy1v.png

CoroutineScope 的实现类主要有 GlobalScopeMainScopeAbstractCoroutine 、 TestScopeImpl 、ProducerScope 、ContextScope 、CloseableCoroutineScope 以及 LifecycleCoroutineScopeImpl ,下面来看下这些协程作用域的作用。

GlobalScope是一个全局协程作用域 ,是一个单例对象,一般情况下在 Android 中不会使用这个作用域来启动协程,因为这么做存在潜在的内存泄漏风险,比如 Activity 退出后,匿名内部类持有 Acitivty 的引用,导致 Activity 无法释放,也就是发生了内存泄漏。

GlobalScope 不支持通过 cancel() 扩展函数取消使用它启动的协程,因为它的 coroutineContext 只能获取,不能设值(调用 plus 方法)。

CoroutineScope.kt (1)_BYdsh35Duw.png

MainScope 是主线程协程作用域,这个作用域的协程分发器为 Dispatchers.Main ,使用方法如下。

MyActivity.kt (2)_ehXiAfJV6B.png

AbstractCoroutine 是 BlockingCoroutine 等协程类的父类,它也实现了 CoroutineScope 接口,也就是它要对外提供协程上下文。

TestScopeImpl 是测试协程作用域,当我们在 runTest() 函数的代码块中执行包含协程的单元测试代码时,这个函数就会创建一个 TestScopeImpl 作为协程的作用域。

ProducerScope 是一个继承了 CoroutineScope 的接口,生产者协程ProducerCoroutine 就实现了该接口,Kotlin Flow 接口的 zip() 扩展函数就会创建生产者协程用来组合两个流的值。

ContextScope 用于直接用协程上下文创建协程作用域,比如 MainScope 就是把 SupervisorJobDispatchers.Main 结合后传给 ContextScope 的方式来创建协程作用域,此外 CoroutineScope 函数也是通过 ContextScope 来创建协程作用域的。

CoroutineScope.kt (4)_UeC-yv1G6Y.png

CloseableCoroutineScopeViewModelviewModelScope 使用的协程作用域,这个作用域实现了 Closeable 接口,当 ViewModel 的 clear() 方法被触发时,就会调用这个作用域的 close() 方法取消使用该作用域启动的协程。

ViewModel.kt_Q7o675fr49.png

LifecycleCoroutineScopeImplLifecycleOwner 的 lifecycleScope 所使用的协程作用域,它的父类 LifecycleCoroutineScope 中声明了 launchWhenCreated() 、launchWhenStarted() 和 launchWhenResumed() 这三个函数,通过这些函数我们可以让代码在 create 、start 和 resume 这三个生命周期回调发生的时候再执行,比如下面这样。

MyActivity.kt (3)_SThg-XsTZV.png

3. CoroutineScope#launch

Builders.common.kt_2O9_hZAA0R.png

CoroutineScopelaunch() 扩展函数中首先会调用 newCoroutineContext() 方法创建一个协程上下文 CoroutineContext,这里的创建协程上下文主要是看下是否为 DEBUG 环境,是的话则需要把 CoroutineId 放到默认协程分发器(Dispatchers.Default)的上下文中。

然后如果 start 参数的类型为 lazy ,则创建延迟执行的独立协程 LazyStandaloneCoroutine ,否则创建 StandaloneCoroutine ,然后调用协程的 start() 方法启动协程,最后返回该协程。

4. StandaloneCoroutine

Builders.common.kt (1)_gFix2l8ot-.png

StandaloneCoroutine(独立协程)继承了 AbstractCoroutine 类,并重写了父类的 handleJobException() 方法,改为调用自 CoroutineExceptionHandler 中的 handleCoroutineException() 方法。

StandaloneCoroutine继承关系_u8E6UX6PdD.png

如上图所示 ,LazyStandaloneCoroutineStandaloneCoroutine 的子类,StandaloneCoroutine 继承了 AbstractCoroutine ,AbstractCoroutine 又继承了 JobSupport 类。

5. DefaultScheduler

internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
    // ...
}

Dispatchers.Default 对应的协程分发器为 DefaultScheduler ,DefaultScheduler 是 SchedulerCoroutineDispatcher 的子类,SchedulerCoroutineDispatcher 是 ExecutorCoroutineDispatcher 的子类。

6. SchedulerCoroutineDispatcher

Dispatcher.kt (1)_rcei0xUywS.png

SchedulerCoroutineDispatcher 的核心线程池大小为 CPU 核数,最大线程池大小为 2097150(2M),空闲线程存活时间为60 秒。

SchedulerCoroutineDispatcher 中重写了 dispatch 方法,具体实现为 CoroutineScheduler 的 dispatch() 。

7. CoroutineScheduler#dispatch

CoroutineScheduler.kt_xXxNW4Jjoe.png

CoroutineScheduler 的 dispatch() 方法中,会调用 createTask() 方法初始化 任务(如 DispatchedContinuation) 的提交时间 submissionTime 和任务上下文 taskContext

然后会尝试把任务添加到局部任务队列 localQueue,如果添加失败,就尝试把任务添加到全局任务队列globalQueue,如果还是添加失败则抛出拒绝执行异常。

然后会通过 currentWorker 获取当前工作者线程,如果工作者线程为空的话,就调用 addToGlobalQueue() 方法把任务添加到全局任务队列,

如果添加成功,则判断是否需要跳过唤醒操作,当 tailDispatch 为默认值 false 时,则不需要跳过。

当任务的模式为非阻塞时,如果不需要跳过唤醒操作,则调用 signalCpuWork() 唤醒负责计算密集型任务的工作者线程。

如果任务的工作模式为阻塞,则调用 signalBlockingWork() 唤醒负责阻塞任务的工作者线程。

任务的工作模式有两种:

  1. TASK_NON_BLOKCING :非阻塞模式
  2. TASK_PROBABLY_BLOCKING :可能阻塞模式,用 IO 线程池执行的任务就是该模式

8. CoroutineScheduler#addToGlobalQueue

private fun addToGlobalQueue(task: Task): Boolean {
    return if (task.isBlocking) {
        globalBlockingQueue.addLast(task)
    } else {
        globalCpuQueue.addLast(task)
    }
}

addToGlobalQueue() 的实现比较简单,就是根据任务的 isBlocking 字段的值把任务添加到全局阻塞任务队列或全局 CPU 任务队列(计算密集型任务队列)。

9. CoroutineScheduler#signalCpuWork

CoroutineScheduler.kt (2)_CZY9URHyk2.png

在 singalCpuWork() 方法中,会调用 tryUnpark() 方法唤醒已存在的工作者线程,如果没有的话,则调用 tryCreateWorker() 方法创建新的工作者线程。

10. CoroutineScheduler#tryCreateWorker

CoroutineScheduler.kt_K8T5MSCeXm.png

在 tryCreateWorker() 方法中,CPU 工作者线程的数量就是已存在的工作者线程的数量减去阻塞任务工作者线程的数量,如果 cpuWorkers 小于核心线程池大小,则创建一个新的 CpuWorker ,如果创建完后发现只有 1 个 CpuWorker ,就再创建一个。

11. CoroutineScheduler#Worker

CoroutineScheduler.kt (1)_GSLMBiwpxn.png

CoroutineScheduler 的工作者线程的具体实现类为它的内部类 Worker ,Worker 是 Thread 的子类,有一个局部任务队列 WorkQueue ,默认状态为 DORMATN(休眠)。

Woker 中定义了一个 WorkerState 枚举类,这个枚举类中包含了 Worker 的状态,一共有下面 5 种状态:

  1. CPU_ACQUIRED:正在执行或正在查找非阻塞任务
  2. BLOCKING:正在执行阻塞任务
  3. PARKING :等待状态
  4. DORMANT :休眠状态
  5. TERMINATED :终止状态

在 CoroutineScheduler 的 createNewWorker() 方法中,创建了一个新的 Worker 后,就会调用它的 run() 方法。

12. Worker#run

CoroutineScheduler.kt (2)_kSM2TiZGWS.png

Workerrun() 方法会执行runWorker() 方法,这个方法会调用 findTask() 方法找出需要执行的任务,找到后就调用 executeTask() 方法执行任务,如果找不到任务的话,就调用 tryPark() 方法让 Worker 进入等待状态,直到有新的任务后,Worker 被唤醒。如果 Worker 处于终止状态,则调用 tryReleaseCpu() 方法释放对 CPU 许可证(permit)的占用。

13. Worker#findTask

CoroutineScheduler.kt (3)_7vT6JRr-e4.png

findTask() 方法首先会调用 tryAcquireCpuPermit() 方法获取 CPU 占用的许可证,获取到许可证后就会调用 findAnyTask() 方法返回任务。

如果获取不到 CPU 占用许可证,就会看下是否需要扫描 localQueue ,如果要扫描的话就从 localQueue 中找出阻塞任务来执行,如果不需要扫描 localQueue 的话,就从全局阻塞任务队列中拿出任务执行。

如果没有阻塞任务的话,就调用 trySteal() 方法把其他 Worker 还没执行的阻塞任务拿过来执行。

14. Worker#tryAcquireCpuPermit

CoroutineScheduler.kt (4)_xgWF4Zf9PA.png

在 Worker 的 tryAcquireCpuPermit() 方法中,会调用 CoroutineScheduler 的 tryAcquireCpuPermit() 方法获取可用的许可证,这个值默认为核心线程池大小。

如果没有可用的许可证,也就是不能再开启线程,则返回 false ,否则更新 CoroutineScheduler 的 controlState 的值。

15. Worker#findAnyTask()

CoroutineScheduler.kt_Kbu84oj0ED.png

在 Worker 的 findAnyTask() 方法中,会通过 scanLocalQueue 参数判断局部任务队列是否有任务,如果有的话,就看下是否优先从全局任务队列中获取任务,不是的话就从局部任务队列中获取任务并返回。

如果局部任务队列为空,则从全局任务队列中获取任务并返回。

如果局部任务队列和全局任务队列都为空,则从其他 Worker 中拿非阻塞任务并返回。

16. Worker#executeTask

CoroutineScheduler.kt (5)_bJdVtiGcGz.png

executeTask() 方法中最主要的就是通过 runSafely() 方法调用任务的 run() 方法,如果遇到异常的话就通过线程的 uncaughtExceptionHandler 抛出。

这里的任务就是 DispatchedContinuation ,DispatchedContinuation 的 run() 方法会通过 SuspendLambda 的父类 BaseContinuationImpl 的 resumeWith() 方法调用协程代码块对应的匿名内部类的 invokeSuspend() 方法,到这里,关于 launch() 的实现就讲完了。

以上结合源码说了些东西,后续回顾不必看这么细

17. CoroutineScope#launch 原理小结

launch() 协程构建器函数的实现与 runBlocking() 实现的主要的差异就是协程类型和协程分发器的类型,launch() 在不修改 context 参数的情况下,默认用的协程分发器是 DefaultScheduler ,在不修改 start 参数的情况下,默认用的是 StandaloneCoroutine

StandaloneCoroutine 只是简单地重写了 JobSupport 的 handleJobException() 方法,而 BlockingCoroutine 则重写了afterCompletion() 方法,并在这个方法中唤醒被阻塞的线程。而且 BlockingCoroutine 需要执行的任务要通过 joinBlocking() 方法让事件循环来执行,而 StandaloneCoroutine 的任务则是由 DefaultScheduler 启动 Worker 后,由 Worker 来执行的。

11. withContext() 原理

我们在 Android 中使用协程的时候,一般都要用 withContext() 把任务执行的分发器从主线程分发器切到工作线程分发器(如 Dispatchers.IO),或者是从工作线程分发器切回主线程分发器 Dispatchers.Main ,下面我们就来看下 withContext() 是怎么实现的。

1. withContext()

Builders.common.kt (2)_N-42UzzZYd.png

withContext() 的用于在指定的协程上下文执行代码块中的代码,也就是指定用于分发协程的分发器。

withContext() 中的代码主要是在 suspendCoroutineUninterceptedOrReturn() 代码块中执行的,这个代码块中可以获取一个 Continuation 实例 uCont,这个 Continuation 实例就是 withContext() 代码块对应的 SuspendLambda 匿名内部类。

在这个代码块中,首先会把旧的上下文与新的上下文进行结合,所谓的结合,主要就是替换掉上下文中 KeyContinuationInterceptor 的元素,比如这个元素原来是默认协程分发器,把它换成 IO 协程分发器。

如果新的上下文和旧的上下文没有区别的话,就创建一个作用域协程 ScopeCoroutine ,并调用它的 startUndipsatchedOrReturn() 扩展函数启动协程。

如果新的协程上下文和旧的上下文的区别只是 ContinuationInterceptor 不同的话,那就创建一个 UndispatchdCoroutine ,并调用它的 startUndispatchedOrReturn() 扩展函数启动协程。

如果新的协程上下文和旧的协程上下文除了 ContinuationInterceptor ,还有其他元素也不同的话,那就创建一个 DispatchedCoroutine ,并调用它的 startCoroutineCancellable() 扩展函数,最后与 suspendCanellableCoroutine() 函数一样,调用这个协程的 getResult() 方法获取执行结果,这个方法会把父协程挂起,也就是父协程代码块中的代码执行到 withContext() 后就不会继续执行,直到 withContext() 方法返回执行结果后,BaseContinuationImpl 中把值传给 completion ,completion 就是 DispatchedCoroutine ,然后 DispatchedCoroutine 就会把状态切换为 RESUMED 恢复执行。

ScopeCoroutine 子类png_3IpNXsizNx.png

如上图所示,DispatchedCoroutineUndispatchedCoroutine 都是 ScopeCoroutine 的子类。

2. ScopeCoroutine

Scopes.kt_ArZCltj7fM.png

ScopeCooutine 继承了 AbstractCoroutine ,重写了 JobSupportisScopedCoroutine 常量,重写了 JobSupportafterCompletion()AbstractCoroutineafterResume() 方法。

ScopeCoroutine 的 afterCompletion() 方法中,在当前协程的工作完成或取消后,就会调用 uContintercepted() 方法,让协程分发器把它封装为 DispatchedContinuation ,并把结果传给这个 DispatchedContinuation ,uCont 就是 withContext() 代码块对应的 SuspendLambda

ScopeCoroutineafterResume() 方法被调用时,它会调用 uCont 成员的 resumeWith() 方法,这里调用的 resumeWith()afterCompletion() 中调用的resumeCancellableWith() 方法的区别,就是 resumeCancellableWith() 会把 DispatchedContinuation 的 resumeMode 改为 MODE_CANCELLABLE,而 resumeWith() 则会把 resumeMode 改为 MODE_ATOMIC

3. 分发模式 resumeMode

resumeModeDispatchedContinuation 的父类 DispatchedTask 的成员变量,它表示分发模式,它的取值有下面 5 个。

  1. MODE_ATMOIC(0) 无法取消的分发模式,使用 Unconfined 协程分发器时,任务的分发模式就是 MODE_ATOMIC
  2. MODE_CANCELLABLE(1) 可取消的分发模式,通过 suspendCancellableCoroutine() 挂起协程时,创建的 CancellableContinuationImpl 的分发模式就是 MODE_CANCELLABLE
  3. MODE_CANCELLABLE_REUSABLE(2) 可取消且可重用的分发模式,用于 suspendCancellableCoroutineReusable() 内联函数,ReceiveChannel 发送广播用的 broadcast() 扩展函数用的就是这个内联函数
  4. MODE_UNDISPATCHED(4) 不进行分发的分发模式,delay() 函数用的就是 MODE_UNDISPATCHED
  5. MODE_UNINITIALIZED(-1) 未初始化,DispatchedContinuation 的默认分发模式

4. JobSupport#isScopedCoroutine

JobSupport.kt (1)_reaDwZ0Jku.png

JobSupportisScopedCorotuine 成员变量主要在 cancelParent() 方法中使用,这个方法会在子协程被取消的时候被调用,在这个方法中,会判断当前协程是否为有作用域的协程 scopedCoroutine,如果是的话,则不调用父协程的 childCancelled() 方法。

5. ScopeCoroutine#startUndispatchedOrReturn

Undispatched.kt_ahuVZCV0F-.png

ScopeCoroutinestartUndispatchedOrReturn() 方法中,会在 undispatchedResult() 代码块中调用 block 参数的 startCoroutineUninterceptedOrReturn() 方法。

6. ScopeCoroutine#undispatchedResult

Undispatched.kt_PB5j3dUjbe.png

undispatchedResult() 函数中,会通过 startBlock() 的执行结果来决定下一步的操作,如果执行结果为挂起标志 COROUTINE_SUSPENDED ,则返回 COROUTINE_SUSPENDED 挂起协程。

如果执行结果 result 不是挂起标志,则调用 makeCompletingOnce() 方法获取协程状态。

7. DispatchedCoroutine

Builders.common.kt_TAF75z5Lq7.png

DispatchedCoroutine 有一个和 DispatchedContinuation 一样简单的决策状态机,这个状态机有 UNDECIDEDSUSPENDEDRESUMED 三种状态。

DispatchedCoroutine 重写了 AbstractCoroutineafterResume() 方法,这个方法会在 resumeWith() 方法被调用的时候调用,协程的 resumeWith() 方法一般是在 SuspendLambda(代码块)执行完后调用的。

DispatchedCoroutineafterResume() 方法首先会尝试把状态迁移到 RESUMED ,如果迁移失败的话,说明协程处于挂起状态,这时就要通过协程分发器再次把 SuspendLambda 封装为任务并进行分发。

DispatchedCoroutine 的 getResult() 方法中,会尝试把状态迁移到 SUSPENDED ,迁移成功则返回挂起标志,如果迁移失败的话,说明处于 RESUMED 状态,也就是已经获取到执行结果了,这时就不用再挂起了,直接把状态拆箱并返回。

unboxState() 扩展函数只是简单地判断了一下当前状态是否为未完成状态 IncompleteStateBox ,如果是的话,则返回它的 state 成员,否则返回当前状态。

8. recoverResult()

引用

  • https://juejin.cn/post/7137905800504148004
  • https://juejin.cn/post/7103011782591004680
  • https://juejin.cn/post/7137226816976060424

Leave a Reply

Your email address will not be published. Required fields are marked *

lWoHvYe 无悔,专一