跳到主要内容
版本:7.0.3

协程

Hunyuan 7b 中英对照 Coroutines

Kotlin的协程(Coroutines)是Kotlin中的轻量级线程,能够以命令式的方式编写非阻塞代码。在语言层面,暂停函数(suspending functions)为异步操作提供了抽象;而在库层面,kotlinx.coroutines提供了诸如async 这样的函数,以及Flow这样的类型。

Spring Framework在以下作用域下支持协程(Coroutines):

  • DeferredFlow 在 Spring MVC 和 WebFlux 中的返回值支持,这些功能被标注在 @Controller
  • 在 Spring MVC 和 WebFlux 中支持暂停函数(suspending functions),这些功能也标注在 @Controller
  • 为 WebFlux 提供了 clientserver 功能 API 的扩展
  • WebFlux 的 coRouter { } DSL
  • WebFlux 的 CoWebFilter
  • 在 RSocket 的 @MessageMapping 标注方法中支持暂停函数和 Flow
  • RSocketRequester 提供了扩展
  • Spring AOP

依赖项

当类路径中包含 kotlinx-coroutines-corekotlinx-coroutines-reactor 依赖时,就会启用协程支持:

build.gradle.kts

dependencies {

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

支持版本为1.4.0及以上。

“Reactive”如何转化为“Coroutines”?

对于返回值,从Reactive API到Coroutines API的转换如下:

  • fun handler(): Mono<Void> 变为 suspend fun handler()
  • fun handler(): Mono<T> 变为 suspend fun handler(): Tsuspend fun handler(): T?,这取决于 Mono 是否可以为空(这样写更具静态类型优势)
  • fun handler(): Flux<T> 变为 fun handler(): Flow<T>

对于输入参数:

  • 如果不需要惰性,fun handler(mono: Mono<T>) 就变成了 fun handler(value: T),因为可以调用悬停函数来获取值参数。

  • 如果需要惰性,fun handler(mono: Mono<T>) 就变成了 fun handler(supplier: suspend () → T)fun handler(supplier: suspend () → T?)

Flow 在 Kotlin 协程(Coroutines)领域中相当于“Flux”,适用于热流(hot stream)或冷流(cold stream)、有限流(finite stream)或无限流(infinite stream),其主要区别如下:

  • Flow 是基于推送(push-based)的,而 Flux 是基于推拉(push-pull)混合模式的。
  • 通过暂停函数(suspending functions)来实现背压(backpressure)控制。
  • Flow 只有一个 单一的暂停收集方法,操作符是通过 扩展(extensions) 来实现的。
  • 多亏了协程(Coroutines),操作符实现起来非常简单
  • 扩展(extensions)允许向 Flow 添加自定义操作符。
  • 收集操作(collect operations)都是暂停函数(suspending functions)。
  • map 操作符 支持异步操作(无需 flatMap),因为它接受一个暂停函数作为参数。

阅读这篇关于利用Spring、协程和Kotlin Flow实现响应式编程的博客文章,了解更多细节,包括如何使用协程并发运行代码。

控制器

这是一个使用协程(Coroutines)的@RestController示例。

@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {

@GetMapping("/suspend")
suspend fun suspendingEndpoint(): Banner {
delay(10)
return banner
}

@GetMapping("/flow")
fun flowEndpoint() = flow {
delay(10)
emit(banner)
delay(10)
emit(banner)
}

@GetMapping("/deferred")
fun deferredEndpoint() = GlobalScope.async {
delay(10)
banner
}

@GetMapping("/sequential")
suspend fun sequential(): List<Banner> {
val banner1 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
val banner2 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
return listOf(banner1, banner2)
}

@GetMapping("/parallel")
suspend fun parallel(): List<Banner> = coroutineScope {
val deferredBanner1: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
val deferredBanner2: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
listOf(deferredBanner1.await(), deferredBanner2.await())
}

@GetMapping("/error")
suspend fun error() {
throw IllegalStateException()
}

@GetMapping("/cancel")
suspend fun cancel() {
throw CancellationException()
}

}

也支持使用@Controller进行视图渲染。

@Controller
class CoroutinesViewController(banner: Banner) {

@GetMapping("/")
suspend fun render(model: Model): String {
delay(10)
model["banner"] = banner
return "index"
}
}

WebFlux.fn

以下是一个通过 coRouter { } DSL 定义的协程路由器(Coroutine router)及相关处理器的示例。

@Configuration
class RouterConfiguration {

@Bean
fun mainRouter(userHandler: UserHandler) = coRouter {
GET("/", userHandler::listView)
GET("/api/user", userHandler::listApi)
}
}
class UserHandler(builder: WebClient.Builder) {

private val client = builder.baseUrl("...").build()

suspend fun listView(request: ServerRequest): ServerResponse =
ServerResponse.ok().renderAndAwait("users", mapOf("users" to
client.get().uri("...").awaitExchange().awaitBody<User>()))

suspend fun listApi(request: ServerRequest): ServerResponse =
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
client.get().uri("...").awaitExchange().awaitBody<User>())
}

交易

通过Reactive事务管理的编程化变体,可以支持协程上的事务处理。

对于挂起函数(suspending functions),提供了一个名为 TransactionalOperator.executeAndAwait 的扩展功能。

import org.springframework.transaction.reactive.executeAndAwait

class PersonRepository(private val operator: TransactionalOperator) {

suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}

private suspend fun insertPerson1() {
// INSERT SQL statement
}

private suspend fun insertPerson2() {
// INSERT SQL statement
}
}

对于 Kotlin 的 Flow,提供了一个名为 Flow<T>.transactional 的扩展函数。

import org.springframework.transaction.reactive.transactional

class PersonRepository(private val operator: TransactionalOperator) {

fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)

private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}

private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}

上下文传播

Spring应用程序通过Micrometer进行监控以便支持可观测性。在跟踪支持方面,当前的观测数据会通过ThreadLocal在阻塞代码中传递,或者在Reactor的Context中在反应式管道中传递。但是,当前的观测数据也需要在挂起的函数的执行上下文中可用。如果没有这一点,“traceId”将不会自动附加到来自协程的日志语句中。

PropagationContextElement操作符通常可以确保Micrometer Context Propagation库能够与Kotlin协程(Kotlin Coroutines)一起正常使用。

它需要依赖 io.micrometer:context-propagation,并且还可以选择性地依赖 org.jetbrains.kotlinx:kotlinx-coroutines-reactor。通过调用 Hooks.enableAutomaticContextPropagation(),可以启用通过 CoroutinesUtils#invokeSuspendingFunction 实现的自动上下文传播功能(Spring 使用该功能将协程适配到 Reactor 的 FluxMono 中)。

应用程序也可以显式使用 PropagationContextElement 来通过上下文传播机制增强 CoroutineContext

fun main() {
runBlocking(Dispatchers.IO + PropagationContextElement()) {
waitAndLog()
}
}

suspend fun waitAndLog() {
delay(10)
logger.info("Suspending function with traceId")
}

在这里,假设已经配置了Micrometer Tracing,那么生成的日志记录将会显示当前的“traceId”,从而为您的应用程序提供更好的可观测性。