Kotlin Flow
Kotlin Flow
一:Flow的概念
Flow流的概念感觉类似于Java的响应式编程,下面看两段代码:
-
// flow的上游
-
override suspend fun getCompanyListings(
-
fetchFromRemote: Boolean,
-
query: String
-
): Flow<Resource<List<CompanyListing>>> {
-
return flow {
-
emit(Resource.Loading(true))
-
val localListings = dao.searchCompanyListing(query)
-
emit(Resource.Success(
-
data = localListings.map { it.toCompanyListing() }
-
))
-
-
val isDbEmpty = localListings.isEmpty() && query.isBlank()
-
val shouldJustLoadFromCache = !isDbEmpty && !fetchFromRemote
-
if(shouldJustLoadFromCache) {
-
emit(Resource.Loading(false))
-
return@flow
-
}
-
val remoteListings = try {
-
val response = api.getListings()
-
companyListingsParser.parse(response.byteStream())
-
} catch(e: IOException) {
-
e.printStackTrace()
-
emit(Resource.Error("Couldn't load data"))
-
null
-
} catch (e: HttpException) {
-
e.printStackTrace()
-
emit(Resource.Error("Couldn't load data"))
-
null
-
}
-
-
remoteListings?.let { listings ->
-
dao.clearCompanyListings()
-
dao.insertCompanyListings(
-
listings.map { it.toCompanyListingEntity() }
-
)
-
emit(Resource.Success(
-
data = dao
-
.searchCompanyListing("")
-
.map { it.toCompanyListing() }
-
))
-
emit(Resource.Loading(false))
-
}
-
}
-
}
-
// flow下游收集
-
viewModelScope.launch {
-
repository
-
.getCompanyListings(fetchFromRemote, query)
-
.collect { result ->
-
when(result) {
-
is Resource.Success -> {
-
result.data?.let { listings ->
-
state = state.copy(
-
companies = listings
-
)
-
}
-
}
-
is Resource.Error -> Unit
-
is Resource.Loading -> {
-
state = state.copy(isLoading = result.isLoading)
-
}
-
}
-
}
-
}
-
// Rxjava的上游
-
var resultList = mutableListOf<WifiSafeCheckItem>()
-
Observable.create<Int> {
-
try {
-
for (item in itemList) {
-
when(item.itemId) {
-
0 ->{
-
// 检测虚假wifi
-
try {
-
Thread.sleep(1000)
-
} catch (e: Exception) {
-
}
-
}
-
1 ->{
-
// 检测DNS是否正常
-
if (!WifiUtil.isDnsSafe(context)){
-
// 不正常就添加
-
resultList.add(itemList[1])
-
}
-
try {
-
Thread.sleep(1500)
-
} catch (e: Exception) {
-
}
-
}
-
2 ->{
-
// 检查是否能上网
-
if (!(WifiUtil.isNetworkConnected(context) && WifiUtil.isNetworkOnline())) {
-
// 不正常就添加
-
resultList.add(itemList[2])
-
}
-
try {
-
Thread.sleep(1500)
-
} catch (e: Exception) {
-
}
-
}
-
3 ->{
-
// 是否连接wifi
-
if (!WifiUtil.isWifiConnected(context)) {
-
resultList.add(itemList[3])
-
}
-
try {
-
Thread.sleep(1000)
-
} catch (e: Exception) {
-
}
-
-
}
-
4 ->{
-
// 检测wifi是否加密
-
if (!WifiUtil.isHaveEncrypt(context)) {
-
resultList.add(itemList[4])
-
}
-
try {
-
Thread.sleep(1500)
-
} catch (e: Exception) {
-
}
-
}
-
}
-
it.onNext(item.itemId)
-
}
-
it.onComplete()
-
} catch (e: Exception) {
-
e.printStackTrace()
-
if (!isDestroyed)
-
it.onError(e)
-
}
-
}.compose(RxUtil.ioAndMainObservable()).subscribe(object : Observer<Int> {
-
// Rxjava的下游
-
override fun onSubscribe(d: Disposable) {
-
mDisposableList.add(d)
-
}
-
-
override fun onNext(t: Int) {
-
if (isDestroyed) return
-
itemList[t].isLoading = false
-
adapter.notifyItemChanged(t)
-
}
-
-
override fun onError(e: Throwable) {
-
if (isDestroyed) return
-
scanOver(resultList)
-
}
-
-
override fun onComplete() {
-
if (isDestroyed) return
-
scanOver(resultList)
-
}
-
-
})
他们两个是不是很像?
- Flow用emit来发送,collect来收集
- Rxjava用onNext来发送,在subscribe收集
二:Flow的语法
1:collect vs collectlatest
先来了解collect:
-
suspend fun main() {
-
val flow = flow<Int> {
-
var currentValue = 10
-
println("before send$currentValue")
-
emit(currentValue)
-
println("after send$currentValue")
-
while (currentValue > 0) {
-
delay(5000)
-
currentValue--
-
println("before send$currentValue")
-
emit(currentValue)
-
println("after send$currentValue")
-
}
-
}.collect {
-
println("collect开始$it")
-
println(it)
-
println("collect结束$it")
-
}
-
}
它的输出是:
before send10
collect开始10
10
collect结束10
after send10
before send9
collect开始9
9
collect结束9
after send9
emit是一个挂起函数,当调用了emit之后,会跳转到collect去执行,当collect执行完之后,再从emit处恢复(resume),所以如果在collect中增加一个delay(5000)函数,那么计数器的时间将会延长一倍。
再来了解collectLatest
-
suspend fun main() {
-
val flow = flow<Int> {
-
var currentValue = 10
-
println("before send$currentValue")
-
emit(currentValue)
-
println("after send$currentValue")
-
while (currentValue > 0) {
-
delay(5000)
-
currentValue--
-
println("before send$currentValue")
-
emit(currentValue)
-
println("after send$currentValue")
-
}
-
}.collectLatest {
-
delay(1000)
-
println("collect开始$it")
-
println(it)
-
println("collect结束$it")
-
}
-
}
输出结果为:
before send10
after send10
collect开始10
10
collect结束10
before send9
after send9
collect开始9
9
collect结束9
区别1:当emit执行之后,collect会执行,但上游并没有挂起,而是继续在emit之后执行,在这段代码中,因为collect中有delay函数,所以after send就先于 collect开始 打印了出来。
-
suspend fun main() {
-
val flow = flow<Int> {
-
var currentValue = 10
-
println("before send$currentValue")
-
emit(currentValue)
-
println("after send$currentValue")
-
while (currentValue > 0) {
-
delay(1000)// 延迟1000
-
currentValue--
-
println("before send$currentValue")
-
emit(currentValue)
-
println("after send$currentValue")
-
}
-
}.collectLatest {// 使用collectLatest
-
println("collect开始$it")
-
delay(2000)// 延迟2000
-
println(it)
-
println("collect结束$it")
-
}
-
}
输出结果为:
before send10
collect开始10
after send10
before send9
collect开始9
after send9
...
before send0
collect开始0
after send0
0
collect结束0
当 collect开始 之后,延迟了2000,还没来得及打印计数,上游又执行了emit,结果下游的块(block)直接被取消了。区别2:当有新的值被emit,下游collectLatest没有被执行完会被cancel取消,所以最后只有0这个计数可以被打印出来。其实区别1和区别2是相互联系的,因为如果没有1就没有2
2:Flow Operator
Flow和Rxjava类似,都有很多转换符。
中游
筛选
- filter
映射
- map
额外操作
- onEach
缓冲
buffer
buffer是一个很有意思的操作符,看一个例子:
-
// 模拟餐厅上菜
-
flow<String> {
-
println("上菜——鸡肉")
-
emit("鸡肉")
-
delay(1000)
-
println("上菜——鱼肉")
-
emit("鱼肉")
-
delay(1000)
-
println("上菜——西瓜")
-
emit("西瓜")
-
}.onEach {
-
println("运送$it")
-
}.collect {
-
println("客人收到$it")
-
delay(2000)
-
println("客人吃完$it")
-
}
输出结果如下:
上菜——鸡肉
运送鸡肉
客人收到鸡肉
客人吃完鸡肉
上菜——鱼肉
运送鱼肉
客人收到鱼肉
客人吃完鱼肉
上菜——西瓜
运送西瓜
客人收到西瓜
客人吃完西瓜因为emit会挂起等collect执行完再resume,所以下一个菜要等客人吃完才上,那可不可以等客人一边吃就一边上菜呢?即要实现:collect不会令emit挂起,并保证emit的值按顺序到达,collect也对应的不取消(collectLatest就会取消),也按顺序对应执行。
用buffer可以解决
-
.buffer().collect {// 增加buffer
-
println("客人收到$it")
-
delay(2000)
-
println("客人吃完$it")
-
}
输出结果如下:
上菜——鸡肉
运送鸡肉
客人收到鸡肉
上菜——鱼肉
运送鱼肉
客人吃完鸡肉
客人收到鱼肉
上菜——西瓜
运送西瓜
客人吃完鱼肉
客人收到西瓜
客人吃完西瓜
conflate
conflate和buffer类似,但功能有些许不同,还是上面那个例子,把buffer改成conflate:
-
conflate().collect {
-
println("${Thread.currentThread().name}客人收到$it")
-
delay(3000)// 为了让效果更明显,延迟改为3000
-
println("${Thread.currentThread().name}客人吃完$it")
-
}
输出结果如下:
上菜——鸡肉
运送鸡肉
客人收到鸡肉
上菜——鱼肉
运送鱼肉
上菜——西瓜
运送西瓜
客人吃完鸡肉
客人收到西瓜
客人吃完西瓜
吃完鸡肉之后,客人阻塞了3000,然后鱼肉和西瓜都被运送过来,当鸡肉的collect执行完之后,在客人面前有鱼肉和西瓜两道菜,这个时候鱼肉被丢弃了,相当于取一个最新值。要注意和collectLatest的区别,collectLatest会取消collect块,但conflate不会影响collect执行,但是缓冲区有多个值的时候只会把最新的那个给collect。
下游(终结符)
- count 计数
-
val countResult = flow<Int> {
-
var currentValue = 10
-
println("before send$currentValue")
-
emit(currentValue)
-
println("after send$currentValue")
-
while (currentValue > 0) {
-
delay(1000)
-
currentValue--
-
println("before send$currentValue")
-
emit(currentValue)
-
println("after send$currentValue")
-
}
-
}.count {
-
it % 2 == 0
-
}
-
println("$countResult")// 共有6个偶数
- reduce累加迭代
从第一个元素开始累加值,并将操作应用于当前累加器值和每个元素。如果流为空,则抛出 NoSuchElementException。 - fold带初始值的累加迭代
三:StateFlow vs ShareFlow vs Flow
StateFlow和ShareFlow是热流,Flow是冷流。区别在于:Hot flow有无collector都会保持活跃,而Cold flow没有collector的话就是死的。
StateFlow
StateFlow just hold one value,like LiveData。Google对于LiveData和StateFlow的差别
先来看一个使用实例:
-
// ViewModel
-
class FlowViewModel : ViewModel() {
-
-
private val _countStateFlow = MutableStateFlow(0)
-
val countStateFlow = _countStateFlow.asStateFlow()
-
-
// 持续增加count值
-
fun increaseCountNum() {
-
viewModelScope.launch {
-
while (true) {
-
delay(1000)
-
_countStateFlow.value
-
}
-
}
-
}
-
}
-
// Activity
-
override fun onCreate(savedInstanceState: Bundle?) {
-
super.onCreate(savedInstanceState)
-
setContentView(R.layout.acty_flow)
-
-
mCountBtn = findViewById<Button?>(R.id.flowBtn).apply {
-
setOnClickListener {
-
mViewModel.increaseCountNum()
-
}
-
}
-
-
lifecycleScope.launch {
-
// repeatOnLifecycle(Lifecycle.State.STARTED) {// 正确
-
// mViewModel.countStateFlow.collectLatest {
-
// log("count $it")
-
// if (it == 5)
-
// startActivity(Intent(this@FlowActivity, MainActivity::class.java))
-
// }
-
// }
-
mViewModel.countStateFlow.collectLatest {// 错误
-
log("count $it")
-
if (it == 5)
-
startActivity(Intent(this@FlowActivity, MainActivity::class.java))
-
}
-
}
-
}
这是StateFlow的一个使用实例,一开始不明白为什么需要repeatxxxxxcycle,运行了实例之后才醒悟,collectLatest是一个suspend function,永远suspend(如果有代码在collectLatest块下,永远不会执行 ),那么lifecycleScope的销毁是在destroy之后,也就是说除非activity destroy,不然就一直collect,相当于livedata一直观察着数据,即使activity不可见。
运行代码,发现跳转到另外一个activity(原activity处于stop状态),依然打印着日志。为了达到与livedata同样的生命周期效果,需要采用注释的那段代码。repeatOnLifecycle(Lifecycle.State.STARTED{}
里的协程作用域会检测到处于start状态就启动,检测到stop状态就取消。
再这里提一下LiveData:
-
private val xx = MutableLiveData("")
-
xx.observe(this) {
-
-
}
因为activity就是一个LifecycleOwner,再observe的注释中有这么一段话:The observer will only receive events if the owner is in Lifecycle.State.STARTED or Lifecycle.State.RESUMED state (active).
If the owner moves to the Lifecycle.State.DESTROYED state, the observer will automatically be removed.所以说在repeatOnLifecycle(Lifecycle.State.STARTED{}
里collect就相当于LiveData的默认效果。(是不是感觉flow麻烦hh)
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgfkehc
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13