• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Kotlin Flow

武飞扬头像
灵剑山真人
帮助1

Kotlin Flow

一:Flow的概念

Flow流的概念感觉类似于Java的响应式编程,下面看两段代码:

  1.  
    // flow的上游
  2.  
    override suspend fun getCompanyListings(
  3.  
    fetchFromRemote: Boolean,
  4.  
    query: String
  5.  
    ): Flow<Resource<List<CompanyListing>>> {
  6.  
    return flow {
  7.  
    emit(Resource.Loading(true))
  8.  
    val localListings = dao.searchCompanyListing(query)
  9.  
    emit(Resource.Success(
  10.  
    data = localListings.map { it.toCompanyListing() }
  11.  
    ))
  12.  
     
  13.  
    val isDbEmpty = localListings.isEmpty() && query.isBlank()
  14.  
    val shouldJustLoadFromCache = !isDbEmpty && !fetchFromRemote
  15.  
    if(shouldJustLoadFromCache) {
  16.  
    emit(Resource.Loading(false))
  17.  
    return@flow
  18.  
    }
  19.  
    val remoteListings = try {
  20.  
    val response = api.getListings()
  21.  
    companyListingsParser.parse(response.byteStream())
  22.  
    } catch(e: IOException) {
  23.  
    e.printStackTrace()
  24.  
    emit(Resource.Error("Couldn't load data"))
  25.  
    null
  26.  
    } catch (e: HttpException) {
  27.  
    e.printStackTrace()
  28.  
    emit(Resource.Error("Couldn't load data"))
  29.  
    null
  30.  
    }
  31.  
     
  32.  
    remoteListings?.let { listings ->
  33.  
    dao.clearCompanyListings()
  34.  
    dao.insertCompanyListings(
  35.  
    listings.map { it.toCompanyListingEntity() }
  36.  
    )
  37.  
    emit(Resource.Success(
  38.  
    data = dao
  39.  
    .searchCompanyListing("")
  40.  
    .map { it.toCompanyListing() }
  41.  
    ))
  42.  
    emit(Resource.Loading(false))
  43.  
    }
  44.  
    }
  45.  
    }
学新通
  1.  
    // flow下游收集
  2.  
    viewModelScope.launch {
  3.  
    repository
  4.  
    .getCompanyListings(fetchFromRemote, query)
  5.  
    .collect { result ->
  6.  
    when(result) {
  7.  
    is Resource.Success -> {
  8.  
    result.data?.let { listings ->
  9.  
    state = state.copy(
  10.  
    companies = listings
  11.  
    )
  12.  
    }
  13.  
    }
  14.  
    is Resource.Error -> Unit
  15.  
    is Resource.Loading -> {
  16.  
    state = state.copy(isLoading = result.isLoading)
  17.  
    }
  18.  
    }
  19.  
    }
  20.  
    }
