跳到主要内容
版本:7.0.2

入站通道适配器

DeepSeek V3 中英对照 Inbound Channel Adapter

入站通道适配器用于通过 JPA QL 在数据库上执行查询并返回结果。消息负载可以是单个实体或实体的 List。以下 XML 配置了一个 inbound-channel-adapter

<int-jpa:inbound-channel-adapter channel="inboundChannelAdapterOne"  // <1>
entity-manager="em" // <2>
auto-startup="true" // <3>
query="select s from Student s" // <4>
expect-single-result="true" // <5>
max-results="" // <6>
max-results-expression="" // <7>
delete-after-poll="true" // <8>
flush-after-delete="true"> // <9>
<int:poller fixed-rate="2000" >
<int:transactional propagation="REQUIRED" transaction-manager="transactionManager"/>
</int:poller>
</int-jpa:inbound-channel-adapter>
  • inbound-channel-adapter 执行 query 属性中的 JPA QL 后,将消息(包含负载)放入的通道。

  • 用于执行所需 JPA 操作的 EntityManager 实例。

  • 指示组件是否应在应用程序上下文启动时自动启动的属性。默认值为 true

  • 其结果作为消息负载发送出去的 JPA QL。

  • 此属性指示 JPQL 查询在结果中是返回单个实体还是一个实体 List。如果值设置为 true,则单个实体将作为消息负载发送。但是,如果将此值设置为 true 后返回了多个结果,则会抛出 MessagingException。默认值为 false

  • 这个非零、非负的整数值告诉适配器在执行选择操作时不要选择超过给定数量的行。默认情况下,如果未设置此属性,查询将选择所有可能的记录。此属性与 max-results-expression 互斥。可选。

  • 一个表达式,用于计算以查找结果集中的最大结果数。与 max-results 互斥。可选。

  • 如果希望在执行查询后删除接收到的行,请将此值设置为 true。必须确保组件作为事务的一部分运行。否则,可能会遇到诸如 java.lang.IllegalArgumentException: Removing a detached instance …​ 的异常。

  • 如果希望在删除接收到的实体后立即刷新持久化上下文,并且不希望依赖 EntityManagerflushMode,请将此值设置为 true。默认值为 false

配置参数参考

以下列表展示了可以为 inbound-channel-adapter 设置的所有值:

<int-jpa:inbound-channel-adapter
auto-startup="true" // <1>
channel="" // <2>
delete-after-poll="false" // <3>
delete-per-row="false" // <4>
entity-class="" // <5>
entity-manager="" // <6>
entity-manager-factory="" // <7>
expect-single-result="false" // <8>
id=""
jpa-operations="" // <9>
jpa-query="" // <10>
named-query="" // <11>
native-query="" // <12>
parameter-source="" // <13>
send-timeout=""> // <14>
<int:poller ref="myPoller"/>
</int-jpa:inbound-channel-adapter>
  • 此生命周期属性指示该组件是否应在应用程序上下文启动时自动启动。该属性默认为 true。可选。

  • 适配器将执行所需 JPA 操作后的负载消息发送到的通道。

  • 一个布尔标志,指示在适配器轮询所选记录后是否删除它们。默认情况下,值为 false(即不删除记录)。您必须确保该组件作为事务的一部分运行。否则,您可能会遇到异常,例如:java.lang.IllegalArgumentException: Removing a detached instance …​。可选。

  • 一个布尔标志,指示记录是否可以批量删除,还是必须一次删除一条记录。默认情况下,值为 false(即可以批量删除记录)。可选。

  • 要从数据库查询的实体类的完全限定名。适配器会根据实体类名自动构建 JPA 查询。可选。

  • 用于执行 JPA 操作的 jakarta.persistence.EntityManager 实例。可选。

  • 用于获取执行 JPA 操作的 jakarta.persistence.EntityManager 实例的 jakarta.persistence.EntityManagerFactory 实例。可选。

  • 一个布尔标志,指示选择操作预期返回单个结果还是结果的 List。如果此标志设置为 true,则选定的单个实体将作为消息负载发送。如果返回多个实体,则会抛出异常。如果为 false,则实体的 List 将作为消息负载发送。该值默认为 false。可选。

  • 用于执行 JPA 操作的 org.springframework.integration.jpa.core.JpaOperations 实现。我们建议不要提供您自己的实现,而是使用默认的 org.springframework.integration.jpa.core.DefaultJpaOperations 实现。您可以使用 entity-managerentity-manager-factoryjpa-operations 属性中的任何一个。可选。

  • 此适配器要执行的 JPA QL。可选。

  • 此适配器需要执行的命名查询。可选。

  • 此适配器执行的原生查询。您可以使用 jpa-querynamed-queryentity-classnative-query 属性中的任何一个。可选。

  • 用于解析查询中参数值的 o.s.i.jpa.support.parametersource.ParameterSource 实现。如果 entity-class 属性有值,则忽略此属性。可选。

  • 向通道发送消息时的最大等待时间(以毫秒为单位)。可选。

使用 Java 配置进行配置

以下Spring Boot应用程序展示了如何使用Java配置入站适配器的示例:

@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}

@Autowired
private EntityManagerFactory entityManagerFactory;

@Bean
public JpaExecutor jpaExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
jpaExecutor.setJpaQuery("from Student");
return executor;
}

@Bean
@InboundChannelAdapter(channel = "jpaInputChannel",
poller = @Poller(fixedDelay = "${poller.interval}"))
public MessageSource<?> jpaInbound() {
return new JpaPollingChannelAdapter(jpaExecutor());
}

@Bean
@ServiceActivator(inputChannel = "jpaInputChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}

}

使用 Java DSL 进行配置

以下Spring Boot应用程序展示了如何使用Java DSL配置入站适配器的示例:

@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}

@Autowired
private EntityManagerFactory entityManagerFactory;

@Bean
public IntegrationFlow pollingAdapterFlow() {
return IntegrationFlow
.from(Jpa.inboundAdapter(this.entityManagerFactory)
.entityClass(StudentDomain.class)
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
.channel(c -> c.queue("pollingResults"))
.get();
}

}