Pulsar 函数
目录
Spring for Apache Pulsar 提供了对 Pulsar IO(连接器)和 Pulsar Functions 的基本支持,允许用户定义由 sources
、processors
和 sinks
组成的流处理管道。sources
和 sinks
由 Pulsar IO(连接器) 建模,而 processors
则由 Pulsar Functions 表示。
因为连接器只是特殊的函数,为了简化起见,我们将源、接收器和函数统称为“Pulsar Functions”。
1. Pulsar 函数管理
该框架提供了 PulsarFunctionAdministration
组件来管理 Pulsar 函数。当你使用 Pulsar Spring Boot 启动器时,PulsarFunctionAdministration
会自动配置。
默认情况下,应用程序会尝试连接到位于 [localhost:8080](http://localhost:8080)
的本地 Pulsar 实例。然而,由于它利用了已经配置好的 PulsarAdministration
,请参阅 Pulsar Admin Client 以了解可用的客户端选项(包括认证)。更多配置选项可通过 spring.pulsar.function.* 应用程序属性进行配置。
2. 自动函数管理
在应用程序启动时,框架会在应用上下文中找到所有的 PulsarFunction
、PulsarSink
和 PulsarSource
bean。对于每个 bean,相应的 Pulsar 函数会被创建或更新。根据函数类型、函数配置以及函数是否已经存在,框架会调用适当的 API。
PulsarFunction
、PulsarSink
和 PulsarSource
bean 分别是 Apache Pulsar 配置对象 FunctionConfig
、SinkConfig
和 SourceConfig
的简单封装。由于支持的连接器数量众多(且它们的配置各异),框架并未尝试创建一个配置属性层次结构来映射各种 Apache Pulsar 连接器。相反,用户需要提供完整的配置对象,然后框架会使用提供的配置来处理管理(创建/更新)工作。
在应用程序关闭时,所有在应用程序启动期间处理的函数都会强制执行其停止策略,这些函数可能会被保留、停止或从 Pulsar 服务器中删除。
3. 限制
3.1. 没有神奇的 Pulsar 函数
Pulsar 函数和自定义连接器由自定义应用程序代码表示(例如 java.util.Function
)。目前没有自动注册自定义代码的神奇支持。虽然这将会非常棒,但它存在一些技术挑战,尚未实现。因此,用户需要确保函数(或自定义连接器)在函数配置中指定的位置可用。例如,如果函数配置中的 jar
值为 ./some/path/MyFunction.jar
,那么函数 jar 文件必须存在于指定的路径中。
3.2. 名称标识符
函数配置中的 name
属性被用作标识符,用于确定函数是否已经存在,以便决定是执行更新还是创建操作。因此,如果需要更新函数,则不应修改 name
。
4. 配置
4.1. Pulsar 函数归档
每个 Pulsar 函数都由一个实际的存档文件(例如 jar 文件)表示。源和接收器的存档路径通过 archive
属性指定,而函数的路径则通过 jar
属性指定。
以下规则决定了路径的“类型”:
-
当路径以
(file|http|https|function|sink|source)://
开头时,路径是一个 URL。 -
当路径以
builtin://
开头时,路径是 内置的(指向提供的开箱即用的连接器之一)。 -
否则,路径是 本地的。
在创建/更新操作期间发生的动作取决于路径 "type",如下所示:
-
当路径是 URL 时,内容由服务器下载
-
当路径是 built-in 时,内容已经在服务器上可用
-
当路径是 local 时,内容会上传到服务器
4.2. 内置的 Source 和 Sink
Apache Pulsar 提供了许多开箱即用的源连接器和接收器连接器,即内置连接器。要使用内置连接器,只需将 archive
设置为 builtin://<connector-type>
(例如 builtin://rabbit
)。
5. 自定义函数
有关如何开发和打包自定义函数的详细信息可以在 Pulsar 文档 中找到。然而,从高层次来看,要求如下:
-
代码使用 Java 8
-
代码实现了
java.util.Function
或org.apache.pulsar.functions.api.Function
-
打包为 uber jar
一旦函数被构建并打包,有几种方式可以使其可用于函数注册。
5.1. file://
可以将 jar 文件上传到服务器,然后在函数配置的 jar
属性中通过 file://
引用它。
5.2. local
jar 文件可以保留在本地,然后通过函数配置中的 jar
属性的本地路径进行引用。
5.3. http://
可以通过 HTTP 服务器提供 jar 文件,然后在函数配置的 jar
属性中通过 http(s)://
进行引用。
5.4. function://
该 jar 文件可以上传到 Pulsar 包管理器,然后在函数配置的 jar
属性中通过 function://
引用。
6. 示例
以下是一些示例,展示了如何配置一个 PulsarSource
bean,这将导致 PulsarFunctionAdministration
自动创建底层的 Pulsar source connector。
@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
接下来的这个示例与前一个相同,只是它使用了 Spring Boot 自动配置的 RabbitProperties
来减轻配置负担。当然,这要求应用程序使用 Spring Boot 并启用了 Rabbit 自动配置。
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
有关更详细的示例,请参阅带有 Pulsar Functions 的示例流管道示例应用程序