跳到主要内容
版本:7.0.2

消息发布

DeepSeek V3 中英对照 Message Publishing

(Aspect-oriented Programming) AOP 消息发布功能允许您将消息的构建和发送作为方法调用的副产品。例如,假设您有一个组件,并且每次该组件的状态发生变化时,您都希望通过消息收到通知。发送此类通知的最简单方法是向专用通道发送消息,但是如何将更改对象状态的方法调用与消息发送过程连接起来,以及通知消息应如何构建?AOP 消息发布功能通过配置驱动的方法来处理这些职责。

消息发布配置

Spring Integration 提供了两种配置方式:XML 配置和基于注解(Java)的配置。

使用 @Publisher 注解进行注解驱动配置

注解驱动的方法允许您使用 @Publisher 注解来标注任何方法,并指定一个 'channel' 属性。从版本 5.1 开始,要启用此功能,您必须在某个 @Configuration 类上使用 @EnablePublisher 注解。更多信息请参见配置与 @EnableIntegration。消息由方法调用的返回值构建,并发送到由 'channel' 属性指定的通道。为了进一步管理消息结构,您还可以结合使用 @Payload@Header 注解。

在内部,Spring Integration 的消息发布功能通过定义 PublisherAnnotationAdvisor 来使用 Spring AOP,并结合 Spring 表达式语言(SpEL),为您提供了对发布 Message 结构的极大灵活性和控制能力。

PublisherAnnotationAdvisor 定义并绑定了以下变量:

  • #return: 绑定到返回值,允许你引用它或其属性(例如 #return.something,其中 'something' 是绑定到 #return 的对象的属性)

  • #exception: 绑定到方法调用时抛出的异常(如果存在)

  • #args: 绑定到方法参数,以便你可以按名称提取单个参数(例如 #args.fname

考虑以下示例:

@Publisher
public String defaultPayload(String fname, String lname) {
return fname + " " + lname;
}

在前面的例子中,消息的构建结构如下:

  • 消息负载是方法的返回类型和值。这是默认行为。

  • 新构建的消息会被发送到一个默认的发布者通道,该通道配置了注解后处理器(本节稍后将介绍)。

以下示例与前述示例相同,只是它未使用默认发布通道:

@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
return fname + " " + lname;
}

我们通过设置 @Publisher 注解的 'channel' 属性来指定发布通道,而非使用默认发布通道。同时,我们还添加了一个 @Header 注解,这会使名为 'last' 的消息头具有与方法参数 'lname' 相同的值。该消息头会被添加到新构建的消息中。

以下示例与前面的示例几乎完全相同:

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
return fname + " " + lname;
}

唯一的区别在于,我们在方法上使用了 @Payload 注解,以明确指定方法的返回值应作为消息的有效载荷。

以下示例在前一个配置的基础上,通过在 @Payload 注解中使用 Spring 表达式语言,进一步指导框架如何构建消息:

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
return fname + " " + lname;
}

在前面的示例中,消息是方法调用的返回值和输入参数 lname 的拼接结果。名为 x 的消息头其值由输入参数 num 决定,该消息头被添加到新构建的消息中。

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
return fname + " " + lname;
}

在前面的示例中,您看到了 @Payload 注解的另一种用法。这里,我们注解了一个方法参数,该参数将成为新构建消息的有效载荷。

与Spring中大多数其他基于注解的功能一样,你需要注册一个后处理器(PublisherAnnotationBeanPostProcessor)。以下示例展示了如何操作:

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

为了更简洁的配置,您可以使用命名空间支持,如下例所示:

<int:annotation-config>
<int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>

对于Java配置,你必须使用 @EnablePublisher 注解,如下例所示:

@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
...
}

从 5.1.3 版本开始,<int:enable-publisher> 组件以及 @EnablePublisher 注解都提供了 proxy-target-classorder 属性,用于调整 ProxyFactory 的配置。

与其他Spring注解(如@Component@Scheduled等)类似,您也可以将@Publisher用作元注解。这意味着您可以定义自己的注解,这些注解的处理方式与@Publisher本身相同。以下示例展示了如何实现:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}

在前面的示例中,我们定义了 @Audit 注解,该注解本身被 @Publisher 注解所标注。同时请注意,您可以在元注解上定义一个 channel 属性,以将消息发送位置封装在此注解内部。现在,您可以使用 @Audit 注解来标注任何方法,如下例所示:

@Audit
public String test() {
return "Hello";
}

在前面的示例中,每次调用 test() 方法都会生成一条消息,其有效载荷来自方法的返回值。每条消息都会被发送到名为 auditChannel 的通道。这种技术的好处之一是,可以避免在多个注解中重复使用相同的通道名称。同时,它还能在您自己(可能特定于领域)的注解与框架提供的注解之间提供一定程度的间接性。

你也可以对类进行注解,这样可以将该注解的属性应用到该类的每个公共方法上,如下例所示:

@Audit
static class BankingOperationsImpl implements BankingOperations {

public String debit(String amount) {
. . .

}

public String credit(String amount) {
. . .
}

}

基于 XML 的 <publishing-interceptor> 元素方法

基于XML的方法允许您配置与基于命名空间的MessagePublishingInterceptor配置相同的基于AOP的消息发布功能。相比注解驱动的方法,它确实具有一些优势,因为您可以使用AOP切点表达式,从而可以一次性拦截多个方法,或者拦截并发布您没有源代码的方法。

要使用 XML 配置消息发布,您只需完成以下两件事:

  • 通过使用 <publishing-interceptor> XML 元素为 MessagePublishingInterceptor 提供配置。

  • 提供 AOP 配置,将 MessagePublishingInterceptor 应用于托管对象。

