引言

Kotlin,作为一个语言,标准库中通过提供最小的底层API来给其他库更好的利用协程。不像其他语言,asyncawait 并不是Kotlin的关键字,甚至都不是标准库的一部分。此外,Kotlin的中断函数suspending function)概念比 futurepromise 提供了更安全,更不容易出错的异步操作。

协程基础

第一个协程

fun main(args: Array<String>) {
    launch(CommonPool) { // create new coroutine in common thread pool
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello,") // main function continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

运行结果

Hello,
World!

本质上,协程是轻量级的线程,由launch协程builder构造。

这里将launch(CommonPool){...} 替换为thread{...}delay{...}替换为Thread.sleep(...),可以获得类似的结果。

如果只是替换 lauch(CommonPool){...} ,会获得下面的错误:

Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function

这是因为delay是特殊的suspending函数。它并不会阻塞线程,但是会暂停协程并且只能在协程里被调用。

连接阻塞和非阻塞世界

第一个例子在main函数里混合了非阻塞的delay(...)和阻塞的Thread.sleep(...),这很容易让人迷糊。这里用runBlocking简单的分隔开阻塞和非阻塞世界:

fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
    launch(CommonPool) { // create new coroutine in common thread pool
        delay(1000L)
        println("World!")
    }
    println("Hello,") // main coroutine continues while child is delayed
    delay(2000L) // non-blocking delay for 2 seconds to keep JVM alive
}

runBlocking { ... } 作为一个适配器,启动main函数的协程。 runBlocking 外部的代码会一直阻塞,直至runBlocking内部协程运行完毕。

这个方法也可以用来作为单元测试:

class MyTest {
    @Test
    fun testMySuspendingFunction() = runBlocking<Unit> {
        // here we can use suspending functions using any assertion style that we like
    }
}

等待Job

当另一个协程在运行的时候推迟一段时间(来等待其完成)并不是一个好的方法。看如何等待(非阻塞环境)后台已经启动的Job完成。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    job.join() // wait until child coroutine completes
}

达到了预期目标,并且main协程不再与job的执行时间绑定。

函数的抽取

我们将 launch(CommonPool) { ... }内部的代码抽取成多个函数,当你使用IDE自带的"Extract function",会获得一个新的带着suspend修饰符的函数。

suspend函数能如同普通函数一样在协程中自由使用。他们额外的功能是能在函数内部使用其他的suspending函数,比如delay

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) { doWorld() }
    println("Hello,")
    job.join()
}

// this is your first suspending function
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

协程是轻量级的

执行下面的代码

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = List(100_000) { // create a lot of coroutines and list their jobs
        launch(CommonPool) {
            delay(1000L)
            print(".")
        }
    }
    jobs.forEach { it.join() } // wait for all jobs to complete
}

启动了100k的协程来打印.,你可以试试使用线程;)

协程类似守护线程

下面的代码启动了一个协程在后台打印。

fun main(args: Array<String>) = runBlocking<Unit> {
    launch(CommonPool) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // just quit after delay
}

执行之后得到结果,程序退出。

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...

活跃的协程并不会想线程一样保证程序活跃,这一点像守护线程。

取消和超时

取消协程执行

在小型程序中,可以通过main来中断所有协程,但是在大型,长期执行的程序中,你可能需要细粒度的控制协程。launch函数返回了一个Job,可以用来取消正在运行中的协程:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to ensure it was cancelled indeed
    println("main: Now I can quit.")
}

程序执行

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

当main执行了job.cancel之后,我们将不会看到其他协程的输出。

取消是协商式的

协程取消是协商式的,一个协程必须协商实现可取消。kotlinx.coroutines里的所有函数都是可取消的。它们检查cancellation状态,并且抛出CancellationException 。然而,如果一个协程在运算中并且不检查cancellation,那么,它就不会被取消。

fun main(args: Array<String>) = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val job = launch(CommonPool) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 10) { // computation loop, just wastes CPU
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to see if it was cancelled....
    println("main: Now I can quit.")
}

