跳到主要内容
版本:7.0.3

使用R2DBC进行数据访问

Hunyuan 7b 中英对照 Data Access with R2DBC

R2DBC(“Reactive Relational Database Connectivity”)是一项由社区推动的规范项目,旨在通过响应式模式(reactive patterns)来标准化对SQL数据库的访问。

包层次结构

Spring框架的R2DBC抽象框架由两个不同的包组成:

使用R2DBC核心类来控制基本的R2DBC处理和错误处理

本节介绍了如何使用R2DBC核心类来控制基本的R2DBC处理,包括错误处理。内容包括以下主题:

使用DatabaseClient

DatabaseClient 是 R2DBC 核心包中的核心类。它负责资源的创建和释放,有助于避免常见的错误,例如忘记关闭连接。它执行 R2DBC 核心工作流程的基本任务(如语句的创建和执行),将 SQL 语句的编写和结果提取的工作留给应用程序代码来完成。DatabaseClient 类:

  • 运行SQL查询
  • 执行更新语句和存储过程调用
  • 遍历Result实例
  • 捕获R2DBC异常,并将其转换为org.springframework.dao包中定义的通用、更具信息量的异常层次结构。(参见一致的异常层次结构。)

客户端拥有一个功能强大、使用流畅的API,该API采用反应式类型(reactive types)进行声明式组合(declarative composition)。

当你在代码中使用DatabaseClient时,你只需要实现java.util.function接口,为它们提供一个明确定义的契约即可。给定DatabaseClient类提供的Connection,一个Function回调会创建一个Publisher。对于提取Row结果的映射函数也是如此。

你可以在DAO实现中使用DatabaseClient,通过直接使用ConnectionFactory引用进行实例化;或者,你也可以在Spring IoC容器中配置它,然后将其作为bean引用提供给DAOs。

创建 DatabaseClient 对象的最简单方法是通过一个静态工厂方法,如下所示:

DatabaseClient client = DatabaseClient.create(connectionFactory);
备注

ConnectionFactory 应始终在 Spring IoC 容器中被配置为一个 Bean。

前述方法使用默认设置创建了一个DatabaseClient

您也可以通过 DatabaseClient.builder() 获取一个 Builder 实例。通过调用以下方法,您可以自定义客户端:

  • ….bindMarkers(…): 提供一个特定的 BindMarkersFactory 用于配置命名参数到数据库绑定标记的转换。

  • ….executeFunction(…): 设置 ExecuteFunction 方法,以确定 Statement 对象的执行方式。

  • ….namedParameters(false): 禁用命名参数的展开功能。默认情况下是启用的。

提示

方言(dialects)是由 BindMarkersFactoryResolverConnectionFactory 中解析的,通常是通过检查 ConnectionFactoryMetadata 来实现的。
你可以通过在 META-INF/spring.factories 中注册一个实现 org.springframework.r2dbc.core.binding.BindmarkersFactoryResolver$BindMarkerFactoryProvider 的类,让 Spring 自动发现你的 BindMarkersFactoryBindMarkersFactoryResolver 会使用 Spring 的 SpringFactoriesLoader 从类路径中查找绑定标记提供者(bind marker provider)的实现。

当前支持的数据库有:

  • H2
  • MariaDB
  • Microsoft SQL Server
  • MySQL
  • Postgres

此类发出的所有SQL语句都会以DEBUG级别进行日志记录,日志所属的类别与客户端实例的全限定类名相对应(通常为DefaultDatabaseClient)。此外,每次执行还会在反应式序列中注册一个检查点,以便于调试。

以下部分提供了一些DatabaseClient使用的示例。这些示例并不是DatabaseClient所提供所有功能的详尽列表。有关所有功能,请参阅相应的javadoc

执行语句

DatabaseClient 提供了执行语句的基本功能。以下示例展示了创建新表所需的最低限度但功能完备的代码:

Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.then();

DatabaseClient 的设计旨在提供便捷、流畅的使用体验。它在执行规范的每个阶段都提供了中间方法(intermediate methods)、继续执行方法(continuation methods)以及终止方法(terminal methods)。上述示例中,使用了 then() 方法来返回一个 Publisher 对象,该对象会在查询(如果 SQL 查询包含多个语句,则为所有查询)完成时立即完成执行。

