Kotlin 协程超简版笔记

[TOC]

基础

启动第一个协程

import kotlinx.coroutines.*

fun main() {
    GlobalScope.launch { // launch new coroutine in background and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello,") // main thread continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

CoroutineScope中使用coroutine builder启动协程。

连接阻塞与非阻塞

协程里的一些操作,必须要在非阻塞的环境里执行。可以使用runBlocking,阻塞住当前线程,直至runBlocking内部协程完成。

使用Job

启动一个协程后会返回Job,可以用job.join()以非阻塞的方式等待工作完成。

结构化并发

如果使用GlobalScope.launch来启动一堆协程的话,这些协程都属于top-level,即使协程非常轻量,数量多了之后还是会有一些问题,想办法持有所有协程的引用是个非常麻烦而且容易出错的事情。

我们可以使用在特定的coroutineScrop来启动这些协程,在内部启动的协程全部结束之前,Scope是不会结束的。

suspend 函数

使用suspend修饰符,可以使用其他的suspend函数,具有传染性。

协程与线程

协程是非常轻量级的,哪怕启动10K个,也不会有太大的压力。表现上和守护线程类似,不会阻止程序退出。

取消协程

协商取消

和线程类似,调用job.cancel()即可。也是协商式的。有两个地方可以接收到取消协程的指令

  • 某些suspend函数会检查取消状态,比如yielddelay
  • 手动显式检查取消状态,isActive就是一个很方便的CoroutineScope拓展方法

取消协程,会抛出CancellationException,很显然,我们是可以在内部使用try…catch…finally来进行一些额外处理的。

如果在finally块内使用suspend函数,会再次抛出CancellationException,毕竟协程已经被取消了,通常这不是一个问题,一般关闭资源类操作都不会受到影响。如果你真的有这种需求,可以使用

withContext(NonCancellable) {
    delay(1000L)
    //do somethin
}

至于withContext,后面会再次提到。

超时

Kotlin总是有一些语法糖,比如常用的超时。

fun main() = runBlocking {
//sampleStart
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
//sampleEnd
}

当然,本质还是取消,还是协商式。抛出的错误则是TimeoutCancellationException,是CancellationException的子类,单纯是为了区分取消的原因不一样。

组合suspend函数

执行顺序

其实不管是suspend函数,还是普通函数,在kotlin里面,都是顺序执行的。

异步执行

你需要显式的去启动一个协程来达到异步的效果,kotlin不缺少的就是语法糖。

async{
    someSuspendFunction()
}

实际上就是类似于launch,启动了一个协程,并把内部块的运算结果用Deferred包装返回,类似于future。区别在于他的await()不会阻塞线程。

当然你也可以选择使用惰性启动(Lazy),只有显式调用start()才会开始启动这个协程。

协程上下文和调度

当然,我说的是ContextDispatcher

协程总是在上下文中执行,包含大量的CoroutineContext,包含之前提到的Job,以及即将提到的Dispatcher

线程调度

调度器决定了协程在哪一个或者哪些线程上执行,可以是绑定到某个线程或者线程池,或者是随意运行。

所有的coroutine builder都支持可选参数CoroutineContext来声明其调度器。

  • 不传参数,继承Scope的调度器
  • Dispatchers.Default 使用后台共享的线程池,比如 GlobalScope.launch { ... }
  • Dispatchers.Unconfined,在调用(caller)线程执行,直到下一次中断点,再次恢复时,所在的线程由中断点的suspend fuction所在线程执行
  • 自定义线程池,顾名思义

Dispatchers.Unconfined不适用于常规场景。

上下文切换

可以使用withContext在不同的上下文之间切换,实际上就是不同线程间切换。

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() {
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                withContext(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")
            }
        }
    }
}

我们需要使用 -Dkotlinx.coroutines.debug作为JVM参数,来debug协程信息。

也可以使用CoroutineName给协程命名,正如对thread命名一样。

协程的Job是上下文的一部分,可以通过coroutineContext[Job]来获取,isActive只是coroutineContext[Job]?.isActive == true的快捷方式。

子协程

当协程在某个CoroutineScope启动的时候,它就继承了CoroutineScope.coroutineContext,新协程的Job也是父协程Job的儿子。当父协程取消,子协程也会被取消。

