跳到主要内容

Pulsar 客户端

DeepSeek V3 中英对照 Pulsar Client

目录

当你使用 Pulsar Spring Boot Starter 时,你将获得自动配置的 PulsarClient

默认情况下,应用程序会尝试连接到本地 Pulsar 实例,地址为 pulsar://localhost:6650。可以通过将 spring.pulsar.client.service-url 属性设置为不同的值来调整此设置。

提示

该值必须是一个有效的 Pulsar 协议 URL

你可以通过指定任何 spring.pulsar.client.* 应用程序属性来进一步配置客户端。

备注

如果你没有使用 starter,你需要自己配置并注册 PulsarClient。有一个 DefaultPulsarClientFactory,它接受一个 builder customizer,可以用来帮助你完成这个任务。

1. TLS 加密 (SSL)

默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务通信。以下部分描述了如何配置 Pulsar 客户端以使用 TLS 加密(SSL)。前提条件是 Broker 也已配置为使用 TLS 加密。

Spring Boot 自动配置目前不支持任何 TLS/SSL 配置属性。你可以改为提供一个 PulsarClientBuilderCustomizer,在 Pulsar 客户端构建器上设置必要的属性。Pulsar 支持 Privacy Enhanced Mail (PEM) 和 Java KeyStore (JKS) 证书格式。

按照以下步骤配置 TLS:

  1. 将 Pulsar 客户端服务 URL 调整为使用 pulsar+ssl:// 协议和 TLS 端口(通常为 6651)。

  2. 将管理客户端服务 URL 调整为使用 https:// 协议和 TLS Web 端口(通常为 8443)。

  3. 提供客户端构建器自定义器,用于在构建器上设置相关属性。

您可以在官方 Pulsar TLS 加密 文档中找到更多关于上述内容的信息。

2. 认证

要连接到需要身份验证的 Pulsar 集群,您需要指定要使用的身份验证插件以及该插件所需的任何参数。当使用 Spring Boot 自动配置时,您可以通过配置属性(在大多数情况下)来设置插件及其参数。

备注

你需要确保在 spring.pulsar.client.authentication.param.* 下定义的名称与你的认证插件所期望的名称完全匹配(通常为驼峰命名法)。Spring Boot 不会对这些条目尝试任何形式的宽松绑定。

例如,如果你想为 AuthenticationOAuth2 认证插件配置 issuer url,你必须使用 spring.pulsar.client.authentication.param.issuerUrl。如果你使用其他形式,例如 issuerurlissuer-url,该设置将不会应用到插件中。

提示

使用环境变量来设置认证参数通常会有问题,因为在转换过程中会丢失大小写敏感性。例如,考虑以下通过环境变量设置的 issuerUrl 认证参数:

SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL=https://some.server.com
properties

当 Spring Boot 加载这个属性时,它会使用 issuerurl(小写)而不是预期的 issuerUrl(驼峰式)。你可以通过在 application.yml 中使用环境变量的值作为相关认证属性的值来绕过这个限制。继续上面的例子:

spring:
pulsar:
client:
authentication:
param:
issuerUrl: ${SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL}
yaml

不使用 Spring Boot 自动配置时,你可以使用 org.apache.pulsar.client.api.AuthenticationFactory 来创建认证,然后在你提供给客户端工厂的客户端自定义器中直接将其设置到 Pulsar 客户端构建器上。

以下列表展示了如何配置每种受支持的身份验证机制。

点击此处查看 Athenz

spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationAthenz
param:
tenantDomain: ...
tenantService: ...
providerDomain: ...
privateKey: ...
keyId: ...
备注

这也需要 TLS 加密

点击此处获取 Token

spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
param:
token: some-token-goes-here

点击此处查看基础内容

spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationBasic
param:
userId: ...
password: ...

点击此处查看 OAuth2

spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: ...
privateKey: ...
audience: ...
scope: ...

点击此处查看 Sasl

spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationSasl
param:
saslJaasClientSectionName: ...
serverType: ...

点击此处获取 mTLS (PEM)

备注

由于此选项需要 TLS 加密,而 TLS 加密已经要求你提供客户端构建器自定义器,因此建议直接在你提供的 TLS 自定义器中的客户端构建器上添加身份验证。你可以使用 org.apache.pulsar.client.api.AuthenticationFactory 来帮助创建身份验证对象,如下所示:

Authentication auth = AuthenticationFactory.TLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem");
java

请参阅官方 Pulsar 文档中的 mTLS (PEM) 部分。

点击此处查看 mTLS (JKS)

备注

由于此选项需要 TLS 加密,而 TLS 加密已经要求您提供客户端构建器自定义器,因此建议直接在您提供的 TLS 自定义器中的客户端构建器上添加身份验证。您可以使用 org.apache.pulsar.client.api.AuthenticationFactory 来帮助创建身份验证对象,如下所示:

Authentication auth = AuthenticationFactory.create(
"org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
Map.of("keyStoreType", "JKS", "keyStorePath", "/path/to/my/keystore.jks", "keyStorePassword", "clientpw"));
java

请参阅官方 Pulsar 文档中关于 mTLS (JKS) 的部分。

你可以在官方的 Pulsar 安全 文档中找到更多关于每个支持插件及其所需属性的信息。

3. 自动集群级故障转移

Pulsar Spring Boot Starter 还会自动配置 PulsarClient,以实现集群级别的自动故障转移

你可以使用 spring.pulsar.client.failover.* 应用程序属性来配置集群级别的故障转移。

以下示例配置了一个主集群和两个备份集群的客户端。

spring:
pulsar:
client:
service-url: "pulsar://my.primary.server:6650"
failover:
delay: 30s
switch-back-delay: 15s
check-interval: 1s
backup-clusters:
- service-url: "pulsar://my.second.server:6650"
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
param:
token: "my-token"
- service-url: "pulsar://my.third.server:6650"
yaml
important

除了客户端配置外,为了使用此功能,broker 还必须满足一些前提条件

在不使用 Spring Boot 自动配置的情况下,你可以提供一个客户端定制器,用于配置客户端以实现集群级别的故障转移。