Skip to content

自定义流:callbackFlow

源:从回调构建 Flow 指南

callbackFlow 是处理传统回调 API(如位置更新、蓝牙扫描、UI 事件)的标准方案。它可以将多次回调转换为数据流。

核心契约:awaitClose

这是必选项!callbackFlow 底层不仅仅是一个 Channel,它还是一个挂起函数。如果不调用 awaitClose,协程会立即结束,导致 Channel 关闭,下游收不到任何数据。

kotlin
fun getLocationFlow(): Flow<Location> = callbackFlow {
    val listener = object : LocationListener {
        override fun onLocationChanged(loc: Location) {
            // ⭐️ 尝试发送。非阻塞,非常适合回调
            trySend(loc).onFailure { e -> log(e) } 
        }
    }
    
    locationManager.requestUpdates(listener)
    
    // ⭐️ 挂起当前协程,直到流被关闭或取消
    // 在这里编写“反注册”逻辑,防止内存泄漏
    awaitClose {
        locationManager.removeUpdates(listener)
    }
}

数据发送:send vs trySend

方法特性适用场景
trySend(e)非阻塞,立即返回结果普通回调接口 (最常用)
send(e)挂起,直到缓冲区有空位suspend 上下文中

缓冲区溢出

callbackFlow 默认缓冲区大小为 64。如果回调触发极快而下游处理慢,trySend 可能会失败。建议配合 buffer(Channel.UNLIMITED) 使用或处理 onFailure

进阶:channelFlow

callbackFlow 其实是 channelFlow 的特殊子类。

  • callbackFlow: 强制要求 awaitClose,用于强调“资源生命周期”。
  • channelFlow: 通用性更强,允许在内部启动多个协程并发发射数据。
kotlin
// 并发合并两个数据源
fun getAllData() = channelFlow {
    launch { send(ensureDataA()) }
    launch { send(ensureDataB()) }
}