跳到主要内容

Kotlin 支持

QWen Plus 中英对照 Kotlin Support

该框架还改进了对 Kotlin lambda 的支持,因此现在你可以使用 Kotlin 语言和 Spring Integration 流定义的组合:

@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}

@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
return { "baz" }
}
kotlin

Kotlin 协程

从 6.0 版本开始,Spring Integration 提供了对 Kotlin 协程的支持。现在可以使用 suspend 函数和 kotlinx.coroutines.Deferred & kotlinx.coroutines.flow.Flow 返回类型用于服务方法:

@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()

@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
flow {
for (i in 1..3) {
emit("$payload #$i")
}
}
kotlin

框架将它们视为反应式流交互,并使用 ReactiveAdapterRegistry 将其转换为相应的 MonoFlux 反应器类型。这样的函数回复会在回复通道中被处理,如果它是一个 ReactiveStreamsSubscribableChannel,或者作为 CompletableFuture 的结果在相应的回调中处理。

备注

具有 Flow 结果的函数在 @ServiceActivator 上默认不是 async,因此 Flow 实例作为回复消息的有效负载生成。处理此对象作为协程或将其转换为 Flux 是目标应用程序的责任。

@MessagingGateway 接口方法在用 Kotlin 声明时也可以标记为 suspend 修饰符。框架内部使用 Mono 来执行请求 - 回复操作,使用下游流。这样的 Mono 结果由 MonoKt.awaitSingleOrNull() API 内部处理,以满足被调用的网关 suspend 函数的 kotlin.coroutines.Continuation 参数:

@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {

suspend fun suspendGateway(payload: String): String

}
kotlin

根据 Kotlin 语言要求,此方法必须作为协程调用:

@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway

fun someServiceMethod() {
runBlocking {
val reply = suspendFunGateway.suspendGateway("test suspend gateway")
}
}
kotlin