跳到主要内容

R2DBC 支持

QWen Plus 中英对照 R2DBC Support

Spring Integration 提供了通道适配器,用于通过 R2DBC 驱动程序以响应式访问数据库来接收和发送消息。

你需要将这个依赖项添加到你的项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-r2dbc</artifactId>
<version>6.4.2</version>
</dependency>
xml

R2DBC 入站通道适配器

R2dbcMessageSource 是基于 R2dbcEntityOperations 的可轮询 MessageSource 实现,并根据 expectSingleResult 选项生成带有从数据库获取的数据作为有效负载的 FluxMono 消息。SELECT 查询可以静态提供,也可以基于在每次 receive() 调用时评估的 SpEL 表达式。R2dbcMessageSource.SelectCreator 作为评估上下文的根对象存在,以允许使用 StatementMapper.SelectSpec 流畅 API。默认情况下,此通道适配器将查询结果映射到 LinkedCaseInsensitiveMap 实例。可以通过提供 payloadType 选项来自定义它,该选项由基于 this.r2dbcEntityOperations.getConverter()EntityRowMapper 使用。updateSql 是可选的,用于在数据库中标记已读取的记录,以便从后续轮询中跳过。UPDATE 操作可以使用 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec> 提供,以根据 SELECT 结果中的记录将值绑定到 UPDATE 中。

这个通道适配器的典型配置可能如下所示:

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
"SELECT * FROM person WHERE name='Name'");
r2dbcMessageSource.setPayloadType(Person.class);
r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
r2dbcMessageSource.setBindFunction(
(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
return r2dbcMessageSource;
}
java

使用 Java DSL 为此通道适配器配置如下:

// 假设这里是Java DSL代码示例
java

请注意,具体的配置代码需要根据实际情况编写。

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlow
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
(selectCreator) ->
selectCreator.createSelect("person")
.withProjection("*")
.withCriteria(Criteria.where("id").is(1)))
.expectSingleResult(true)
.payloadType(Person.class)
.updateSql("UPDATE Person SET id='2' where id = :id")
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
bindSpec.bind("id", o.getId())),
e -> e.poller(p -> p.fixedDelay(100)))
.handle((p, h) -> p)
.channel(MessageChannels.flux())
.get();
}
java

R2DBC 输出通道适配器

R2dbcMessageHandlerReactiveMessageHandler 的一个实现,用于使用提供的 R2dbcEntityOperations 在数据库中执行 INSERT(默认)、UPDATEDELETE 查询。R2dbcMessageHandler.Type 可以通过请求消息的 SpEL 表达式静态配置或动态配置。要执行的查询可以基于 tableNamevaluescriteria 表达式选项,或者(如果未提供 tableName)则将整个消息有效负载视为一个 org.springframework.data.relational.core.mapping.Table 实体以执行 SQL 操作。包 org.springframework.data.relational.core.query 已注册为 SpEL 评估上下文中的导入,以便直接访问用于 UPDATEDELETE 查询的 Criteria 流畅 API。valuesExpression 用于 INSERTUPDATE,并且必须被评估为列值对的 Map,以便根据请求消息在目标表中进行更改。

此通道适配器的典型配置可能如下所示:

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
messageHandler.setCriteriaExpression(
EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
return messageHandler;
}
java

使用 Java DSL 为此通道适配器配置如下:

.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
.values("{age:36}"))
java