自定义流:callbackFlow
callbackFlow 是处理传统回调 API(如位置更新、蓝牙扫描、UI 事件)的标准方案。它可以将多次回调转换为数据流。
核心契约:awaitClose
这是必选项!callbackFlow 底层不仅仅是一个 Channel,它还是一个挂起函数。如果不调用 awaitClose,协程会立即结束,导致 Channel 关闭,下游收不到任何数据。
kotlin
fun getLocationFlow(): Flow<Location> = callbackFlow {
val listener = object : LocationListener {
override fun onLocationChanged(loc: Location) {
// ⭐️ 尝试发送。非阻塞,非常适合回调
trySend(loc).onFailure { e -> log(e) }
}
}
locationManager.requestUpdates(listener)
// ⭐️ 挂起当前协程,直到流被关闭或取消
// 在这里编写“反注册”逻辑,防止内存泄漏
awaitClose {
locationManager.removeUpdates(listener)
}
}数据发送:send vs trySend
| 方法 | 特性 | 适用场景 |
|---|---|---|
trySend(e) | 非阻塞,立即返回结果 | 普通回调接口 (最常用) |
send(e) | 挂起,直到缓冲区有空位 | 在 suspend 上下文中 |
缓冲区溢出
callbackFlow 默认缓冲区大小为 64。如果回调触发极快而下游处理慢,trySend 可能会失败。建议配合 buffer(Channel.UNLIMITED) 使用或处理 onFailure。
进阶:channelFlow
callbackFlow 其实是 channelFlow 的特殊子类。
- callbackFlow: 强制要求
awaitClose,用于强调“资源生命周期”。 - channelFlow: 通用性更强,允许在内部启动多个协程并发发射数据。
kotlin
// 并发合并两个数据源
fun getAllData() = channelFlow {
launch { send(ensureDataA()) }
launch { send(ensureDataB()) }
}