使代码可取消

有两个方式使运算代码可取消,第一个是定期调用中断函数,比如yield函数。另一个方式是明确检查cancellation状态。

fun main(args: Array<String>) = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val job = launch(CommonPool) {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // cancellable computation loop
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to see if it was cancelled....
    println("main: Now I can quit.")
}

可以将上面的isActive换成i < 10来检查一下效果。

可以看到,现在的循环能被取消。在协程里可以访问isActive,这个属性来自于CoroutineScope对象。

使用finally关闭资源

当取消时会抛出CancellationException,可以用常规的方式惊醒handle,比如try {...} finally {...} 或者 Kotlin 的use

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("I'm running finally")
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to ensure it was cancelled indeed
    println("main: Now I can quit.")
}

不可取消代码块

之前的例子中,在finally块中调用suspending函数会导致CancellationException,因为代码所在的协程已经被取消了。通常,这并不是一个问题,好的代码或者习惯通常是非阻塞的,也不会调用其他suspending函数。然而,如果你非要这么做,也是可以的。那就是将代码用run(NonCancellable) {...}包起来。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            run(NonCancellable) {
                println("I'm running finally")
                delay(1000L)
                println("And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to ensure it was cancelled indeed
    println("main: Now I can quit.")
}

超时

很多时候取消一个协程的理由是执行时间太长,这里不需要你手动去检查job对应的协程所执行的时间。withTimeout函数会帮你操办这一些。

fun main(args: Array<String>) = runBlocking<Unit> {
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
}

代码会抛出一个TimeoutException,这是CancellationException的一个子类,我们不会看到它的调用栈,因为取消协程是一个很正常的操作。

组成中断函数

常规序列

假定我们有两个中断函数(假装是从远端获取数据好了)。

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

如果我们想顺序的执行它们,先执行one然后two,然后相加他们的结果。(继续假装我们需要通过one的结果来判断是否需要two)

我们只需要用常规的顺序进行调用,因为在协程里的代码和普通代码也没有区别。默认是顺序的。

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println("The answer is ${one + two}")
    }
    println("Completed in $time ms")
}

执行会获得下面的结果

The answer is 42
Completed in 2017 ms

并发和协程

如果one和two之间没有依赖,我们只想快一点得到两个函数的返回值,并发的执行他们?async会帮组我们。

