|
对于最新稳定版本,请使用 Spring Framework 7.0.6! |
使用 R2DBC 进行数据访问
R2DBC(“响应式关系型数据库连接”)是一项由社区推动的规范工作,旨在使用响应式模式标准化对 SQL 数据库的访问。
包层次结构
Spring Framework 的 R2DBC 抽象框架由两个不同的包组成:
-
core:org.springframework.r2dbc.core包包含DatabaseClient类以及各种相关类。参见 使用 R2DBC 核心类控制基本的 R2DBC 处理和错误处理。 -
connection:org.springframework.r2dbc.connection包包含一个工具类, 用于便捷地访问ConnectionFactory,以及各种简单的ConnectionFactory实现, 可用于测试和运行未经修改的 R2DBC。参见 控制数据库连接。
使用 R2DBC 核心类控制基本的 R2DBC 处理和错误处理
本节介绍如何使用 R2DBC 核心类来控制基本的 R2DBC 处理,包括错误处理。内容涵盖以下主题:
使用DatabaseClient
DatabaseClient 是 R2DBC 核心包中的中心类。它负责资源的创建和释放,有助于避免常见错误,例如忘记关闭连接。它执行核心 R2DBC 工作流的基本任务(例如语句的创建与执行),而将提供 SQL 和提取结果的工作留给应用程序代码。DatabaseClient 类:
-
运行SQL查询
-
更新语句和存储过程调用
-
对
Result实例执行迭代 -
捕获 R2DBC 异常,并将其转换为
org.springframework.dao包中定义的通用且更具信息量的异常层次结构。 (参见一致的异常层次结构。)
客户端提供了一个函数式、流畅的 API,使用响应式类型进行声明式组合。
当你在代码中使用 DatabaseClient 时,只需实现 java.util.function 接口,这些接口具有明确的契约。
给定由 Connection 类提供的 DatabaseClient,一个 Function 回调会创建一个 Publisher。
用于从 Row 结果中提取数据的映射函数也同样如此。
你可以在 DAO 实现中通过直接实例化并传入一个 DatabaseClient 引用来使用 ConnectionFactory,也可以在 Spring IoC 容器中对其进行配置,并将其作为 bean 引用提供给 DAO。
创建 DatabaseClient 对象最简单的方式是通过静态工厂方法,如下所示:
-
Java
-
Kotlin
DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
ConnectionFactory 应始终在 Spring IoC 容器中配置为一个 bean。 |
上述方法使用默认设置创建了一个 DatabaseClient。
您也可以通过 Builder 获取一个 DatabaseClient.builder() 实例。
您可以通过调用以下方法来自定义客户端:
-
….bindMarkers(…):提供一个特定的BindMarkersFactory,用于配置命名参数到数据库绑定标记的转换。 -
….executeFunction(…):设置ExecuteFunction,用于指定Statement对象的执行方式。 -
….namedParameters(false):禁用命名参数展开。默认启用。
方言通过以下方式解析:BindMarkersFactoryResolver来自一个ConnectionFactory,通常通过检查ConnectionFactoryMetadata.
您可以让 Spring 自动发现您的 BindMarkersFactory通过注册一个
实现了
的类org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider通过META-INF/spring.factories.
BindMarkersFactoryResolver使用 Spring 的从类路径中发现绑定标记提供者实现SpringFactoriesLoader.
|
当前支持的数据库包括:
-
H2
-
MariaDB
-
Microsoft SQL Server
-
MySQL
-
Postgres
此类发出的所有 SQL 语句都会以 DEBUG 级别记录,日志类别对应于客户端实例的全限定类名(通常为 DefaultDatabaseClient)。此外,每次执行都会在响应式序列中注册一个检查点,以辅助调试。
以下各节提供了一些 DatabaseClient 的使用示例。这些示例并未涵盖 DatabaseClient 所提供的全部功能。
有关完整功能,请参阅相应的javadoc。
执行语句
DatabaseClient 提供了执行语句的基本功能。
以下示例展示了创建一个新表所需的最简但功能完整的代码:
-
Java
-
Kotlin
Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.await()
DatabaseClient 旨在提供便捷、流畅的使用体验。
它在执行规范的每个阶段都暴露了中间方法、延续方法和终止方法。上面的示例使用 then() 返回一个完成型的
Publisher,该发布者会在查询(或多个查询,如果 SQL 查询包含多条语句)完成后立即完成。
execute(…) 接受 SQL 查询字符串或一个查询 Supplier<String>,
以将实际查询的创建推迟到执行时进行。 |
查询(SELECT)
SQL 查询可以通过 Row 对象返回值,也可以返回受影响的行数。
DatabaseClient 可以返回更新的行数或实际的行数据,
具体取决于所执行的查询。
以下查询从表中获取 id 和 name 列:
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
.fetch().first();
val first = client.sql("SELECT id, name FROM person")
.fetch().awaitSingle()
以下查询使用了一个绑定变量:
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitSingle()
你可能已经注意到上面示例中使用了 fetch()。fetch() 是一个延续操作符,用于指定你想要消费多少数据。
调用 first() 返回结果中的第一行,并丢弃其余行。
您可以使用以下操作符来消费数据:
-
first()返回整个结果的第一行。其 Kotlin 协程变体对于非空返回值命名为awaitSingle(),如果值是可选的(nullable),则命名为awaitSingleOrNull()。 -
one()返回恰好一个结果,如果结果包含多行则会失败。 使用 Kotlin 协程时,可使用awaitOne()获取恰好一个值,或者使用awaitOneOrNull()(当该值可能为null时)。 -
all()返回结果的所有行。使用 Kotlin 协程时,请使用flow()。 -
rowsUpdated()返回受影响的行数(INSERT/UPDATE/DELETE的计数)。其 Kotlin 协程变体名为awaitRowsUpdated()。
如果不指定进一步的映射细节,查询将返回表格形式的结果,
以Map的形式表示,其键为不区分大小写的列名,并映射到对应的列值。
你可以通过提供一个 Function<Row, T> 来控制结果映射,该函数会对每个 Row 进行调用,从而返回任意类型的值(包括单个值、集合与映射,以及对象)。
以下示例提取 name 列并输出其值:
-
Java
-
Kotlin
Flux<String> names = client.sql("SELECT name FROM person")
.map(row -> row.get("name", String.class))
.all();
val names = client.sql("SELECT name FROM person")
.map{ row: Row -> row.get("name", String.class) }
.flow()
正在更新(INSERT, UPDATE,和DELETE)与DatabaseClient
修改语句的唯一区别在于,这些语句通常不会返回表格数据,因此你需要使用 rowsUpdated() 来处理结果。
以下示例展示了一个返回已更新行数的 UPDATE 语句:
-
Java
-
Kotlin
Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitRowsUpdated()
将值绑定到查询
典型的应用程序需要使用参数化的 SQL 语句,根据某些输入来查询或更新数据行。这些语句通常是带有 SELECT 子句限制的 WHERE 语句,或者是接受输入参数的 INSERT 和 UPDATE 语句。如果参数未被正确转义,参数化语句就存在 SQL 注入的风险。DatabaseClient 利用 R2DBC 的 bind API 来消除查询参数导致 SQL 注入的风险。您可以使用 execute(…) 操作符提供一个参数化的 SQL 语句,并将参数绑定到实际的 Statement 上。随后,您的 R2DBC 驱动程序会通过预编译语句和参数替换来执行该语句。
参数绑定支持两种绑定策略:
-
按索引,使用从零开始的参数索引。
-
按名称,使用占位符名称。
以下示例展示了查询的参数绑定:
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34);
查询预处理器会将命名的 Collection 参数展开为一系列绑定标记,从而避免根据参数数量动态生成查询语句。
嵌套的对象数组也会被展开,以支持(例如)在 select 列表中使用。
考虑以下查询:
SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))
上述查询可以参数化并按如下方式执行:
-
Java
-
Kotlin
List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann", 50});
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples)
| 选择列表的使用依赖于具体的数据库厂商。 |
以下示例展示了一个使用 IN 谓词的更简单变体:
-
Java
-
Kotlin
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", Arrays.asList(35, 50));
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("tuples", arrayOf(35, 50))
R2DBC 本身并不支持类似集合(Collection)的值。然而,在 Spring 的 R2DBC 支持中,如上例所示,对给定的 List 进行展开在命名参数中是可行的,例如可用于上述 IN 子句中。
不过,插入或更新数组类型的列(例如在 PostgreSQL 中)需要使用底层 R2DBC 驱动所支持的数组类型:
通常是 Java 数组,例如使用 String[] 来更新 text[] 类型的列。
请勿将 Collection<String> 或类似类型作为数组参数传递。 |
语句过滤器
有时,你需要在实际执行 Statement 之前对其选项进行微调。为此,可以向 Statement 注册一个 StatementFilterFunction 过滤器(DatabaseClient),以在语句执行过程中拦截并修改它们,如下例所示:
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
.bind("name", …)
.bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
.bind("name", …)
.bind("state", …)
DatabaseClient 还提供了一个简化的 filter(…) 重载方法,该方法接受一个 Function<Statement, Statement>:
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"));
client.sql("SELECT id, name, state FROM table")
.filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
client.sql("SELECT id, name, state FROM table")
.filter { statement -> s.fetchSize(25) }
StatementFilterFunction 的实现允许对 Statement 以及 Result 对象进行过滤。
DatabaseClient最佳实践
DatabaseClient 类的实例在配置完成后是线程安全的。这一点非常重要,因为这意味着你可以配置一个单一的 DatabaseClient 实例,然后安全地将这个共享引用注入到多个 DAO(或仓库)中。
DatabaseClient 是有状态的,因为它持有一个对 ConnectionFactory 的引用,但这种状态并非会话状态(conversational state)。
在使用 DatabaseClient 类时,一种常见的做法是在 Spring 配置文件中配置一个 ConnectionFactory,然后通过依赖注入将该共享的 ConnectionFactory Bean 注入到你的 DAO 类中。DatabaseClient 是在 ConnectionFactory 的 setter 方法中创建的。这样得到的 DAO 类类似于以下形式:
-
Java
-
Kotlin
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory);
}
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {
private val databaseClient = DatabaseClient.create(connectionFactory)
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
显式配置的一种替代方式是使用组件扫描(component-scanning)和注解支持进行依赖注入。在这种情况下,您可以使用 @Component 注解该类(使其成为组件扫描的候选对象),并使用 ConnectionFactory 注解 @Autowired 的 setter 方法。以下示例展示了如何实现这一点:
-
Java
-
Kotlin
@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
@Autowired (2)
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory); (3)
}
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
| 1 | 使用 @Component 注解该类。 |
| 2 | 使用 ConnectionFactory 注解标注 @Autowired 的 setter 方法。 |
| 3 | 使用 DatabaseClient 创建一个新的 ConnectionFactory。 |
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)
private val databaseClient = DatabaseClient(connectionFactory) (3)
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
| 1 | 使用 @Component 注解该类。 |
| 2 | ConnectionFactory 的构造函数注入。 |
| 3 | 使用 DatabaseClient 创建一个新的 ConnectionFactory。 |
无论您选择使用上述哪种模板初始化方式(或不使用),通常都无需在每次执行 SQL 时都创建一个新的 DatabaseClient 类实例。一旦完成配置,DatabaseClient 实例就是线程安全的。如果您的应用程序需要访问多个数据库,则可能需要多个 DatabaseClient 实例,这就要求使用多个 ConnectionFactory,并相应地配置多个不同的 DatabaseClient 实例。
自动生成键的检索
INSERT 语句在向定义了自增列或标识列的表中插入行时,可能会生成键值。若要完全控制所生成键值对应的列名,只需注册一个 StatementFilterFunction,该函数会请求为指定列生成键值。
-
Java
-
Kotlin
Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"))
.map(row -> row.get("id", Integer.class))
.first();
// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
.map { row -> row.get("id", Integer.class) }
.awaitOne()
// generatedId emits the generated key once the INSERT statement has finished
控制数据库连接
这一部分涵盖了:
使用ConnectionFactory
Spring 通过 ConnectionFactory 获取到数据库的 R2DBC 连接。
ConnectionFactory 是 R2DBC 规范的一部分,也是驱动程序的通用入口点。
它允许容器或框架将连接池和事务管理等细节从应用程序代码中隐藏起来。
作为开发人员,你无需了解如何连接数据库的具体细节。这是负责配置 ConnectionFactory 的管理员的职责。
在开发和测试代码时,你很可能同时扮演这两个角色,但你并不一定需要知道生产环境数据源是如何配置的。
当你使用 Spring 的 R2DBC 层时,可以使用第三方提供的连接池实现来配置自己的连接池。一个流行的实现是 R2DBC Pool(r2dbc-pool)。Spring 发行版中包含的实现仅用于测试目的,不提供连接池功能。
要配置一个ConnectionFactory:
-
通过
ConnectionFactory获取连接,方式与通常获取 R2DBCConnectionFactory的方式相同。 -
提供一个 R2DBC URL (请参阅您所用驱动程序的文档以获取正确的值)。
以下示例展示了如何配置一个 ConnectionFactory:
-
Java
-
Kotlin
ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
使用ConnectionFactoryUtils
ConnectionFactoryUtils 类是一个便捷而强大的辅助类,
它提供了 static 方法,用于从 ConnectionFactory
获取连接并(在必要时)关闭连接。
它支持与订阅者 Context 绑定的连接,例如使用 R2dbcTransactionManager。
使用SingleConnectionFactory
SingleConnectionFactory 类是 DelegatingConnectionFactory 接口的一个实现,它包装了一个单一的 Connection,该连接在每次使用后不会被关闭。
如果任何客户端代码基于连接池的假设调用 close 方法(例如使用持久化工具时),您应将 suppressClose 属性设置为 true。此设置会返回一个抑制关闭操作的代理,该代理包装了物理连接。请注意,此后您将无法再将其转换为原生的 Connection 或类似对象。
SingleConnectionFactory 主要是一个测试类,如果您的 R2DBC 驱动程序允许此类用法,也可用于特定需求,例如流水线操作。
与池化的 ConnectionFactory 不同,它始终复用同一个连接,避免了频繁创建物理连接。
使用TransactionAwareConnectionFactoryProxy
TransactionAwareConnectionFactoryProxy 是目标 ConnectionFactory 的一个代理。
该代理包装了目标 ConnectionFactory,以增加对 Spring 管理的事务的感知能力。
如果使用了一个未以其他方式与 Spring 的 R2DBC 支持集成的 R2DBC 客户端,则需要使用此类。在这种情况下,您仍然可以使用该客户端,同时让该客户端参与 Spring 管理的事务。通常更推荐将 R2DBC 客户端与 ConnectionFactoryUtils 正确集成,以便进行资源管理。 |
请参阅 TransactionAwareConnectionFactoryProxy
javadoc 以获取更多详情。
使用R2dbcTransactionManager
R2dbcTransactionManager 类是针对单个 R2DBC ReactiveTransactionManager 的 ConnectionFactory 实现。它将来自指定 Connection 的 R2DBC ConnectionFactory 绑定到订阅者(subscriber)的 Context 中,从而可能为每个 Connection 提供一个独立的订阅者 ConnectionFactory。
应用程序代码需要通过 Connection 来获取 R2DBC 的 ConnectionFactoryUtils.getConnection(ConnectionFactory),而不是使用 R2DBC 标准的 ConnectionFactory.create() 方法。所有框架类(例如 DatabaseClient)都隐式地使用了这一策略。如果不与事务管理器一起使用,该查找策略的行为与 ConnectionFactory.create() 完全相同,因此在任何情况下都可以使用。