进阶功能和概念

操作符组合

为了让代码简洁,增加复用性,Reactor提供了一些模式来达到这些目的。操作符组合就是其中一种。

transform

就是将一部分操作链作为一个函数嵌到另一个publisher的操作中。 transform的函数在组装的时候就被调用,确定好操作链。 对于不同的订阅者来说,是同一个操作链。

compose

达到的效果和transform差不多,区别在于,compose的函数会在每次订阅的时候都被调用,实际上可以产生不同的操作链。

热源与冷源

  • 冷源:如果没有被订阅,不会有操作发生(不会触发产生数据的操作)
  • 热源:不管是否有订阅,都会立马开始有操作产生(产生数据) 在热源的某些情况下,后面订阅的订阅者只能收到它订阅之后产生的新元素。 just: 在组装的时候,直接获取值,并且重放给所有的订阅者 defer:可以将just转换成一个冷源,每次订阅都会触发其包装的请求

ConnectableFlux:广播到多个订阅者

将数据生成的操作推迟到订阅者的订阅时,可能不是我们需要的,我们更希望多个订阅者集合,然后触发订阅和数据生成。

  • publish,动态的将下游的请求转发到上游(回压),如果有一个下游(订阅者)的需求为0,那么就会停止对上游的请求。
  • replay,从第一个订阅者开始缓冲数据,可以配置时间、数量 ConnectableFlux提供了额外的函数来管理订阅者对上游源头的请求。
  • connect,手动的触发对上游源头的订阅
  • autoConnect(n),订阅者达到n之后,自动向上游订阅
  • refCount(n),当没有足够的订阅者的时候,会断开和上游的请求。当达到足够的订阅者数量的时候,会重新产生一个新的订阅到上游源头
  • refCount(int,Duration),对断开操作加上一个平缓的时间

批量操作

将一个流分成一批一批,有三个解决方案

  • grouping
  • windowing
  • buffering

使用Flux<GroupedFlux<T»分组

将一个流Flux<T>分割成多个批次,每一个批次(组)就是一个GroupedFlux<T>,可以使用key()来获取用来分割特定的key。 每一组都不是连续,因为只有groupBy函数获取到上游的值才能知道将它分到哪一组。

  1. 每个元素都必定有一个组

  2. 组里的元素可以来自上游不同的位置

  3. 不可能为空(为空的话,压根不会产生

一些限制:

  • 分组的数量不能太大
  • 每一组的消费要足够快

窗口滑动Flux<Flux<T»

可以从数量,时间,边界条件等因素来进行窗口滑动。 和groupBy的一个区别就是,窗口滑动是连续的,也是可以重叠的,上游的元素也可能被抛弃。windowWhile是一个比较奇特的滑动,支持传入一个predicate,

  • 返回true,则开启一个window。
  • 返回false,
    • 如果当前存在window。则关闭这个window。
    • 如果当前没有开启的window,那么生成一个空的window。

使用Flux<List<T»缓冲

和windowing类似,区别主要有两点

  • 缓冲到Collection里,默认是List
  • 不会产生空的容器(window会产生空的flux)