在概念上,aync和launch相识。启动一个单独的协程,和其他协程异步执行。区别在于launch返回Job,并不携带任何返回值。但是aync返回一个Deferred——一个轻量级的非阻塞futrue,相当于对于未来返回值的promise,可以通过调用.await() 来获取返回值,Deferred也是一个Job,所以你可以取消它。

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(CommonPool) { doSomethingUsefulOne() }
        val two = async(CommonPool) { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

执行会获得下面的结果

The answer is 42
Completed in 1017 ms

异步懒启动

async有一个懒启动选项,可以使用 CoroutineStart.LAZY 参数。这将让协程只有它的返回值被需要(调用.await())或者手动启动(.start())才会启动。

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

执行会获得下面的结果

The answer is 42
Completed in 2017 ms

当然,这并不是设想中的使用场景。它被用来替换标准的lazy用在suspending函数中。

异步风格函数

我们可以定义异步风格的函数使用async协程builder,建议使用async或者Async作为前缀。

// The result type of asyncSomethingUsefulOne is Deferred<Int>
fun asyncSomethingUsefulOne() = async(CommonPool) {
    doSomethingUsefulOne()
}

// The result type of asyncSomethingUsefulTwo is Deferred<Int>
fun asyncSomethingUsefulTwo() = async(CommonPool)  {
    doSomethingUsefulTwo()
}

注意,这些asyncXXX并不是中断函数,可以用在任何位置。

// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
    val time = measureTimeMillis {
        // we can initiate async actions outside of a coroutine
        val one = asyncSomethingUsefulOne()
        val two = asyncSomethingUsefulTwo()
        // but waiting for a result must involve either suspending or blocking.
        // here we use `runBlocking { ... }` to block the main thread while waiting for the result
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}

协程上下文和调度

我们在上面使用了launch(CommonPool) {...}async(CommonPool) {...}run(NonCancellable) {...}。CommonPool和NonCancellable都是协程上下文。这一章将有更多选择。

调度和线程

协程上下文包含协程调度器(coroutine dispatchers),用来确定对应协程执行的线程。协程调度器能将协程封闭在某个特定的线程里,调度到线程池里或者让其自由运行,试一下下面的代码:

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // not confined -- will work with main thread
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
        println("      'CommonPool': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
        println("          'newSTC': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

运行会获得下面的结果

      'Unconfined': I'm working in thread main
      'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
          'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main

coroutineContext和Unconfined的区别会在下面介绍。

####Unconfined vs confined 调度器

Unconfined协程调度器启动协程在调用者所在的线程,但是只能保证到第一个中断点。中断过后,在哪个线程恢复运行,由是调用的中断函数决定。Unconfined协程调度器非常适合用于既不消耗CPU也不会修改共享数据的场景。

在另一方面,coroutineContext属性只有在协程block里可以使用,其来自于CoroutineScope接口,是当前协程所在上下文的引用。这样上下文可以被继承。runBlocking的默认上下文就是调用其所在的线程。

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // not confined -- will work with main thread
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("      'Unconfined': After delay in thread ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

运行会获得下面的结果

      'Unconfined': I'm working in thread main
'coroutineContext': I'm working in thread main
      'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
'coroutineContext': After delay in thread main

所以,main里的协程继承了runBlocking{…}的上下文,Unconfined的协程恢复在delay函数所使用的DefaultExecutor里。

协程和线程Debug

当使用Unconfined调度器或者多线程的调度器(比如CommonPool),协程能在一个线程里中断,然后唤醒在另外一个线程。即使使用单线程的调度器也是很难弄清楚协程在什么时候在哪个线程干了什么。多线程debug常用的解决方案是打印出所在线程的名字。这个功能被各种log框架支持。当时用协程时,只有线程的名字并不能提供太多的信息,所以kotlinx.coroutines 提供了一个简单的方法让debug更简单。

使用 -Dkotlinx.coroutines.debug 参数运行下面代码:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking<Unit> {
    val a = async(coroutineContext) {
        log("I'm computing a piece of the answer")
        6
    }
    val b = async(coroutineContext) {
        log("I'm computing another piece of the answer")
        7
    }
    log("The answer is ${a.await() * b.await()}")
}

这里有三个协程,分别为main协程#1,即runBlocking。以及a和b分别为#2和#3,他们执行在runBlocking内部。也被调度器调度到main协程所在线程执行。输出如下

[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

可以看到,log函数将线程名字放在方括号里,都是main线程。但是后面加上了协程的识别符。协程创建时会被分配一个连续的识别符。当debug模式打开时。

可以从 newCoroutineContext 函数文档里获取更多debug相关资料。

线程之间切换

开启debug模式之后运行下面代码

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) {
    val ctx1 = newSingleThreadContext("Ctx1")
    val ctx2 = newSingleThreadContext("Ctx2")
    runBlocking(ctx1) {
        log("Started in ctx1")
        run(ctx2) {
            log("Working in ctx2")
        }
        log("Back to ctx1")
    }
}

这里示范了两个新的用法,一个是调用runBlocking时指定上下文,另一个是使用run函数来切换协程所在上下文,但依旧保持为同一个协程,运行在不同线程和上下文。可以看下面的输出

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

上下文Job

协程的Job是其上下文的一部分,协程能从自己上下文获取自己的Job。通过coroutineContext[Job]

fun main(args: Array<String>) = runBlocking<Unit> {
    println("My job is ${coroutineContext[Job]}")
}

执行效果如下

My job is BlockingCoroutine{Active}@65ae6ba4

所以,CoroutineScope 里的 isActive 实际上只是coroutineContext[Job]!!.isActive的快捷方式。

子协程

当一个协程的coroutineContext被用来启动其他的协程,那么,新的协程的Job会成为该协程的子Job,当父协程被取消时,所有的子协程也会被递归的取消。

fun main(args: Array<String>) = runBlocking<Unit> {
    // start a coroutine to process some kind of incoming request
    val request = launch(CommonPool) {
        // it spawns two other jobs, one with its separate context
        val job1 = launch(CommonPool) {
            println("job1: I have my own context and execute independently!")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        // and the other inherits the parent context
        val job2 = launch(coroutineContext) {
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
        // request completes when both its sub-jobs complete:
        job1.join()
        job2.join()
    }
    delay(500)
    request.cancel() // cancel processing of the request
    delay(1000) // delay a second to see what happens
    println("main: Who has survived request cancellation?")
}

代码执行结果如下

job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?

结合上下文

协程上下文可以通过+操作符结合在一起,右边的上下文会代替左边的上下文。举个例子,当替换掉调度器的时候,父协程的Job可以被继承。

fun main(args: Array<String>) = runBlocking<Unit> {
    // start a coroutine to process some kind of incoming request
    val request = launch(coroutineContext) { // use the context of `runBlocking`
        // spawns CPU-intensive child job in CommonPool !!! 
        val job = launch(coroutineContext + CommonPool) {
            println("job: I am a child of the request coroutine, but with a different dispatcher")
            delay(1000)
            println("job: I will not execute this line if my parent request is cancelled")
        }
        job.join() // request completes when its sub-job completes
    }
    delay(500)
    request.cancel() // cancel processing of the request
    delay(1000) // delay a second to see what happens
    println("main: Who has survived request cancellation?")
}

执行结果如下

job: I am a child of the request coroutine, but with a different dispatcher
main: Who has survived request cancellation?

协程命名

自动分配标识符在大多数情况下都是可行的。然而,当协程被绑定到执行特定的后台任务或者请求的时候,名字更方便debug,CoroutineName 起到了给协程起名的作用。当debug模式开启的时候,将会显示具体的协程名。

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
    log("Started main coroutine")
    // run two background value computations
    val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
        log("Computing v1")
        delay(500)
        252
    }
    val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
        log("Computing v2")
        delay(1000)
        6
    }
    log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}

开启-Dkotlinx.coroutines.debug ,执行结果如下

[main @main#1] Started main coroutine
[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42

取消明确任务

将我们对上下文,子协程,Job的了解放在一起,假定我们应用有一个对象有生命周期,但是这个对象并不是一个协程,比如,我们开发一个Andorid应用,在activity中启动了多个协程来执行异步任务,比如获取数据,动画等待。当activity销毁的时候,所有的协程都必须被取消,以防内存泄露。

我们能管理多个协程的生命周期,通过构造一个Job的实例,然后将其绑到activity的生命周期中。使用Job()工厂函数来构造Job实例,如下面代码所示,只要确保所有的协程都通过Job启动,然后调用Job.cancel 就能中断所有的协程了。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = Job() // create a job object to manage our lifecycle
    // now launch ten coroutines for a demo, each working for a different time
    val coroutines = List(10) { i ->
        // they are all children of our job object
        launch(coroutineContext + job) { // we use the context of main runBlocking thread, but with our own job object
            delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
            println("Coroutine $i is done")
        }
    }
    println("Launched ${coroutines.size} coroutines")
    delay(500L) // delay for half a second
    println("Cancelling job!")
    job.cancel() // cancel our job.. !!!
    delay(1000L) // delay for more to see if our coroutines are still working
}

执行结果如下

Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Cancelling job!

如上所示,只有前三个协程打印出了消息,其他的协程都被取消了。所以我们只需要创建一个父Job,将子协程绑定到其中,当job取消的时候,所有子协程都会被取消。

Channels

Deferred提供了简单的从conroutines获取返回对象的方法。Channels则提供了传递对象流的简单方法。

Channel基本概念

Channel在概念上非常类似于BlockingQueue。一个关键的不同点在于使用中断方法send取代阻塞方法put,用中断方法receive代替阻塞方法take

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch(CommonPool) {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

代码输出如下

1
4
9
16
25
Done!

Channels的关闭与遍历

和队列不一样,Channel能够被关闭,表明没有新的元素放入。在接收端可以很方便的使用for来进行迭代循环接受channel的所有元素。

在概念上,关闭就像是发送一个特殊的关闭token到channel中,当迭代到这个特殊的token,就会自动停止。所以,在关闭之前发送的元素都是能保证被接收者接收的。

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch(CommonPool) {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
}

绑定channel生产者

启动一个协程来充当生产者是很常见的一个设计模式。这是生成-消费模型的一部分。你可以抽象一个生产者的函数来接受一个channel来作为参数,但是这与常规的做法有点不一样,一般都是函数返回结果。

这里有一个非常方便的协程builder——produce,使得生产者一端非常简单。还有一个拓展函数conconsumeEach,可以代替for用在消费者一端。

fun produceSquares() = produce<Int>(CommonPool) {
    for (x in 1..5) send(x * x)
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

管道

一个协程生产数据流(可能是无限的)的模式,称之为管道。

fun produceNumbers() = produce<Int>(CommonPool) {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

其他协程去消费这个数据流,做一些处理,产生一些结果。下面的例子就是简单的平方。

fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
    for (x in numbers) send(x * x)
}

启动协程,并把他们连在一起。

fun main(args: Array<String>) = runBlocking<Unit> {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    for (i in 1..5) println(squares.receive()) // print first five
    println("Done!") // we are done
    squares.cancel() // need to cancel these coroutines in a larger app
    numbers.cancel()
}

上面的例子里,其实并不需要去取消这两个协程。因为协程和守护线程一样。但是在大一些的应用里,如果不再需要,我们还是需要关闭管道。当然,我们也能以子协程执行管道。

质数管道

这里来尝试一下极限一点的例子,使用管道来生成质数。

fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

下面的管道过滤数据流,移除所有能被之前确定的质数整除的数字。

fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
    for (x in numbers) if (x % prime != 0) send(x)
}

现在,我们构建一个从2开始的无限数据流,从里取出第一个质数(2是质数),然后对数据流进行过滤,如果不能被这个数整除,说明他也可能是质数,生成一个新的数据流,再取出第一个数,再进行过滤……

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... 

代码如下

fun main(args: Array<String>) = runBlocking<Unit> {
    var cur = numbersFrom(coroutineContext, 2)
    for (i in 1..10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(coroutineContext, cur, prime)
    }
}

运行代码结果如下

2
3
5
7
11
13
17
19
23
29

当然,你可以用迭代器之类的来实现这个流程,这个的优点可以充分利用CPU,如果你使用 CommonPool。

这是一种非常不现实的实现方式。但是展示了一些优点,可中断,异步。

显而易见,越往后面,速度越慢,比正常人写出的代码慢次方级别。

Fan-out

多个协程可能从同一个channel里获取数据,然后进行分布式的工作。我们先创建一个简单的生产者协程。

fun produceNumbers() = produce<Int>(CommonPool) {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

然后再创建一些消费者协程,简单的打印出他们获取的数值。

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
    channel.consumeEach {
        println("Processor #$id received $it")
    }    
}

我们启动这些协程,让他们工作一会儿,看发生了什么。

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

结果可能像下面这样

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

注意,取消producer协程会关闭channel,处理的迭代也会终止。

Fan-in

多个生产者可以发送数据到同一个channel。

Buffered channels

到现在为止,使用的channel都是没有缓冲的,消费者直接面对生产者(没有中间商赚差价),当其中有一个暂停了,另外一个也会被暂停。

Channel()函数也接受一个可选参数capacity,可以自定义缓冲的长度,缓冲区允许发送者发送多个数据到缓冲区直至满了之后中断。有点类似BlockingQueue,当缓冲区满了,就会阻塞。

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>(4) // create buffered channel
    launch(coroutineContext) { // launch sender coroutine
        repeat(10) {
            println("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(1000)
}

会打印"sending"五次

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

公平Channel

发送和接收操作在channel里是公平的,取决于多个协程调用的次序。

下面的例子就是两个协程"ping"和"pong",在"table"接收和发送"ball"。

data class Ball(var hits: Int)

fun main(args: Array<String>) = runBlocking<Unit> {
    val table = Channel<Ball>() // a shared table
    launch(coroutineContext) { player("ping", table) }
    launch(coroutineContext) { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    table.receive() // game over, grab the ball
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}

ping会首先启动,所以它先拿到ball,虽然ping立马又进入了循环,但是ball还是会被pong拿到,因为他已经处于等待状态了。

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
ping Ball(hits=5)

共享可变状态和并发

能够使用多线程的调度器(比如CommonPool)来启动多个协程。这也带来了通常的并发问题。最大的问题在于如何同步的访问共享的可变状态。有些解决方案和多线程一样,有些则是比较特别。

问题

我们创建1000个协程,重复1000次,启动了100万个协程。我们也会测量这些操作消耗的时间。

suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
    val n = 1000 // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch(context) {
                repeat(k) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${n * k} actions in $time ms")    
}

我们使其做一个非常简单的操作,就是自增一个共享变量。

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter++
    }
    println("Counter = $counter")
}

这会产生什么呢?很大的可能是不会打印"Counter = 1000000",因为1000个协程从多个线程同时去自增counter,并没有任何同步机制。

Volatiles并无作用

通常有一个误解,给一个变量加上volatile就能解决并发问题,我们先试试。

@Volatile // in Kotlin `volatile` is an annotation 
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter++
    }
    println("Counter = $counter")
}

代码运行比之前会更慢,但是我们仍然得不到"Counter = 1000000",那是因为volatile变量保证原子操作(比如读和写),但是并不保证复合操作(自增)。

线程安全的数据结构

对于多线程和协程带来的并发问题,可以使用常用的线程安全数据结构。

var counter = AtomicInteger()

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter.incrementAndGet()
    }
    println("Counter = ${counter.get()}")
}

这是解决这个问题最简单快捷的方法,适用于常见的计数器,集合,队列等标准的数据结构以及对其标准操作。然而,很难通用到对一些复杂对象和复杂操作上去。因为他们并没有开箱可用的线程安全实现。

细粒度线程控制

线程限制是将多线程去访问一个共享对象时限制为单个线程。一般用于UI一类的程序,将UI状态绑定在单线程的事件分发中。我们也能很方便使用单线程的调度器启动多个协程。

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) { // run each coroutine in CommonPool
        run(counterContext) { // but confine each increment to the single-threaded context
            counter++
        }
    }
    println("Counter = $counter") 
}

