跳到主要内容

使用 R2DBC 进行数据访问

ChatGPT-4o-mini 中英对照 Data Access with R2DBC

R2DBC ("反应式关系数据库连接") 是一个由社区驱动的规范工作,旨在标准化使用反应式模式访问 SQL 数据库。

包层次结构

Spring Framework 的 R2DBC 抽象框架由两个不同的包组成:

  • core: org.springframework.r2dbc.core 包含 DatabaseClient 类以及各种相关类。请参阅 使用 R2DBC 核心类控制基本 R2DBC 处理和错误处理

  • connection: org.springframework.r2dbc.connection 包含一个实用程序类,以便轻松访问 ConnectionFactory 以及各种简单的 ConnectionFactory 实现,您可以使用它们进行测试和运行未修改的 R2DBC。请参阅 控制数据库连接

使用 R2DBC 核心类控制基本 R2DBC 处理和错误处理

本节涵盖如何使用 R2DBC 核心类来控制基本的 R2DBC 处理,包括错误处理。它包括以下主题:

使用 DatabaseClient

DatabaseClient 是 R2DBC 核心包中的核心类。它处理资源的创建和释放,这有助于避免常见错误,例如忘记关闭连接。它执行核心 R2DBC 工作流的基本任务(例如语句创建和执行),将应用程序代码留给提供 SQL 和提取结果。DatabaseClient 类:

  • 执行 SQL 查询

  • 更新语句和存储过程调用

  • Result 实例进行迭代

  • 捕获 R2DBC 异常并将其转换为 org.springframework.dao 包中定义的通用、更具信息性的异常层次结构。 (参见 一致的异常层次结构.)

该客户端拥有一个功能齐全、流畅的 API,使用响应式类型进行声明式组合。

当你在代码中使用 DatabaseClient 时,你只需要实现 java.util.function 接口,为它们提供一个明确定义的契约。给定一个由 DatabaseClient 类提供的 Connection,一个 Function 回调会创建一个 Publisher。对于提取 Row 结果的映射函数也是如此。

您可以通过直接实例化 DatabaseClient 并使用 ConnectionFactory 引用在 DAO 实现中使用它,或者您可以在 Spring IoC 容器中配置它,并将其作为 bean 引用提供给 DAO。

创建 DatabaseClient 对象的最简单方法是通过静态工厂方法,如下所示:

DatabaseClient client = DatabaseClient.create(connectionFactory);
java
备注

ConnectionFactory 应始终作为一个 bean 配置在 Spring IoC 容器中。

前面的这个方法创建了一个具有默认设置的 DatabaseClient

您还可以从 DatabaseClient.builder() 获取一个 Builder 实例。您可以通过调用以下方法来定制客户端:

  • ….bindMarkers(…): 提供一个特定的 BindMarkersFactory 来配置命名参数与数据库绑定标记的转换。

  • ….executeFunction(…): 设置 ExecuteFunction,定义 Statement 对象的执行方式。

  • ….namedParameters(false): 禁用命名参数扩展。默认启用。

提示

方言由 BindMarkersFactoryResolverConnectionFactory 解析,通常通过检查 ConnectionFactoryMetadata
您可以通过通过 META-INF/spring.factories 注册一个实现 org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider 的类,让 Spring 自动发现您的 BindMarkersFactoryBindMarkersFactoryResolver 使用 Spring 的 SpringFactoriesLoader 从类路径中发现绑定标记提供者实现。

当前支持的数据库有:

  • H2

  • MariaDB

  • Microsoft SQL Server

  • MySQL

  • Postgres

此类发出的所有 SQL 都以 DEBUG 级别记录在与客户端实例的完全限定类名(通常是 DefaultDatabaseClient)对应的类别下。此外,每次执行都会在反应序列中注册一个检查点,以帮助调试。

以下部分提供了一些 DatabaseClient 使用的示例。这些示例并不是 DatabaseClient 所提供的所有功能的详尽列表。有关详细信息,请参见相关的 javadoc

执行语句

DatabaseClient 提供了运行语句的基本功能。以下示例展示了创建新表所需的最小但功能完整的代码:

Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.then();
java