学新通
  1.  
    // Rxjava的上游
  2.  
    var resultList = mutableListOf<WifiSafeCheckItem>()
  3.  
    Observable.create<Int> {
  4.  
    try {
  5.  
    for (item in itemList) {
  6.  
    when(item.itemId) {
  7.  
    0 ->{
  8.  
    // 检测虚假wifi
  9.  
    try {
  10.  
    Thread.sleep(1000)
  11.  
    } catch (e: Exception) {
  12.  
    }
  13.  
    }
  14.  
    1 ->{
  15.  
    // 检测DNS是否正常
  16.  
    if (!WifiUtil.isDnsSafe(context)){
  17.  
    // 不正常就添加
  18.  
    resultList.add(itemList[1])
  19.  
    }
  20.  
    try {
  21.  
    Thread.sleep(1500)
  22.  
    } catch (e: Exception) {
  23.  
    }
  24.  
    }
  25.  
    2 ->{
  26.  
    // 检查是否能上网
  27.  
    if (!(WifiUtil.isNetworkConnected(context) && WifiUtil.isNetworkOnline())) {
  28.  
    // 不正常就添加
  29.  
    resultList.add(itemList[2])
  30.  
    }
  31.  
    try {
  32.  
    Thread.sleep(1500)
  33.  
    } catch (e: Exception) {
  34.  
    }
  35.  
    }
  36.  
    3 ->{
  37.  
    // 是否连接wifi
  38.  
    if (!WifiUtil.isWifiConnected(context)) {
  39.  
    resultList.add(itemList[3])
  40.  
    }
  41.  
    try {
  42.  
    Thread.sleep(1000)
  43.  
    } catch (e: Exception) {
  44.  
    }
  45.  
     
  46.  
    }
  47.  
    4 ->{
  48.  
    // 检测wifi是否加密
  49.  
    if (!WifiUtil.isHaveEncrypt(context)) {
  50.  
    resultList.add(itemList[4])
  51.  
    }
  52.  
    try {
  53.  
    Thread.sleep(1500)
  54.  
    } catch (e: Exception) {
  55.  
    }
  56.  
    }
  57.  
    }
  58.  
    it.onNext(item.itemId)
  59.  
    }
  60.  
    it.onComplete()
  61.  
    } catch (e: Exception) {
  62.  
    e.printStackTrace()
  63.  
    if (!isDestroyed)
  64.  
    it.onError(e)
  65.  
    }
  66.  
    }.compose(RxUtil.ioAndMainObservable()).subscribe(object : Observer<Int> {
  67.  
    // Rxjava的下游
  68.  
    override fun onSubscribe(d: Disposable) {
  69.  
    mDisposableList.add(d)
  70.  
    }
  71.  
     
  72.  
    override fun onNext(t: Int) {
  73.  
    if (isDestroyed) return
  74.  
    itemList[t].isLoading = false
  75.  
    adapter.notifyItemChanged(t)
  76.  
    }
  77.  
     
  78.  
    override fun onError(e: Throwable) {
  79.  
    if (isDestroyed) return
  80.  
    scanOver(resultList)
  81.  
    }
  82.  
     
  83.  
    override fun onComplete() {
  84.  
    if (isDestroyed) return
  85.  
    scanOver(resultList)
  86.  
    }
  87.  
     
  88.  
    })
学新通

他们两个是不是很像?

  1. Flow用emit来发送,collect来收集
  2. Rxjava用onNext来发送,在subscribe收集

二:Flow的语法

1:collect vs collectlatest

先来了解collect:
  1.  
    suspend fun main() {
  2.  
    val flow = flow<Int> {
  3.  
    var currentValue = 10
  4.  
    println("before send$currentValue")
  5.  
    emit(currentValue)
  6.  
    println("after send$currentValue")
  7.  
    while (currentValue > 0) {
  8.  
    delay(5000)
  9.  
    currentValue--
  10.  
    println("before send$currentValue")
  11.  
    emit(currentValue)
  12.  
    println("after send$currentValue")
  13.  
    }
  14.  
    }.collect {
  15.  
    println("collect开始$it")
  16.  
    println(it)
  17.  
    println("collect结束$it")
  18.  
    }
  19.  
    }
学新通

它的输出是:

before send10
collect开始10
10
collect结束10
after send10

before send9
collect开始9
9
collect结束9
after send9

emit是一个挂起函数,当调用了emit之后,会跳转到collect去执行,当collect执行完之后,再从emit处恢复(resume),所以如果在collect中增加一个delay(5000)函数,那么计数器的时间将会延长一倍。

再来了解collectLatest
  1.  
    suspend fun main() {
  2.  
    val flow = flow<Int> {
  3.  
    var currentValue = 10
  4.  
    println("before send$currentValue")
  5.  
    emit(currentValue)
  6.  
    println("after send$currentValue")
  7.  
    while (currentValue > 0) {
  8.  
    delay(5000)
  9.  
    currentValue--
  10.  
    println("before send$currentValue")
  11.  
    emit(currentValue)
  12.  
    println("after send$currentValue")
  13.  
    }
  14.  
    }.collectLatest {
  15.  
    delay(1000)
  16.  
    println("collect开始$it")
  17.  
    println(it)
  18.  
    println("collect结束$it")
  19.  
    }
  20.  
    }
学新通

输出结果为:
before send10
after send10
collect开始10
10
collect结束10

before send9
after send9
collect开始9
9
collect结束9

