|
对于最新的稳定版本,请使用 Spring Framework 7.0.6! |
使用R2DBC进行数据访问
R2DBC ("Reactive Relational Database Connectivity") 是一个由社区驱动的规范项目,旨在使用响应式模式来标准化对SQL数据库的访问。
包层次结构
Spring Framework 的 R2DBC 抽象框架由两个不同的包组成:
-
core:org.springframework.r2dbc.core包包含DatabaseClient类以及多种相关类。请参阅 使用 R2DBC 核心类来控制基本的 R2DBC 处理和错误处理。 -
connection:org.springframework.r2dbc.connection包包含一个用于轻松ConnectionFactory访问和各种简单ConnectionFactory实现的工具类, 您可以用于测试和运行未修改的 R2DBC。参见 控制数据库连接。
使用 R2DBC 核心类控制基本 R2DBC 处理与错误处理
本节介绍如何使用 R2DBC 核心类来控制基本的 R2DBC 处理,包括错误处理。它包含以下主题:
使用 DatabaseClient
DatabaseClient 是 R2DBC 核心包中的核心类。它负责资源的创建和释放,有助于避免常见的错误,例如忘记关闭连接。它执行 R2DBC 核心工作流的基本任务(如语句的创建和执行),让应用程序代码提供 SQL 并提取结果。DatabaseClient 类:
-
运行 SQL 查询
-
更新语句和存储过程调用
-
对
Result个实例进行迭代 -
捕获R2DBC异常,并将其转换为在
org.springframework.dao包中定义的通用且更丰富的异常层次结构。(参见一致的异常层次结构。)
客户端使用反应类型具有功能性的流畅API,用于声明式组合。
当您在代码中使用 DatabaseClient 时,只需实现
java.util.function 接口,并为它们定义明确的契约。
给定由 DatabaseClient 类提供的 Connection,一个 Function
回调会创建一个 Publisher。映射函数提取 Row 结果的情况也是如此。
您可以通过直接实例化一个 DatabaseClient 引用,在 DAO 实现中使用 DatabaseClient,或者将其配置在 Spring IoC 容器中,并作为 bean 引用提供给 DAO。
创建 DatabaseClient 对象的最简单方法是通过一个静态工厂方法,如下所示:
-
Java
-
Kotlin
DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
The ConnectionFactory 应始终在 Spring IoC 容器中配置为一个 bean。 |
前面的方法使用默认设置创建一个 DatabaseClient。
您也可以从 DatabaseClient.builder() 获取一个 Builder 实例。
您可以调用以下方法来自定义客户端:
-
….bindMarkers(…): 提供一个特定的BindMarkersFactory来配置命名参数到数据库绑定标记的转换。 -
….executeFunction(…): 设置ExecuteFunction以指定Statement对象如何运行。 -
….namedParameters(false): 禁用命名参数扩展。默认已启用。
方言通过 BindMarkersFactoryResolver 从 ConnectionFactory 解析,通常是通过检查 ConnectionFactoryMetadata。
您可以通过通过 META-INF/spring.factories 注册一个实现 org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider 的类,让 Spring 自动发现您的 BindMarkersFactory。
BindMarkersFactoryResolver 通过 Spring 的 SpringFactoriesLoader 从类路径中发现绑定标记提供者实现。
|
当前支持的数据库有:
-
H2
-
MariaDB
-
微软 SQL Server
-
MySQL
-
Postgres
此类发出的所有SQL在对应于客户端实例的完全限定类名的类别下以DEBUG级别记录(通常为DefaultDatabaseClient)。此外,每次执行都会在反应序列中注册一个检查点以帮助调试。
以下部分提供了一些 DatabaseClient 用法的示例。这些示例并不是 DatabaseClient 所有功能的完整列表。
有关详细信息,请参阅相应的 javadoc。
执行语句
DatabaseClient 提供了运行语句的基本功能。
以下示例显示了创建新表所需包含的最小但完整的
代码:
-
Java
-
Kotlin
Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.await()
DatabaseClient 旨在实现方便、流畅的使用。
它在执行规范的每个阶段都公开了中间方法、继续方法和终端方法。上面的示例使用 then() 返回一个完成 Publisher,该完成会在查询(或如果 SQL 查询包含多个语句,则为所有查询)完成后立即完成。
execute(…) 接受 SQL 查询字符串或查询 Supplier<String>
以推迟实际查询的创建直到执行。 |
查询(SELECT)
SQL 查询可以通过 Row 对象返回值,或返回受影响的行数。
DatabaseClient 可以根据所执行的查询返回更新的行数或行本身。
以下查询从表中获取 id 和 name 列:
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
.fetch().first();
val first = client.sql("SELECT id, name FROM person")
.fetch().awaitSingle()
以下查询使用了绑定变量:
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitSingle()
您可能已经注意到上面示例中使用了 fetch()。fetch() 是一个继续操作符,它允许您指定要消耗的数据量。
调用 first() 会从结果中返回第一行并丢弃其余行。
您可以使用以下运算符来消费数据:
-
first()返回整个结果的第一行。其 Kotlin 协程版本名为awaitSingle(),用于非空返回值,而awaitSingleOrNull()用于可选值的情况。 -
one()返回一个结果,如果结果包含多行则会失败。 使用 Kotlin 协程,awaitOne()表示恰好一个值,awaitOneOrNull()表示值可能是null。 -
all()返回结果的所有行。在使用 Kotlin 协程时,请使用flow()。 -
rowsUpdated()返回受影响的行数(INSERT/UPDATE/DELETE计数)。其 Kotlin 协程版本名为awaitRowsUpdated()。
未指定进一步的映射细节时,查询将返回以Map形式呈现的表格结果,其键为不区分大小写的列名,对应其列值。
您可以通过提供一个Function<Row, T>来控制结果映射,该组件会被每个Row调用,从而可以返回任意值(单一值、集合与映射、对象)。
以下示例提取 name 列并发出其值:
-
Java
-
Kotlin
Flux<String> names = client.sql("SELECT name FROM person")
.map(row -> row.get("name", String.class))
.all();
val names = client.sql("SELECT name FROM person")
.map{ row: Row -> row.get("name", String.class) }
.flow()
更新(INSERT、UPDATE 和 DELETE)与 DatabaseClient
修改语句的唯一区别是,这些语句通常不返回表格数据,因此你使用 rowsUpdated() 来消耗结果。
以下示例显示了一个返回更新行数的 UPDATE 语句:
-
Java
-
Kotlin
Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitRowsUpdated()
将值绑定到查询
一个典型的应用程序需要参数化的SQL语句,以便根据某些输入选择或更新行。这些通常是SELECT语句,受WHERE子句或INSERT和UPDATE语句的约束,这些语句接受输入参数。如果参数未正确转义,参数化语句可能会带来SQL注入的风险。DatabaseClient利用R2DBC的bind API来消除查询参数的SQL注入风险。您可以使用execute(…)运算符提供参数化的SQL语句,并将参数绑定到实际的Statement。然后您的R2DBC驱动程序会通过使用预编译语句和参数替换来运行该语句。
参数绑定支持两种绑定策略:
-
通过索引,使用基于零的参数索引。
-
按名称,使用占位符名称。
以下示例显示了查询的参数绑定:
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34);
查询预处理器将命名的 Collection 参数展开为一系列绑定标记,以避免根据参数数量动态创建查询的需要。
嵌套的对象数组会被展开,以允许使用(例如)选择列表。
考虑以下查询:
SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))
前述查询可以进行参数化处理,并可以按如下方式运行:
-
Java
-
Kotlin
List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann", 50});
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples)
| 使用选择列表取决于提供商。 |
以下示例显示了一个使用 IN 谓词的更简单变体:
-
Java
-
Kotlin
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", Arrays.asList(35, 50));
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("tuples", arrayOf(35, 50))
R2DBC本身不支持类似集合的值。然而,
在上面的例子中,展开给定的List对于Spring的R2DBC支持中的命名参数有效,
例如,如上所示用于IN子句。
但是,插入或更新数组类型的列(例如在Postgres中)
需要底层R2DBC驱动程序支持的数组类型:
通常是Java数组,例如String[]来更新text[]列。
不要将Collection<String>或类似内容作为数组参数传递。 |
声明过滤器
有时您需要在实际的 Statement 运行前对其选项进行微调。为此,请通过 DatabaseClient 注册一个 Statement 过滤器(StatementFilterFunction)来拦截并修改执行中的语句,如以下示例所示:
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
.bind("name", …)
.bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
.bind("name", …)
.bind("state", …)
DatabaseClient 还提供了一个简化的 filter(…) 重载方法,它接受一个 Function<Statement, Statement>:
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"));
client.sql("SELECT id, name, state FROM table")
.filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
client.sql("SELECT id, name, state FROM table")
.filter { statement -> s.fetchSize(25) }
StatementFilterFunction 实现允许对
Statement 进行筛选,并对 Result 对象进行筛选。
DatabaseClient 最佳实践
DatabaseClient 类的实例在配置后是线程安全的。这一点很重要,因为这意味着您可以配置一个 DatabaseClient 的实例,然后安全地将此共享引用注入到多个 DAO(或仓库)中。
DatabaseClient 是有状态的,因为它维护对 ConnectionFactory 的引用,但此状态不是会话状态。
使用 DatabaseClient 类的常见做法是,在 Spring 配置文件中配置一个 ConnectionFactory
,然后将该共享的 ConnectionFactory bean 依赖注入到您的 DAO 类中。 DatabaseClient 是在 ConnectionFactory 的 setter 中创建的。这使得 DAO 类看起来如下:
-
Java
-
Kotlin
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory);
}
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {
private val databaseClient = DatabaseClient.create(connectionFactory)
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
显式配置的替代方法是使用组件扫描和注解支持的依赖注入。在这种情况下,您可以将类标注为<code>0</code>> (这使其成为组件扫描的候选),并将<code>1</code>设置方法标注为<code>2</code>。下面的示例展示了如何操作:
-
Java
-
Kotlin
@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
@Autowired (2)
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory); (3)
}
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
| 1 | 使用 @Component 注解该类。 |
| 2 | 使用 @Autowired 注解 ConnectionFactory 的设置方法。 |
| 3 | 使用 ConnectionFactory 创建一个新的 DatabaseClient。 |
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)
private val databaseClient = DatabaseClient(connectionFactory) (3)
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
| 1 | 使用 @Component 注解该类。 |
| 2 | 构造函数注入的 ConnectionFactory。 |
| 3 | 使用 ConnectionFactory 创建一个新的 DatabaseClient。 |
无论您选择以上哪种模板初始化样式(或不使用),通常不需要每次运行SQL时都创建一个新的 DatabaseClient 类的实例。
配置完成后,DatabaseClient 实例是线程安全的。
如果您的应用程序访问多个数据库,可能需要多个 DatabaseClient 实例,这需要多个 ConnectionFactory,并且随后需要多个配置不同的 DatabaseClient 实例。
检索自动生成的键
INSERT 语句在将行插入到定义了自增或标识列的表中时可能会生成键。要完全控制要生成的列名,只需注册一个 StatementFilterFunction,该语句请求所需列的生成键。
-
Java
-
Kotlin
Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"))
.map(row -> row.get("id", Integer.class))
.first();
// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
.map { row -> row.get("id", Integer.class) }
.awaitOne()
// generatedId emits the generated key once the INSERT statement has finished
控制数据库连接
本节内容包括:
使用 ConnectionFactory
Spring 通过 ConnectionFactory 从数据库获取 R2DBC 连接。
ConnectionFactory 是 R2DBC 规范的一部分,是驱动程序的常见入口点。
它可以让容器或框架将连接池和事务管理问题隐藏在应用程序代码之外。作为开发人员,
您无需了解如何连接到数据库的细节。这是设置 ConnectionFactory 的管理员的责任。在开发和测试代码时,
您很可能同时担任这两个角色,但您不一定需要知道生产数据源的配置方式。
当您使用Spring的R2DBC层时,可以使用第三方提供的连接池实现自行配置。流行的实现方案是R2DBC Pool (r2dbc-pool)。Spring发行版中的实现仅供测试使用,不提供连接池功能。
要配置一个 ConnectionFactory:
-
使用
ConnectionFactory获取连接,就像您通常获取 R2DBCConnectionFactory一样。 -
提供一个 R2DBC URL (请参阅您的驱动程序的文档以获取正确的值)。
以下示例展示了如何配置一个ConnectionFactory:
-
Java
-
Kotlin
ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
使用 ConnectionFactoryUtils
ConnectionFactoryUtils 类是一个方便且功能强大的帮助类,
它提供了 static 方法,用于从 ConnectionFactory 获取连接
并关闭连接(如有必要)。
它支持订阅者 Context-bound 连接,例如
R2dbcTransactionManager。
使用 SingleConnectionFactory
The SingleConnectionFactory class is an implementation of DelegatingConnectionFactory
interface that wraps a single Connection that is not closed after each use.
如果任何客户端代码在假设使用连接池连接的情况下调用 close(例如使用持久化工具时),您应将 suppressClose 属性设置为 true。此设置会返回一个关闭抑制代理,该代理包装了物理连接。请注意,您不能再将其强制转换为原生的 Connection 或类似对象。
SingleConnectionFactory 主要是一个测试类,如果您的 R2DBC 驱动程序允许,可以用于特定的需求,例如流水线处理。
与池化的 ConnectionFactory 相比,它会一直重复使用同一连接,从而避免过多地创建物理连接。
使用 TransactionAwareConnectionFactoryProxy
TransactionAwareConnectionFactoryProxy 是对目标 ConnectionFactory 的代理。
代理会包装该目标 ConnectionFactory 以增加对 Spring 管理事务的了解。
如果使用未与其他方式集成到 Spring 的 R2DBC 支持中的 R2DBC 客户端,则必须使用此类。
在这种情况下,您仍然可以使用此客户端,并且同时让此客户端参与 Spring 管理的事务。通常最好将 R2DBC 客户端正确集成以访问 ConnectionFactoryUtils 进行资源管理。 |
查看 TransactionAwareConnectionFactoryProxy
javadoc 以了解更多信息。
使用 R2dbcTransactionManager
R2dbcTransactionManager 类是 ReactiveTransactionManager 的一个实现,用于
单个 R2DBC ConnectionFactory。它将指定的
ConnectionFactory 中的 R2DBC Connection 绑定到订阅者 Context,可能允许每个 ConnectionFactory 对应一个订阅者
Connection。
应用代码需要通过Connection和ConnectionFactoryUtils.getConnection(ConnectionFactory)来获取R2DBC Connection,而不是R2DBC的标准ConnectionFactory.create()。所有框架类(如DatabaseClient)都隐式地使用此策略。如果没有与事务管理器一起使用,查找策略的行为将与ConnectionFactory.create()完全相同,因此可以在任何情况下使用。