跳到主要内容

消息发布

QWen Plus 中英对照 Message Publishing

(面向切面编程) AOP 消息发布功能让你在方法调用的副产品中构建和发送消息。例如,想象一下你有一个组件,每当这个组件的状态发生变化时,你都希望通过消息得到通知。发送这种通知最简单的方法是向专用通道发送消息,但你该如何将改变对象状态的方法调用与消息发送过程连接起来,以及通知消息应该如何构建?AOP 消息发布功能通过配置驱动的方法来处理这些职责。

消息发布配置

Spring Integration 提供了两种方法:XML 配置和注解驱动(Java)配置。

基于注解的配置与 @Publisher 注解

注解驱动的方法允许你使用 @Publisher 注解来标注任何方法,以指定一个 'channel' 属性。从 5.1 版本开始,要开启此功能,你必须在某个 @Configuration 类上使用 @EnablePublisher 注解。更多信息,请参阅 配置和 @EnableIntegration。消息由方法调用的返回值构建,并发送到由 'channel' 属性指定的通道。为了进一步管理消息结构,你还可以同时使用 @Payload@Header 注解。

在内部,Spring Integration 的这个消息发布功能通过定义 PublisherAnnotationAdvisor 使用了 Spring AOP 和 Spring 表达式语言(SpEL),为您提供了相当大的灵活性和对它发布的 Message 结构的控制。

PublisherAnnotationAdvisor 定义并绑定以下变量:

  • #return:绑定到返回值,使您可以引用它或其属性(例如,#return.something,其中 'something' 是绑定到 #return 的对象的属性)

  • #exception:如果方法调用抛出异常,则绑定到该异常

  • #args:绑定到方法参数,使您可以按名称提取单个参数(例如,#args.fname

考虑以下示例:

@Publisher
public String defaultPayload(String fname, String lname) {
return fname + " " + lname;
}
java

在前面的例子中,消息的构建结构如下:

  • 消息的有效载荷是方法的返回类型和值。这是默认行为。

  • 新构造的消息被发送到一个默认的发布者通道,该通道通过注解后处理器进行配置(本节稍后会介绍)。

以下示例与前面的示例相同,只是它没有使用默认发布渠道:

@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
return fname + " " + lname;
}
java

而不是使用默认的发布通道,我们通过设置 @Publisher 注解的 'channel' 属性来指定发布通道。我们还添加了一个 @Header 注解,这会导致消息头名为 'last' 的值与 'lname' 方法参数的值相同。该头部会被添加到新构造的消息中。

以下示例几乎与前面的示例相同:

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
return fname + " " + lname;
}
java

唯一的区别是我们使用 @Payload 注解在方法上来明确指定方法的返回值应该用作消息的有效载荷。

以下示例在之前的配置基础上进行了扩展,通过在 @Payload 注解中使用 Spring 表达式语言来进一步指示框架如何构造消息:

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
return fname + " " + lname;
}
java

在前面的例子中,消息是方法调用的返回值和 'lname' 输入参数的连接。名为 'x' 的 Message 标头的值由 'num' 输入参数确定。该标头被添加到新构造的消息中。

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
return fname + " " + lname;
}
java

在前面的例子中,您看到了 @Payload 注解的另一种用法。在这里,我们标注了一个方法参数,该参数成为新构造的消息的有效负载。

与 Spring 中大多数其他注解驱动的功能一样,你需要注册一个后处理器(PublisherAnnotationBeanPostProcessor)。以下示例展示了如何进行注册:

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>
xml

对于更简洁的配置,您可以改用命名空间支持,如下例所示:

<int:annotation-config>
<int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>
xml

对于 Java 配置,你必须使用 @EnablePublisher 注解,如下例所示:

@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
...
}
java

从 5.1.3 版本开始,<int:enable-publisher> 组件以及 @EnablePublisher 注解具有 proxy-target-classorder 属性,用于调整 ProxyFactory 配置。

类似于其他 Spring 注解(@Component@Scheduled 等),你也可以将 @Publisher 用作元注解。这意味着你可以定义自己的注解,这些注解的处理方式与 @Publisher 相同。以下示例展示了如何做到这一点:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}
java

在前面的例子中,我们定义了 @Audit 注解,它本身被 @Publisher 注解。还请注意,你可以在元注解上定义一个 channel 属性,以封装消息发送的位置在这个注解内部。现在你可以使用 @Audit 注解标注任何方法,如下例所示:

@Audit
public String test() {
return "Hello";
}
java

在前面的例子中,test() 方法的每次调用都会生成一条消息,其有效负载来自该方法的返回值。每条消息都会发送到名为 auditChannel 的通道。这种技术的一个好处是,你可以避免在多个注解中重复相同的通道名称。你还可以在你自己潜在的领域特定注解和框架提供的注解之间提供一层间接性。

你也可以注解这个类,这让你可以将此注解的属性应用到该类的每个公共方法上,如下例所示:

@Audit
static class BankingOperationsImpl implements BankingOperations {

public String debit(String amount) {
. . .

}

public String credit(String amount) {
. . .
}

}
java

基于 XML 的方法使用 <publishing-interceptor> 元素

基于 XML 的方法允许你配置与基于命名空间的 MessagePublishingInterceptor 配置相同的 AOP 基消息发布功能。它肯定比注解驱动的方法有一些优势,因为它允许你使用 AOP 切点表达式,从而可能一次性拦截多个方法或拦截并发布你没有源代码的方法。

要使用 XML 配置消息发布,你只需要做以下两件事:

  • 使用 <publishing-interceptor> XML 元素为 MessagePublishingInterceptor 提供配置。

  • 提供 AOP 配置以将 MessagePublishingInterceptor 应用到托管对象。

