Kotlin 支持
该框架还进行了改进,以支持 Kotlin lambda 函数,因此现在你可以结合使用 Kotlin 语言和 Spring Integration 流定义:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}
@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
return { "baz" }
}
Kotlin 协程
自 6.0 版本起,Spring Integration 开始支持 Kotlin 协程。现在,服务方法可以使用 suspend 函数以及 kotlinx.coroutines.Deferred 和 kotlinx.coroutines.flow.Flow 返回类型:
@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()
@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
flow {
for (i in 1..3) {
emit("$payload #$i")
}
}
该框架将它们视为Reactive Streams交互,并使用 ReactiveAdapterRegistry 转换为相应的 Mono 和 Flux 反应器类型。如果回复通道是 ReactiveStreamsSubscribableChannel,则此类函数回复会在回复通道中处理,或者作为 CompletableFuture 在相应回调中的结果进行处理。
备注
默认情况下,带有 Flow 结果的函数在 @ServiceActivator 上不是 async 的,因此 Flow 实例会作为回复消息的有效负载生成。目标应用程序有责任将此对象作为协程处理或相应地将其转换为 Flux。
当在 Kotlin 中声明时,@MessagingGateway 接口方法也可标记 suspend 修饰符。框架内部利用 Mono 通过下游流执行请求-回复。这样的 Mono 结果由 MonoKt.awaitSingleOrNull() API 内部处理,以满足网关被调用的 suspend 函数的 kotlin.coroutines.Continuation 参数:
@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {
suspend fun suspendGateway(payload: String): String
}
根据 Kotlin 语言要求,此方法必须作为协程调用:
@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway
fun someServiceMethod() {
runBlocking {
val reply = suspendFunGateway.suspendGateway("test suspend gateway")
}
}