流式支持
在许多情况下,应用程序数据是从流中获取的。不建议将流的引用作为消息负载发送给消费者。相反,消息是从输入流读取的数据创建的,并且消息负载会逐一写入输出流。
此依赖项为项目所需:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>7.0.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-stream:7.0.2"
从流中读取
Spring Integration 提供了两种用于流的适配器。ByteStreamReadingMessageSource 和 CharacterStreamReadingMessageSource 都实现了 MessageSource。通过在通道适配器元素中配置其中之一,可以配置轮询周期,并且消息总线可以自动检测并调度它们。字节流版本需要一个 InputStream 作为唯一的构造函数参数,而字符流版本则需要一个 Reader。ByteStreamReadingMessageSource 还接受 bytesPerMessage 属性,用于确定每次尝试读取到每个 Message 中的字节数。默认值为 1024。以下示例创建了一个输入流,该流生成的消息每条包含 2048 个字节:
<bean class="org.springframework.integration.stream.inbound.ByteStreamReadingMessageSource">
<constructor-arg ref="someInputStream"/>
<property name="bytesPerMessage" value="2048"/>
</bean>
<bean class="org.springframework.integration.stream.inbound.CharacterStreamReadingMessageSource">
<constructor-arg ref="someReader"/>
</bean>
CharacterStreamReadingMessageSource 将读取器包装在 BufferedReader 中(如果它本身还不是一个 BufferedReader)。你可以在第二个构造函数参数中设置缓冲读取器使用的缓冲区大小。从版本 5.0 开始,第三个构造函数参数 (blockToDetectEOF) 控制着 CharacterStreamReadingMessageSource 的行为。当设置为 false(默认值)时,receive() 方法会检查读取器是否 ready(),如果不是则返回 null。在这种情况下,不会检测 EOF(文件结束符)。当设置为 true 时,receive() 方法会阻塞,直到有数据可用或在底层流上检测到 EOF。当检测到 EOF 时,会发布一个 StreamClosedEvent(应用程序事件)。你可以通过一个实现了 ApplicationListener<StreamClosedEvent> 的 Bean 来消费此事件。
为了便于检测 EOF,轮询线程会阻塞在 receive() 方法中,直到数据到达或检测到 EOF。
一旦检测到EOF,轮询器会在每次轮询时继续发布事件。应用程序监听器可以停止适配器来防止这种情况。事件在轮询器线程上发布。停止适配器会导致线程被中断。如果您打算在停止适配器后执行一些可中断的任务,您必须在不同的线程上执行stop(),或者为这些下游活动使用不同的线程。请注意,发送到QueueChannel是可中断的,因此,如果您希望从监听器发送消息,请在停止适配器之前进行。
这便于通过“管道”或重定向将数据传递到 stdin,如下两个示例所示:
cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt
这种方法允许应用程序在管道关闭时停止运行。
提供了四种便捷的工厂方法:
public static final CharacterStreamReadingMessageSource stdin() { ... }
public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }
public static final CharacterStreamReadingMessageSource stdinPipe() { ... }
public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }
写入流
对于目标流,您可以使用以下两种实现之一:ByteStreamWritingMessageHandler 或 CharacterStreamWritingMessageHandler。每个实现都需要一个构造函数参数(字节流使用 OutputStream,字符流使用 Writer),并且每个实现都提供了第二个构造函数,用于添加可选的 'bufferSize' 参数。由于这两种实现最终都实现了 MessageHandler 接口,您可以在 channel-adapter 配置中引用它们,如通道适配器中所述。
<bean class="org.springframework.integration.stream.outbound.ByteStreamWritingMessageHandler">
<constructor-arg ref="someOutputStream"/>
<constructor-arg value="1024"/>
</bean>
<bean class="org.springframework.integration.stream.outbound.CharacterStreamWritingMessageHandler">
<constructor-arg ref="someWriter"/>
</bean>
流命名空间支持
Spring Integration 定义了一个命名空间,以减少与流相关的通道适配器所需的配置。使用该命名空间需要以下模式位置:
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/stream
https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
以下代码片段展示了支持配置入站通道适配器的不同配置选项:
<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>
<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>
从 5.0 版本开始,你可以设置 detect-eof 属性,该属性会设置 blockToDetectEOF 属性。更多信息请参阅从流中读取。
要配置出站通道适配器,你也可以使用命名空间支持。以下示例展示了出站通道适配器的不同配置:
<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
channel="testChannel"/>
<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
channel="testChannel"/>
<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>
<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
channel="testChannel"/>