• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Project Reactor源码阅读-flatMap

武飞扬头像
juejin
帮助152

功能介绍

public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)

image.png

将外部Publisher发射出来的元素转换成内部Publisher,然后将这些内部的Publisher合并到一个Flux中,允许元素交错

  1. mapper:转换函数,将外部Publisher发射出来的元素转换成一个新的Publisher
  2. concurrency:针对外部Publisher的最大请求数量,同时也是scalarQueue队列大小。可通过reactor.bufferSize.x属性配置,默认32
  3. prefetch:针对内部Publisher的最大请求数量,同时也是innerQueue队列大小。可通过reactor.bufferSize.small属性配置,默认256

image.png

代码示例

public Flux<Integer> flat(int delayMillis, int i) {
    return delayPublishFlux(delayMillis, i * 10, i * 10   5);
}

@Test
public void test() {
    delayPublishFlux(100, 1, 6)
            .doOnRequest(r -> logLong(r, "main-request"))
            .flatMap((i) -> flat(1000, i).doOnRequest(r -> logLong(r, "inner-request"))
                    .subscribeOn(Schedulers.newElastic("inner")), 3, 2)
            .subscribe(i -> logInt(i, "消费"));
    sleep(10000);
}

image.png

image.png

可以看到mian-request最大是3,inner-request最大是2。证明了concurrencyprefetch的作用。

源码分析

首先看一下flatMap()操作符在装配阶段做了什么。

Flux#flatMap()

image.png

创建FluxFlatMap对象,它既是Subscriber,又是Subscription

另外还创建了2个队列Supplier,第一个是以concurrency为大小创建的mainQueueSupplier(创建scalarQueue),另一个是以prefetch为大小的创建的innerQueueSupplier。接下来查看订阅阶段发生了什么。

Flux#subscribe()

image.png

因为FluxFlatMap实现了OptimizableOperator接口,实际的Subscriber是通过调用subscribeOrReturn()返回的。

FlatMapMain#onSubscribe()

image.png

这里最关键的是调用request()向上游请求数据,请求数量是maxConcurrency。这正是flatMap()方法传入的concurrency。当数据下发时,肯定会调用onNext()

FlatMapMain#onNext()

image.png

  1. 将当前元素转换成一个Publisher
  2. 如果转换后的PublisherCallable,则直接获取元素调用tryEmitScalar()下发。
  3. 否则创建FlatMapInner对象,用它来订阅Publisher

在前面的方法定义中提到过:flatMap()支持并发处理,允许元素交错

看到这里,我们能得出一个推论:flatMap()是否真的会并发处理,取决转换后的Publisher是否支持异步订阅,即p.subscribe(innner)是否异步执行。

代码验证

首先回顾一下前面讲过的publishOn()subScribeOn()工作机制。

publishOn():在onNext()onComplete()onError()方法进行线程切换,publishOn()使得它下游的消费阶段异步执行。 subScribeOn():在subscribe的时候进行线程切换,subscribeOn()使得它上游的订阅阶段以及整个消费阶段异步执行。

同步执行(不会发生交错)

@Test
public void testSync() {
    delayPublishFlux(100, 1, 6)
            .flatMap((i) -> flat(1000, i), 3, 2)
            .subscribe(i -> logInt(i, "消费"));
    sleep(10000);
}

重点是去掉了flat()subscribeOn()调用。

image.png

同步执行,元素没有发生交错。

subscribeOn()异步执行

@Test
public void testSubscribeOn() {
    delayPublishFlux(100, 1, 6)
            .flatMap((i) -> flat(1000, i).subscribeOn(Schedulers.newElastic("inner")), 3, 2)
            .subscribe(i -> logInt(i, "消费"));
    sleep(10000);
}

image.png

异步执行,并且此时最大并发是3。

publishOn()异步执行

@Test
public void testPublishOn() {
    delayPublishFlux(100, 1, 6)
            .flatMap((i) -> flat(10, i)
                    .publishOn(Schedulers.newElastic("inner"))
                    // 故意让下游执行慢一点
                    .doOnNext(x -> sleep(1000)), 3, 2)
            .subscribe(i -> logInt(i, "消费"));
    sleep(10000);
}

image.png

异步执行,并且此时最大并发是3。

