多路复用:Select 表达式 Experimental
select 表达式是协程中处理多路复用的核心工具。它允许你同时等待多个挂起函数,并只处理最先完成的那一个。这与 Go 语言中的 select 关键字非常相似。
实验性 API
select API 目前在 Kotlin 中仍标记为 Experimental,但在并发编程中非常重要。
核心场景:谁快用谁
假设我们要从主服务器和备用服务器同时获取数据,通过 select 可以实现“最快响应优先”。
kotlin
suspend fun fetchMixedData(userDeferred: Deferred<User>, newsChannel: ReceiveChannel<News>) {
// 监听多个异步源,只处理第一个到达的
select<Unit> {
// 分支 1: Deferred 完成
userDeferred.onAwait { user ->
println("User loaded first: $user")
}
// 分支 2: Channel 收到消息
newsChannel.onReceive { news ->
println("News arrived first: $news")
}
// 分支 3: 超时处理 (配合 onTimeout)
onTimeout(1000) {
println("Timed out!")
}
}
}Select 子句 (Clauses)
并不是所有的挂起函数都能放入 select 中。必须是支持 SelectClause 的函数才能使用。
| 类型 | 支持的 Clause | 描述 |
|---|---|---|
Job | onJoin | 等待 Job 完成 |
Deferred | onAwait | 等待结果返回 |
SendChannel | onSend | 等待发送能力(缓冲区有空位) |
ReceiveChannel | onReceive | 等待接收消息 |
ReceiveChannel | onReceiveCatching | 等待接收结果(含关闭状态) |
两个 Channel 互斥读取
一个经典的生产者-消费者变体:一个消费者轮询两个管道,谁有数据就处理谁。
kotlin
suspend fun consumeBoth(channel1: ReceiveChannel<Int>, channel2: ReceiveChannel<Int>) {
while (isActive) { // 循环监听
select<Unit> {
channel1.onReceive { value ->
println("From Ch1: $value")
}
channel2.onReceive { value ->
println("From Ch2: $value")
}
}
}
}局限性
select 目前不支持 Flow。对于 Flow 的多路复用,请使用 merge 操作符将它们合并为一个流,或者使用 shardedFlow。