DatabaseClient 旨在提供方便、流畅的使用体验。它在执行规范的每个阶段都暴露了中间、继续和终端方法。上面的示例使用 then() 返回一个完成的 Publisher,该 Publisher 会在查询(如果 SQL 查询包含多个语句,则为多个查询)完成后立即完成。

备注

execute(…) 接受 SQL 查询字符串或查询 Supplier<String>,以便在执行时延迟实际查询的创建。

查询 (SELECT)

SQL 查询可以通过 Row 对象返回值或受影响的行数。DatabaseClient 可以根据发出的查询返回更新的行数或行本身。

以下查询从一个表中获取 idname 列:

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
.fetch().first();
java

以下查询使用了绑定变量:

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().first();
java

您可能已经注意到在上面的示例中使用了 fetch()fetch() 是一个续接操作符,允许您指定想要消耗多少数据。

调用 first() 会返回结果中的第一行,并丢弃其余行。您可以使用以下操作符来消费数据:

  • first() 返回整个结果的第一行。它的 Kotlin 协程变体名为 awaitSingle(),用于非空返回值,如果值是可选的,则使用 awaitSingleOrNull()

  • one() 返回恰好一个结果,如果结果包含多行则会失败。使用 Kotlin 协程时,awaitOne() 用于恰好一个值,或者如果值可能为 null 则使用 awaitOneOrNull()

  • all() 返回结果的所有行。当使用 Kotlin 协程时,使用 flow()

  • rowsUpdated() 返回受影响的行数(INSERT/UPDATE/DELETE 计数)。它的 Kotlin 协程变体名为 awaitRowsUpdated()

在未指定进一步映射细节的情况下,查询返回的表格结果为 Map,其键为不区分大小写的列名,映射到它们的列值。

您可以通过提供一个 Function<Row, T> 来控制结果映射,该函数会对每个 Row 进行调用,以便返回任意值(单个值、集合和映射,以及对象)。

以下示例提取 name 列并发出其值:

Flux<String> names = client.sql("SELECT name FROM person")
.map(row -> row.get("name", String.class))
.all();
java

另外,还有一种快捷方式可以映射到单个值:

Flux<String> names = client.sql("SELECT name FROM person")
.mapValue(String.class)
.all();
java

或者你可以映射到一个具有 bean 属性或记录组件的结果对象:

// assuming a name property on Person
Flux<Person> persons = client.sql("SELECT name FROM person")
.mapProperties(Person.class)
.all();
java

null 怎么办?

关系数据库的结果可以包含 null 值。Reactive Streams 规范禁止发出 null 值。这个要求强制在提取函数中正确处理 null。虽然你可以从 Row 中获取 null 值,但你必须不发出 null 值。你必须将任何 null 值包装在一个对象中(例如,对于单个值使用 Optional),以确保你的提取函数永远不会直接返回 null 值。

使用 DatabaseClient 进行更新(INSERTUPDATEDELETE

修改语句的唯一区别在于,这些语句通常不返回表格数据,因此您使用 rowsUpdated() 来处理结果。

以下示例显示了一个 UPDATE 语句,该语句返回更新的行数:

Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().rowsUpdated();
java

将值绑定到查询

一个典型的应用程序需要参数化的 SQL 语句来根据某些输入选择或更新行。这些通常是受 WHERE 子句约束的 SELECT 语句,或接受输入参数的 INSERTUPDATE 语句。如果参数没有正确转义,参数化语句会面临 SQL 注入的风险。DatabaseClient 利用 R2DBC 的 bind API 来消除查询参数的 SQL 注入风险。您可以通过 execute(…) 操作符提供一个参数化的 SQL 语句,并将参数绑定到实际的 Statement。然后,您的 R2DBC 驱动程序通过使用预编译语句和参数替换来运行该语句。

参数绑定支持两种绑定策略:

  • 按索引,使用零基参数索引。

  • 按名称,使用占位符名称。

以下示例展示了查询的参数绑定:

db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34);
java

或者,您可以传入一个名称和值的映射:

