出站网关
JPA入站通道适配器允许您轮询数据库以检索一个或多个JPA实体。检索到的数据随后将用于启动Spring集成流,并将检索到的数据作为消息负载使用。
此外,您可以在流程结束时使用JPA出站通道适配器来持久化数据,这本质上会在持久化操作结束时停止流程。
然而,如何在流程中执行JPA持久化操作呢?例如,您可能在Spring Integration消息流中处理业务数据,并希望将其持久化,同时仍需要在后续下游使用其他组件。或者,您需要执行JPQL查询并主动检索数据,而不是使用轮询器轮询数据库,然后在流程的后续组件中处理这些数据。
这正是JPA出站网关发挥作用的地方。它们使您能够持久化数据以及检索数据。为支持这些用途,Spring Integration提供了两种类型的JPA出站网关:
-
更新出站网关
-
检索出站网关
每当使用出站网关执行保存、更新或仅删除数据库中某些记录的操作时,都需要使用更新型出站网关。例如,如果使用 entity 进行持久化,结果将返回一个合并并持久化的实体。在其他情况下,将返回受影响(更新或删除)的记录数量。
从数据库中检索(选择)数据时,我们使用检索出站网关。通过检索出站网关,我们可以使用 JPQL、命名查询(原生或基于 JPQL)或原生查询(SQL)来选取数据并获取结果。
更新型出站网关在功能上类似于出站通道适配器,不同之处在于更新型出站网关在执行JPA操作后会将结果发送到网关的回复通道。
检索出站网关类似于入站通道适配器。
这种相似性是使用核心 JpaExecutor 类来尽可能统一通用功能的主要因素。
对于所有 JPA 出站网关,与 outbound-channel-adapter 类似,我们可以使用它来执行各种 JPA 操作:
-
实体类
-
JPA 查询语言 (JPQL)
-
原生查询
-
命名查询
有关配置示例,请参阅 JPA 出站网关示例。
通用配置参数
JPA Outbound Gateways 始终可以访问 Spring Integration 的 Message 作为输入。因此,以下参数可用:
parameter-source-factory
o.s.i.jpa.support.parametersource.ParameterSourceFactory 的一个实例,用于获取 o.s.i.jpa.support.parametersource.ParameterSource 的实例。ParameterSource 用于解析查询中提供的参数值。如果使用 JPA 实体执行操作,则忽略 parameter-source-factory 属性。parameter 子元素与 parameter-source-factory 互斥,并且必须在提供的 ParameterSourceFactory 上进行配置。可选。
use-payload-as-parameter-source
如果设置为 true,则 Message 的负载(payload)将用作参数的来源。如果设置为 false,则整个 Message 都可用作参数的来源。如果未传入 JPA 参数,此属性默认为 true。这意味着,如果你使用默认的 BeanPropertyParameterSourceFactory,负载的 bean 属性将用作 JPA 查询参数值的来源。然而,如果传入了 JPA 参数,此属性默认评估为 false。原因是 JPA 参数允许你提供 SpEL 表达式。因此,能够访问整个 Message(包括消息头)是非常有益的。可选属性。
更新出站网关
以下列表展示了您可以在更新出站网关上设置的所有属性,并描述了关键属性:
<int-jpa:updating-outbound-gateway request-channel="" // <1>
auto-startup="true"
entity-class=""
entity-manager=""
entity-manager-factory=""
id=""
jpa-operations=""
jpa-query=""
named-query=""
native-query=""
order=""
parameter-source-factory=""
persist-mode="MERGE"
reply-channel="" // <2>
reply-timeout="" // <3>
use-payload-as-parameter-source="true">
<int:poller/>
<int-jpa:transactional/>
<int-jpa:parameter name="" type="" value=""/>
<int-jpa:parameter name="" expression=""/>
</int-jpa:updating-outbound-gateway>
出站网关接收消息以执行所需操作的通道。此属性类似于
outbound-channel-adapter的channel属性。可选。网关在执行所需的 JPA 操作后发送响应的通道。如果未定义此属性,则请求消息必须包含
replyChannel头信息。可选。指定网关等待将结果发送到回复通道的时间。仅当回复通道本身可能阻塞发送操作时适用(例如,当前已满的有界
QueueChannel)。该值以毫秒为单位指定。可选。
使用 Java 配置进行配置
以下Spring Boot应用程序展示了如何使用Java配置出站适配器的示例:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
@IntegrationComponentScan
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@MessagingGateway
interface JpaGateway {
@Gateway(requestChannel = "jpaUpdateChannel")
@Transactional
void updateStudent(StudentDomain payload);
}
@Bean
@ServiceActivator(channel = "jpaUpdateChannel")
public MessageHandler jpaOutbound() {
JpaOutboundGateway adapter =
new JpaOutboundGateway(new JpaExecutor(this.entityManagerFactory));
adapter.setOutputChannelName("updateResults");
return adapter;
}
}
使用 Java DSL 进行配置
以下Spring Boot应用程序展示了如何使用Java DSL配置出站适配器的示例:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("updateResults"));
}
}
检索出站网关
以下示例演示了如何配置检索出站网关:
- Java DSL
- Kotlin DSL
- Java
- XML
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow retrievingGatewayFlow() {
return f -> f
.handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
.channel(c -> c.queue("retrieveResults"));
}
}
@Bean
fun retrievingGatewayFlow() =
integrationFlow {
handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
channel { queue("retrieveResults") }
}
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public JpaExecutor jpaExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
jpaExecutor.setJpaQuery("from Student s where s.id = :id");
executor.setJpaParameters(Collections.singletonList(new JpaParameter("id", null, "payload")));
jpaExecutor.setExpectSingleResult(true);
return executor;
}
@Bean
@ServiceActivator(channel = "jpaRetrievingChannel")
public MessageHandler jpaOutbound() {
JpaOutboundGateway adapter = new JpaOutboundGateway(jpaExecutor());
adapter.setOutputChannelName("retrieveResults");
adapter.setGatewayType(OutboundGatewayType.RETRIEVING);
return adapter;
}
}
<int-jpa:retrieving-outbound-gateway request-channel=""
auto-startup="true"
delete-after-poll="false"
delete-in-batch="false"
entity-class=""
id-expression="" // <1>
entity-manager=""
entity-manager-factory=""
expect-single-result="false" // <2>
id=""
jpa-operations=""
jpa-query=""
max-results="" // <3>
max-results-expression="" // <4>
first-result="" // <5>
first-result-expression="" // <6>
named-query=""
native-query=""
order=""
parameter-source-factory=""
reply-channel=""
reply-timeout=""
use-payload-as-parameter-source="true">
<int:poller></int:poller>
<int-jpa:transactional/>
<int-jpa:parameter name="" type="" value=""/>
<int-jpa:parameter name="" expression=""/>
</int-jpa:retrieving-outbound-gateway>
(自 Spring Integration 4.0 起) 该 SpEL 表达式用于根据
requestMessage(作为评估上下文的根对象)确定EntityManager.find(Class entityClass, Object primaryKey)方法中的primaryKey值。entityClass参数由entity-class属性(如果存在)确定,否则由payload的类确定。如果使用id-expression,则不允许使用所有其他属性。可选。一个布尔标志,指示选择操作预期返回单个结果还是结果的
List。如果此标志设置为true,则单个实体将作为消息的有效负载发送。如果返回多个实体,则会抛出异常。如果为false,则实体的List将作为消息的有效负载发送。默认值为false。可选。此非零、非负整数值告知适配器在执行选择操作时不要选择超过指定数量的行。默认情况下,如果未设置此属性,则给定查询将选择所有可能的记录。此属性与
max-results-expression互斥。可选。可用于查找结果集中最大结果数的表达式。它与
max-results互斥。可选。此非零、非负整数值告知适配器从哪条记录开始检索结果。此属性与
first-result-expression互斥。此属性在版本 3.0 中引入。可选。此表达式针对消息进行评估,以查找结果集中第一条记录的位置。此属性与
first-result互斥。此属性在版本 3.0 中引入。可选。
当你选择在检索时删除实体,并且已经检索到一个实体集合时,默认情况下,实体是按逐个删除的。这可能会导致性能问题。
或者,你可以将属性 deleteInBatch 设置为 true,以执行批量删除。然而,这样做的限制是不支持级联删除。
JSR 317: Java™ Persistence 2.0 在第 4.10 章“批量更新和删除操作”中说明:
“删除操作仅适用于指定类及其子类的实体。它不会级联到相关实体。”
更多信息,请参阅 JSR 317: Java™ Persistence 2.0
从版本 6.0 开始,当查询未返回任何实体时,Jpa.retrievingGateway() 会返回一个空列表结果。在此之前,根据 requiresReply 的设置,会返回 null 并结束流程,或抛出异常。或者,要恢复之前的行为,可以在网关后添加一个 filter 来过滤掉空列表。在那些将空列表处理作为下游逻辑一部分的应用中,这需要额外的配置。有关可能的空列表处理选项,请参阅 Splitter Discard Channel。
JPA 出站网关示例
本节包含使用更新出站网关和检索出站网关的各种示例:
通过实体类进行更新
在以下示例中,通过使用 org.springframework.integration.jpa.test.entity.Student 实体类作为 JPA 定义参数,持久化了一个更新型出站网关:
<int-jpa:updating-outbound-gateway request-channel="entityRequestChannel" // <1>
reply-channel="entityResponseChannel" // <2>
entity-class="org.springframework.integration.jpa.test.entity.Student"
entity-manager="em"/>
这是出站网关的请求通道。它类似于
outbound-channel-adapter的channel属性。这是网关与出站适配器的区别所在。这是接收 JPA 操作回复的通道。然而,如果您对接收到的回复不感兴趣,只想执行操作,那么使用 JPA
outbound-channel-adapter是合适的选择。在本示例中,我们使用实体类,回复是 JPA 操作创建或合并的实体对象。
使用 JPQL 更新
以下示例使用Java持久化查询语言(JPQL)更新实体,这要求使用更新型出站网关:
<int-jpa:updating-outbound-gateway request-channel="jpaqlRequestChannel"
reply-channel="jpaqlResponseChannel"
jpa-query="update Student s set s.lastName = :lastName where s.rollNumber = :rollNumber" // <1>
entity-manager="em">
<int-jpa:parameter name="lastName" expression="payload"/>
<int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:updating-outbound-gateway>
网关执行的 JPQL 查询。由于我们使用了更新出站网关,只有
update和delete类型的 JPQL 查询才是合理的选择。
当你发送一条包含 String 类型有效负载的消息,并且该消息还带有一个名为 rollNumber、值为 long 类型的头部时,具有指定学号的学生姓氏将被更新为消息有效负载中的值。在使用更新网关时,返回值始终是一个整数值,表示受 JPA QL 执行影响的记录数。
使用 JPQL 检索实体
以下示例使用检索出站网关和JPQL从数据库中检索(选择)一个或多个实体:
<int-jpa:retrieving-outbound-gateway request-channel="retrievingGatewayReqChannel"
reply-channel="retrievingGatewayReplyChannel"
jpa-query="select s from Student s where s.firstName = :firstName and s.lastName = :lastName"
entity-manager="em">
<int-jpa:parameter name="firstName" expression="payload"/>
<int-jpa:parameter name="lastName" expression="headers['lastName']"/>
</int-jpa:outbound-gateway>
通过 id-expression 检索实体
以下示例使用带有 id-expression 的检索出站网关,从数据库中检索(查找)一个且仅一个实体:primaryKey 是 id-expression 表达式求值的结果。entityClass 是消息 payload 的类。
<int-jpa:retrieving-outbound-gateway
request-channel="retrievingGatewayReqChannel"
reply-channel="retrievingGatewayReplyChannel"
id-expression="payload.id"
entity-manager="em"/>
使用命名查询进行更新
使用命名查询与直接使用JPQL查询基本相同。区别在于需要使用 named-query 属性,如下例所示:
<int-jpa:updating-outbound-gateway request-channel="namedQueryRequestChannel"
reply-channel="namedQueryResponseChannel"
named-query="updateStudentByRollNumber"
entity-manager="em">
<int-jpa:parameter name="lastName" expression="payload"/>
<int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:outbound-gateway>
你可以在这里找到一个使用 Spring Integration JPA 适配器的完整示例应用程序:此处。