代码运行的非常缓慢,这是因为多线程调度器启动的协程在自增操作时都得通过run切换到单线程里去。

粗粒度线程控制

通常情况下,线程限制都是对大块代码而言的,比如一大坨更新状态的业务逻辑。下面的例子就不是那样了,让每一个协程都跑在同一个协程中。

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(counterContext) { // run each coroutine in the single-threaded context
        counter++
    }
    println("Counter = $counter")
}

互斥锁

互斥是保护共享状态的修改部分不会被并发执行的方案。在阻塞世界里你可能已经用过synchronized 或者ReentrantLock 。在协程里,称之为Mutex ,拥有lock和unlock函数来划定关键操作代码范围。核心的区别在于Metux.lock 是中断函数,并不会阻塞线程。

val mutex = Mutex()
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        mutex.lock()
        try { counter++ }
        finally { mutex.unlock() }
    }
    println("Counter = $counter")
}

例子里的锁是细粒度的,所以会付出一些代价。然而,当你必须修改某些共享对象的状态,而又没有一个线程(前面例子里的 newSingleThreadContext("CounterContext"))来绑定在一起的适合,使用互斥锁是一个比较好的选择。

Actors

一个Actor是协程的组合,它的状态被封装在协程中,通过channel与其他协程通信。一个简单的actor能够协程函数,但是一个复杂的actor最好还是使用类来完成。