区别1:当emit执行之后,collect会执行,但上游并没有挂起,而是继续在emit之后执行,在这段代码中,因为collect中有delay函数,所以after send就先于 collect开始 打印了出来。

  1.  
    suspend fun main() {
  2.  
    val flow = flow<Int> {
  3.  
    var currentValue = 10
  4.  
    println("before send$currentValue")
  5.  
    emit(currentValue)
  6.  
    println("after send$currentValue")
  7.  
    while (currentValue > 0) {
  8.  
    delay(1000)// 延迟1000
  9.  
    currentValue--
  10.  
    println("before send$currentValue")
  11.  
    emit(currentValue)
  12.  
    println("after send$currentValue")
  13.  
    }
  14.  
    }.collectLatest {// 使用collectLatest
  15.  
    println("collect开始$it")
  16.  
    delay(2000)// 延迟2000
  17.  
    println(it)
  18.  
    println("collect结束$it")
  19.  
    }
  20.  
    }
学新通

输出结果为:
before send10
collect开始10
after send10

before send9
collect开始9
after send9
...
before send0
collect开始0
after send0
0
collect结束0

当 collect开始 之后,延迟了2000,还没来得及打印计数,上游又执行了emit,结果下游的块(block)直接被取消了。区别2:当有新的值被emit,下游collectLatest没有被执行完会被cancel取消,所以最后只有0这个计数可以被打印出来。其实区别1和区别2是相互联系的,因为如果没有1就没有2

2:Flow Operator

Flow和Rxjava类似,都有很多转换符。

中游
筛选
  1. filter
映射
  1. map
额外操作
  1. onEach
缓冲
buffer

buffer是一个很有意思的操作符,看一个例子:

  1.  
    // 模拟餐厅上菜
  2.  
    flow<String> {
  3.  
    println("上菜——鸡肉")
  4.  
    emit("鸡肉")
  5.  
    delay(1000)
  6.  
    println("上菜——鱼肉")
  7.  
    emit("鱼肉")
  8.  
    delay(1000)
  9.  
    println("上菜——西瓜")
  10.  
    emit("西瓜")
  11.  
    }.onEach {
  12.  
    println("运送$it")
  13.  
    }.collect {
  14.  
    println("客人收到$it")
  15.  
    delay(2000)
  16.  
    println("客人吃完$it")
  17.  
    }
学新通

输出结果如下:
上菜——鸡肉
运送鸡肉
客人收到鸡肉
客人吃完鸡肉
上菜——鱼肉
运送鱼肉
客人收到鱼肉
客人吃完鱼肉
上菜——西瓜
运送西瓜
客人收到西瓜
客人吃完西瓜因为emit会挂起等collect执行完再resume,所以下一个菜要等客人吃完才上,那可不可以等客人一边吃就一边上菜呢?即要实现:collect不会令emit挂起,并保证emit的值按顺序到达,collect也对应的不取消(collectLatest就会取消),也按顺序对应执行。

用buffer可以解决

  1.  
    .buffer().collect {// 增加buffer
  2.  
    println("客人收到$it")
  3.  
    delay(2000)
  4.  
    println("客人吃完$it")
  5.  
    }

输出结果如下:

上菜——鸡肉
运送鸡肉
客人收到鸡肉
上菜——鱼肉
运送鱼肉
客人吃完鸡肉
客人收到鱼肉
上菜——西瓜
运送西瓜
客人吃完鱼肉
客人收到西瓜
客人吃完西瓜

conflate

conflate和buffer类似,但功能有些许不同,还是上面那个例子,把buffer改成conflate:

  1.  
    conflate().collect {
  2.  
    println("${Thread.currentThread().name}客人收到$it")
  3.  
    delay(3000)// 为了让效果更明显,延迟改为3000
  4.  
    println("${Thread.currentThread().name}客人吃完$it")
  5.  
    }

输出结果如下:
上菜——鸡肉
运送鸡肉
客人收到鸡肉
上菜——鱼肉
运送鱼肉
上菜——西瓜
运送西瓜
客人吃完鸡肉
客人收到西瓜
客人吃完西瓜

吃完鸡肉之后,客人阻塞了3000,然后鱼肉和西瓜都被运送过来,当鸡肉的collect执行完之后,在客人面前有鱼肉和西瓜两道菜,这个时候鱼肉被丢弃了,相当于取一个最新值。要注意和collectLatest的区别,collectLatest会取消collect块,但conflate不会影响collect执行,但是缓冲区有多个值的时候只会把最新的那个给collect

