Pulsar 客户端
目录
当你使用 Pulsar Spring Boot Starter 时,你将获得自动配置的 PulsarClient
。
默认情况下,应用程序会尝试连接到本地 Pulsar 实例,地址为 pulsar://localhost:6650
。可以通过将 spring.pulsar.client.service-url
属性设置为不同的值来调整此设置。
该值必须是一个有效的 Pulsar 协议 URL
你可以通过指定任何 spring.pulsar.client.* 应用程序属性来进一步配置客户端。
如果你没有使用 starter,你需要自己配置并注册 PulsarClient
。有一个 DefaultPulsarClientFactory
,它接受一个 builder customizer,可以用来帮助你完成这个任务。
1. TLS 加密 (SSL)
默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务通信。以下部分描述了如何配置 Pulsar 客户端以使用 TLS 加密(SSL)。前提条件是 Broker 也已配置为使用 TLS 加密。
Spring Boot 自动配置目前不支持任何 TLS/SSL 配置属性。你可以改为提供一个 PulsarClientBuilderCustomizer
,在 Pulsar 客户端构建器上设置必要的属性。Pulsar 支持 Privacy Enhanced Mail (PEM) 和 Java KeyStore (JKS) 证书格式。
按照以下步骤配置 TLS:
-
将 Pulsar 客户端服务 URL 调整为使用
pulsar+ssl://
协议和 TLS 端口(通常为6651
)。 -
将管理客户端服务 URL 调整为使用
https://
协议和 TLS Web 端口(通常为8443
)。 -
提供客户端构建器自定义器,用于在构建器上设置相关属性。
您可以在官方 Pulsar TLS 加密 文档中找到更多关于上述内容的信息。
2. 认证
要连接到需要身份验证的 Pulsar 集群,您需要指定要使用的身份验证插件以及该插件所需的任何参数。当使用 Spring Boot 自动配置时,您可以通过配置属性(在大多数情况下)来设置插件及其参数。
你需要确保在 spring.pulsar.client.authentication.param.*
下定义的名称与你的认证插件所期望的名称完全匹配(通常为驼峰命名法)。Spring Boot 不会对这些条目尝试任何形式的宽松绑定。
例如,如果你想为 AuthenticationOAuth2
认证插件配置 issuer url,你必须使用 spring.pulsar.client.authentication.param.issuerUrl
。如果你使用其他形式,例如 issuerurl
或 issuer-url
,该设置将不会应用到插件中。
使用环境变量来设置认证参数通常会有问题,因为在转换过程中会丢失大小写敏感性。例如,考虑以下通过环境变量设置的 issuerUrl
认证参数:
SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL=https://some.server.com
当 Spring Boot 加载这个属性时,它会使用 issuerurl
(小写)而不是预期的 issuerUrl
(驼峰式)。你可以通过在 application.yml
中使用环境变量的值作为相关认证属性的值来绕过这个限制。继续上面的例子:
spring:
pulsar:
client:
authentication:
param:
issuerUrl: ${SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL}
当不使用 Spring Boot 自动配置时,你可以使用 org.apache.pulsar.client.api.AuthenticationFactory
来创建认证,然后在你提供给客户端工厂的客户端自定义器中直接将其设置到 Pulsar 客户端构建器上。
以下列表展示了如何配置每种受支持的身份验证机制。
点击此处查看 Athenz
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationAthenz
param:
tenantDomain: ...
tenantService: ...
providerDomain: ...
privateKey: ...
keyId: ...
这也需要 TLS 加密。
点击此处获取 Token
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
param:
token: some-token-goes-here
点击此处查看基础内容
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationBasic
param:
userId: ...
password: ...
点击此处查看 OAuth2
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: ...
privateKey: ...
audience: ...
scope: ...
点击此处查看 Sasl
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationSasl
param:
saslJaasClientSectionName: ...
serverType: ...
点击此处获取 mTLS (PEM)
由于此选项需要 TLS 加密,而 TLS 加密已经要求你提供客户端构建器自定义器,因此建议直接在你提供的 TLS 自定义器中的客户端构建器上添加身份验证。你可以使用 org.apache.pulsar.client.api.AuthenticationFactory
来帮助创建身份验证对象,如下所示:
Authentication auth = AuthenticationFactory.TLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem");
请参阅官方 Pulsar 文档中的 mTLS (PEM) 部分。
点击此处查看 mTLS (JKS)
由于此选项需要 TLS 加密,而 TLS 加密已经要求您提供客户端构建器自定义器,因此建议直接在您提供的 TLS 自定义器中的客户端构建器上添加身份验证。您可以使用 org.apache.pulsar.client.api.AuthenticationFactory
来帮助创建身份验证对象,如下所示:
Authentication auth = AuthenticationFactory.create(
"org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
Map.of("keyStoreType", "JKS", "keyStorePath", "/path/to/my/keystore.jks", "keyStorePassword", "clientpw"));
请参阅官方 Pulsar 文档中关于 mTLS (JKS) 的部分。
你可以在官方的 Pulsar 安全 文档中找到更多关于每个支持插件及其所需属性的信息。
3. 自动集群级故障转移
Pulsar Spring Boot Starter 还会自动配置 PulsarClient
,以实现集群级别的自动故障转移。
你可以使用 spring.pulsar.client.failover.* 应用程序属性来配置集群级别的故障转移。
以下示例配置了一个主集群和两个备份集群的客户端。
spring:
pulsar:
client:
service-url: "pulsar://my.primary.server:6650"
failover:
delay: 30s
switch-back-delay: 15s
check-interval: 1s
backup-clusters:
- service-url: "pulsar://my.second.server:6650"
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
param:
token: "my-token"
- service-url: "pulsar://my.third.server:6650"
除了客户端配置外,为了使用此功能,broker 还必须满足一些前提条件。
在不使用 Spring Boot 自动配置的情况下,你可以提供一个客户端定制器,用于配置客户端以实现集群级别的故障转移。