以下示例展示了如何配置 publishing-interceptor 元素:

<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
<method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" value="something"/>
</method>
<method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" expression="'something'.toUpperCase()"/>
</method>
<method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>

<publishing-interceptor> 的配置看起来与基于注解的方法非常相似,并且它也利用了 Spring 表达式语言的能力。

在前面的例子中,执行 testBeanecho 方法会生成一个具有以下结构的 Message

  • Message 的有效载荷为 String 类型,内容如下:Echoing: [value],其中 value 是所执行方法返回的值。

  • Message 包含一个名为 things、值为 something 的头部信息。

  • Message 被发送到 echoChannel

第二种方法与第一种非常相似。这里,每个以 'repl' 开头的方法都会渲染一个具有以下结构的 Message

  • Message 负载与前述示例相同。

  • Message 包含一个名为 things 的头部,其值为 SpEL 表达式 'something'.toUpperCase() 的结果。

  • Message 被发送到 echoChannel

第二种方法,映射任何以 echoDef 开头的方法的执行,会产生一个具有以下结构的 Message

  • Message 负载是执行方法返回的值。

  • 由于未提供 channel 属性,Message 被发送到由 publisher 定义的 defaultChannel

对于简单的映射规则,你可以依赖 publisher 的默认设置,如下例所示:

<publishing-interceptor id="anotherInterceptor"/>

前面的示例将匹配切点表达式的方法的返回值映射为有效载荷,并发送到 default-channel。如果未指定 defaultChannel(如前面的示例所示),消息将被发送到全局的 nullChannel(相当于 /dev/null)。

异步发布

发布操作与组件执行位于同一线程中,因此默认情况下是同步的。这意味着整个消息流必须等待发布者的流程完成。然而,开发者通常期望完全相反的效果:利用此消息发布功能来启动异步流程。例如,您可能托管一个服务(HTTP、WS等)来接收远程请求。您可能希望将此请求内部发送到一个可能需要较长时间处理的流程中,但同时希望立即回复用户。因此,您无需将入站请求发送到输出通道进行处理(传统方式),而是可以使用 output-channelreplyChannel 标头向调用者发送简单的确认式回复,同时利用消息发布功能启动复杂流程。

在以下示例中,服务接收一个复杂的负载(需要进一步发送以进行处理),但它也需要向调用者回复一个简单的确认:

public String echo(Object complexPayload) {
return "ACK";
}

因此,我们不再将复杂流程连接到输出通道,而是改用消息发布功能。我们将其配置为使用服务方法的输入参数(如前述示例所示)创建新消息,并将其发送至 localProcessChannel。为确保此流程的异步性,我们只需将其发送至任意类型的异步通道(如下例中的 ExecutorChannel)。以下示例展示了如何配置异步的 publishing-interceptor

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleService" class="test.SampleService"/>

<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
<int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
<int:header name="sample_header" expression="'some sample value'"/>
</int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
<int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>

处理此类场景的另一种方法是使用线路监听。请参阅线路监听

基于定时触发器生产和发布消息

在前面的章节中,我们探讨了消息发布功能,该功能将消息的构建与发布作为方法调用的副产品。然而,在这些情况下,您仍需负责调用方法。Spring Integration 2.0 通过为 inbound-channel-adapter 元素新增 expression 属性,增加了对定时消息生产者和发布者的支持。您可以根据多种触发器进行调度,其中任何一种都可以在 poller 元素上配置。目前,我们支持 cronfixed-ratefixed-delay 以及由您实现并通过 trigger 属性值引用的任何自定义触发器。

如前所述,对计划生产者和发布者的支持是通过 <inbound-channel-adapter> XML 元素提供的。请看以下示例:

<int:inbound-channel-adapter id="fixedDelayProducer"
expression="'fixedDelayTest'"
channel="fixedDelayChannel">
<int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>

上述示例创建了一个入站通道适配器,该适配器会构建一个 Message,其有效载荷为 expression 属性中定义的表达式的结果。每当 fixed-delay 属性指定的延迟时间到达时,就会创建并发送这样的消息。

以下示例与前面的示例类似,只是它使用了 fixed-rate 属性:

<int:inbound-channel-adapter id="fixedRateProducer"
expression="'fixedRateTest'"
channel="fixedRateChannel">
<int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>

fixed-rate 属性允许您以固定速率发送消息(从每个任务的开始时间开始计算)。

以下示例展示了如何应用 Cron 触发器,其值在 cron 属性中指定:

<int:inbound-channel-adapter id="cronProducer"
expression="'cronTest'"
channel="cronChannel">
<int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>

以下示例展示了如何在消息中插入额外的头部信息:

<int:inbound-channel-adapter id="headerExpressionsProducer"
expression="'headerExpressionsTest'"
channel="headerExpressionsChannel"
auto-startup="false">
<int:poller fixed-delay="5000"/>
<int:header name="foo" expression="6 * 7"/>
<int:header name="bar" value="x"/>
</int:inbound-channel-adapter>

额外的消息头可以接受标量值或Spring表达式求值的结果。

如果你需要实现自定义触发器,可以通过 trigger 属性引用任何实现了 org.springframework.scheduling.Trigger 接口的 Spring 配置 Bean。以下示例展示了具体实现方式:

<int:inbound-channel-adapter id="triggerRefProducer"
expression="'triggerRefTest'" channel="triggerRefChannel">
<int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
<beans:constructor-arg value="9999"/>
</beans:bean>