跳到主要内容

Pulsar 函数

DeepSeek V3 中英对照 Pulsar Functions

目录

Spring for Apache Pulsar 提供了对 Pulsar IO(连接器)和 Pulsar Functions 的基本支持,允许用户定义由 sourcesprocessorssinks 组成的流处理管道。sourcessinksPulsar IO(连接器) 建模,而 processors 则由 Pulsar Functions 表示。

备注

因为连接器只是特殊的函数,为了简化起见,我们将源、接收器和函数统称为“Pulsar Functions”。

先决条件

熟悉度 - 观众应稍微熟悉 Pulsar IOPulsar Functions。如果不是这种情况,查看他们的入门指南可能会有所帮助。

功能启用 - 要使用这些功能,必须启用并配置 Apache Pulsar 中的函数支持(默认情况下是禁用的)。内置连接器可能还需要安装在 Pulsar 集群上。

有关更多详细信息,请参阅 Pulsar IOPulsar Functions 文档。

1. Pulsar 函数管理

该框架提供了 PulsarFunctionAdministration 组件来管理 Pulsar 函数。当你使用 Pulsar Spring Boot 启动器时,PulsarFunctionAdministration 会自动配置。

默认情况下,应用程序会尝试连接到位于 [localhost:8080](http://localhost:8080) 的本地 Pulsar 实例。然而,由于它利用了已经配置好的 PulsarAdministration,请参阅 Pulsar Admin Client 以了解可用的客户端选项(包括认证)。更多配置选项可通过 spring.pulsar.function.* 应用程序属性进行配置。

2. 自动函数管理

在应用程序启动时,框架会在应用上下文中找到所有的 PulsarFunctionPulsarSinkPulsarSource bean。对于每个 bean,相应的 Pulsar 函数会被创建或更新。根据函数类型、函数配置以及函数是否已经存在,框架会调用适当的 API。

备注

PulsarFunctionPulsarSinkPulsarSource bean 分别是 Apache Pulsar 配置对象 FunctionConfigSinkConfigSourceConfig 的简单封装。由于支持的连接器数量众多(且它们的配置各异),框架并未尝试创建一个配置属性层次结构来映射各种 Apache Pulsar 连接器。相反,用户需要提供完整的配置对象,然后框架会使用提供的配置来处理管理(创建/更新)工作。

在应用程序关闭时,所有在应用程序启动期间处理的函数都会强制执行其停止策略,这些函数可能会被保留、停止或从 Pulsar 服务器中删除。

3. 限制

3.1. 没有神奇的 Pulsar 函数

Pulsar 函数和自定义连接器由自定义应用程序代码表示(例如 java.util.Function)。目前没有自动注册自定义代码的神奇支持。虽然这将会非常棒,但它存在一些技术挑战,尚未实现。因此,用户需要确保函数(或自定义连接器)在函数配置中指定的位置可用。例如,如果函数配置中的 jar 值为 ./some/path/MyFunction.jar,那么函数 jar 文件必须存在于指定的路径中。

3.2. 名称标识符

函数配置中的 name 属性被用作标识符,用于确定函数是否已经存在,以便决定是执行更新还是创建操作。因此,如果需要更新函数,则不应修改 name

4. 配置

4.1. Pulsar 函数归档

每个 Pulsar 函数都由一个实际的存档文件(例如 jar 文件)表示。源和接收器的存档路径通过 archive 属性指定,而函数的路径则通过 jar 属性指定。

以下规则决定了路径的“类型”:

  • 当路径以 (file|http|https|function|sink|source):// 开头时,路径是一个 URL

  • 当路径以 builtin:// 开头时,路径是 内置的(指向提供的开箱即用的连接器之一)。

  • 否则,路径是 本地的

在创建/更新操作期间发生的动作取决于路径 "type",如下所示:

  • 当路径是 URL 时,内容由服务器下载

  • 当路径是 built-in 时,内容已经在服务器上可用

  • 当路径是 local 时,内容会上传到服务器

4.2. 内置的 Source 和 Sink

Apache Pulsar 提供了许多开箱即用的源连接器和接收器连接器,即内置连接器。要使用内置连接器,只需将 archive 设置为 builtin://<connector-type>(例如 builtin://rabbit)。

5. 自定义函数

有关如何开发和打包自定义函数的详细信息可以在 Pulsar 文档 中找到。然而,从高层次来看,要求如下:

  • 代码使用 Java 8

  • 代码实现了 java.util.Functionorg.apache.pulsar.functions.api.Function

  • 打包为 uber jar

一旦函数被构建并打包,有几种方式可以使其可用于函数注册。

5.1. file://

可以将 jar 文件上传到服务器,然后在函数配置的 jar 属性中通过 file:// 引用它。

5.2. local

jar 文件可以保留在本地,然后通过函数配置中的 jar 属性的本地路径进行引用。

5.3. http://

可以通过 HTTP 服务器提供 jar 文件,然后在函数配置的 jar 属性中通过 http(s):// 进行引用。

5.4. function://

该 jar 文件可以上传到 Pulsar 包管理器,然后在函数配置的 jar 属性中通过 function:// 引用。

6. 示例

以下是一些示例,展示了如何配置一个 PulsarSource bean,这将导致 PulsarFunctionAdministration 自动创建底层的 Pulsar source connector。

@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
java

接下来的这个示例与前一个相同,只是它使用了 Spring Boot 自动配置的 RabbitProperties 来减轻配置负担。当然,这要求应用程序使用 Spring Boot 并启用了 Rabbit 自动配置。

@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
java
提示

有关更详细的示例,请参阅带有 Pulsar Functions 的示例流管道示例应用程序