Skip to content

Flow 变换与常用操作符

Flow 的操作符分为两大类:

  1. 中间操作符 (Intermediate): 返回一个新的 Flow,不执行逻辑(冷)。
  2. 终端操作符 (Terminal): 触发流的执行,通常返回一个结果或启动收集。

变换三剑客

操作符作用适用场景
map1对1 转换将数据 A 转为 B (如 JSON -> Object)
filter过滤剔除不符合条件的数据
transform1对N 复杂转换需要发射多个值或不发射值时
kotlin
// transform 实战:部分发射
flowOf(1, 2, 3).transform { value ->
    if (value % 2 == 0) {
        emit("Even: $value")
        emit("Is Logic OK? Yes")
    }
}.collect { println(it) }

生命周期钩子

不改变流的内容,仅用于副作用(日志、埋点)。

  • onStart: 流被收集前 (在订阅线程)
  • onCompletion: 流结束时 (无论成功/失败/取消)
  • onEach: 每一个元素发射时
kotlin
flowOn
    .onStart { showLoading() }
    .onEach { log("Item: $it") }
    .onCompletion { hideLoading() }
    .collect()

异常捕获

捕获范围

catch 操作符只能捕获它上游抛出的异常。如果异常发生在 catch 下游(包括 collect 块内部),它无法捕获。

kotlin
flow {
    emit(1)
    throw RuntimeException("Upstream Error")
}.catch { e ->
    emit(-1) // 发生错误时发射兜底值
}.collect { 
    println(it) 
}

错误重试 (Retry)

移动端网络环境不稳定,请求失败后进行重试是刚需。

1. 基础重试:retry

kotlin
flow {
    emit(api.fetchData()) 
}
.retry(3) { e -> 
    // 仅当异常为 IOException 时重试,最多 3 次
    (e is IOException).also { if (it) log("Retrying...") }
}
.catch { e -> 
    // 重试 3 次后依然失败,才会走到这里
    emit(EmptyData)
}
.collect()

2. 指数退避:retryWhen

如果需要“越传越慢”的重试策略(Exponential Backoff),使用 retryWhen

kotlin
flow { ... }
.retryWhen { cause, attempt ->
    if (cause !is IOException || attempt >= 3) return@retryWhen false
    
    // 指数退避:1s, 2s, 4s...
    val delayTime = 1000L * (1L shl attempt.toInt())
    delay(delayTime)
    true // 返回 true 表示继续重试
}
.collect()