但是使用GlobalScope来启动协程的时候,就不会与父Scope扯上关系了。

在所有的子协程没有完成之前,父协程不会完成,所以无需显式的调用join来等待所有子协程。

上下文元素组合

我们谈到了很多上下文元素,比如调度器,命名等,我们可以简单用+把他们给连接起来传给builder

显式Job

有时候我们需要管理的生命周期对象并不是一个协程,但是它可能会启动一堆协程,比如Android activity。我们可以通过创建一个Job实例,将启动的协程都绑定到这个job上,起到统一管理的作用。

class Activity : CoroutineScope {
    lateinit var job: Job

    fun create() {
        job = Job()
    }

    fun destroy() {
        job.cancel()
    }
    
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Default + job
    
    fun doSomething() {
        // launch ten coroutines for a demo, each working for a different time
        repeat(10) { i ->
            launch {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
                println("Coroutine $i is done")
            }
        }
    }
} 

doSomething中启动的协程,都继承了父CoroutineScope中的元素。就是Dispatchers.Default + job

ThreadLocal

在多线程中,ThreadLocal是非常方便的一个用法。然而在协程中,情况就不太一样了。协程并不全被绑定到某个特定的线程中。

Kotlin为ThreadLocal准备了一个asContextElement的拓展函数,使用了一个额外的上下文元素来保存ThreadLocal中的值,并且在每次切换上下文的时候进行恢复。

import kotlinx.coroutines.*

val threadLocal = ThreadLocal<String?>() // declare thread-local variable

fun main() = runBlocking<Unit> {
    threadLocal.set("main")
    println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
       println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}

这里有一个关键的限制,当ThreadLocal值被修改的时候,新的值不会通知到协程调用者。下一次中断后,修改值会丢失。

当然,对于可变对象,还是封装更合适。

错误处理

错误传递

Coroutine Builder对于错误传递有两种表现:

  • 自动传递Exception (launchactor)
  • 将其暴露给用户 (asyncproduce)

前者将未处理的异常,交给某个handler统一处理,类似于Java的Thread.uncaughtExceptionHandler

后者会将错误交给用户最终处理。

import kotlinx.coroutines.*

fun main() = runBlocking {
    val job = GlobalScope.launch {
        println("Throwing exception from launch")
        throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
    }
    job.join()
    println("Joined failed job")
    val deferred = GlobalScope.async {
        println("Throwing exception from async")
        throw ArithmeticException() // Nothing is printed, relying on user to call await
    }
    try {
        deferred.await()
        println("Unreached")
    } catch (e: ArithmeticException) {
        println("Caught ArithmeticException")
    }
}

CoroutineExceptionHandler

我们可以使用CoroutineExceptionHandler作为一个上下文元素,传递给builder。

在JVM平台上,我们也可以通过ServiceLoader,设置一个全局的CoroutineExceptionHandler

当然,CoroutineExceptionHandler只会在Coroutine Builder选择不将错误交给用户处理的时候才生效。具体如下

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val handler = CoroutineExceptionHandler { _, exception -> 
        println("Caught $exception") 
    }
    val job = GlobalScope.launch(handler) {
        throw AssertionError()
    }
    val deferred = GlobalScope.async(handler) {
        throw AssertionError() // Nothing will be printed
    }
    joinAll(job, deferred)
}

取消和异常

取消和异常紧密相关,协程内部使用CancellationException来作为取消的实现。这个异常被所有的handler忽略,即使可以通过catch块来获取,只应该被用来debug。当一个协程被Job.cancel取消,实际上是抛出了一个不包含原因 的 CancellationException,但不会影响到父协程。

import kotlinx.coroutines.*

fun main() = runBlocking {
    val job = launch {
        val child = launch {
            try {
                delay(Long.MAX_VALUE)
            } finally {
                println("Child is cancelled")
            }
        }
        yield()
        println("Cancelling child")
        child.cancel()
        child.join()
        yield()
        println("Parent is not cancelled")
    }
    job.join()   
}

但是,如果子协程被其他错误取消的话,父协程也会收到这个错误的影响。会先取消掉其他子协程,最后再处理错误。

这个行为无法被重写,也不受到CoroutineExceptionHandler的影响,这个设计为结构化并发提供了稳定的层次结构。

