Kotlin 协程超简版笔记
[TOC]
基础
启动第一个协程
import kotlinx.coroutines.*
fun main() {
GlobalScope.launch { // launch new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
在CoroutineScope
中使用coroutine builder
启动协程。
连接阻塞与非阻塞
协程里的一些操作,必须要在非阻塞的环境里执行。可以使用runBlocking
,阻塞住当前线程,直至runBlocking
内部协程完成。
使用Job
启动一个协程后会返回Job
,可以用job.join()
以非阻塞的方式等待工作完成。
结构化并发
如果使用GlobalScope.launch
来启动一堆协程的话,这些协程都属于top-level
,即使协程非常轻量,数量多了之后还是会有一些问题,想办法持有所有协程的引用是个非常麻烦而且容易出错的事情。
我们可以使用在特定的coroutineScrop
来启动这些协程,在内部启动的协程全部结束之前,Scope
是不会结束的。
suspend 函数
使用suspend
修饰符,可以使用其他的suspend
函数,具有传染性。
协程与线程
协程是非常轻量级的,哪怕启动10K个,也不会有太大的压力。表现上和守护线程类似,不会阻止程序退出。
取消协程
协商取消
和线程类似,调用job.cancel()
即可。也是协商式的。有两个地方可以接收到取消协程的指令
- 某些
suspend
函数会检查取消状态,比如yield
和delay
- 手动显式检查取消状态,
isActive
就是一个很方便的CoroutineScope
拓展方法
取消协程,会抛出CancellationException
,很显然,我们是可以在内部使用try…catch…finally来进行一些额外处理的。
如果在finally块内使用suspend
函数,会再次抛出CancellationException
,毕竟协程已经被取消了,通常这不是一个问题,一般关闭资源类操作都不会受到影响。如果你真的有这种需求,可以使用
withContext(NonCancellable) {
delay(1000L)
//do somethin
}
至于withContext
,后面会再次提到。
超时
Kotlin总是有一些语法糖,比如常用的超时。
fun main() = runBlocking {
//sampleStart
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
//sampleEnd
}
当然,本质还是取消,还是协商式。抛出的错误则是TimeoutCancellationException
,是CancellationException
的子类,单纯是为了区分取消的原因不一样。
组合suspend函数
执行顺序
其实不管是suspend
函数,还是普通函数,在kotlin里面,都是顺序执行的。
异步执行
你需要显式的去启动一个协程来达到异步的效果,kotlin不缺少的就是语法糖。
async{
someSuspendFunction()
}
实际上就是类似于launch
,启动了一个协程,并把内部块的运算结果用Deferred
包装返回,类似于future
。区别在于他的await()
不会阻塞线程。
当然你也可以选择使用惰性启动(Lazy),只有显式调用start()
才会开始启动这个协程。
协程上下文和调度
当然,我说的是Context
和Dispatcher
协程总是在上下文中执行,包含大量的CoroutineContext
,包含之前提到的Job
,以及即将提到的Dispatcher
线程调度
调度器决定了协程在哪一个或者哪些线程上执行,可以是绑定到某个线程或者线程池,或者是随意运行。
所有的coroutine builder
都支持可选参数CoroutineContext
来声明其调度器。
- 不传参数,继承Scope的调度器
Dispatchers.Default
使用后台共享的线程池,比如GlobalScope.launch { ... }
Dispatchers.Unconfined
,在调用(caller)线程执行,直到下一次中断点,再次恢复时,所在的线程由中断点的suspend fuction
所在线程执行- 自定义线程池,顾名思义
Dispatchers.Unconfined
不适用于常规场景。
上下文切换
可以使用withContext
在不同的上下文之间切换,实际上就是不同线程间切换。
import kotlinx.coroutines.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main() {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
我们需要使用 -Dkotlinx.coroutines.debug
作为JVM参数,来debug协程信息。
也可以使用CoroutineName
给协程命名,正如对thread命名一样。
协程的Job
是上下文的一部分,可以通过coroutineContext[Job]
来获取,isActive
只是coroutineContext[Job]?.isActive == true
的快捷方式。
子协程
当协程在某个CoroutineScope
启动的时候,它就继承了CoroutineScope.coroutineContext
,新协程的Job也是父协程Job的儿子。当父协程取消,子协程也会被取消。
但是使用GlobalScope
来启动协程的时候,就不会与父Scope扯上关系了。
在所有的子协程没有完成之前,父协程不会完成,所以无需显式的调用join来等待所有子协程。
上下文元素组合
我们谈到了很多上下文元素,比如调度器,命名等,我们可以简单用+
把他们给连接起来传给builder
。
显式Job
有时候我们需要管理的生命周期对象并不是一个协程,但是它可能会启动一堆协程,比如Android activity。我们可以通过创建一个Job实例,将启动的协程都绑定到这个job上,起到统一管理的作用。
class Activity : CoroutineScope {
lateinit var job: Job
fun create() {
job = Job()
}
fun destroy() {
job.cancel()
}
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + job
fun doSomething() {
// launch ten coroutines for a demo, each working for a different time
repeat(10) { i ->
launch {
delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
}
}
}
doSomething
中启动的协程,都继承了父CoroutineScope
中的元素。就是Dispatchers.Default + job
ThreadLocal
在多线程中,ThreadLocal
是非常方便的一个用法。然而在协程中,情况就不太一样了。协程并不全被绑定到某个特定的线程中。
Kotlin为ThreadLocal准备了一个asContextElement
的拓展函数,使用了一个额外的上下文元素来保存ThreadLocal
中的值,并且在每次切换上下文的时候进行恢复。
import kotlinx.coroutines.*
val threadLocal = ThreadLocal<String?>() // declare thread-local variable
fun main() = runBlocking<Unit> {
threadLocal.set("main")
println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
yield()
println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}
job.join()
println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}
这里有一个关键的限制,当ThreadLocal
值被修改的时候,新的值不会通知到协程调用者。下一次中断后,修改值会丢失。
当然,对于可变对象,还是封装更合适。
错误处理
错误传递
Coroutine Builder
对于错误传递有两种表现:
- 自动传递Exception (
launch
和actor
) - 将其暴露给用户 (
async
和produce
)
前者将未处理的异常,交给某个handler
统一处理,类似于Java的Thread.uncaughtExceptionHandler
。
后者会将错误交给用户最终处理。
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = GlobalScope.launch {
println("Throwing exception from launch")
throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
}
job.join()
println("Joined failed job")
val deferred = GlobalScope.async {
println("Throwing exception from async")
throw ArithmeticException() // Nothing is printed, relying on user to call await
}
try {
deferred.await()
println("Unreached")
} catch (e: ArithmeticException) {
println("Caught ArithmeticException")
}
}
CoroutineExceptionHandler
我们可以使用CoroutineExceptionHandler
作为一个上下文元素,传递给builder。
在JVM平台上,我们也可以通过ServiceLoader,设置一个全局的CoroutineExceptionHandler
。
当然,CoroutineExceptionHandler
只会在Coroutine Builder
选择不将错误交给用户处理的时候才生效。具体如下
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
val job = GlobalScope.launch(handler) {
throw AssertionError()
}
val deferred = GlobalScope.async(handler) {
throw AssertionError() // Nothing will be printed
}
joinAll(job, deferred)
}
取消和异常
取消和异常紧密相关,协程内部使用CancellationException
来作为取消的实现。这个异常被所有的handler
忽略,即使可以通过catch块来获取,只应该被用来debug。当一个协程被Job.cancel
取消,实际上是抛出了一个不包含原因 的 CancellationException
,但不会影响到父协程。
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
val child = launch {
try {
delay(Long.MAX_VALUE)
} finally {
println("Child is cancelled")
}
}
yield()
println("Cancelling child")
child.cancel()
child.join()
yield()
println("Parent is not cancelled")
}
job.join()
}
但是,如果子协程被其他错误取消的话,父协程也会收到这个错误的影响。会先取消掉其他子协程,最后再处理错误。
这个行为无法被重写,也不受到CoroutineExceptionHandler
的影响,这个设计为结构化并发提供了稳定的层次结构。
错误聚合
如果多个子协程同时抛出错误,先抛先赢。第一个被抛出的错误会被handler捕获,这样会造成错误信息丢失。
Supervision Job
之前提到的异常传播,都是水平和垂直双向的,有时候我们并不希望子协程的异常影响到父进程。
SupervisorJob
就可以起到这个效果,异常带来的取消只会向下传播。
import kotlinx.coroutines.*
fun main() = runBlocking {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
// launch the first child -- its exception is ignored for this example (don't do this in practice!)
val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
println("First child is failing")
throw AssertionError("First child is cancelled")
}
// launch the second child
val secondChild = launch {
firstChild.join()
// Cancellation of the first child is not propagated to the second child
println("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
// But cancellation of the supervisor is propagated
println("Second child is cancelled because supervisor is cancelled")
}
}
// wait until the first child fails & completes
firstChild.join()
println("Cancelling supervisor")
supervisor.cancel()
secondChild.join()
}
}
Supervision scope
结构化并发也可以使用SupervisionScope
import kotlinx.coroutines.*
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
supervisorScope {
val child = launch(handler) {
delay(1000L)
println("Child throws an exception")
throw AssertionError()
}
val child2 = launch(handler) {
child.join()
println("Child2 still run")
}
println("Scope is completing")
}
println("Scope is completed")
}
和普通的CoroutineScope
的一个重要区别就是错误处理,由于子协程的异常不会传递给父协程,所以每个协程需要独立处理自己的异常。
Channels(experimental)
协程之间相互传单个值使用Deferred
会非常方便,Kotlin也提供了一个传递流的方法。
Channel基础
和BlockingQueue
非常类似。不同的是,channel
是非阻塞的,而且可以关闭表示不会有新数据流入,所以可以直接对其进行for
循环。
关闭类似于发送了一个特殊的close token到channel
中,在没有收到这个token之前,是不会关闭的,这也保证了在关闭之前发送的值都会被处理。
Channel 生产者
使用协程来产生数据流是非常常见的,和生产消费者模式类似。kotlin提供了抽象的生成器,不同于函数返回单个值,而是直接返回一个channel。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
fun main() = runBlocking {
//sampleStart
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
//sampleEnd
}
管道Pipeline
某个协程生产数据流,也许是无限生产,后面紧跟着另外的协程去处理或者消费这些数据流,产生另外的数据,这就组成了一个管道。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // 产生1,2,3,4,5……
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
fun main() = runBlocking {
var cur = numbersFrom(2)
for (i in 1..10) {
val prime = cur.receive() //取到prime
println(prime)
cur = filter(cur, prime)//对cur过滤,过滤掉所有能整除prime的数
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
这里可能需要解释下
cur = numbersFrom(2)
get a prime //2
cur -> cur.filter(prime) //过滤掉cur中所有能整除2的数值
get a prime //3
cur -> cur.filter(prime) //再次过滤掉cur中能整除3的数值
get a prime //5
……
这里可以提一下的是,整个过程启动了很多协程,但我们不需要手动去一个个关闭(也做不到)。这提现了结构化的好处。
Fan-out & Fan-in
对channel做一些并发的写和读,都是没有问题的。
Buffered channels
到目前为止,channel都是不带buffer的,生产者和消费者之间面对面(没有中间商赚差价),生产者没有准备好之前,receive会被暂停(suspend),消费者没有准备好之前,send也会被暂停。
创建channel的时候,可以使用可选参数capacity
来指定buffer的长度。在buffer被填充满之前,send不会被暂停,buffer空之前,receive也不会被暂停。
公平channels
不管是发送还是接受操作,对于channel来说,都是FIFO,先被调用(invoke
)的先执行(operate
)。
滴答channels/Ticker channels
Kotlin总是这么贴心的准备一大堆有用的没用的糖果。Ticker channel 会定时的产生Unit,但并不会累积。
共享可变状态和并发
协程在大多数情况下都是并发操作的,多线程下会遇到的问题,协程一样也会遇到。
问题
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
var counter = 0
fun main() = runBlocking<Unit> {
GlobalScope.massiveRun {
counter++
}
println("Counter = $counter")
}
常见的并发操作,一般情况下,是很容易出现counter不是我们预期值得情况。
Volatiles没有帮助
在这个问题中,给counter加上volatile修饰符并不会解决问题,因为这个只能解决可见性。只能保证每个协程获取counter的时候,是最新的,但是随后的+1,写入,并不是原子性的。
原子操作类型
最简单最快捷的一个办法就是使用AtomicInteger
,这个问题就被妥善解决了,但是在其他复杂情况下,并没有这么容易,因为没有这么多开箱可用的原子操作类。
线程细粒度控制
val counterContext = newSingleThreadContext("CounterContext")
fun main() = runBlocking<Unit> {
GlobalScope.massiveRun { // run each coroutine with DefaultDispathcer
withContext(counterContext) { //confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
}
虽然每次启动的协程都是DefaultDispathcer
,是跑在共享的线程池上的,但是一旦到了读写数据的阶段,切换成单线程的上下文。这样能解决问题,但是会导致速度很慢。时间都耗在上下文切换上了。
线程粗粒度控制
val counterContext = newSingleThreadContext("CounterContext")
fun main() = runBlocking<Unit> {
// run each coroutine in the single-threaded context
CoroutineScope(counterContext).massiveRun {
counter++
}
println("Counter = $counter")
}
这个更粗暴了,直接让启动的所有协程都在同一个单线程上。
互斥锁Mutex
和多线程里的锁类似,作用就是让关键操作不并发执行。这个和 synchronized
和 ReentrantLock
的区别主要在于是暂停,而不是阻塞线程。
Kotlin也准备了方便的糖
mutex.withLock{
//do something
}
等同于
mutex.lock()
try{
//do something
}finally{
mutex.unlock()
}
所以方案如下
val mutex = Mutex()
fun main() = runBlocking<Unit> {
GlobalScope.massiveRun {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
Actors
一个Actor
是由协程,被封装到协程中的状态,和其他携程沟通的channel组成的实体。
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
使用actor builder
// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
最后,程序被改为
fun main() = runBlocking<Unit> {
val counter = counterActor() // create the actor
GlobalScope.massiveRun {
counter.send(IncCounter)
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
这样自增的操作与调用counter.send(IncCounter)
的协程无关,它被封在actor
里面,由一个独立的协程执行。协程内部是顺序执行的,这样可以解决共享可变状态的问题。
显而易见,在高负载的情况下,actor
的效能很高很多,独立的协程会一直有事可做,而且也避免了上下文切换和获取锁。
Select 表达式(experimental)
select
表达式可以同时等待多个暂停函数,并选择第一个可用的暂停函数执行。
Selecting to receive
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
"a -> '$value'"
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
"b -> '$value'"
}
}
fun main() = runBlocking<Unit> {
val a = produce<String> {
repeat(4) { send("Hello $it") }
}
val b = produce<String> {
repeat(4) { send("World $it") }
}
repeat(12) { // print first eight results
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
输出结果
a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed
Channel 'a' is closed
Channel 'a' is closed
Channel 'a' is closed
Channel 'a' is closed
这说明
- select准备好的suspend函数。
- 准备好的clause的先执行。
- onReceiveOrNull遇到close会直接一直返回null
这里需要说明的是2。
这一块有点复杂……
Selecting to send
这一块没啥好说的,基本和receive一致。
Selecting deferred values
也没啥好说的。