备注

execute(…) 可以接受 SQL 查询字符串,也可以接受一个 Query Supplier<String> 对象,将实际的查询创建操作推迟到执行时再进行。

查询(SELECT

SQL 查询可以通过 Row 对象或受影响的行数来返回值。DatabaseClient 可以根据执行的查询返回更新的行数,或者返回这些行本身。

以下查询从一个表中获取idname列:

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
.fetch().first();

以下查询使用了一个绑定变量:

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().first();

你可能已经注意到上面示例中使用了fetch()fetch()是一个延续操作符(continuation operator),它允许你指定想要获取多少数据。

调用first()会返回结果中的第一行,并丢弃剩余的行。您可以使用以下运算符来获取数据:

  • first() 返回整个结果集中的第一行。在 Kotlin 协程中,如果返回值非空,则其对应的方法为 awaitSingle();如果返回值可以是 null,则对应的方法为 awaitSingleOrNull()

  • one() 仅返回一个结果;如果结果集中包含多行,则会失败。在使用 Kotlin 协程时,相应的方法为 awaitOne()(用于确保只返回一个结果),或者 awaitOneOrNull()(用于允许返回 null 值)。

  • all() 返回结果集中的所有行。在使用 Kotlin 协程时,应使用 flow() 方法。

  • rowsUpdated() 返回受影响的行数(即 INSERTUPDATEDELETE 操作所涉及的行数)。在 Kotlin 协程中,其对应的方法为 awaitRowsUpdated()

如果不进一步指定映射细节,查询将返回以Map形式的表格结果,其中键是不区分大小写的列名,这些列名对应于各自的列值。

你可以通过提供一个 Function<Row, T> 来控制结果映射,该函数会对每个 Row 被调用,从而它可以返回任意类型的值(单个值、集合和映射,以及对象)。

以下示例提取name列并输出其值:

Flux<String> names = client.sql("SELECT name FROM person")
.map(row -> row.get("name", String.class))
.all();

或者,还有一个快捷方法可以映射到单个值:

Flux<String> names = client.sql("SELECT name FROM person")
.mapValue(String.class)
.all();

或者你可以将其映射到一个包含bean属性或记录组件的结果对象中:

// assuming a name property on Person
Flux<Person> persons = client.sql("SELECT name FROM person")
.mapProperties(Person.class)
.all();

null值呢?

关系型数据库的结果可能包含null值。Reactive Streams规范禁止发射null值。这一要求强制在提取器函数中必须正确处理null值。虽然你可以从Row中获取null值,但绝不能发射null值。你必须将任何null值封装在一个对象中(例如,对于单个值使用Optional),以确保你的提取器函数永远不会直接返回null值。

使用 DatabaseClient 进行更新(INSERTUPDATEDELETE

修改语句的唯一区别在于,这些语句通常不会返回表格数据,因此你需要使用rowsUpdated()来获取结果。

以下示例展示了一个UPDATE语句,该语句会返回更新后的行数:

Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().rowsUpdated();

将值绑定到查询中

一个典型的应用程序需要参数化的SQL语句来根据某些输入选择或更新行。这些通常是受WHERE子句限制的SELECT语句,或者是接受输入参数的INSERTUPDATE语句。如果参数没有正确地进行转义,参数化语句就存在SQL注入的风险。DatabaseClient利用R2DBC的bind API来消除查询参数的SQL注入风险。你可以使用execute(…)操作符提供一个参数化的SQL语句,并将参数绑定到实际的Statement上。然后,你的R2DBC驱动程序通过使用预编译语句和参数替换来运行该语句。

参数绑定支持两种绑定策略:

  • 按索引排序,使用从零开始的参数索引。

  • 按名称排序,使用占位符名称。

以下示例展示了查询的参数绑定:

db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34);

或者,您也可以传入一个名称和值的映射:

Map<String, Object> params = new LinkedHashMap<>();
params.put("id", "joe");
params.put("name", "Joe");
params.put("age", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(params);

或者,您可以传递一个包含bean属性或记录组件的参数对象:

// assuming id, name, age properties on Person
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindProperties(new Person("joe", "Joe", 34);

或者,您可以使用位置参数将值绑定到语句中。索引是从0开始的。

db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind(0, "joe")
.bind(1, "Joe")
.bind(2, 34);

如果你的应用程序需要绑定许多参数,那么通过一次调用同样可以实现这一目标:

List<?> values = List.of("joe", "Joe", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(values);

R2DBC原生绑定标记

R2DBC使用依赖于实际数据库供应商的数据库原生绑定标记。例如,Postgres使用索引标记,如 $1$2$n。另一个例子是SQL Server,它使用以 @ 为前缀的命名绑定标记。

这与JDBC不同,JDBC要求使用 ? 作为绑定标记。在JDBC中,实际的驱动程序会在执行语句时将 ? 绑定标记转换为数据库原生标记。

Spring Framework的R2DBC支持允许你使用原生绑定标记或带有 :name 语法的命名绑定标记。

命名参数支持利用 BindMarkersFactory 实例在查询执行时将命名参数扩展为原生绑定标记,从而让你在不同数据库供应商之间实现一定程度的查询可移植性。

查询预处理器会将名为“Collection”的参数展开为一系列绑定标记,从而无需根据参数数量来动态创建查询。嵌套的对象数组会被展开,以便可以使用(例如)选择列表。

考虑以下查询:

SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))

上述查询可以参数化,并按以下方式运行:

List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann", 50});

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples);
备注

选择列表(select lists)的用法因浏览器或应用程序的供应商而异。

以下示例展示了一个使用IN谓词的更简单的变体:

client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", Arrays.asList(35, 50));
备注

R2DBC本身不支持类似集合(Collection-like)的数据类型。不过,在上面的示例中,对List的扩展在Spring的R2DBC支持中是可行的,例如可以用于IN子句中,如上所示。然而,插入或更新数组类型的列(例如在Postgres中)需要使用底层R2DBC驱动程序支持的数组类型:通常是一个Java数组,比如使用String[]来更新text[]类型的列。不要将Collection<String>或类似的数据作为数组参数传递。

语句过滤器

有时你需要在Statement实际执行之前对其进行微调。为此,需要向DatabaseClient注册一个Statement过滤器(StatementFilterFunction),以便在语句执行过程中拦截并修改它们,如下例所示:

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
.bind("name",)
.bind("state",);

DatabaseClient 还提供了一个简化的 filter(…) 重载版本,该版本接受一个 Function<Statement, Statement> 类型的参数:

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"));

client.sql("SELECT id, name, state FROM table")
.filter(statement -> s.fetchSize(25));

StatementFilterFunction 的实现允许对 Statement 对象和 Result 对象进行过滤。

DatabaseClient 最佳实践

一旦配置完成,DatabaseClient 类的实例就是线程安全的。这一点很重要,因为这意味着你可以配置一个 DatabaseClient 的单一实例,然后安全地将这个共享引用注入到多个 DAO(或仓库)中。DatabaseClient 是有状态的,因为它维护着一个对 ConnectionFactory 的引用,但这种状态并不是会话状态(conversational state)。

在使用DatabaseClient类时,一种常见的做法是在Spring配置文件中配置一个ConnectionFactory,然后通过依赖注入将该共享的ConnectionFactory bean注入到DAO类中。DatabaseClient是在ConnectionFactory的setter方法中被创建的。这样产生的DAO类如下所示:

public class R2dbcCorporateEventDao implements CorporateEventDao {

private DatabaseClient databaseClient;

public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory);
}

// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}

另一种替代显式配置的方法是使用组件扫描和注解支持来进行依赖注入。在这种情况下,你可以在类上添加@Component注解(这会使该类成为组件扫描的候选对象),并在ConnectionFactory的setter方法上添加@Autowired注解。以下示例展示了如何实现这一点:

@Component 1
public class R2dbcCorporateEventDao implements CorporateEventDao {

private DatabaseClient databaseClient;

@Autowired 2
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory); 3
}

// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
  • 使用 @Component 注解标记该类。

  • 使用 @Autowired 注解标注 ConnectionFactory 的 setter 方法。

  • 使用 ConnectionFactory 创建一个新的 DatabaseClient

无论您选择使用上述哪种模板初始化样式(或不使用任何样式),每次运行SQL时都创建一个新的DatabaseClient实例的情况都是很少见的。一旦配置完成,DatabaseClient实例就是线程安全的。如果您的应用程序需要访问多个数据库,那么您可能需要多个DatabaseClient实例,这就需要多个ConnectionFactory,进而也需要多个配置不同的DatabaseClient实例。

获取自动生成的密钥

当向定义了自动递增或标识列的表中插入行时,INSERT 语句可能会生成键值对。若要完全控制所生成的列名,只需注册一个 StatementFilterFunction,该函数可以请求所需列的生成键值对即可。

Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"))
.map(row -> row.get("id", Integer.class))
.first();