错误聚合

如果多个子协程同时抛出错误,先抛先赢。第一个被抛出的错误会被handler捕获,这样会造成错误信息丢失。

Supervision Job

之前提到的异常传播,都是水平和垂直双向的,有时候我们并不希望子协程的异常影响到父进程。

SupervisorJob就可以起到这个效果,异常带来的取消只会向下传播。

import kotlinx.coroutines.*

fun main() = runBlocking {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        // launch the first child -- its exception is ignored for this example (don't do this in practice!)
        val firstChild = launch(CoroutineExceptionHandler { _, _ ->  }) {
            println("First child is failing")
            throw AssertionError("First child is cancelled")
        }
        // launch the second child
        val secondChild = launch {
            firstChild.join()
            // Cancellation of the first child is not propagated to the second child
            println("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                // But cancellation of the supervisor is propagated
                println("Second child is cancelled because supervisor is cancelled")
            }
        }
        // wait until the first child fails & completes
        firstChild.join()
        println("Cancelling supervisor")
        supervisor.cancel()
        secondChild.join()
    }
}

Supervision scope

结构化并发也可以使用SupervisionScope

import kotlinx.coroutines.*

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught $exception")
    }
    supervisorScope {
        val child = launch(handler) {
            delay(1000L)
            println("Child throws an exception")
            throw AssertionError()
        }
        val child2 = launch(handler) {
            child.join()
            println("Child2 still run")
        }
        println("Scope is completing")
    }
    println("Scope is completed")
}

和普通的CoroutineScope的一个重要区别就是错误处理,由于子协程的异常不会传递给父协程,所以每个协程需要独立处理自己的异常。

Channels(experimental)

协程之间相互传单个值使用Deferred会非常方便,Kotlin也提供了一个传递流的方法。

Channel基础

BlockingQueue非常类似。不同的是,channel是非阻塞的,而且可以关闭表示不会有新数据流入,所以可以直接对其进行for循环。

关闭类似于发送了一个特殊的close token到channel中,在没有收到这个token之前,是不会关闭的,这也保证了在关闭之前发送的值都会被处理。

Channel 生产者

使用协程来产生数据流是非常常见的,和生产消费者模式类似。kotlin提供了抽象的生成器,不同于函数返回单个值,而是直接返回一个channel。

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
//sampleStart
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
//sampleEnd
}

管道Pipeline

