Skip to content

溢出策略:onBufferOverflow

当生产者的速度 > 消费者的速度,就会发生“背压 (Backpressure)”。如果不处理,流可能会挂起生产者,导致性能瓶颈。

默认行为:SUSPEND

Flow 默认是无缓冲的。 如果使用了 buffer 但没指定策略,默认是 BufferOverflow.SUSPEND。即:缓冲区满了,暂停上游 emit,直到下游处理完腾出空位。

丢弃策略 (Dropping)

在实时数据(股票、传感器)场景下,我们通常不希望挂起,而是丢弃旧数据。

策略别名操作符行为场景
DROP_OLDEST无 (需配置 buffer)删除队列头部最老的数据,腾出位置给新数据视频直播、实时日志
DROP_LATEST丢弃当前试图发送的这个新数据拒绝服务、限流
SUSPENDbuffer()挂起生产者关键业务数据 (不能丢)

极速操作符:conflate

conflate (合并) 是 buffer(onBufferOverflow = DROP_OLDEST) 的快捷方式。 逻辑:下游来不及处理?那就跳过中间的所有值,永远只给它最新的那一个

kotlin
flow {
    for (i in 1..100) {
        emit(i)
        delay(10) // 生产很快
    }
}
.conflate() 
.collect {
    delay(100) // 消费很慢
    println(it) // 输出可能跳跃:1, 10, 25, ... 100
}

collectLatest

这也是处理背压的一种方式,但它是取消下游而不是丢弃上游。 当新值到来时,如果下游还在处理旧值,立即取消旧值的处理逻辑,开始处理新值。

kotlin
flow.collectLatest { value ->
    // 如果有新值来了,这一行及其后的代码会被立即取消
    heavyProcessing(value) 
}