Map<String, Object> params = new LinkedHashMap<>();
params.put("id", "joe");
params.put("name", "Joe");
params.put("age", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(params);
java

或者您可以传入一个包含 bean 属性或记录组件的参数对象:

// assuming id, name, age properties on Person
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindProperties(new Person("joe", "Joe", 34);
java

或者,您可以使用位置参数将值绑定到语句。索引是从零开始的。

db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind(0, "joe")
.bind(1, "Joe")
.bind(2, 34);
java

如果您的应用程序绑定了许多参数,可以通过一次调用实现相同的效果:

List<?> values = List.of("joe", "Joe", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(values);
java

R2DBC 原生绑定标记

R2DBC 使用依赖于实际数据库供应商的数据库原生绑定标记。例如,Postgres 使用索引标记,如 $1$2$n。另一个例子是 SQL Server,它使用以 @ 为前缀的命名绑定标记。

这与 JDBC 不同,后者要求使用 ? 作为绑定标记。在 JDBC 中,实际的驱动程序将 ? 绑定标记转换为数据库原生标记,作为其语句执行的一部分。

Spring Framework 的 R2DBC 支持让您可以使用原生绑定标记或使用 :name 语法的命名绑定标记。

命名参数支持利用 BindMarkersFactory 实例在查询执行时将命名参数扩展为原生绑定标记,这使您在不同数据库供应商之间具有一定程度的查询可移植性。

查询预处理器将命名为 Collection 的参数展开为一系列绑定标记,以消除根据参数数量动态创建查询的需要。嵌套对象数组被展开,以允许使用(例如)选择列表。

考虑以下查询:

SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))
sql

前面的查询可以参数化并运行如下:

List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann", 50});

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples);
java
备注

选择列表的使用依赖于供应商。

以下示例展示了一个使用 IN 谓词的更简单变体:

client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", Arrays.asList(35, 50));
java
备注

R2DBC 本身不支持类似集合的值。然而,在上面的示例中扩展给定的 List 对于 Spring 的 R2DBC 支持中的命名参数是有效的,例如,在上面所示的 IN 子句中使用。然而,插入或更新数组类型的列(例如,在 Postgres 中)需要一个由底层 R2DBC 驱动程序支持的数组类型:通常是一个 Java 数组,例如,String[] 用于更新 text[] 列。请勿将 Collection<String> 或类似的内容作为数组参数传递。

语句过滤器

有时您需要在实际的 Statement 运行之前微调选项。为此,您可以向 DatabaseClient 注册一个 Statement 过滤器(StatementFilterFunction),以拦截并修改语句的执行,以下示例演示了这一点:

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
.bind("name",)
.bind("state",);
java

DatabaseClient 还提供了一个简化的 filter(…) 重载,它接受一个 Function<Statement, Statement>

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"));

client.sql("SELECT id, name, state FROM table")
.filter(statement -> s.fetchSize(25));
java

StatementFilterFunction 实现允许对 Statement 进行过滤以及对 Result 对象进行过滤。

DatabaseClient 最佳实践

DatabaseClient 类的实例在配置后是线程安全的。这一点很重要,因为这意味着您可以配置一个 DatabaseClient 的单一实例,然后安全地将这个共享引用注入多个 DAO(或仓库)。DatabaseClient 是有状态的,因为它保持对 ConnectionFactory 的引用,但这个状态不是会话状态。

在使用 DatabaseClient 类时,一个常见的做法是在你的 Spring 配置文件中配置一个 ConnectionFactory,然后将这个共享的 ConnectionFactory bean 依赖注入到你的 DAO 类中。DatabaseClientConnectionFactory 的 setter 中创建。这导致 DAO 类看起来如下所示:

public class R2dbcCorporateEventDao implements CorporateEventDao {

private DatabaseClient databaseClient;

public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory);
}

// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
java

显式配置的替代方案是使用组件扫描和注解支持进行依赖注入。在这种情况下,您可以使用 @Component 注解类(这使其成为组件扫描的候选者),并使用 @Autowired 注解 ConnectionFactory 的 setter 方法。以下示例演示了如何做到这一点:

@Component 1
public class R2dbcCorporateEventDao implements CorporateEventDao {

private DatabaseClient databaseClient;

@Autowired 2
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory); 3
}

// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
java
  • @Component 注解类。

  • @Autowired 注解 ConnectionFactory 的 setter 方法。

  • 使用 ConnectionFactory 创建一个新的 DatabaseClient