某个协程生产数据流,也许是无限生产,后面紧跟着另外的协程去处理或者消费这些数据流,产生另外的数据,这就组成了一个管道。

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // 产生1,2,3,4,5……
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}
fun main() = runBlocking {
    var cur = numbersFrom(2) 
    for (i in 1..10) {
        val prime = cur.receive() //取到prime
        println(prime)
        cur = filter(cur, prime)//对cur过滤,过滤掉所有能整除prime的数
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish 
}

这里可能需要解释下

cur = numbersFrom(2)
get a prime //2
cur -> cur.filter(prime) //过滤掉cur中所有能整除2的数值
get a prime //3
cur -> cur.filter(prime) //再次过滤掉cur中能整除3的数值
get a prime //5
……

这里可以提一下的是,整个过程启动了很多协程,但我们不需要手动去一个个关闭(也做不到)。这提现了结构化的好处。

Fan-out & Fan-in

对channel做一些并发的写和读,都是没有问题的。

Buffered channels

到目前为止,channel都是不带buffer的,生产者和消费者之间面对面(没有中间商赚差价),生产者没有准备好之前,receive会被暂停(suspend),消费者没有准备好之前,send也会被暂停。

创建channel的时候,可以使用可选参数capacity来指定buffer的长度。在buffer被填充满之前,send不会被暂停,buffer空之前,receive也不会被暂停。

公平channels

不管是发送还是接受操作,对于channel来说,都是FIFO,先被调用(invoke)的先执行(operate)。

滴答channels/Ticker channels

Kotlin总是这么贴心的准备一大堆有用的没用的糖果。Ticker channel 会定时的产生Unit,但并不会累积。

共享可变状态和并发

协程在大多数情况下都是并发操作的,多线程下会遇到的问题,协程一样也会遇到。

问题

import kotlinx.coroutines.*
import kotlin.system.*    

suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch {
                repeat(k) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${n * k} actions in $time ms")    
}

var counter = 0

fun main() = runBlocking<Unit> {
    GlobalScope.massiveRun {
        counter++
    }
    println("Counter = $counter") 
}

常见的并发操作,一般情况下,是很容易出现counter不是我们预期值得情况。

Volatiles没有帮助

在这个问题中,给counter加上volatile修饰符并不会解决问题,因为这个只能解决可见性。只能保证每个协程获取counter的时候,是最新的,但是随后的+1,写入,并不是原子性的。

原子操作类型

最简单最快捷的一个办法就是使用AtomicInteger,这个问题就被妥善解决了,但是在其他复杂情况下,并没有这么容易,因为没有这么多开箱可用的原子操作类。

线程细粒度控制

val counterContext = newSingleThreadContext("CounterContext")

fun main() = runBlocking<Unit> {
    GlobalScope.massiveRun { // run each coroutine with DefaultDispathcer
        withContext(counterContext) { //confine each increment to the single-threaded context
            counter++
        }
    }
    println("Counter = $counter")
}

虽然每次启动的协程都是DefaultDispathcer,是跑在共享的线程池上的,但是一旦到了读写数据的阶段,切换成单线程的上下文。这样能解决问题,但是会导致速度很慢。时间都耗在上下文切换上了。

线程粗粒度控制

val counterContext = newSingleThreadContext("CounterContext")

fun main() = runBlocking<Unit> {
    // run each coroutine in the single-threaded context
    CoroutineScope(counterContext).massiveRun { 
        counter++
    }
    println("Counter = $counter")  
}

这个更粗暴了,直接让启动的所有协程都在同一个单线程上。

互斥锁Mutex

和多线程里的锁类似,作用就是让关键操作不并发执行。这个和 synchronizedReentrantLock 的区别主要在于是暂停,而不是阻塞线程。

Kotlin也准备了方便的糖

mutex.withLock{
    //do something
}

等同于

mutex.lock()
try{
    //do something
}finally{
    mutex.unlock()
}

所以方案如下

val mutex = Mutex()

fun main() = runBlocking<Unit> {
    GlobalScope.massiveRun {
        mutex.withLock {
            counter++        
        }
    }
    println("Counter = $counter")
}

Actors

一个Actor是由协程,被封装到协程中的状态,和其他携程沟通的channel组成的实体。

// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

使用actor builder

// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

最后,程序被改为

fun main() = runBlocking<Unit> {
    val counter = counterActor() // create the actor
    GlobalScope.massiveRun {
        counter.send(IncCounter)
    }
    // send a message to get a counter value from an actor
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // shutdown the actor  
}

这样自增的操作与调用counter.send(IncCounter)的协程无关,它被封在actor里面,由一个独立的协程执行。协程内部是顺序执行的,这样可以解决共享可变状态的问题。

显而易见,在高负载的情况下,actor的效能很高很多,独立的协程会一直有事可做,而且也避免了上下文切换和获取锁。

Select 表达式(experimental)

select表达式可以同时等待多个暂停函数,并选择第一个可用的暂停函数执行。

Selecting to receive

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*

suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
    select<String> {
        a.onReceiveOrNull { value -> 
            if (value == null) 
                "Channel 'a' is closed" 
            else 
                "a -> '$value'"
        }
        b.onReceiveOrNull { value -> 
            if (value == null) 
                "Channel 'b' is closed"
            else    
                "b -> '$value'"
        }
    }
    
fun main() = runBlocking<Unit> {
    val a = produce<String> {
        repeat(4) { send("Hello $it") }
    }
    val b = produce<String> {
        repeat(4) { send("World $it") }
    }
    repeat(12) { // print first eight results
        println(selectAorB(a, b))
    }
    coroutineContext.cancelChildren()     
}

输出结果

a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed
Channel 'a' is closed
Channel 'a' is closed
Channel 'a' is closed
Channel 'a' is closed

这说明

  1. select准备好的suspend函数。
  2. 准备好的clause的先执行。
  3. onReceiveOrNull遇到close会直接一直返回null

这里需要说明的是2。

这一块有点复杂……

Selecting to send

这一块没啥好说的,基本和receive一致。

Selecting deferred values

也没啥好说的。