进阶功能和概念
操作符组合
为了让代码简洁,增加复用性,Reactor提供了一些模式来达到这些目的。操作符组合就是其中一种。
transform
就是将一部分操作链作为一个函数嵌到另一个publisher的操作中。 transform的函数在组装的时候就被调用,确定好操作链。 对于不同的订阅者来说,是同一个操作链。
compose
达到的效果和transform差不多,区别在于,compose的函数会在每次订阅的时候都被调用,实际上可以产生不同的操作链。
热源与冷源
- 冷源:如果没有被订阅,不会有操作发生(不会触发产生数据的操作)
- 热源:不管是否有订阅,都会立马开始有操作产生(产生数据) 在热源的某些情况下,后面订阅的订阅者只能收到它订阅之后产生的新元素。 just: 在组装的时候,直接获取值,并且重放给所有的订阅者 defer:可以将just转换成一个冷源,每次订阅都会触发其包装的请求
ConnectableFlux:广播到多个订阅者
将数据生成的操作推迟到订阅者的订阅时,可能不是我们需要的,我们更希望多个订阅者集合,然后触发订阅和数据生成。
- publish,动态的将下游的请求转发到上游(回压),如果有一个下游(订阅者)的需求为0,那么就会停止对上游的请求。
- replay,从第一个订阅者开始缓冲数据,可以配置时间、数量 ConnectableFlux提供了额外的函数来管理订阅者对上游源头的请求。
- connect,手动的触发对上游源头的订阅
- autoConnect(n),订阅者达到n之后,自动向上游订阅
- refCount(n),当没有足够的订阅者的时候,会断开和上游的请求。当达到足够的订阅者数量的时候,会重新产生一个新的订阅到上游源头
- refCount(int,Duration),对断开操作加上一个平缓的时间
批量操作
将一个流分成一批一批,有三个解决方案
- grouping
- windowing
- buffering
使用Flux<GroupedFlux<T»分组
将一个流Flux<T>分割成多个批次,每一个批次(组)就是一个GroupedFlux<T>,可以使用key()来获取用来分割特定的key。 每一组都不是连续,因为只有groupBy函数获取到上游的值才能知道将它分到哪一组。
-
每个元素都必定有一个组
-
组里的元素可以来自上游不同的位置
-
不可能为空(为空的话,压根不会产生
一些限制:
- 分组的数量不能太大
- 每一组的消费要足够快
窗口滑动Flux<Flux<T»
可以从数量,时间,边界条件等因素来进行窗口滑动。 和groupBy的一个区别就是,窗口滑动是连续的,也是可以重叠的,上游的元素也可能被抛弃。windowWhile是一个比较奇特的滑动,支持传入一个predicate,
- 返回true,则开启一个window。
- 返回false,
- 如果当前存在window。则关闭这个window。
- 如果当前没有开启的window,那么生成一个空的window。
使用Flux<List<T»缓冲
和windowing类似,区别主要有两点
- 缓冲到Collection里,默认是List
- 不会产生空的容器(window会产生空的flux)