跳到主要内容

协程

ChatGPT-4o 中英对照 Coroutines

Kotlin 协程 是 Kotlin 的轻量级线程,允许以命令式的方式编写非阻塞代码。在语言层面,挂起函数为异步操作提供了抽象,而在库层面,kotlinx.coroutines 提供了像 async { } 这样的函数和像 Flow 这样的类型。

Spring Framework 在以下范围内提供对协程的支持:

  • 在 Spring MVC 和 WebFlux 注解的 @Controller 中支持 DeferredFlow 返回值

  • 在 Spring MVC 和 WebFlux 注解的 @Controller 中支持挂起函数

  • WebFlux 客户端服务器 函数式 API 的扩展

  • WebFlux.fn coRouter { } DSL

  • WebFlux CoWebFilter

  • 在 RSocket @MessageMapping 注解的方法中支持挂起函数和 Flow

  • RSocketRequester 的扩展

  • Spring AOP

依赖项

kotlinx-coroutines-corekotlinx-coroutines-reactor 依赖项在类路径中时,协程支持被启用:

build.gradle.kts

dependencies {

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}
kotlin

支持版本 1.4.0 及以上。

响应式如何转换为协程?

对于返回值,从 Reactive 到 Coroutines API 的转换如下:

  • fun handler(): Mono<Void> 变为 suspend fun handler()

  • fun handler(): Mono<T> 变为 suspend fun handler(): Tsuspend fun handler(): T?,具体取决于 Mono 是否可以为空(这样做的好处是类型更加静态)

  • fun handler(): Flux<T> 变为 fun handler(): Flow<T>

对于输入参数:

  • 如果不需要惰性,fun handler(mono: Mono<T>) 可以变为 fun handler(value: T),因为可以调用挂起函数来获取值参数。

  • 如果需要惰性,fun handler(mono: Mono<T>) 可以变为 fun handler(supplier: suspend () → T)fun handler(supplier: suspend () → T?)

Flow 是协程世界中的 Flux 等价物,适用于热流或冷流、有限或无限流,主要区别如下:

  • Flow 是基于推送的,而 Flux 是推拉混合的

  • 背压通过挂起函数实现

  • Flow 只有一个单一的挂起收集方法,操作符作为扩展实现

  • 由于协程的存在,操作符易于实现

  • 扩展允许为 Flow 添加自定义操作符

  • 收集操作是挂起函数

  • map 操作符支持异步操作(不需要 flatMap),因为它接受一个挂起函数参数

阅读这篇关于使用 Spring、协程和 Kotlin Flow 实现响应式编程的博客文章,以获取更多详细信息,包括如何使用协程并发运行代码。

控制器

这是一个协程 @RestController 的示例。

@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {

@GetMapping("/suspend")
suspend fun suspendingEndpoint(): Banner {
delay(10)
return banner
}

@GetMapping("/flow")
fun flowEndpoint() = flow {
delay(10)
emit(banner)
delay(10)
emit(banner)
}

@GetMapping("/deferred")
fun deferredEndpoint() = GlobalScope.async {
delay(10)
banner
}

@GetMapping("/sequential")
suspend fun sequential(): List<Banner> {
val banner1 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
val banner2 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
return listOf(banner1, banner2)
}

@GetMapping("/parallel")
suspend fun parallel(): List<Banner> = coroutineScope {
val deferredBanner1: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
val deferredBanner2: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
listOf(deferredBanner1.await(), deferredBanner2.await())
}

@GetMapping("/error")
suspend fun error() {
throw IllegalStateException()
}

@GetMapping("/cancel")
suspend fun cancel() {
throw CancellationException()
}

}
kotlin

使用 @Controller 进行视图渲染也是支持的。

@Controller
class CoroutinesViewController(banner: Banner) {

@GetMapping("/")
suspend fun render(model: Model): String {
delay(10)
model["banner"] = banner
return "index"
}
}
kotlin

WebFlux.fn

下面是一个通过 coRouter { } DSL 定义的协程路由及相关处理程序的示例。

@Configuration
class RouterConfiguration {

@Bean
fun mainRouter(userHandler: UserHandler) = coRouter {
GET("/", userHandler::listView)
GET("/api/user", userHandler::listApi)
}
}
kotlin
class UserHandler(builder: WebClient.Builder) {

private val client = builder.baseUrl("...").build()

suspend fun listView(request: ServerRequest): ServerResponse =
ServerResponse.ok().renderAndAwait("users", mapOf("users" to
client.get().uri("...").awaitExchange().awaitBody<User>()))

suspend fun listApi(request: ServerRequest): ServerResponse =
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
client.get().uri("...").awaitExchange().awaitBody<User>())
}
kotlin

交易

通过反应式事务管理的编程变体支持对协程的事务处理。

对于挂起函数,提供了一个 TransactionalOperator.executeAndAwait 扩展。

import org.springframework.transaction.reactive.executeAndAwait

class PersonRepository(private val operator: TransactionalOperator) {

suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}

private suspend fun insertPerson1() {
// INSERT SQL statement
}

private suspend fun insertPerson2() {
// INSERT SQL statement
}
}
kotlin

对于 Kotlin Flow,提供了一个 Flow<T>.transactional 扩展。

import org.springframework.transaction.reactive.transactional

class PersonRepository(private val operator: TransactionalOperator) {

fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)

private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}

private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}
kotlin