Apache Pulsar 支持
通过为 Spring for Apache Pulsar 项目提供自动配置,支持 Apache Pulsar。
当 org.springframework.pulsar:spring-pulsar 在 classpath 中时,Spring Boot 将自动配置并注册 Spring for Apache Pulsar 的组件。
有 spring-boot-starter-pulsar starter 可以方便地收集所需依赖以供使用。
连接到 Pulsar
当你使用 Pulsar starter 时,Spring Boot 将自动配置并注册一个 PulsarClient bean。
默认情况下,应用程序会尝试连接到位于 pulsar://localhost:6650 的本地 Pulsar 实例。这可以通过将 spring.pulsar.client.service-url 属性设置为其他值来进行调整。
该值必须是一个有效的 Pulsar Protocol URL
你可以通过指定任意以 spring.pulsar.client.* 为前缀的应用程序属性来配置客户端。
如果你需要对配置进行更精细的控制,可以考虑注册一个或多个 PulsarClientBuilderCustomizer Bean。
认证
要连接到需要身份验证的 Pulsar 集群,你需要通过设置 pluginClassName 来指定要使用的身份验证插件,以及该插件所需的任何参数。你可以将这些参数设置为一个从参数名到参数值的映射。以下示例展示了如何配置 AuthenticationOAuth2 插件。
- Properties
- YAML
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
你需要确保在 spring.pulsar.client.authentication.param.* 下定义的名称与你的认证插件所期望的名称完全一致(通常为驼峰命名法)。Spring Boot 不会对这些条目尝试任何形式的宽松绑定(relaxed binding)。
例如,如果你想为 AuthenticationOAuth2 认证插件配置 issuer URL,必须使用 spring.pulsar.client.authentication.param.issuerUrl。如果你使用其他形式,例如 issuerurl 或 issuer-url,该设置将不会被应用到插件中。
这种缺乏宽松绑定的特性也使得使用环境变量来配置认证参数变得有问题,因为在转换过程中会丢失大小写敏感性。如果你使用环境变量来设置这些参数,则需要按照 Apache Pulsar 的 Spring 参考文档中的 这些步骤 进行操作,以确保其正常工作。
SSL
默认情况下,Pulsar 客户端以明文形式与 Pulsar 服务通信。你可以按照 Apache Pulsar 的 Spring 参考文档中的这些步骤来启用 TLS 加密。
有关客户端和认证的完整详细信息,请参阅 Apache Pulsar 的 Spring 参考文档。
连接到 Pulsar 管理
Spring for Apache Pulsar 的 PulsarAdministration 客户端也会被自动配置。
默认情况下,应用程序会尝试连接到位于 http://localhost:8080 的本地 Pulsar 实例。可以通过将 spring.pulsar.admin.service-url 属性设置为不同值来调整该地址,其格式为 (http|https)://<host>:<port>。
如果你需要对配置进行更精细的控制,可以考虑注册一个或多个 PulsarAdminBuilderCustomizer Bean。
Authentication
在访问需要身份验证的 Pulsar 集群时,admin 客户端需要与常规 Pulsar 客户端相同的安全部分配置。你可以使用前述的 authentication configuration,只需将 spring.pulsar.client.authentication 替换为 spring.pulsar.admin.authentication 即可。
要在启动时创建一个 topic,请添加一个类型为 PulsarTopic 的 bean。如果该 topic 已存在,则该 bean 会被忽略。
发送消息
Spring 的 PulsarTemplate 会自动配置,你可以使用它来发送消息,如下例所示:
- Java
- Kotlin
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
PulsarTemplate 依赖于 PulsarProducerFactory 来创建底层的 Pulsar producer。Spring Boot 自动配置也会提供该 producer factory,默认情况下,它会缓存所创建的 producers。你可以通过指定任意以 spring.pulsar.producer.* 和 spring.pulsar.producer.cache.* 为前缀的应用程序属性来配置 producer factory 及其缓存设置。
如果你需要对 Producer 工厂的配置进行更精细的控制,可以考虑注册一个或多个 ProducerBuilderCustomizer Bean。这些定制器会应用于所有创建的 Producer。你也可以在发送消息时传入一个 ProducerBuilderCustomizer,以仅影响当前的 Producer。
如果你需要对发送的消息进行更精细的控制,可以在发送消息时传入一个 TypedMessageBuilderCustomizer。
接收消息
当 Apache Pulsar 基础设施存在时,任何 Bean 都可以使用 @PulsarListener 注解来创建一个监听器端点。以下组件在 someTopic 主题上创建了一个监听器端点:
- Java
- Kotlin
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自动配置提供了 PulsarListener 所需的所有组件,例如 PulsarListenerContainerFactory 以及它用于构建底层 Pulsar 消费者的消费者工厂。你可以通过指定任意以 spring.pulsar.listener.* 和 spring.pulsar.consumer.* 为前缀的应用程序属性来配置这些组件。
如果你需要对消费者工厂(consumer factory)的配置进行更精细的控制,可以考虑注册一个或多个 ConsumerBuilderCustomizer Bean。这些定制器(customizer)将应用于该工厂创建的所有消费者,因此也适用于所有的 @PulsarListener 实例。你也可以通过设置 @PulsarListener 注解的 consumerCustomizer 属性来定制单个监听器。
如果你需要对实际的容器工厂配置进行更精细的控制,可以考虑注册一个或多个 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> Bean。
读取消息
Pulsar Reader 接口允许应用程序手动管理游标。当使用 Reader 连接到一个 topic 时,需要指定 Reader 在连接到该 topic 时从哪条消息开始读取。
当 Apache Pulsar 基础设施存在时,任何 Bean 都可以使用 @PulsarReader 注解来通过 Reader 消费消息。以下组件创建了一个 Reader 端点,该端点从 someTopic 主题的开头开始读取消息:
- Java
- Kotlin
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
@PulsarReader 依赖于 PulsarReaderFactory 来创建底层的 Pulsar reader。Spring Boot 自动配置提供了该 reader factory,可以通过设置任意以 spring.pulsar.reader.* 为前缀的应用程序属性对其进行自定义。
如果你需要对 reader factory 的配置进行更精细的控制,可以考虑注册一个或多个 ReaderBuilderCustomizer Bean。这些定制器会应用到该工厂创建的所有 reader,因此也会影响到所有的 @PulsarReader 实例。你也可以通过设置 @PulsarReader 注解的 readerCustomizer 属性来定制单个监听器。
如果你需要对实际的容器工厂配置进行更精细的控制,可以考虑注册一个或多个 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> Bean。
有关上述任意组件的更多详细信息,以及探索其他可用功能,请参阅 Spring for Apache Pulsar 参考文档。
事务支持
Spring for Apache Pulsar 在使用 PulsarTemplate 和 @PulsarListener 时支持事务。
将 spring.pulsar.transaction.enabled 属性设置为 true 将会:
-
配置一个 PulsarTransactionManager bean
-
为 PulsarTemplate 启用事务支持
-
为 @PulsarListener 方法启用事务支持
@PulsarListener 的 transactional 属性可用于微调监听器何时应使用事务。
若要更精细地控制 Spring for Apache Pulsar 的事务特性,你应该自行定义 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory Bean。如果默认自动配置的 PulsarTransactionManager 不符合你的需求,你也可以定义一个 PulsarAwareTransactionManager Bean。
Additional Pulsar Properties
自动配置所支持的属性在附录的 Integration Properties 一节中列出。请注意,这些属性(使用连字符或驼峰命名法)在大多数情况下直接映射到 Apache Pulsar 的配置属性。详情请参阅 Apache Pulsar 官方文档。
只有 Pulsar 支持的属性的一个子集可以通过 PulsarProperties 类直接使用。如果你希望通过额外的、未被直接支持的属性来调整自动配置的组件,可以使用上述每个组件所支持的自定义器(customizer)。