UDP 适配器
本节介绍如何配置和使用 UDP 适配器。
外发 UDP 适配器 (XML 配置)
以下示例配置了一个 UDP 外发通道适配器:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
socket-customizer="udpCustomizer"
channel="exampleChannel"/>
当将 multicast
设置为 true
时,你还应在主机属性中提供组播地址。
UDP 是一个高效但不可靠的协议。Spring Integration 添加了两个属性以提高可靠性:check-length
和 acknowledge
。当 check-length
设置为 true
时,适配器会在消息数据前添加一个长度字段(网络字节顺序的四个字节)。这使得接收端可以验证接收到的数据包的长度。如果接收系统使用的缓冲区太短而无法容纳数据包,则数据包可能会被截断。length
头提供了一种检测此问题的机制。
从 4.3 版本开始,您可以将 port
设置为 0
,在这种情况下,操作系统会选择端口。可以通过在适配器启动后调用 getPort()
并且 isListening()
返回 true
来发现所选择的端口。
从 5.3.3 版本开始,您可以添加一个 SocketCustomizer
bean,在创建 DatagramSocket
后对其进行修改(例如,调用 setTrafficClass(0x10)
)。
以下示例显示了一个出站通道适配器,它为数据报包添加了长度检查:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
check-length="true"
channel="exampleChannel"/>
必须配置数据包的接收者以期望在实际数据之前有一个长度。对于 Spring Integration UDP 入站通道适配器,设置其 check-length
属性。
第二种可靠性改进允许使用应用程序级别的确认协议。接收者必须在指定的时间内向发送者发送确认。
以下示例展示了一个出站通道适配器,它为数据报包添加了长度检查并等待确认:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
check-length="true"
acknowledge="true"
ack-host="thishost"
ack-port="22222"
ack-timeout="10000"
channel="exampleChannel"/>
将 acknowledge
设置为 true
表示接收数据包的一方可以解释添加到包含确认数据(主机和端口)的数据包中的报头。很可能,接收者是一个 Spring Integration 入站通道适配器。
当启用多播时,一个额外的属性 (min-acks-for-success
) 指定了在 ack-timeout
内必须收到的确认数。
从 4.3 版本开始,您可以将 ackPort
设置为 0
,在这种情况下,操作系统会选择端口。
外发 UDP 适配器(Java 配置)
以下示例展示了如何用 Java 配置一个 outbound UDP 适配器:
@Bean
@ServiceActivator(inputChannel = "udpOut")
public UnicastSendingMessageHandler handler() {
return new UnicastSendingMessageHandler("localhost", 11111);
}
(或 MulticastSendingChannelAdapter
用于组播)。
外发 UDP 适配器(Java DSL 配置)
以下示例展示了如何使用 Java DSL 配置 outbound UDP 适配器:
@Bean
public IntegrationFlow udpOutFlow() {
return f -> f.handle(Udp.outboundAdapter("localhost", 1234)
.configureSocket(socket -> socket.setTrafficClass(0x10)))
.get();
}
入站 UDP 适配器 (XML 配置)
以下示例展示了如何配置基本的单播入站 udp 通道适配器。
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="false"
socket-customizer="udpCustomizer"
check-length="true"/>
以下示例展示了如何配置一个基本的组播入站 udp 通道适配器:
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="true"
multicast-address="225.6.7.8"
check-length="true"/>
默认情况下,不会对入站数据包执行反向 DNS 查找:在未配置 DNS 的环境中(例如 Docker 容器),这可能会导致连接延迟。为了将 IP 地址转换为主机名以用于消息标头,默认行为可以通过将 lookup-host
属性设置为 true
来覆盖。
从 5.3.3 版本开始,你可以添加一个 SocketCustomizer
bean,在创建后修改 DatagramSocket
。它会被接收套接字以及为发送确认而创建的任何套接字调用。
入站 UDP 适配器(Java 配置)
以下示例展示了如何用 Java 配置一个入站 UDP 适配器:
@Bean
public UnicastReceivingChannelAdapter udpIn() {
UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
adapter.setOutputChannelName("udpChannel");
return adapter;
}
以下示例展示了如何使用 Java DSL 配置入站 UDP 适配器:
入站 UDP 适配器(Java DSL 配置)
@Bean
public IntegrationFlow udpIn() {
return IntegrationFlow.from(Udp.inboundAdapter(11111))
.channel("udpChannel")
.get();
}
服务器监听事件
从 5.0.2 版本开始,当一个入站适配器启动并开始监听时,会发出 UdpServerListeningEvent
。这在适配器配置为监听端口 0
时非常有用,这意味着操作系统会选择端口。如果需要在启动其他将连接到套接字的进程之前等待,也可以用它来代替轮询 isListening()
。
高级出站配置
<int-ip:udp-outbound-channel-adapter>
(UnicastSendingMessageHandler
) 具有 destination-expression
和 socket-expression
选项。
你可以使用 destination-expression
作为运行时替代硬编码的 host
-port
对来确定传出数据报包的目的地址,这是针对 requestMessage
(带有评估上下文的根对象)。表达式必须计算为一个 URI
、URI 样式的 String
(参见 RFC-2396),或 SocketAddress
。你也可以为此表达式使用入站 IpHeaders.PACKET_ADDRESS
标头。在框架中,当我们在 UnicastReceivingChannelAdapter
中接收到数据报并将其转换为消息时,DatagramPacketMessageMapper
会填充此标头。标头值正是传入数据报的 DatagramPacket.getSocketAddress()
的结果。
通过 socket-expression
,出站通道适配器可以使用(例如)入站通道适配器套接字通过相同的端口发送数据报,而这些数据报就是从该端口接收的。在我们的应用程序作为 UDP 服务器运行且客户端在网络地址转换 (NAT) 后面操作的情况下,这非常有用。此表达式必须计算为一个 DatagramSocket
。requestMessage
用作评估上下文的根对象。您不能将 socket-expression
参数与 multicast
和 acknowledge
参数一起使用。以下示例展示了如何配置带有转换器的 UDP 入站通道适配器,该转换器转换为大写并使用套接字:
<int-ip:udp-inbound-channel-adapter id="inbound" port="0" channel="in" />
<int:channel id="in" />
<int:transformer expression="new String(payload).toUpperCase()"
input-channel="in" output-channel="out"/>
<int:channel id="out" />
<int-ip:udp-outbound-channel-adapter id="outbound"
socket-expression="@inbound.socket"
destination-expression="headers['ip_packetAddress']"
channel="out" />
以下示例显示了使用 Java DSL 的等效配置:
@Bean
public IntegrationFlow udpEchoUpcaseServer() {
return IntegrationFlow.from(Udp.inboundAdapter(11111).id("udpIn"))
.<byte[], String>transform(p -> new String(p).toUpperCase())
.handle(Udp.outboundAdapter("headers['ip_packetAddress']")
.socketExpression("@udpIn.socket"))
.get();
}