出站网关
JPA 入站通道适配器允许您轮询数据库以检索一个或多个 JPA 实体。检索到的数据随后用于启动一个 Spring Integration 流,该流使用检索到的数据作为消息有效负载。
此外,你可以在流程的末端使用 JPA 外发通道适配器来持久化数据,实际上是在持久化操作结束时停止流程。
然而,如何在流程中间执行 JPA 持久化操作呢?例如,你可能有业务数据需要在 Spring Integration 消息流中进行处理,并且你希望将这些数据持久化,但仍然需要使用下游的其他组件。或者,与其使用轮询器轮询数据库,你需要执行 JPQL 查询并主动检索数据,然后在流程中的后续组件中处理这些数据。
这就是 JPA 外发网关发挥作用的地方。它们使您能够持久化数据以及检索数据。为了便于这些用途,Spring Integration 提供了两种类型的 JPA 外发网关:
-
更新出站网关
-
检索出站网关
每当使用出站网关执行保存、更新或仅删除数据库中某些记录的操作时,您需要使用更新出站网关。例如,如果您使用 entity
来持久化它,则返回的结果是一个合并且持久化的实体。在其他情况下,则返回受影响的记录数(更新或删除的)。
当从数据库中检索(选择)数据时,我们使用检索 outbound 网关。通过检索 outbound 网关,我们可以使用 JPQL、命名查询(本地或基于 JPQL)或原生查询(SQL)来选择数据并检索结果。
更新的出站网关在功能上类似于出站通道适配器,不同之处在于更新的出站网关在执行 JPA 操作后会向网关的回复通道发送一个结果。
检索型 outbound 网关类似于 inbound 通道适配器。
我们建议您首先阅读本章前面的 Outbound Channel Adapter 部分和 Inbound Channel Adapter 部分,因为大多数常见概念都在那里进行了说明。
这种相似性是使用中央 JpaExecutor
类尽可能统一通用功能的主要因素。
所有 JPA 外发网关都具有与 outbound-channel-adapter
类似的功能,我们可以用它来执行各种 JPA 操作:
-
实体类
-
JPA 查询语言 (JPQL)
-
本地查询
-
命名查询
对于配置示例,请参见 JPA 外发网关示例。
常见配置参数
JPA 输出网关始终可以访问 Spring Integration 的 Message
作为输入。因此,以下参数是可用的:
parameter-source-factory
o.s.i.jpa.support.parametersource.ParameterSourceFactory
的一个实例,用于获取 o.s.i.jpa.support.parametersource.ParameterSource
的实例。ParameterSource
用于解析查询中提供的参数值。如果您使用 JPA 实体执行操作,则会忽略 parameter-source-factory
属性。parameter
子元素与 parameter-source-factory
互斥,并且必须在提供的 ParameterSourceFactory
上进行配置。可选。
use-payload-as-parameter-source
如果设置为 true
,则 Message
的有效负载将用作参数的来源。如果设置为 false
,则整个 Message
可用作参数的来源。如果没有传递 JPA 参数,此属性默认为 true
。这意味着,如果您使用默认的 BeanPropertyParameterSourceFactory
,有效负载的 bean 属性将用作 JPA 查询参数值的来源。但是,如果传递了 JPA 参数,则此属性默认评估为 false
。原因是 JPA 参数允许您提供 SpEL 表达式。因此,能够访问整个 Message
(包括标题)是非常有益的。可选。
更新 Outbound Gateway
以下列表显示了可以在 updating-outbound-gateway 上设置的所有属性,并描述了关键属性:
<int-jpa:updating-outbound-gateway request-channel="" // <1>
auto-startup="true"
entity-class=""
entity-manager=""
entity-manager-factory=""
id=""
jpa-operations=""
jpa-query=""
named-query=""
native-query=""
order=""
parameter-source-factory=""
persist-mode="MERGE"
reply-channel="" // <2>
reply-timeout="" // <3>
use-payload-as-parameter-source="true">
<int:poller/>
<int-jpa:transactional/>
<int-jpa:parameter name="" type="" value=""/>
<int-jpa:parameter name="" expression=""/>
</int-jpa:updating-outbound-gateway>
外发网关接收执行所需操作的消息的通道。此属性类似于
outbound-channel-adapter
的channel
属性。可选。网关在执行所需的 JPA 操作后发送响应的通道。如果未定义此属性,则请求消息必须具有
replyChannel
标头。可选。指定网关等待将结果发送到回复通道的时间。仅当回复通道本身可能会阻塞发送操作时应用(例如,当前已满的有界
QueueChannel
)。值以毫秒为单位指定。可选。
使用Java配置进行配置
下面的 Spring Boot 应用程序展示了如何使用 Java 配置出站适配器的示例:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
@IntegrationComponentScan
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@MessagingGateway
interface JpaGateway {
@Gateway(requestChannel = "jpaUpdateChannel")
@Transactional
void updateStudent(StudentDomain payload);
}
@Bean
@ServiceActivator(channel = "jpaUpdateChannel")
public MessageHandler jpaOutbound() {
JpaOutboundGateway adapter =
new JpaOutboundGateway(new JpaExecutor(this.entityManagerFactory));
adapter.setOutputChannelName("updateResults");
return adapter;
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置 outbound 适配器的示例:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("updateResults"));
}
}
检索 outbound gateway
以下示例演示了如何配置检索 outbound gateway:
- Java DSL
- Kotlin DSL
- Java
- XML
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow retrievingGatewayFlow() {
return f -> f
.handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
.channel(c -> c.queue("retrieveResults"));
}
}
@Bean
fun retrievingGatewayFlow() =
integrationFlow {
handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
channel { queue("retrieveResults") }
}
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public JpaExecutor jpaExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
jpaExecutor.setJpaQuery("from Student s where s.id = :id");
executor.setJpaParameters(Collections.singletonList(new JpaParameter("id", null, "payload")));
jpaExecutor.setExpectSingleResult(true);
return executor;
}
@Bean
@ServiceActivator(channel = "jpaRetrievingChannel")
public MessageHandler jpaOutbound() {
JpaOutboundGateway adapter = new JpaOutboundGateway(jpaExecutor());
adapter.setOutputChannelName("retrieveResults");
adapter.setGatewayType(OutboundGatewayType.RETRIEVING);
return adapter;
}
}
<int-jpa:retrieving-outbound-gateway request-channel=""
auto-startup="true"
delete-after-poll="false"
delete-in-batch="false"
entity-class=""
id-expression="" // <1>
entity-manager=""
entity-manager-factory=""
expect-single-result="false" // <2>
id=""
jpa-operations=""
jpa-query=""
max-results="" // <3>
max-results-expression="" // <4>
first-result="" // <5>
first-result-expression="" // <6>
named-query=""
native-query=""
order=""
parameter-source-factory=""
reply-channel=""
reply-timeout=""
use-payload-as-parameter-source="true">
<int:poller></int:poller>
<int-jpa:transactional/>
<int-jpa:parameter name="" type="" value=""/>
<int-jpa:parameter name="" expression=""/>
</int-jpa:retrieving-outbound-gateway>
(自 Spring Integration 4.0 起) 确定
EntityManager.find(Class entityClass, Object primaryKey)
方法的primaryKey
值的 SpEL 表达式,以requestMessage
作为评估上下文的根对象。如果存在,则从entity-class
属性确定entityClass
参数;否则,从payload
类中确定。如果使用id-expression
,则不允许使用所有其他属性。可选。指示选择操作是否预期返回单个结果或
List
结果的布尔标志。如果此标志设置为true
,则单个实体将作为消息的有效负载发送。如果返回多个实体,则会抛出异常。如果设置为false
,则将实体List
作为消息的有效负载发送。默认值为false
。可选。此非零、非负整数值告诉适配器在执行选择操作时不要选择超过指定数量的行。默认情况下,如果没有设置此属性,则由给定查询选择所有可能的记录。此属性与
max-results-expression
互斥。可选。可用于查找结果集中最大结果数的表达式。它与
max-results
互斥。可选。此非零、非负整数值告诉适配器从哪个第一条记录开始检索结果。此属性与
first-result-expression
互斥。此属性是在版本 3.0 中引入的。可选。该表达式针对消息进行求值,以查找结果集中第一条记录的位置。此属性与
first-result
互斥。此属性是在版本 3.0 中引入的。可选。
当你选择在检索时删除实体,并且你已经检索到一个实体集合时,默认情况下,实体是按每个实体逐一删除的。这可能会导致性能问题。
或者,你可以将属性 deleteInBatch
设置为 true
,以执行批量删除。但是,这样做有一个限制,即不支持级联删除。
JSR 317:Java™ 持久化 2.0 在第 4.10 章“批量更新和删除操作”中指出:
“删除操作仅适用于指定类及其子类的实体。它不会级联到相关实体。”
有关更多信息,请参阅 JSR 317:Java™ 持久化 2.0
从 6.0 版本开始,Jpa.retrievingGateway()
在查询未返回任何实体时将返回一个空列表结果。以前会根据 requiresReply
返回 null
结束流程或抛出异常。或者,要恢复到以前的行为,在网关之后添加一个 filter
来过滤掉空列表。这需要在空列表处理是下游逻辑一部分的应用程序中进行额外的配置。有关可能的空列表处理选项,请参阅 拆分器丢弃通道。
JPA 外向网关示例
本节包含使用更新 outbound 网关和检索 outbound 网关的各种示例:
使用实体类进行更新
在以下示例中,通过使用 org.springframework.integration.jpa.test.entity.Student
实体类作为 JPA 定义参数来持久化更新的出站网关:
<int-jpa:updating-outbound-gateway request-channel="entityRequestChannel" // <1>
reply-channel="entityResponseChannel" // <2>
entity-class="org.springframework.integration.jpa.test.entity.Student"
entity-manager="em"/>
这是 outbound 网关的请求通道。它类似于
outbound-channel-adapter
的channel
属性。这里是网关与 outbound 适配器不同的地方。这是接收 JPA 操作回复的通道。但是,如果您不关心收到的回复,而只想执行操作,则使用 JPA
outbound-channel-adapter
是合适的选择。在本例中,我们使用了一个实体类,回复是由于 JPA 操作而创建或合并的实体对象。
使用 JPQL 更新
以下示例通过使用 Java 持久化查询语言 (JPQL) 更新实体,这需要使用更新 outbound gateway:
<int-jpa:updating-outbound-gateway request-channel="jpaqlRequestChannel"
reply-channel="jpaqlResponseChannel"
jpa-query="update Student s set s.lastName = :lastName where s.rollNumber = :rollNumber" // <1>
entity-manager="em">
<int-jpa:parameter name="lastName" expression="payload"/>
<int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:updating-outbound-gateway>
网关执行的 JPQL 查询。由于我们使用了更新型的输出网关,只有
update
和delete
JPQL 查询是合理的选择。
当你发送一个带有 String
负载的消息时,该消息还包含一个名为 rollNumber
的头,其值为 long
类型,指定学号的学生的姓氏将被更新为消息负载中的值。在使用更新网关时,返回值始终是一个整数值,表示执行 JPA QL 影响的记录数。
使用 JPQL 检索实体
以下示例使用检索 outbound gateway 和 JPQL 从数据库中检索(选择)一个或多个实体:
<int-jpa:retrieving-outbound-gateway request-channel="retrievingGatewayReqChannel"
reply-channel="retrievingGatewayReplyChannel"
jpa-query="select s from Student s where s.firstName = :firstName and s.lastName = :lastName"
entity-manager="em">
<int-jpa:parameter name="firstName" expression="payload"/>
<int-jpa:parameter name="lastName" expression="headers['lastName']"/>
</int-jpa:outbound-gateway>
使用 id-expression
检索实体
以下示例使用带有 id-expression
的检索 outbound 网关从数据库中检索(查找)唯一的一个实体:primaryKey
是 id-expression
评估的结果。entityClass
是消息 payload
的类。
<int-jpa:retrieving-outbound-gateway
request-channel="retrievingGatewayReqChannel"
reply-channel="retrievingGatewayReplyChannel"
id-expression="payload.id"
entity-manager="em"/>
使用命名查询进行更新
使用命名查询基本上与直接使用 JPQL 查询相同。不同的是,使用了 named-query
属性,如下例所示:
<int-jpa:updating-outbound-gateway request-channel="namedQueryRequestChannel"
reply-channel="namedQueryResponseChannel"
named-query="updateStudentByRollNumber"
entity-manager="em">
<int-jpa:parameter name="lastName" expression="payload"/>
<int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:outbound-gateway>
你可以在 这里 找到一个使用 Spring Integration 的 JPA 适配器的完整示例应用程序。