// generatedId emits the generated key once the INSERT statement has finished

控制数据库连接

本节涵盖:

使用 ConnectionFactory

Spring通过一个ConnectionFactory来获取到数据库的R2DBC连接。ConnectionFactory是R2DBC规范的一部分,也是驱动程序常见的入口点。它允许容器或框架将连接池管理和事务管理的问题从应用程序代码中隐藏起来。作为开发者,你不需要了解如何连接到数据库的细节。这是管理员的职责,他们负责设置ConnectionFactory。在开发和测试代码的过程中,你很可能同时承担着这两个角色,但你不一定需要知道生产环境的数据源是如何配置的。

当你使用Spring的R2DBC层时,你可以使用第三方提供的连接池实现来配置自己的连接池。一个流行的实现是R2DBC Pool(r2dbc-pool)。Spring发行版中的实现仅用于测试目的,并不提供连接池功能。

要配置一个 ConnectionFactory

  1. 通过 ConnectionFactory 建立连接,就像通常获取 R2DBC ConnectionFactory 一样。

  2. 提供一个 R2DBC URL(请参阅驱动程序的文档以获取正确的值)。

以下示例展示了如何配置一个 ConnectionFactory

ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");

使用 ConnectionFactoryUtils

ConnectionFactoryUtils 类是一个方便且强大的辅助类,它提供了 static 方法来从 ConnectionFactory 获取连接,并在必要时关闭连接。

它支持与R2dbcTransactionManager等组件绑定的订阅者Context连接。

使用 SingleConnectionFactory

SingleConnectionFactory 类是 DelegatingConnectionFactory 接口的实现,它包装了一个在每次使用后不会被关闭的单一 Connection 对象。

如果任何客户端代码在假设使用的是池化连接的情况下调用close方法(例如在使用持久化工具时),你应该将suppressClose属性设置为true。此设置会返回一个抑制关闭操作的代理对象,该代理对象会包裹住实际的连接。请注意,你不能再将这个代理对象强制转换为原始的Connection对象或类似的对象了。

SingleConnectionFactory主要是一个测试类,如果您的R2DBC驱动程序允许的话,它可以用于一些特定的需求,例如流水线处理。与共享式ConnectionFactory不同,它始终重用同一个连接,从而避免了过多地创建物理连接。

使用 TransactionAwareConnectionFactoryProxy

TransactionAwareConnectionFactoryProxy 是一个针对目标 ConnectionFactory 的代理。该代理包装了那个目标 ConnectionFactory,以增加对 Spring 管理的事务的支持。

备注

如果您使用的R2DBC客户端没有与Spring的R2DBC支持集成,那么就需要使用这个类。在这种情况下,您仍然可以使用这个客户端,并且让这个客户端参与Spring管理的事务。通常,最好将R2DBC客户端与能够正确访问ConnectionFactoryUtils的组件集成起来,以便进行资源管理。

有关更多详细信息,请参阅TransactionAwareConnectionFactoryProxy的Javadoc。

使用 R2dbcTransactionManager

R2dbcTransactionManager 类是针对单个 R2DBC ConnectionFactoryReactiveTransactionManager 实现。它将来自指定 ConnectionFactory 的 R2DBC Connection 绑定到订阅者 Context 上,每个 ConnectionFactory 可能允许有一个订阅者 Connection

需要应用程序代码通过 ConnectionFactoryUtils.getConnection(ConnectionFactory) 来获取 R2DBC 的 Connection,而不是使用 R2DBC 的标准方法 ConnectionFactory.create()。所有框架类(如 DatabaseClient)都隐式地采用了这种策略。如果不与事务管理器一起使用,这种查找策略的行为与 ConnectionFactory.create() 完全相同,因此在任何情况下都可以使用。