通过以上代码可以得知,如果内部Publisher生产数据慢,推荐使用subscribeOn()。如果只是内部Publisher消费速度慢,推荐使用publishOn()。如果生产都消费都慢的话,两个操作符一起使用

在调用内部Publisher的subscribe()方法之后,后续肯定会执行FlatMapInner#onSubscribe()

FlatMapInner#onSubscribe()

image.png

publishOn类似,FlatMapInner也支持同步队列融合、异步队列融合以及非融合三种处理方式。

如果上游的SubscriptionQueueSubscription类型,则会进行队列融合。具体采用同步还是异步,取决于该QueueSubscription#requestFusion()实现。

  1. 同步队列融合:复用当前队列,然后直接调用FlatMapMain#drain()排空队列。
  2. 异步队列融合:复用当前队列,然后调用上游s.request()请求数据,请求数量是prefetch
  3. 非融合:直接调用上游s.request()请求数据,请求数量也是prefetch

非融合

以下代码会非融合方式执行。(和SubscribeOn()异步执行逻辑是一样的)

@Test
public void testNoFused() {
    delayPublishFlux(100, 1, 6)
            .flatMap((i) -> flat(100, i)
                    .subscribeOn(Schedulers.newElastic("inner")), 3, 2)
            .subscribe(i -> {
                // 消费慢一点,innerQueue更容易有数据积压
                sleep(1000);
                logInt(i, "消费");
            });
    sleep(10000);
}

FlatMapInner#onNext()

image.png

此时onNext()方法入参就是内部Publisher实际下发的元素,继续调用FlatMapMain#tryEmit()下发。

FlatMapMain#tryEmit()

image.png

上面代码逻辑其实主要由两种情况组成:(后面两大块逻辑是差不多的)

  1. 可能直接调用下游Subscriber#onNext()继续下发元素。
  2. 创建队列,然后将元素加入队列,视情况调用drainLoop()排空队列。

具体取决于数据生产以及消费的速度: 1、如果消费速度大于生产速度,没有数据积压,则直接调用下游Subscriber#onNext()进行下发。 2、如果消费速度跟不上生产速度,元素会直接先保存到innerQueue中,然后在wip==0时调用drainLoop()排空队列。

FlatMapMain#tryEmitScalar()

前面提到过,如果转换后的Publisher是Callable,会执行tryEmitScalar()方法。该方法做的事情跟tryEmit()处理逻辑基本一致,主要差别就是处理的元素和使用的队列不同。

tryEmitScalar()处理的元素是内部Publisher直接调用call()获取的,而tryEmit()是内部Publisher向下游发送的。tryEmitScalar()使用scalarQueue缓存元素,而tryEmit()使用innerQueue

FlatMapMain#drainLoop()

drainLoop()逻辑非常多,截取部分关键代码:

image.png

image.png

image.png

  1. 排空innerQueue中的元素,下发给下游。
  2. 请求内部Publisher下发元素,请求数量就是本次innerQueue排出数量。
  3. 请求外部Publisher下发元素,请求数量是可补充数量,不会超过concurrency

另外还有一些完成和取消控制。

同步队列融合

以下代码会以同步队列融合方式执行。

@Test
public void testSyncFused() {
    delayPublishFlux(100, 1, 6)
            .flatMap((i) -> flatRange(i), 3, 2)
            .subscribe(i -> logInt(i, "消费"));
    sleep(10000);
}

复用当前队列,然后直接调用FlatMapMain.drain()排空队列。

FlatMapMain#drain()

image.png

依然是调用drainLoop()排空队列中的元素。注意,同步队列融合没有request()过程,直接在onSubscribe()阶段进行元素下发。

异步队列融合

以下代码会以异步队列融合方式执行。

@Test
public void testAsyncFused() {
    delayPublishFlux(100, 1, 6)
            .flatMap((i) -> flatRange(i).publishOn(Schedulers.newElastic("inner")), 3, 2)
            .subscribe(i -> logInt(i, "消费"));
    sleep(10000);
}

复用当前队列,然后调用上游s.request()请求数据,请求数量是prefetch。后续肯定会调用FlatMapInner#onNext()

FlatMapInner#onNext()

image.png

此时入参为null,然后调用了FlatMapMain.drain(),排空队列元素。异步队列融合会复用队列,上游实际发送是null,可以将其理解成一个信号,告知下游排空队列中的元素。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanegba
系列文章
更多 icon
同类精品
更多 icon
继续加载