flow函数和collect函数源码分析

flow方法:


public fun <T> flow(@BuilderInference block:suspend FlowCollector<T>.() -> Unit):
Flow<T> = SafeFlow(block)
public interface FlowCollector<in T{
    public suspend fun emit(value: T)
}

会将flow传入的方法封装成一个FlowCollector的「扩展函数」,因此在flow代码块中使用emit是自然地。

第二个代码块中观察到FlowCollector的泛型是通过emit来推导的,这也就是为什么emit方法传入不同的类型flow所构造的FlowCollector的类型也不同。

总结:flow方法只是将传入的方法扩展成了一个FlowCollector的扩展函数,并可共享已有的emit方法。

collect方法:

public suspend inline fun <T> Flow<T>.collect(crossinline action: 
                suspend (valueT) -> Unit)
Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

可以看到调用Flow的扩展函数collect时会手动构建一个FlowCollector,并重写emit方法的逻辑为执行collect中的代码块。

接下来看Flow的collect成员方法接收到FlowCollector对象后做什么处理。

public interface Flow<out T{
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

收集collect的具体行为默认是通过具体的flow构建时构造出来的。如默认上文构造出来的是「SafeFlow」,collect收集行为被封装在「AbstractFlow」中。

先看下AbstractFlow:

 abstract class AbstractFlow<T> : Flow<T{

    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)

可以看到构建了一个SafeCollector调用collectSafely将改参数传进去。

SafeCollector会保存协程上下文(为了之后防止再次创建续体导致的浪费)和collect方法传进来的FlowCollector。

重写collectSafely方法的类也就是最上面的SafeFlow做的事情:

SafeFlow:

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

可以看到最后是用SafeCollector调用的flow里面的代码块

注:这个「collector就是collect中创建的SafeCollector」而block方法则是「之前flow中创建的扩展函数(也就是flow代码块)」

所以这就是「为什么是冷流」的原因,只有调用collect才会构建这个SafeCollector对象并调用flow传进来的方法(flow代码块会添加到FlowCollector的扩展函数中,为了之后SafaCollector调用block)

到此flow代码块开始运行了,flow中的调用者this即为collect中创建的SafeCollector对象

SafeCollector中的emit方法

接下来看看SafeCollector中的emit方法:

 override suspend fun emit(value: T) {
 //该方法中会获取当前协程的续体,当执行不需要挂起时(不返回SUSPEND关键字),
 //会直接运行resumeWith并且不会执行拦截器,这也是为什么不能够在
 //flow中切换上下文的原因,不会执行intercept切换线程
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                //调用第二个emit方法
                emit(uCont, value)
            } catch (e: Throwable) {
                // Save the fact that exception from emit (or even check context) has been thrown
                lastEmissionContext = DownstreamExceptionElement(e)
                //emit
                throw e
            }
        }
    }
     
//带续体参数的emit方法:
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        val currentContext = uCont.context
        currentContext.ensureActive()
        // This check is triggered once per flow on happy path.
        val previousContext = lastEmissionContext
        if (previousContext !== currentContext) {
            checkContext(currentContext, previousContext, value)
        }
        completion = uCont
        //最终会调用到emitfun中。 
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }
    

再来看下「emitfun」

读者先记下这三个变量具体含义:

「collect为FlowCollector对象(该对象的emit方法实现是执行collect代码块),value为emit的参数,最后一个代表续体」

private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

这里科普下Kotlin中的::语法 和 「表达式」

Class::函数名/属性名  代表获取这个class的「函数引用」和属性引用。「属性引用」还好理解可以针对这个属性进行指定赋值/取值操作(静态属性不需要具体对象,成员属性需要具体的对象)。「函数引用」代表持有了这个方法的引用可以调用这个函数,由于是函数不区分对象。

=表达式 对应于上面的表达式则为emitfun返回了一个经强转后的方法。也就是emitfun返回的是一个方法,当调用表达式时该方法的invoke会自动执行(后面就会看到了)

可以看到将这个FlowCollector<Any?>的emit方法强转为了Function3<FlowCollector<Any?>, Any?, Continuation, Any?>方法,最直观的感受就是将原来只接受一个value的emit方法强转成了一个FlowCollector,value,和续体的方法。

「看下反编译代码」

   private static final Function3 emitFun = (Function3)TypeIntrinsics.beforeCheckcastToFunctionOfArity(new Function3() {
      // $FF: synthetic method
      // $FF: bridge method
      public Object invoke(Object var1, Object var2, Object var3) {
         return this.invoke((FlowCollector)var1, var2, (Continuation)var3);
      }

      @Nullable
      public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
         InlineMarker.mark(0);
         Object var10000 = p1.emit(p2, continuation);
         InlineMarker.mark(2);
         InlineMarker.mark(1);
         return var10000;
      }

      public final KDeclarationContainer getOwner() {
         return Reflection.getOrCreateKotlinClass(FlowCollector.class);
      }

      public final String getName() {
         return "emit";
      }

      public final String getSignature() {
         return "emit(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;";
      }
   }, 3);

讲到这里读者应该已经猜到了

参数讲解:

1.p1:对应于调用collect方法所构建的FlowCollector对象,重写emit方法执行collect代码块内容

2.p2:emit方法的参数,对应于发送的值

3.continuation:当前协程的续体

流程讲解:

1.当调用emifun表达式时,表达式所构建的Function3方法的invoke方法将会被调用

2.会调用到emit方法,该方法最终会调用collect代码块的内容也就是action方法,并把emit的参数传入。

至此,emit每调用一次,都会执行一次collect方法。

总结

flow方法的主要作用是将传入的方法参数变成FlowCollector的扩展函数。

collect方法首先会创建一个FlowCollector对象并重写其emit方法的逻辑用于执行传入的方法参数;

接着还会创建一个SafaCollector对象保存刚创建的重写emit方法的FlowCollector对象;

由于flow中的代码块其实是FlowCollector的扩展函数,所以会利用SafaCollector去调用扩展函数,从这里面可以得出一个结论flow中的this即为SafaCollector对象。

emit方法在SafaCollector中最终会执行collect函数


原文始发于微信公众号(北洋洋洋):flow函数和collect函数源码分析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/76130.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!