UDP 适配器
本节介绍如何配置和使用 UDP 适配器。
出站 UDP 适配器(XML 配置)
以下示例配置了一个UDP出站通道适配器:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
socket-customizer="udpCustomizer"
channel="exampleChannel"/>
当设置 multicast 为 true 时,您也应在 host 属性中提供组播地址。
UDP是一种高效但不可靠的协议。Spring Integration增加了两个属性来提升可靠性:check-length和acknowledge。当check-length设置为true时,适配器会在消息数据前添加一个长度字段(四个字节,按网络字节顺序排列)。这使得接收端能够验证接收到的数据包长度。如果接收系统使用的缓冲区过短而无法容纳数据包,数据包可能会被截断。length头部提供了一种检测这种情况的机制。
从 4.3 版本开始,你可以将 port 设置为 0,此时操作系统会自动选择一个端口。在适配器启动后,当 isListening() 返回 true 时,可以通过调用 getPort() 来获取被选中的端口。
从 5.3.3 版本开始,你可以添加一个 SocketCustomizer bean 来在 DatagramSocket 创建后对其进行修改(例如,调用 setTrafficClass(0x10))。
以下示例展示了一个出站通道适配器,该适配器为数据报数据包添加了长度检查功能:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
check-length="true"
channel="exampleChannel"/>
数据包的接收方也必须配置为期望在实际数据之前有一个长度字段。对于 Spring Integration UDP 入站通道适配器,请设置其 check-length 属性。
第二个可靠性改进允许使用应用层确认协议。接收方必须在指定时间内向发送方发送确认。
以下示例展示了一个出站通道适配器,该适配器为数据报数据包添加长度检查并等待确认:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
check-length="true"
acknowledge="true"
ack-host="thishost"
ack-port="22222"
ack-timeout="10000"
channel="exampleChannel"/>
将 acknowledge 设置为 true 意味着数据包的接收方能够解析添加到数据包中的包含确认数据(主机和端口)的头部。通常情况下,接收方是一个 Spring Integration 入站通道适配器。
当 multicast 设置为 true 时,一个额外的属性 (min-acks-for-success) 用于指定在 ack-timeout 时间内必须接收到多少确认信息。
从 4.3 版本开始,你可以将 ackPort 设置为 0,此时操作系统将自动选择端口。
出站 UDP 适配器(Java 配置)
以下示例展示了如何使用 Java 配置出站 UDP 适配器:
@Bean
@ServiceActivator(inputChannel = "udpOut")
public UnicastSendingMessageHandler handler() {
return new UnicastSendingMessageHandler("localhost", 11111);
}
(或用于多播的 MulticastSendingChannelAdapter)。
出站 UDP 适配器(Java DSL 配置)
以下示例展示了如何使用 Java DSL 配置出站 UDP 适配器:
@Bean
public IntegrationFlow udpOutFlow() {
return f -> f.handle(Udp.outboundAdapter("localhost", 1234)
.configureSocket(socket -> socket.setTrafficClass(0x10)))
.get();
}
入站 UDP 适配器(XML 配置)
以下示例展示了如何配置一个基本的单播入站UDP通道适配器。
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="false"
socket-customizer="udpCustomizer"
check-length="true"/>
以下示例展示了如何配置一个基本的组播入站 UDP 通道适配器:
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="true"
multicast-address="225.6.7.8"
check-length="true"/>
默认情况下,不会对入站数据包执行反向DNS查询:在未配置DNS的环境中(例如Docker容器),这可能导致连接延迟。若要将IP地址转换为主机名以用于消息头,可通过将 lookup-host 属性设置为 true 来覆盖默认行为。
从 5.3.3 版本开始,你可以添加一个 SocketCustomizer bean 来在 DatagramSocket 创建后对其进行修改。该自定义器会被调用以配置接收套接字以及为发送确认而创建的任何套接字。
入站 UDP 适配器(Java 配置)
以下示例展示了如何使用 Java 配置入站 UDP 适配器:
@Bean
public UnicastReceivingChannelAdapter udpIn() {
UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
adapter.setOutputChannelName("udpChannel");
return adapter;
}
以下示例展示了如何使用 Java DSL 配置入站 UDP 适配器:
入站 UDP 适配器(Java DSL 配置)
@Bean
public IntegrationFlow udpIn() {
return IntegrationFlow.from(Udp.inboundAdapter(11111))
.channel("udpChannel")
.get();
}
服务器监听事件
从 5.0.2 版本开始,当入站适配器启动并开始监听时,会发出 UdpServerListeningEvent。这在适配器配置为监听端口 0(意味着由操作系统选择端口)时非常有用。如果你需要在启动其他将连接到套接字的过程之前等待,也可以用它来代替轮询 isListening()。
高级出站配置
<int-ip:udp-outbound-channel-adapter> (UnicastSendingMessageHandler) 提供了 destination-expression 和 socket-expression 选项。
您可以使用 destination-expression 作为运行时替代硬编码的 host-port 对,以根据 requestMessage(以根对象作为求值上下文)确定传出数据报包的目标地址。该表达式必须求值为一个 URI、一个 URI 风格的 String(参见 RFC-2396)或一个 SocketAddress。您也可以在此表达式中使用入站 IpHeaders.PACKET_ADDRESS 标头。在框架中,当我们在 UnicastReceivingChannelAdapter 中接收数据报并将其转换为消息时,DatagramPacketMessageMapper 会填充此标头。该标头值正是传入数据报 DatagramPacket.getSocketAddress() 的结果。
通过 socket-expression,出站通道适配器可以使用(例如)入站通道适配器的套接字,通过接收数据报的同一端口发送数据报。这在我们的应用程序作为 UDP 服务器运行且客户端位于网络地址转换(NAT)后的场景中非常有用。该表达式必须解析为 DatagramSocket 类型。requestMessage 用作评估上下文的根对象。不能将 socket-expression 参数与 multicast 和 acknowledge 参数一起使用。以下示例展示了如何配置一个 UDP 入站通道适配器,该适配器包含一个转换为大写的转换器并使用套接字:
<int-ip:udp-inbound-channel-adapter id="inbound" port="0" channel="in" />
<int:channel id="in" />
<int:transformer expression="new String(payload).toUpperCase()"
input-channel="in" output-channel="out"/>
<int:channel id="out" />
<int-ip:udp-outbound-channel-adapter id="outbound"
socket-expression="@inbound.socket"
destination-expression="headers['ip_packetAddress']"
channel="out" />
以下示例展示了使用 Java DSL 的等效配置:
@Bean
public IntegrationFlow udpEchoUpcaseServer() {
return IntegrationFlow.from(Udp.inboundAdapter(11111).id("udpIn"))
.<byte[], String>transform(p -> new String(p).toUpperCase())
.handle(Udp.outboundAdapter("headers['ip_packetAddress']")
.socketExpression("@udpIn.socket"))
.get();
}