这儿又一个actor协程构建器,他有一个mailbox channel去接受请求

There is an actor coroutine builder that conveniently combines actor’s mailbox channel into its scope to receive messages from and combines the send channel into the resulting job object, so that a single reference to the actor can be carried around as its handle.

第一步是先定义一个类用来封装actor处理的消息,kotlin的sealed classes非常适合做这个。我们定义了CounterMsg作为消息类,子类IncCounter和GetCounter去自增和获取。返回一个 CompletableDeferred 作为异步返回值。

// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

然后我们定义一个函数启动一个actor

// This function launches a new counter actor
fun counterActor() = actor<CounterMsg>(CommonPool) {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

主要的代码如下

fun main(args: Array<String>) = runBlocking<Unit> {
    val counter = counterActor() // create the actor
    massiveRun(CommonPool) {
        counter.send(IncCounter)
    }
    // send a message to get a counter value from an actor
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // shutdown the actor
}

无论actor在哪个调度器的上下文里执行都是不影响的,一个actor是一个协程,协程会被顺序执行。所以,把协程和状态结合在一起是一个共享可变状态的选择。

Actor在负载下非常高效,因为在这种情况下它总是工作在自己的上下文里,根本不需要切换。

注意,actor协程构建器是双重的produce构建器。actor与接受消息的channel相关联,生产者与发送详细哦的channel相关联。

Select

select表达式允许在等待多个中断函数的时候,获取到最早恢复的那一个。

从channels里select

我们有两个producers:fizzbuzz ,一个每300ms产生一个"Fizz",一个每500ms产生一个"Buzz!"

fun fizz(context: CoroutineContext) = produce<String>(context) {
    while (true) { // sends "Fizz" every 300 ms
        delay(300)
        send("Fizz")
    }
}

fun fizz(context: CoroutineContext) = produce<String>(context) {
    while (true) { // sends "Fizz" every 300 ms
        delay(300)
        send("Fizz")
    }
}

使用receive中断函数,我们能够从每一个channel里获取,但是select表达式允许我们同时使用OnReceive来接收

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
    select<Unit> { // <Unit> means that this select expression does not produce any result 
        fizz.onReceive { value ->  // this is the first select clause
            println("fizz -> '$value'")
        }
        buzz.onReceive { value ->  // this is the second select clause
            println("buzz -> '$value'")
        }
    }
}

