跳到主要内容
版本:7.0.2

Debezium 支持

DeepSeek V3 中英对照 Debezium Support

Debezium Engine,变更数据捕获(CDC)入站通道适配器。DebeziumMessageProducer 允许捕获数据库变更事件,将其转换为消息,并随后流式传输到出站通道。

您需要在项目中引入 Spring Integration Debezium 依赖:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-debezium</artifactId>
<version>7.0.2</version>
</dependency>

你还需要为你的输入数据库包含一个 debezium 连接器 依赖。例如,要在 PostgreSQL 中使用 Debezium,你将需要 postgres 的 debezium 连接器:

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-version}</version>
</dependency>
备注

请将 debezium-version 替换为与正在使用的 spring-integration-debezium 版本兼容的版本。

Inbound Debezium 通道适配器

Debezium 适配器期望一个预先配置好的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> 实例。

提示

debezium-supplier 提供了一个开箱即用的 DebeziumEngine.Builder Spring Boot 自动配置,并附带了一个便捷的 DebeziumProperties 配置抽象。

提示

Debezium Java DSL 可以从提供的 DebeziumEngine.Builder 创建 DebeziumMessageProducer 实例,也可以从普通的 Debezium 配置(例如 java.util.Properties)创建。后者对于具有特定配置和序列化格式的常见用例非常方便。

此外,DebeziumMessageProducer 可通过以下配置属性进行调整:

  • contentType - 允许处理 JSON(默认)、AVROPROTOBUF 消息内容。contentType 必须 与为提供的 DebeziumEngine.Builder 配置的 SerializationFormat 保持一致。

  • enableBatch - 当设置为 false(默认)时,debezium 适配器会为从源数据库接收到的每一个 ChangeEvent 数据变更事件发送一条新的 Message。如果设置为 true,则适配器会为从 Debezium 引擎接收到的每一批 ChangeEvent 向下游发送一条 Message。这样的负载不可序列化,需要自定义序列化/反序列化实现。

  • enableEmptyPayload - 启用对墓碑(即删除)消息的支持。在数据库行删除时,Debezium 可以发送一个墓碑变更事件,该事件具有与被删除行相同的键,并且值为 Optional.empty。默认为 false

  • headerMapper - 自定义的 HeaderMapper 实现,允许选择和转换 ChangeEvent 头部为 Message 头部。默认的 DefaultDebeziumHeaderMapper 实现提供了一个 setHeaderNamesToMap 的 setter 方法。默认情况下,所有头部都会被映射。

  • taskExecutor - 为 Debezium 引擎设置一个自定义的 TaskExecutor

以下代码片段展示了此通道适配器的多种配置方式:

使用 Java 配置进行配置

以下Spring Boot应用程序展示了如何通过Java配置来配置入站适配器的示例:

@SpringBootApplication
public class DebeziumJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}

@Bean
public MessageChannel debeziumInputChannel() {
return new DirectChannel();
}

@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {

DebeziumMessageProducer debeziumMessageProducer =
new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}

@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(Message<?> message) {

Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); 1

String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); 2

String payload = new String((byte[]) message.getPayload()); 3

System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
}

}
  • 事件预期发送到的逻辑目标名称。通常目标由 topic.prefix 配置选项、数据库名称和表名称组成。例如:my-topic.inventory.orders

  • 包含变更表键的模式以及变更行的实际键。键模式及其对应的键负载都包含一个字段,该字段对应连接器创建事件时变更表的 PRIMARY KEY(或唯一约束)中的每一列。

  • 与键类似,负载包含模式部分和负载值部分。模式部分包含描述负载值部分信封结构(包括其嵌套字段)的模式。针对创建、更新或删除数据操作的所有变更事件都具有采用信封结构的负载值。

提示

key.converter.schemas.enable=false 和/或 value.converter.schemas.enable=false 允许分别禁用键或有效载荷的消息内模式内容。

同样地,我们可以配置 DebeziumMessageProducer 以批量处理传入的变更事件:

@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {

DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setEnableBatch(true);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}

@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
System.out.println(payload);
}

Debezium Java DSL 支持

spring-integration-debezium 通过 Debezium 工厂和 DebeziumMessageProducerSpec 实现,提供了一个便捷的 Java DSL 流畅 API。

Debezium Java DSL 的入站通道适配器为:

DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>   debeziumEngineBuilder = ...
IntegrationFlow.from(
Debezium.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))

或者从原生的 Debezium 配置属性创建 DebeziumMessageProducerSpec 实例,并默认使用 JSON 序列化格式。

Properties debeziumConfig = ...
IntegrationFlow
.from(Debezium.inboundChannelAdapter(debeziumConfig))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))

以下Spring Boot应用程序展示了如何使用Java DSL配置入站适配器的示例:

@SpringBootApplication
public class DebeziumJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow debeziumInbound(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {

return IntegrationFlow
.from(Debezium
.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
.get();
}

}