使用R2DBC进行数据访问
R2DBC(“Reactive Relational Database Connectivity”)是一项由社区推动的规范项目,旨在通过响应式模式(reactive patterns)来标准化对SQL数据库的访问。
包层次结构
Spring框架的R2DBC抽象框架由两个不同的包组成:
-
core:org.springframework.r2dbc.core包包含DatabaseClient类以及多种相关类。请参阅 使用 R2DBC 核心类来控制基本的 R2DBC 处理和错误处理。 -
connection:org.springframework.r2dbc.connection包包含一个用于便捷访问ConnectionFactory的工具类,以及多种简单的ConnectionFactory实现,您可以使用这些实现来进行测试和运行未修改的 R2DBC。请参阅 控制数据库连接。
使用R2DBC核心类来控制基本的R2DBC处理和错误处理
本节介绍了如何使用R2DBC核心类来控制基本的R2DBC处理,包括错误处理。内容包括以下主题:
使用DatabaseClient
DatabaseClient 是 R2DBC 核心包中的核心类。它负责资源的创建和释放,有助于避免常见的错误,例如忘记关闭连接。它执行 R2DBC 核心工作流程的基本任务(如语句的创建和执行),将 SQL 语句的编写和结果提取的工作留给应用程序代码来完成。DatabaseClient 类:
- 运行SQL查询
- 执行更新语句和存储过程调用
- 遍历
Result实例 - 捕获R2DBC异常,并将其转换为
org.springframework.dao包中定义的通用、更具信息量的异常层次结构。(参见一致的异常层次结构。)
客户端拥有一个功能强大、使用流畅的API,该API采用反应式类型(reactive types)进行声明式组合(declarative composition)。
当你在代码中使用DatabaseClient时,你只需要实现java.util.function接口,为它们提供一个明确定义的契约即可。给定DatabaseClient类提供的Connection,一个Function回调会创建一个Publisher。对于提取Row结果的映射函数也是如此。
你可以在DAO实现中使用DatabaseClient,通过直接使用ConnectionFactory引用进行实例化;或者,你也可以在Spring IoC容器中配置它,然后将其作为bean引用提供给DAOs。
创建 DatabaseClient 对象的最简单方法是通过一个静态工厂方法,如下所示:
- Java
- Kotlin
DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
ConnectionFactory 应始终在 Spring IoC 容器中被配置为一个 Bean。
前述方法使用默认设置创建了一个DatabaseClient。
您也可以通过 DatabaseClient.builder() 获取一个 Builder 实例。通过调用以下方法,您可以自定义客户端:
-
….bindMarkers(…): 提供一个特定的BindMarkersFactory用于配置命名参数到数据库绑定标记的转换。 -
….executeFunction(…): 设置ExecuteFunction方法,以确定Statement对象的执行方式。 -
….namedParameters(false): 禁用命名参数的展开功能。默认情况下是启用的。
方言(dialects)是由 BindMarkersFactoryResolver 从 ConnectionFactory 中解析的,通常是通过检查 ConnectionFactoryMetadata 来实现的。
你可以通过在 META-INF/spring.factories 中注册一个实现 org.springframework.r2dbc.core.binding.BindmarkersFactoryResolver$BindMarkerFactoryProvider 的类,让 Spring 自动发现你的 BindMarkersFactory。BindMarkersFactoryResolver 会使用 Spring 的 SpringFactoriesLoader 从类路径中查找绑定标记提供者(bind marker provider)的实现。
当前支持的数据库有:
- H2
- MariaDB
- Microsoft SQL Server
- MySQL
- Postgres
此类发出的所有SQL语句都会以DEBUG级别进行日志记录,日志所属的类别与客户端实例的全限定类名相对应(通常为DefaultDatabaseClient)。此外,每次执行还会在反应式序列中注册一个检查点,以便于调试。
以下部分提供了一些DatabaseClient使用的示例。这些示例并不是DatabaseClient所提供所有功能的详尽列表。有关所有功能,请参阅相应的javadoc。
执行语句
DatabaseClient 提供了执行语句的基本功能。以下示例展示了创建新表所需的最低限度但功能完备的代码:
- Java
- Kotlin
Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.await()
DatabaseClient 的设计旨在提供便捷、流畅的使用体验。它在执行规范的每个阶段都提供了中间方法(intermediate methods)、继续执行方法(continuation methods)以及终止方法(terminal methods)。上述示例中,使用了 then() 方法来返回一个 Publisher 对象,该对象会在查询(如果 SQL 查询包含多个语句,则为所有查询)完成时立即完成执行。
execute(…) 可以接受 SQL 查询字符串,也可以接受一个 Query Supplier<String> 对象,将实际的查询创建操作推迟到执行时再进行。
查询(SELECT)
SQL 查询可以通过 Row 对象或受影响的行数来返回值。DatabaseClient 可以根据执行的查询返回更新的行数,或者返回这些行本身。
以下查询从一个表中获取id和name列:
- Java
- Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
.fetch().first();
val first = client.sql("SELECT id, name FROM person")
.fetch().awaitSingle()
以下查询使用了一个绑定变量:
- Java
- Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitSingle()
你可能已经注意到上面示例中使用了fetch()。fetch()是一个延续操作符(continuation operator),它允许你指定想要获取多少数据。
调用first()会返回结果中的第一行,并丢弃剩余的行。您可以使用以下运算符来获取数据:
-
first()返回整个结果集中的第一行。在 Kotlin 协程中,如果返回值非空,则其对应的方法为awaitSingle();如果返回值可以是null,则对应的方法为awaitSingleOrNull()。 -
one()仅返回一个结果;如果结果集中包含多行,则会失败。在使用 Kotlin 协程时,相应的方法为awaitOne()(用于确保只返回一个结果),或者awaitOneOrNull()(用于允许返回null值)。 -
all()返回结果集中的所有行。在使用 Kotlin 协程时,应使用flow()方法。 -
rowsUpdated()返回受影响的行数(即INSERT、UPDATE、DELETE操作所涉及的行数)。在 Kotlin 协程中,其对应的方法为awaitRowsUpdated()。
如果不进一步指定映射细节,查询将返回以Map形式的表格结果,其中键是不区分大小写的列名,这些列名对应于各自的列值。
你可以通过提供一个 Function<Row, T> 来控制结果映射,该函数会对每个 Row 被调用,从而它可以返回任意类型的值(单个值、集合和映射,以及对象)。
以下示例提取name列并输出其值:
- Java
- Kotlin
Flux<String> names = client.sql("SELECT name FROM person")
.map(row -> row.get("name", String.class))
.all();
val names = client.sql("SELECT name FROM person")
.map{ row: Row -> row.get("name", String.class) }
.flow()
或者,还有一个快捷方法可以映射到单个值:
Flux<String> names = client.sql("SELECT name FROM person")
.mapValue(String.class)
.all();
或者你可以将其映射到一个包含bean属性或记录组件的结果对象中:
// assuming a name property on Person
Flux<Person> persons = client.sql("SELECT name FROM person")
.mapProperties(Person.class)
.all();
使用 DatabaseClient 进行更新(INSERT、UPDATE 和 DELETE)
修改语句的唯一区别在于,这些语句通常不会返回表格数据,因此你需要使用rowsUpdated()来获取结果。
以下示例展示了一个UPDATE语句,该语句会返回更新后的行数:
- Java
- Kotlin
Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitRowsUpdated()
将值绑定到查询中
一个典型的应用程序需要参数化的SQL语句来根据某些输入选择或更新行。这些通常是受WHERE子句限制的SELECT语句,或者是接受输入参数的INSERT和UPDATE语句。如果参数没有正确地进行转义,参数化语句就存在SQL注入的风险。DatabaseClient利用R2DBC的bind API来消除查询参数的SQL注入风险。你可以使用execute(…)操作符提供一个参数化的SQL语句,并将参数绑定到实际的Statement上。然后,你的R2DBC驱动程序通过使用预编译语句和参数替换来运行该语句。
参数绑定支持两种绑定策略:
-
按索引排序,使用从零开始的参数索引。
-
按名称排序,使用占位符名称。
以下示例展示了查询的参数绑定:
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34);
或者,您也可以传入一个名称和值的映射:
Map<String, Object> params = new LinkedHashMap<>();
params.put("id", "joe");
params.put("name", "Joe");
params.put("age", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(params);
或者,您可以传递一个包含bean属性或记录组件的参数对象:
// assuming id, name, age properties on Person
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindProperties(new Person("joe", "Joe", 34);
或者,您可以使用位置参数将值绑定到语句中。索引是从0开始的。
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind(0, "joe")
.bind(1, "Joe")
.bind(2, 34);
如果你的应用程序需要绑定许多参数,那么通过一次调用同样可以实现这一目标:
List<?> values = List.of("joe", "Joe", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(values);
查询预处理器会将名为“Collection”的参数展开为一系列绑定标记,从而无需根据参数数量来动态创建查询。嵌套的对象数组会被展开,以便可以使用(例如)选择列表。
考虑以下查询:
SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))
上述查询可以参数化,并按以下方式运行:
- Java
- Kotlin
List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann", 50});
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples)
选择列表(select lists)的用法因浏览器或应用程序的供应商而异。
以下示例展示了一个使用IN谓词的更简单的变体:
- Java
- Kotlin
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", arrayOf(35, 50))
R2DBC本身不支持类似集合(Collection-like)的数据类型。不过,在上面的示例中,对List的扩展在Spring的R2DBC支持中是可行的,例如可以用于IN子句中,如上所示。然而,插入或更新数组类型的列(例如在Postgres中)需要使用底层R2DBC驱动程序支持的数组类型:通常是一个Java数组,比如使用String[]来更新text[]类型的列。不要将Collection<String>或类似的数据作为数组参数传递。
语句过滤器
有时你需要在Statement实际执行之前对其进行微调。为此,需要向DatabaseClient注册一个Statement过滤器(StatementFilterFunction),以便在语句执行过程中拦截并修改它们,如下例所示:
- Java
- Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
.bind("name", …)
.bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
.bind("name", …)
.bind("state", …)
DatabaseClient 还提供了一个简化的 filter(…) 重载版本,该版本接受一个 Function<Statement, Statement> 类型的参数:
- Java
- Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"));
client.sql("SELECT id, name, state FROM table")
.filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
client.sql("SELECT id, name, state FROM table")
.filter { statement -> s.fetchSize(25) }
StatementFilterFunction 的实现允许对 Statement 对象和 Result 对象进行过滤。
DatabaseClient 最佳实践
一旦配置完成,DatabaseClient 类的实例就是线程安全的。这一点很重要,因为这意味着你可以配置一个 DatabaseClient 的单一实例,然后安全地将这个共享引用注入到多个 DAO(或仓库)中。DatabaseClient 是有状态的,因为它维护着一个对 ConnectionFactory 的引用,但这种状态并不是会话状态(conversational state)。
在使用DatabaseClient类时,一种常见的做法是在Spring配置文件中配置一个ConnectionFactory,然后通过依赖注入将该共享的ConnectionFactory bean注入到DAO类中。DatabaseClient是在ConnectionFactory的setter方法中被创建的。这样产生的DAO类如下所示:
- Java
- Kotlin
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory);
}
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {
private val databaseClient = DatabaseClient.create(connectionFactory)
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
另一种替代显式配置的方法是使用组件扫描和注解支持来进行依赖注入。在这种情况下,你可以在类上添加@Component注解(这会使该类成为组件扫描的候选对象),并在ConnectionFactory的setter方法上添加@Autowired注解。以下示例展示了如何实现这一点:
- Java
- Kotlin
@Component 1
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
@Autowired 2
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory); 3
}
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
使用
@Component注解标记该类。使用
@Autowired注解标注ConnectionFactory的 setter 方法。使用
ConnectionFactory创建一个新的DatabaseClient。
@Component 1
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { 2
private val databaseClient = DatabaseClient(connectionFactory) 3
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
使用
@Component注解标记该类。通过构造函数注入
ConnectionFactory。- \ [#3] 使用
ConnectionFactory创建一个新的DatabaseClient。
无论您选择使用上述哪种模板初始化样式(或不使用任何样式),每次运行SQL时都创建一个新的DatabaseClient实例的情况都是很少见的。一旦配置完成,DatabaseClient实例就是线程安全的。如果您的应用程序需要访问多个数据库,那么您可能需要多个DatabaseClient实例,这就需要多个ConnectionFactory,进而也需要多个配置不同的DatabaseClient实例。
获取自动生成的密钥
当向定义了自动递增或标识列的表中插入行时,INSERT 语句可能会生成键值对。若要完全控制所生成的列名,只需注册一个 StatementFilterFunction,该函数可以请求所需列的生成键值对即可。
- Java
- Kotlin
Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"))
.map(row -> row.get("id", Integer.class))
.first();
// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
.map { row -> row.get("id", Integer.class) }
.awaitOne()
// generatedId emits the generated key once the INSERT statement has finished
控制数据库连接
本节涵盖:
使用 ConnectionFactory
Spring通过一个ConnectionFactory来获取到数据库的R2DBC连接。ConnectionFactory是R2DBC规范的一部分,也是驱动程序常见的入口点。它允许容器或框架将连接池管理和事务管理的问题从应用程序代码中隐藏起来。作为开发者,你不需要了解如何连接到数据库的细节。这是管理员的职责,他们负责设置ConnectionFactory。在开发和测试代码的过程中,你很可能同时承担着这两个角色,但你不一定需要知道生产环境的数据源是如何配置的。
当你使用Spring的R2DBC层时,你可以使用第三方提供的连接池实现来配置自己的连接池。一个流行的实现是R2DBC Pool(r2dbc-pool)。Spring发行版中的实现仅用于测试目的,并不提供连接池功能。
要配置一个 ConnectionFactory:
-
通过
ConnectionFactory建立连接,就像通常获取 R2DBCConnectionFactory一样。 -
提供一个 R2DBC URL(请参阅驱动程序的文档以获取正确的值)。
以下示例展示了如何配置一个 ConnectionFactory:
- Java
- Kotlin
ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
使用 ConnectionFactoryUtils
ConnectionFactoryUtils 类是一个方便且强大的辅助类,它提供了 static 方法来从 ConnectionFactory 获取连接,并在必要时关闭连接。
它支持与R2dbcTransactionManager等组件绑定的订阅者Context连接。
使用 SingleConnectionFactory
SingleConnectionFactory 类是 DelegatingConnectionFactory 接口的实现,它包装了一个在每次使用后不会被关闭的单一 Connection 对象。
如果任何客户端代码在假设使用的是池化连接的情况下调用close方法(例如在使用持久化工具时),你应该将suppressClose属性设置为true。此设置会返回一个抑制关闭操作的代理对象,该代理对象会包裹住实际的连接。请注意,你不能再将这个代理对象强制转换为原始的Connection对象或类似的对象了。
SingleConnectionFactory主要是一个测试类,如果您的R2DBC驱动程序允许的话,它可以用于一些特定的需求,例如流水线处理。与共享式ConnectionFactory不同,它始终重用同一个连接,从而避免了过多地创建物理连接。
使用 TransactionAwareConnectionFactoryProxy
TransactionAwareConnectionFactoryProxy 是一个针对目标 ConnectionFactory 的代理。该代理包装了那个目标 ConnectionFactory,以增加对 Spring 管理的事务的支持。
如果您使用的R2DBC客户端没有与Spring的R2DBC支持集成,那么就需要使用这个类。在这种情况下,您仍然可以使用这个客户端,并且让这个客户端参与Spring管理的事务。通常,最好将R2DBC客户端与能够正确访问ConnectionFactoryUtils的组件集成起来,以便进行资源管理。
有关更多详细信息,请参阅TransactionAwareConnectionFactoryProxy的Javadoc。
使用 R2dbcTransactionManager
R2dbcTransactionManager 类是针对单个 R2DBC ConnectionFactory 的 ReactiveTransactionManager 实现。它将来自指定 ConnectionFactory 的 R2DBC Connection 绑定到订阅者 Context 上,每个 ConnectionFactory 可能允许有一个订阅者 Connection。
需要应用程序代码通过 ConnectionFactoryUtils.getConnection(ConnectionFactory) 来获取 R2DBC 的 Connection,而不是使用 R2DBC 的标准方法 ConnectionFactory.create()。所有框架类(如 DatabaseClient)都隐式地采用了这种策略。如果不与事务管理器一起使用,这种查找策略的行为与 ConnectionFactory.create() 完全相同,因此在任何情况下都可以使用。