我们运行几次

fun main(args: Array<String>) = runBlocking<Unit> {
    val fizz = fizz(coroutineContext)
    val buzz = buzz(coroutineContext)
    repeat(7) {
        selectFizzBuzz(fizz, buzz)
    }
}

结果如下

fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'

处理关闭情况

当channel关闭时,OnReceive会导致select失败并抛出异常。我们能够使用OnReceiveOrNull来执行一些特殊操作,当channel关闭时。

suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
    select<String> {
        a.onReceiveOrNull { value -> 
            if (value == null) 
                "Channel 'a' is closed" 
            else 
                "a -> '$value'"
        }
        b.onReceiveOrNull { value -> 
            if (value == null) 
                "Channel 'b' is closed"
            else    
                "b -> '$value'"
        }
    }

加上一点测试代码

fun main(args: Array<String>) = runBlocking<Unit> {
    // we are using the context of the main thread in this example for predictability ... 
    val a = produce<String>(coroutineContext) {
        repeat(4) { send("Hello $it") }
    }
    val b = produce<String>(coroutineContext) {
        repeat(4) { send("World $it") }
    }
    repeat(8) { // print first eight results
        println(selectAorB(a, b))
    }
}

结果非常有意思,我们会分析下原因

a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed

通过观察

首先,当多个中断函数可以恢复的时候,select会优先恢复第一个。