下游(终结符)
  1. count 计数
  1.  
    val countResult = flow<Int> {
  2.  
    var currentValue = 10
  3.  
    println("before send$currentValue")
  4.  
    emit(currentValue)
  5.  
    println("after send$currentValue")
  6.  
    while (currentValue > 0) {
  7.  
    delay(1000)
  8.  
    currentValue--
  9.  
    println("before send$currentValue")
  10.  
    emit(currentValue)
  11.  
    println("after send$currentValue")
  12.  
    }
  13.  
    }.count {
  14.  
    it % 2 == 0
  15.  
    }
  16.  
    println("$countResult")// 共有6个偶数
学新通
  1. reduce累加迭代
    从第一个元素开始累加值,并将操作应用于当前累加器值和每个元素。如果流为空,则抛出 NoSuchElementException。
  2. fold带初始值的累加迭代

三:StateFlow vs ShareFlow vs Flow

StateFlow和ShareFlow是热流,Flow是冷流。区别在于:Hot flow有无collector都会保持活跃,而Cold flow没有collector的话就是死的。

StateFlow

StateFlow just hold one value,like LiveData。Google对于LiveData和StateFlow的差别

先来看一个使用实例:

  1.  
    // ViewModel
  2.  
    class FlowViewModel : ViewModel() {
  3.  
     
  4.  
    private val _countStateFlow = MutableStateFlow(0)
  5.  
    val countStateFlow = _countStateFlow.asStateFlow()
  6.  
     
  7.  
    // 持续增加count值
  8.  
    fun increaseCountNum() {
  9.  
    viewModelScope.launch {
  10.  
    while (true) {
  11.  
    delay(1000)
  12.  
    _countStateFlow.value
  13.  
    }
  14.  
    }
  15.  
    }
  16.  
    }
学新通
  1.  
    // Activity
  2.  
    override fun onCreate(savedInstanceState: Bundle?) {
  3.  
    super.onCreate(savedInstanceState)
  4.  
    setContentView(R.layout.acty_flow)
  5.  
     
  6.  
    mCountBtn = findViewById<Button?>(R.id.flowBtn).apply {
  7.  
    setOnClickListener {
  8.  
    mViewModel.increaseCountNum()
  9.  
    }
  10.  
    }
  11.  
     
  12.  
    lifecycleScope.launch {
  13.  
    // repeatOnLifecycle(Lifecycle.State.STARTED) {// 正确
  14.  
    // mViewModel.countStateFlow.collectLatest {
  15.  
    // log("count $it")
  16.  
    // if (it == 5)
  17.  
    // startActivity(Intent(this@FlowActivity, MainActivity::class.java))
  18.  
    // }
  19.  
    // }
  20.  
    mViewModel.countStateFlow.collectLatest {// 错误
  21.  
    log("count $it")
  22.  
    if (it == 5)
  23.  
    startActivity(Intent(this@FlowActivity, MainActivity::class.java))
  24.  
    }
  25.  
    }
  26.  
    }
学新通

这是StateFlow的一个使用实例,一开始不明白为什么需要repeatxxxxxcycle,运行了实例之后才醒悟,collectLatest是一个suspend function,永远suspend(如果有代码在collectLatest块下,永远不会执行 ),那么lifecycleScope的销毁是在destroy之后,也就是说除非activity destroy,不然就一直collect,相当于livedata一直观察着数据,即使activity不可见

运行代码,发现跳转到另外一个activity(原activity处于stop状态),依然打印着日志。为了达到与livedata同样的生命周期效果,需要采用注释的那段代码。repeatOnLifecycle(Lifecycle.State.STARTED{}里的协程作用域会检测到处于start状态就启动,检测到stop状态就取消。

再这里提一下LiveData:

  1.  
    private val xx = MutableLiveData("")
  2.  
    xx.observe(this) {
  3.  
     
  4.  
    }

因为activity就是一个LifecycleOwner,再observe的注释中有这么一段话:The observer will only receive events if the owner is in Lifecycle.State.STARTED or Lifecycle.State.RESUMED state (active).
If the owner moves to the Lifecycle.State.DESTROYED state, the observer will automatically be removed.所以说在repeatOnLifecycle(Lifecycle.State.STARTED{}里collect就相当于LiveData的默认效果。(是不是感觉flow麻烦hh)

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgfkehc
系列文章
更多 icon
同类精品
更多 icon
继续加载