以下示例展示了如何配置 publishing-interceptor 元素:

<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
<method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" value="something"/>
</method>
<method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" expression="'something'.toUpperCase()"/>
</method>
<method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>
xml

<publishing-interceptor> 配置看起来与基于注解的方法非常相似,它也使用了 Spring 表达式语言的功能。

在前面的例子中,testBeanecho 方法的执行会呈现一个具有以下结构的 Message

  • Message 的有效负载是类型为 String 的以下内容:Echoing: [value],其中 value 是执行方法返回的值。

  • Message 包含一个名为 things 且值为 something 的头。

  • Message 被发送到 echoChannel

第二种方法与第一种非常相似。在这里,每个以 'repl' 开头的方法都会渲染一个具有以下结构的 Message

  • Message 负载与前面的示例相同。

  • Message 有一个名为 things 的头,其值是 SpEL 表达式 'something'.toUpperCase() 的结果。

  • Message 被发送到 echoChannel

第二种方法,映射任何以 echoDef 开头的方法的执行,生成一个具有以下结构的 Message

  • Message 负载是执行方法返回的值。

  • 由于未提供 channel 属性,Message 将发送到由 publisher 定义的 defaultChannel

对于简单的映射规则,你可以依赖 publisher 的默认值,如下例所示:

<publishing-interceptor id="anotherInterceptor"/>
xml

前面的例子将与切入点表达式匹配的每个方法的返回值映射到一个负载,并发送到 default-channel。如果你没有指定 defaultChannel(如前面的例子中没有指定),消息将被发送到全局的 nullChannel(相当于 /dev/null)。

异步发布

发布发生在与组件执行相同的线程中。因此,默认情况下,它是同步的。这意味着整个消息流必须等待发布者的流程完成。然而,开发人员经常希望得到完全相反的效果:使用此消息发布功能来启动异步流程。例如,您可能托管一个服务(HTTP、WS 等),它接收远程请求。您可能希望将此请求内部发送到一个可能需要一些时间才能完成的流程。但是,您也可能希望立即回复用户。因此,与其将传入请求发送到输出通道进行处理(这是传统方式),您可以使用 output-channelreplyChannel 头来向调用者发送一个简单的类似确认的回复,同时使用消息发布者功能来启动一个复杂的流程。

以下示例中的服务接收一个复杂的有效负载(需要进一步发送以进行处理),但它也需要回复调用者一个简单的确认信息:

public String echo(Object complexPayload) {
return "ACK";
}
java

所以,与其将复杂的流程连接到输出通道,我们反而使用消息发布功能。我们配置它以创建一条新消息,通过使用服务方法的输入参数(如前面的例子所示),并将其发送到 'localProcessChannel'。为了确保此流程是异步的,我们只需要将其发送到任何类型的异步通道(下一个例子中的 ExecutorChannel)。以下示例展示了如何创建一个异步的 publishing-interceptor

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleService" class="test.SampleService"/>

<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
<int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
<int:header name="sample_header" expression="'some sample value'"/>
</int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
<int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>
xml

处理这种场景的另一种方法是使用一个 线路监听 。参见 线路监听

基于计划触发器生成和发布消息

在前面的部分中,我们研究了消息发布功能,该功能将消息构建并发布为方法调用的副产品。然而,在这些情况下,您仍然需要负责调用该方法。Spring Integration 2.0 增加了对计划消息生产者和发布者的支持,'inbound-channel-adapter' 元素上新增了 expression 属性。您可以基于几种触发器之一进行调度,这些触发器中的任何一个都可以配置在 'poller' 元素上。目前,我们支持 cronfixed-ratefixed-delay 和任何由您实现并由 'trigger' 属性值引用的自定义触发器。

如前所述,通过 <inbound-channel-adapter> XML 元素提供对计划生产者和发布者的支持。考虑以下示例:

<int:inbound-channel-adapter id="fixedDelayProducer"
expression="'fixedDelayTest'"
channel="fixedDelayChannel">
<int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>
xml

前面的例子创建了一个入站通道适配器,它构建一个 Message,其有效负载是 expression 属性中定义的表达式的结果。每当 fixed-delay 属性指定的延迟发生时,就会创建并发送这样的消息。

以下示例与前面的示例类似,不同之处在于它使用了 fixed-rate 属性:

<int:inbound-channel-adapter id="fixedRateProducer"
expression="'fixedRateTest'"
channel="fixedRateChannel">
<int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>
xml

fixed-rate 属性让你可以以固定速率发送消息(从每个任务的开始时间进行测量)。

以下示例显示了如何应用带有在 cron 属性中指定值的 Cron 触发器:

<int:inbound-channel-adapter id="cronProducer"
expression="'cronTest'"
channel="cronChannel">
<int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>
xml

以下示例显示如何将附加标题插入消息中:

<int:inbound-channel-adapter id="headerExpressionsProducer"
expression="'headerExpressionsTest'"
channel="headerExpressionsChannel"
auto-startup="false">
<int:poller fixed-delay="5000"/>
<int:header name="foo" expression="6 * 7"/>
<int:header name="bar" value="x"/>
</int:inbound-channel-adapter>
xml

附加的消息头可以接受标量值或评估Spring表达式的结果。

如果你需要实现自己的自定义触发器,可以使用 trigger 属性来提供对实现了 org.springframework.scheduling.Trigger 接口的任何 spring 配置 bean 的引用。以下示例展示了如何实现:

<int:inbound-channel-adapter id="triggerRefProducer"
expression="'triggerRefTest'" channel="triggerRefChannel">
<int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
<beans:constructor-arg value="9999"/>
</beans:bean>
xml