无论您选择使用(或不使用)上述哪种模板初始化样式,通常不需要每次运行 SQL 时都创建一个新的 DatabaseClient 类实例。配置完成后,DatabaseClient 实例是线程安全的。如果您的应用程序访问多个数据库,您可能需要多个 DatabaseClient 实例,这就需要多个 ConnectionFactory,并随后需要多个配置不同的 DatabaseClient 实例。

检索自动生成的键

INSERT 语句在向定义了自增或身份列的表中插入行时可能会生成键。要完全控制要生成的列名,只需注册一个 StatementFilterFunction,该函数请求所需列的生成键。

Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"))
.map(row -> row.get("id", Integer.class))
.first();

// generatedId emits the generated key once the INSERT statement has finished
java

控制数据库连接

本节涵盖:

使用 ConnectionFactory

Spring 通过 ConnectionFactory 获取与数据库的 R2DBC 连接。ConnectionFactory 是 R2DBC 规范的一部分,是驱动程序的一个常见入口点。它允许容器或框架将连接池和事务管理问题隐藏在应用程序代码之外。作为开发者,您不需要了解如何连接到数据库的细节。这是设置 ConnectionFactory 的管理员的责任。在您开发和测试代码时,您很可能同时扮演这两个角色,但您不一定需要知道生产数据源是如何配置的。

当您使用 Spring 的 R2DBC 层时,您可以配置自己的连接池实现,使用第三方提供的实现。一个流行的实现是 R2DBC Pool (r2dbc-pool)。Spring 分发中的实现仅用于测试目的,并不提供连接池功能。

要配置一个 ConnectionFactory

  1. 使用 ConnectionFactory 获取连接,就像您通常获取 R2DBC ConnectionFactory 一样。

  2. 提供一个 R2DBC URL(请参阅您驱动程序的文档以获取正确的值)。

以下示例展示了如何配置 ConnectionFactory

ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
java

使用 ConnectionFactoryUtils

ConnectionFactoryUtils 类是一个方便且强大的助手类,提供 static 方法以从 ConnectionFactory 获取连接并在必要时关闭连接。

它支持以订阅者 Context 绑定的连接,例如 R2dbcTransactionManager

使用 SingleConnectionFactory

SingleConnectionFactory 类是 DelegatingConnectionFactory 接口的一个实现,它包装了一个在每次使用后不会关闭的单一 Connection

如果任何客户端代码在假设使用池化连接的情况下调用 close(例如在使用持久性工具时),您应该将 suppressClose 属性设置为 true。此设置返回一个关闭抑制代理,包装物理连接。请注意,您将无法再将其转换为原生 Connection 或类似对象。

SingleConnectionFactory 主要是一个测试类,可以用于特定需求,例如如果你的 R2DBC 驱动程序允许这种使用,则可以进行管道化。与池化的 ConnectionFactory 相比,它始终重用相同的连接,从而避免了过多创建物理连接。

使用 TransactionAwareConnectionFactoryProxy

TransactionAwareConnectionFactoryProxy 是一个目标 ConnectionFactory 的代理。该代理包装了目标 ConnectionFactory,以增加对 Spring 管理的事务的感知。

备注

如果您使用的 R2DBC 客户端没有与 Spring 的 R2DBC 支持集成,则需要使用此类。在这种情况下,您仍然可以使用此客户端,同时使该客户端参与 Spring 管理的事务。通常,最好将 R2DBC 客户端与适当访问 ConnectionFactoryUtils 的资源管理集成。

请参阅 TransactionAwareConnectionFactoryProxy 的 javadoc 以获取更多详细信息。

使用 R2dbcTransactionManager

R2dbcTransactionManager 类是一个针对单个 R2DBC ConnectionFactoryReactiveTransactionManager 实现。它将来自指定 ConnectionFactory 的 R2DBC Connection 绑定到订阅者的 Context,这可能允许每个 ConnectionFactory 有一个订阅者 Connection

应用程序代码需要通过 ConnectionFactoryUtils.getConnection(ConnectionFactory) 来检索 R2DBC Connection,而不是使用 R2DBC 的标准 ConnectionFactory.create()。所有框架类(例如 DatabaseClient)都隐式使用此策略。如果不与事务管理器一起使用,则查找策略的行为与 ConnectionFactory.create() 完全相同,因此可以在任何情况下使用。