应用

5. 来源

5.1. CDC 来源

更改数据捕获 (CDC)source从各种数据库捕获和流式传输更改事件。 目前,它支持MySQL,PostgreSQL,MongoDB,OracleSQL Server数据库。spring-doc.cadn.net.cn

基于 Debezium 嵌入式连接器构建,CDC Source允许通过不同的消息绑定器(如 Apache Kafka、RabbitMQ 和所有 Spring Cloud Stream 支持代理)捕获和流式传输数据库更改。spring-doc.cadn.net.cn

它支持所有 Debezium 配置属性。只需将cdc.config.前缀添加到现有的 Debezium 属性。例如,要将 Debezium 的connector.class属性使用cdc.config.connector.classsource 属性。spring-doc.cadn.net.cn

我们为最常用的 Debezium 属性提供了方便的快捷方式。例如,代替长cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnectorDebezium 属性,您可以使用我们的cdc.connector=mysql捷径。下表列出了所有可用的快捷方式以及它们所代表的 Debezium 属性。 Debezium 属性(例如cdc.config.XXX) 始终优先于快捷方式!spring-doc.cadn.net.cn

CDC Source 引入了新的默认值BackingOffsetStore配置,基于 MetadataStore 服务。Later 提供了各种微服务友好的方式来存储偏移元数据。spring-doc.cadn.net.cn

5.1.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

疾病预防控制中心
配置

用于 debezium 配置属性的 Spring pass-trough 包装器。所有带有“cdc.config.”前缀的属性都是原生 Debezium 属性。删除前缀,将它们转换为 Debezium io.debezium.config.Configuration。(Map<String, String>,默认:<none>)spring-doc.cadn.net.cn

连接器

cdc.config.connector.class 属性的快捷方式。只要它们不相互矛盾,就可以使用其中任何一个。(ConnectorType,默认:<none>,可能的值:mysql,postgres,mongodb,oracle,sqlserver)spring-doc.cadn.net.cn

名称

此 sourceConnector 实例的唯一名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

图式

将架构作为出站消息的一部分包含在内。(布尔值,默认:false)spring-doc.cadn.net.cn

cdc.扁平化
添加字段

要添加到扁平化邮件的元数据字段的逗号分隔列表。这些字段将以“__”或“__[<]struct]__”为前缀,具体取决于结构的规范。(字符串,默认:<none>)spring-doc.cadn.net.cn

添加标头

逗号分隔列表指定要添加到扁平邮件标头的元数据字段列表。这些字段将以“__”或“__[struct]__”为前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

删除处理模式

处理已删除记录的选项:(1) 无 - 传递记录,(2) 删除 - 删除记录和 (3) 重写 - 向记录添加“__deleted”字段。(DeleteHandlingMode,默认:<none>,可能的值:drop,rewrite,none)spring-doc.cadn.net.cn

掉落墓碑

默认情况下,Debezium 会生成逻辑删除记录以对已删除的记录启用 Kafka 压缩。dropTombstones 可以禁止显示墓碑记录。(布尔值,默认:true)spring-doc.cadn.net.cn

启用

启用平展源记录事件 (https://debezium.io/docs/configuration/event-flattening)。(布尔值,默认:true)spring-doc.cadn.net.cn

cdc.偏移
提交超时

在取消进程并恢复将来尝试提交的偏移数据之前,等待刷新记录并将要提交到偏移存储的偏移数据的最大毫秒数。(持续时间,默认:5000ms)spring-doc.cadn.net.cn

冲洗间隔

尝试提交偏移量的时间间隔。默认值为 1 分钟。(持续时间,默认:60000ms)spring-doc.cadn.net.cn

政策

偏移存储提交策略。(OffsetPolicy,默认:<none>)spring-doc.cadn.net.cn

存储

Kafka 连接器跟踪已处理的记录数,并定期将计数(作为“偏移量”)存储在预配置的元数据存储中。重新启动时,连接器会从上次记录的源偏移量恢复读取。(OffsetStorageType,默认:<none>,可能的值:memory,file,kafka,metadata)spring-doc.cadn.net.cn

cdc.stream.header
转换连接标头

当 true 时,{@link org.apache.kafka.connect.header.Header} 将转换为消息头,名称为 {@link org.apache.kafka.connect.header.Header#key()},{@link org.apache.kafka.connect.header.Header#value()}。(布尔值,默认:true)spring-doc.cadn.net.cn

抵消

将源记录的偏移量元数据序列化到 cdc.offset 下的出站邮件标头中。(布尔值,默认:false)spring-doc.cadn.net.cn

元数据.store.dynamo-db
创建延迟

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

创建重试

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

读取容量

表上的读取容量。(长,默认:1)spring-doc.cadn.net.cn

桌子

元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

生存时间

表条目的 TTL。(整数,默认:<none>)spring-doc.cadn.net.cn

写入容量

表上的写入容量。(长,默认:1)spring-doc.cadn.net.cn

metadata.store.gemfire
地区

元数据的 Gemfire 区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

元数据.store.jdbc
地区

在此存储中保留的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

表前缀

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

元数据.store.mongo-db
收集

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis
钥匙

元数据的 Redis 密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指示要配置的元数据存储类型(默认值为“内存”)。您必须包含相应的 Spring Integration 依赖项才能使用持久存储。(StoreType,默认:<none>,可能的值:mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)spring-doc.cadn.net.cn

元数据.store.zookeeper
连接字符串

Zookeeper 连接字符串,格式为 HOST:PORT。(字符串,默认:127.0.0.1:2181)spring-doc.cadn.net.cn

编码

在 Zookeeper 中存储数据时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

重试间隔

Zookeeper作的重试间隔(以毫秒为单位)。(整数,默认:1000)spring-doc.cadn.net.cn

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

Debezium 属性快捷方式映射

下表列出了所有可用的快捷方式以及它们所代表的 Debezium 属性。spring-doc.cadn.net.cn

表 1.表格快捷方式属性映射
捷径 源语言 描述

cdc.connectorspring-doc.cadn.net.cn

cdc.config.connector.classspring-doc.cadn.net.cn

mysql:MySql连接器,postgres:PostgresConnector,mongodb:MongodbSourceConnector,oracle:OracleConnector,sqlserver:SqlServerConnectorspring-doc.cadn.net.cn

cdc.namespring-doc.cadn.net.cn

cdc.config.namespring-doc.cadn.net.cn

cdc.offset.flush-intervalspring-doc.cadn.net.cn

cdc.config.offset.flush.interval.msspring-doc.cadn.net.cn

cdc.offset.commit-timeoutspring-doc.cadn.net.cn

cdc.config.offset.flush.timeout.msspring-doc.cadn.net.cn

cdc.offset.policyspring-doc.cadn.net.cn

cdc.config.offset.commit.policyspring-doc.cadn.net.cn

periodic:定期提交偏移策略,always:AlwaysCommitOffsetPolicyspring-doc.cadn.net.cn

cdc.offset.storagespring-doc.cadn.net.cn

cdc.config.offset.storagespring-doc.cadn.net.cn

metadata:MetadataStoreOffsetBackingStore、file:文件偏移备份存储,kafka:KafkaOffsetBackingStore,memory: 内存偏移备份存储spring-doc.cadn.net.cn

cdc.flattening.drop-tombstonesspring-doc.cadn.net.cn

cdc.config.drop.墓碑spring-doc.cadn.net.cn

cdc.flattening.delete-handling-modespring-doc.cadn.net.cn

cdc.config.delete.handling.modespring-doc.cadn.net.cn

none:没有drop:落rewrite:重写spring-doc.cadn.net.cn

5.1.2. 数据库支持

CDC Source使用 Debezium 实用程序,目前支持 CDC 的五个数据存储:MySQL,PostgreSQL,MongoDB,OracleSQL Server数据库。spring-doc.cadn.net.cn

5.1.3. 示例和测试

[CdcSourceIntegrationTest]() 、[CdcDeleteHandlingIntegrationTest]() 和 [CdcFlatteningIntegrationTest]() 集成测试使用在本地计算机上运行的测试数据库夹具。 我们使用预构建的 debezium docker 数据库镜像。 Maven 构建借助docker-maven-plugin.spring-doc.cadn.net.cn

要从 IDE 运行和调试测试,您需要从命令行部署所需的数据库映像。 以下说明说明介绍了如何从 Docker 镜像运行预配置的测试数据库。spring-doc.cadn.net.cn

MySQL

启动debezium/example-mysql在 docker 中:spring-doc.cadn.net.cn

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0

(可选)用mysql客户端连接到数据库并创建debezium具有所需凭据的用户:spring-doc.cadn.net.cn

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

使用以下属性将 CDC 源连接到 MySQL 数据库:spring-doc.cadn.net.cn

cdc.connector=mysql (1)

cdc.name=my-sql-connector (2)
cdc.config.database.server.id=85744 (2)
cdc.config.database.server.name=my-app-connector (2)

cdc.config.database.user=debezium  (3)
cdc.config.database.password=dbz (3)
cdc.config.database.hostname=localhost (3)
cdc.config.database.port=3306 (3)

cdc.schema=true (4)
cdc.flattening.enabled=true (5)
1 将 CDC 源配置为使用 MySqlConnector。(相当于cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector).
2 元数据用于识别和调度传入事件。
3 连接到运行在localhost:3306debezium用户。
4 SourceRecord事件。
5 启用 CDC 事件平展化。

您还可以运行CdcSourceIntegrationTests#CdcMysqlTests使用此 MySQL 配置。spring-doc.cadn.net.cn

PostgreSQL

debezium/example-postgres:1.0Docker 镜像:spring-doc.cadn.net.cn

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0

您可以像这样连接到此服务器:spring-doc.cadn.net.cn

psql -U postgres -h localhost -p 5432

使用以下属性将 CDC 源连接到 PostgreSQL:spring-doc.cadn.net.cn

cdc.connector=postgres (1)
cdc.offset.storage=memory (2)

cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)

cdc.config.database.user=postgres  (4)
cdc.config.database.password=postgres (4)
cdc.config.database..dbname=postgres (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=5432 (4)

cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 配置CDC Source以使用 PostgresConnector。等效于cdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector.
2 配置 Debezium 引擎以使用memory(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)支持偏移量存储。
3 元数据用于识别和调度传入事件。
4 连接到运行在localhost:5432postgres用户。
5 SourceRecord事件。
6 启用 CDC 事件平展化。

您还可以运行CdcSourceIntegrationTests#CdcPostgresTests使用此 MySQL 配置。spring-doc.cadn.net.cn

Mongo数据库

debezium/example-mongodb:0.10Docker 镜像:spring-doc.cadn.net.cn

docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz  debezium/example-mongodb:0.10

初始化库存集合spring-doc.cadn.net.cn

docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'

mongodb终端输出,搜索像host: "3f95a8a6516e:27017":spring-doc.cadn.net.cn

2019-01-10T13:46:10.004+0000 I COMMAND  [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms

127.0.0.1 3f95a8a6516e进入您的/etc/hostsspring-doc.cadn.net.cn

使用以下属性将 CDC 源连接到 MongoDB:spring-doc.cadn.net.cn

cdc.connector=mongodb (1)
cdc.offset.storage=memory (2)

cdc.config.mongodb.hosts=rs0/localhost:27017 (3)
cdc.config.mongodb.name=dbserver1 (3)
cdc.config.mongodb.user=debezium (3)
cdc.config.mongodb.password=dbz (3)
cdc.config.database.whitelist=inventory (3)

cdc.config.tasks.max=1 (4)

cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 配置CDC Source使用 MongoDB 连接器。这映射到cdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector.
2 配置 Debezium 引擎以使用memory(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)支持偏移量存储。
3 连接到运行在localhost:27017debezium用户。
4 debezium.io/docs/connectors/mongodb/#tasks
5 SourceRecord事件。
6 启用 CDC 事件平展化。

您还可以运行CdcSourceIntegrationTests#CdcPostgresTests使用此 MySQL 配置。spring-doc.cadn.net.cn

SQL 服务器

开始一个sqlserverdebezium/example-postgres:1.0Docker 镜像:spring-doc.cadn.net.cn

docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2

从 debezium 的 sqlserver 教程中填充示例数据:spring-doc.cadn.net.cn

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

使用以下属性将 CDC 源连接到 SQLServer:spring-doc.cadn.net.cn

cdc.connector=sqlserver (1)
cdc.offset.storage=memory (2)

cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)

cdc.config.database.user=sa  (4)
cdc.config.database.password=Password! (4)
cdc.config.database..dbname=testDB (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=1433 (4)
1 配置CDC Source以使用 SqlServerConnector。等效于设置cdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector.
2 配置 Debezium 引擎以使用memory(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)支持偏移量存储。
3 元数据用于识别和调度传入事件。
4 连接到运行在localhost:1433sa用户。

您还可以运行CdcSourceIntegrationTests#CdcSqlServerTests使用此 MySQL 配置。spring-doc.cadn.net.cn

神谕

从 localhost 启动可访问的 Oracle,并使用 Debezium Vagrant 设置中描述的配置、用户和授权进行设置spring-doc.cadn.net.cn

从 Debezium 的 Oracle 教程中填充示例数据:spring-doc.cadn.net.cn

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

5.2. 文件源

此应用程序轮询目录并将新文件或其内容发送到输出通道。 默认情况下,文件源将 File 的内容作为字节数组提供。 但是,可以使用 --file.supplier.mode 选项对其进行自定义:spring-doc.cadn.net.cn

使用时--file.supplier.mode=lines,您还可以提供附加选项--file.supplier.withMarkers=true. 如果设置为 true,则基础 FileSplitter 将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。 这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker.如果未显式设置,则选项 withMarkers 默认为 false。spring-doc.cadn.net.cn

5.2.1. 选项

文件源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

文件.consumer
标记-json

当 'fileMarkers == true' 时,指定它们是否应生成为 FileSplitter.FileMarker 对象或 JSON。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的 FileReadingMode。值是 'ref' - File 对象,'lines' - 每行一条消息,或 'contents' - 以字节形式显示的内容。(FileReadingMode,默认:<none>,可能的值:ref,lines,contents)spring-doc.cadn.net.cn

带标记

设置为 true 以在数据之前/之后发出文件开始/文件结束标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认:<none>)spring-doc.cadn.net.cn

文件.提供商
空时延迟

未检测到新文件时的延迟持续时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

目录

要轮询新文件的目录。(文件,默认:<none>)spring-doc.cadn.net.cn

文件名模式

一个简单的Ant模式来匹配文件。(字符串,默认:<none>)spring-doc.cadn.net.cn

文件名-正则表达式

用于匹配文件的正则表达式模式。(模式,默认:<none>)spring-doc.cadn.net.cn

预防重复

设置为 true 以包含防止重复的 AcceptOnceFileListFilter。(布尔值,默认:true)spring-doc.cadn.net.cn

5.3. FTP 源

此源应用程序支持使用 FTP 协议传输文件。 文件从remote目录到local部署应用的目录。 默认情况下,源发出的消息作为字节数组提供。但是,这可能是 使用--mode选择:spring-doc.cadn.net.cn

使用时--mode=lines,您还可以提供附加选项--withMarkers=true. 如果设置为true,基础FileSplitter将在实际数据之前和之后发出额外的文件开始文件结束标记消息。 这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker.选项withMarkers默认为false如果未显式设置。spring-doc.cadn.net.cn

另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。spring-doc.cadn.net.cn

5.3.1. 输入

N/A(从 FTP 服务器获取文件)。spring-doc.cadn.net.cn

5.3.2. 输出

模式 = 内容
头:
有效载荷:

一个byte[]填充文件内容。spring-doc.cadn.net.cn

模式 = 线
头:
有效载荷:

一个String对于每行。spring-doc.cadn.net.cn

第一行前面有一条消息,其中有START标记有效载荷。 最后一行后面有一条消息,其中有END标记有效载荷。spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json性能。spring-doc.cadn.net.cn

模式 = ref
头:
有效载荷:

一个java.io.File对象。spring-doc.cadn.net.cn

5.3.3. 选项

ftp 源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

文件.consumer
标记-json

当 'fileMarkers == true' 时,指定它们是否应生成为 FileSplitter.FileMarker 对象或 JSON。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的 FileReadingMode。值是 'ref' - File 对象,'lines' - 每行一条消息,或 'contents' - 以字节形式显示的内容。(FileReadingMode,默认:<none>,可能的值:ref,lines,contents)spring-doc.cadn.net.cn

带标记

设置为 true 以在数据之前/之后发出文件开始/文件结束标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认:<none>)spring-doc.cadn.net.cn

ftp.工厂
缓存会话

缓存会话。(布尔值,默认:<none>)spring-doc.cadn.net.cn

客户端模式

用于 FTP 会话的客户端模式。(ClientMode,默认:<none>,可能的值:ACTIVE,PASSIVE)spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

用于连接到服务器的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:21)spring-doc.cadn.net.cn

用户名

用于连接到服务器的用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

ftp.supplier
自动创建本地目录

设置为 true 以创建本地目录(如果不存在)。(布尔值,默认:true)spring-doc.cadn.net.cn

空时延迟

未检测到新文件时的延迟持续时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

删除远程文件

设置为 true 可在成功传输后删除远程文件。(布尔值,默认:false)spring-doc.cadn.net.cn

文件名模式

一种筛选器模式,用于匹配要传输的文件的名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

文件名-正则表达式

用于匹配要传输的文件名称的过滤器正则表达式模式。(模式,默认:<none>)spring-doc.cadn.net.cn

本地目录

用于文件传输的本地目录。(文件,默认:<none>)spring-doc.cadn.net.cn

保留时间戳

设置为 true 以保留原始时间戳。(布尔值,默认:true)spring-doc.cadn.net.cn

远程目录

远程 FTP 目录。(字符串,默认:/)spring-doc.cadn.net.cn

远程文件分隔符

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

tmp-文件后缀

传输过程中要使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

元数据.store.dynamo-db
创建延迟

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

创建重试

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

读取容量

表上的读取容量。(长,默认:1)spring-doc.cadn.net.cn

桌子

元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

生存时间

表条目的 TTL。(整数,默认:<none>)spring-doc.cadn.net.cn

写入容量

表上的写入容量。(长,默认:1)spring-doc.cadn.net.cn

metadata.store.gemfire
地区

元数据的 Gemfire 区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

元数据.store.jdbc
地区

在此存储中保留的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

表前缀

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

元数据.store.mongo-db
收集

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis
钥匙

元数据的 Redis 密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指示要配置的元数据存储类型(默认值为“内存”)。您必须包含相应的 Spring Integration 依赖项才能使用持久存储。(StoreType,默认:<none>,可能的值:mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)spring-doc.cadn.net.cn

元数据.store.zookeeper
连接字符串

Zookeeper 连接字符串,格式为 HOST:PORT。(字符串,默认:127.0.0.1:2181)spring-doc.cadn.net.cn

编码

在 Zookeeper 中存储数据时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

重试间隔

Zookeeper作的重试间隔(以毫秒为单位)。(整数,默认:1000)spring-doc.cadn.net.cn

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

5.3.4. 示例

java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
         --ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo

5.4. 晶洞源

Geode 源将发出从 Apache Geode EntryEvents 或 CqEvents 中提取的对象流。spring-doc.cadn.net.cn

5.4.1. 选项

晶洞源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

geode.客户端
pdx 读取序列化

将 Geode 对象反序列化为 PdxInstance 而不是域类。(布尔值,默认:false)spring-doc.cadn.net.cn

geode.池
连接类型

指定连接类型:“服务器”或“定位器”。(ConnectType,默认:<none>,可能的值:locator,server)spring-doc.cadn.net.cn

主机地址

指定一个或多个格式为 [host]:[port] 的 Gemfire 定位器或服务器地址。(InetSocketAddress[],默认:<none>)spring-doc.cadn.net.cn

已启用订阅

设置为 true 以启用客户端池的订阅。需要将更新同步到客户端缓存。(布尔值,默认:false)spring-doc.cadn.net.cn

geode.region
区域名称

区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

geode.安全
密码

缓存密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

缓存用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

geode.security.ssl (地理洞.安全.ssl)
密码

将用于安全套接字连接的 SSL 密码配置为有效密码名称的数组。(字符串,默认:any)spring-doc.cadn.net.cn

密钥存储类型

标识用于 SSL 通信的密钥库类型(例如 JKS、PKCS11 等)。(字符串,默认:JKS)spring-doc.cadn.net.cn

密钥存储 uri

用于连接到 Geode 集群的预先创建的密钥库 URI 的位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

SSL-密钥存储密码

用于访问密钥信任库的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

SSL-信任存储密码

用于访问信任存储的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

信任存储类型

标识用于 SSL 通信的信任库类型(例如 JKS、PKCS11 等)。(字符串,默认:JKS)spring-doc.cadn.net.cn

信任存储 uri

用于连接到 Geode 集群的预先创建的信任库 URI 的位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

用户主目录

本地目录,用于缓存从 truststoreUri 和 keystoreUri 位置下载的信任库和密钥库文件。(字符串,默认:user.home)spring-doc.cadn.net.cn

geode.提供商
事件表达式

SpEL 表达式,用于从 {@link org.apache.geode.cache.EntryEvent} 或 {@link org.apache.geode.cache.query.CqEvent} 中提取数据。(表达式,默认值:<none>)spring-doc.cadn.net.cn

查询

OQL 查询。如果提供,这将启用连续查询。(字符串,默认:<none>)spring-doc.cadn.net.cn

5.5. Http 源代码

侦听 HTTP 请求并将正文作为消息有效负载发出的源应用程序。 如果 Content-Type 匹配text/*application/json,有效负载将是一个字符串, 否则,有效负载将是一个字节数组。spring-doc.cadn.net.cn

有效载荷:

如果内容类型匹配text/*application/jsonspring-doc.cadn.net.cn

如果内容类型不匹配text/*application/jsonspring-doc.cadn.net.cn

5.5.2. 选项

http 源支持以下配置属性:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

http.cors
允许凭据

浏览器是否应包含与要注释的请求域关联的任何 cookie。(布尔值,默认:<none>)spring-doc.cadn.net.cn

允许的标头

可在实际请求期间使用的请求标头列表。(String[],默认:<none>)spring-doc.cadn.net.cn

允许的来源

允许的来源列表,例如 https://domain1.com。(String[],默认:<none>)spring-doc.cadn.net.cn

http
映射请求标头

将要映射的标头。(String[],默认:<none>)spring-doc.cadn.net.cn

路径模式

HTTP 端点路径映射。(字符串,默认:/)spring-doc.cadn.net.cn

服务器
端口

服务器 HTTP 端口。(整数,默认:8080)spring-doc.cadn.net.cn

5.6. JDBC 源代码

此源轮询来自 RDBMS 的数据。此源完全基于DataSourceAutoConfiguration,因此请参阅 Spring Boot JDBC 支持 了解更多信息。spring-doc.cadn.net.cn

有效载荷
  • Map<String, Object>什么时候jdbc.split == true(默认)和List<Map<String, Object>>否则spring-doc.cadn.net.cn

5.6.2. 选项

jdbc 源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

jdbc.supplier
最大行数

要处理查询的最大行数。(整数,默认:0)spring-doc.cadn.net.cn

查询

用于选择数据的查询。(字符串,默认:<none>)spring-doc.cadn.net.cn

分裂

是否将 SQL 结果拆分为单个消息。(布尔值,默认:true)spring-doc.cadn.net.cn

更新

要执行的 SQL 更新语句,用于将轮询的消息标记为“已见”。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.cloud.stream.轮询器
克罗恩

Cron 表达式值。(字符串,默认:<none>)spring-doc.cadn.net.cn

固定延迟

修复了默认轮询器的延迟。(长,默认:1000)spring-doc.cadn.net.cn

初始延迟

定期触发器的初始延迟。(整数,默认:0)spring-doc.cadn.net.cn

每次轮询的最大消息数

默认轮询程序的每次轮询的最大消息数。(长,默认:1)spring-doc.cadn.net.cn

时间单位

要应用于延迟值的 TimeUnit。(TimeUnit,默认:<none>,可能的值:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)spring-doc.cadn.net.cn

spring.datasource
数据

数据 (DML) 脚本资源引用。(List<String>,默认值:<none>)spring-doc.cadn.net.cn

驱动程序类名称

JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

初始化模式

确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化时要应用的模式。(DataSourceInitializationMode,默认:embedded,可能的值:ALWAYS,EMBEDDED,NEVER)spring-doc.cadn.net.cn

密码

数据库的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

图式

架构 (DDL) 脚本资源引用。(List<String>,默认值:<none>)spring-doc.cadn.net.cn

网址

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

数据库的登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

另请参阅 Spring Boot 文档以获取补充DataSourceproperties 和TriggerPropertiesMaxMessagesProperties用于轮询选项。spring-doc.cadn.net.cn

5.7. JMS 源代码

JMS 源允许从 JMS 接收消息。spring-doc.cadn.net.cn

5.7.1. 选项

JMS 源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

jms.supplier
客户端 ID

持久订阅的客户端 ID。(字符串,默认:<none>)spring-doc.cadn.net.cn

目的地

接收消息的目标(队列或主题)。(字符串,默认:<none>)spring-doc.cadn.net.cn

消息选择器

消息的选择器。(字符串,默认:<none>)spring-doc.cadn.net.cn

会话交易

True 启用事务并选择 DefaultMessageListenerContainer,false 选择 SimpleMessageListenerContainer。(布尔值,默认:true)spring-doc.cadn.net.cn

订阅持久

对于持久订阅,则为 true。(布尔值,默认:<none>)spring-doc.cadn.net.cn

订阅名称

持久订阅或共享订阅的名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

订阅共享

对于共享订阅,则为 true。(布尔值,默认:<none>)spring-doc.cadn.net.cn

spring.jms
jndi-name

连接工厂 JNDI 名称。设置后,优先于其他连接工厂自动配置。(字符串,默认:<none>)spring-doc.cadn.net.cn

pub-sub-domain

默认目标类型是否为 topic。(布尔值,默认:false)spring-doc.cadn.net.cn

spring.jms.listener
确认模式

容器的确认模式。默认情况下,侦听器通过自动确认进行交易。(AcknowledgeMode,默认:<none>,可能的值:AUTO,CLIENT,DUPS_OK)spring-doc.cadn.net.cn

自动启动

启动时自动启动容器。(布尔值,默认:true)spring-doc.cadn.net.cn

并发

最小并发使用者数。(整数,默认:<none>)spring-doc.cadn.net.cn

最大并发

最大并发使用者数。(整数,默认:<none>)spring-doc.cadn.net.cn

接收超时

用于接听呼叫的超时。使用 -1 表示无等待接收,或使用 0 表示根本没有超时。后者仅在未在事务管理器中运行时才可行,并且通常不鼓励使用,因为它会阻止完全关闭。(持续时间,默认:1s)spring-doc.cadn.net.cn

5.8. 负载发生器源

发送生成的数据并将其分派到流的源。spring-doc.cadn.net.cn

5.8.1. 选项

负载发生器源具有以下选项:spring-doc.cadn.net.cn

load-generator.generate-timestamp

是否生成时间戳。(布尔值,默认:false)spring-doc.cadn.net.cn

加载生成器.消息计数

消息计数。(整数,默认:1000)spring-doc.cadn.net.cn

load-generator.message-size

消息大小。(整数,默认:1000)spring-doc.cadn.net.cn

load-generator.producers

生产者数量。(整数,默认:1)spring-doc.cadn.net.cn

5.9. 邮件源

侦听电子邮件并将邮件正文作为邮件有效负载发出的源应用程序。spring-doc.cadn.net.cn

5.9.1. 选项

邮件源具有以下选项:spring-doc.cadn.net.cn

mail.supplier.charset

byte[] 邮件到字符串转换的字符集。(字符串,默认:UTF-8)spring-doc.cadn.net.cn

邮件.提供商.delete

设置为 true 可在下载后删除电子邮件。(布尔值,默认:false)spring-doc.cadn.net.cn

邮件.提供商表达式

配置 SpEL 表达式以选择消息。(字符串,默认:true)spring-doc.cadn.net.cn

邮件.supplier.idle-imap

设置为 true 以使用 IdleImap 配置。(布尔值,默认:false)spring-doc.cadn.net.cn

mail.supplier.java-邮件属性

JavaMail 属性作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<none>)spring-doc.cadn.net.cn

mail.supplier.标记为已读

设置为 true 将电子邮件标记为已读。(布尔值,默认:false)spring-doc.cadn.net.cn

邮件.supplier.url

用于连接到邮件服务器的邮件连接 URL,例如“imaps://username:[email protected]:993/Inbox”。(URLName,默认:<none>)spring-doc.cadn.net.cn

mail.supplier.user-flag

当服务器不支持 \Recent 时标记邮件的标志。(字符串,默认:<none>)spring-doc.cadn.net.cn

5.10. MongoDB 源代码

此源轮询来自 MongoDB 的数据。 此来源完全基于MongoDataAutoConfiguration,因此请参阅 Spring Boot MongoDB 支持 了解更多信息。spring-doc.cadn.net.cn

5.10.1. 选项

mongodb 源代码具有以下选项:spring-doc.cadn.net.cn

mongodb.supplier.collection

要查询的 MongoDB 集合。(字符串,默认:<none>)spring-doc.cadn.net.cn

mongodb.supplier.查询

MongoDB 查询。(字符串,默认:{ })spring-doc.cadn.net.cn

mongodb.supplier.query-expression

MongoDB查询DSL样式中的SpEL表达式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

mongodb.supplier.split

是否将查询结果拆分为单个消息。(布尔值,默认:true)spring-doc.cadn.net.cn

mongodb.supplier.update-expression

MongoDB 中的 SpEL 表达式更新了 DSL 样式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

另请参阅 Spring Boot 文档以获取更多内容MongoProperties性能。 See 和TriggerProperties用于轮询选项。spring-doc.cadn.net.cn

5.11. MQTT 源代码

启用从 MQTT 接收消息的源。spring-doc.cadn.net.cn

有效载荷:

5.11.2. 选项

mqtt 源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

MQTT
清洁会话

客户端和服务器是否应记住重新启动和重新连接的状态。(布尔值,默认:true)spring-doc.cadn.net.cn

连接超时

连接超时(以秒为单位)。(整数,默认:30)spring-doc.cadn.net.cn

保持活动间隔

ping间隔(以秒为单位)。(整数,默认:60)spring-doc.cadn.net.cn

密码

连接到代理时要使用的密码。(字符串,默认:guest)spring-doc.cadn.net.cn

坚持

'内存'或'文件'。(字符串,默认:memory)spring-doc.cadn.net.cn

持久性目录

持久性目录。(字符串,默认:/tmp/paho)spring-doc.cadn.net.cn

网址

MQTT 代理的位置(逗号分隔的列表)。(String[],默认:[tcp://localhost:1883])spring-doc.cadn.net.cn

用户名

连接到代理时要使用的用户名。(字符串,默认:guest)spring-doc.cadn.net.cn

mqtt.supplier
二元的

true 将有效负载保留为字节。(布尔值,默认:false)spring-doc.cadn.net.cn

字符集

用于将字节转换为 String 的字符集(当二进制为 false 时)。(字符串,默认:UTF-8)spring-doc.cadn.net.cn

客户端 ID

标识客户端。(字符串,默认:stream.client.id.source)spring-doc.cadn.net.cn

QOS

QOs;所有主题的单个值或与主题匹配的逗号分隔列表。(Integer[],默认:[0])spring-doc.cadn.net.cn

主题

源将订阅的主题(逗号分隔)。(String[],默认:[stream.mqtt])spring-doc.cadn.net.cn

5.12. RabbitMQ 源代码

“rabbit”源允许从 RabbitMQ 接收消息。spring-doc.cadn.net.cn

在部署流之前,队列必须存在;它们不会自动创建。 您可以使用 RabbitMQ Web UI 轻松创建队列。spring-doc.cadn.net.cn

5.12.1. 输入

5.12.2. 输出

有效载荷

5.12.3. 选项

子源有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

兔子.提供商
启用重试

true 启用重试。(布尔值,默认:false)spring-doc.cadn.net.cn

初始重试间隔

启用重试时的初始重试间隔。(整数,默认:1000)spring-doc.cadn.net.cn

映射请求标头

将要映射的标头。(String[],默认:[STANDARD_REQUEST_HEADERS])spring-doc.cadn.net.cn

最大尝试次数

启用重试时的最大传递尝试次数。(整数,默认:3)spring-doc.cadn.net.cn

最大重试间隔

启用重试时的最大重试间隔。(整数,默认:30000)spring-doc.cadn.net.cn

自己的连接

如果为 true,请根据启动属性使用单独的连接。(布尔值,默认:false)spring-doc.cadn.net.cn

队列

源将侦听消息的队列。(String[],默认:<none>)spring-doc.cadn.net.cn

重新排队

是否应将被拒绝的邮件重新排队。(布尔值,默认:true)spring-doc.cadn.net.cn

重试乘数

启用重试时重试回退乘数。(双精度,默认:2)spring-doc.cadn.net.cn

交易

通道是否已交易。(布尔值,默认:false)spring-doc.cadn.net.cn

spring.rabbitmq
地址

客户端应连接到的地址的逗号分隔列表。设置后,将忽略主机和端口。(字符串,默认:<none>)spring-doc.cadn.net.cn

连接超时

连接超时。将其设置为零以永久等待。(持续时间,默认:<none>)spring-doc.cadn.net.cn

主机

RabbitMQ 主机。如果设置了地址,则忽略。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

登录以对代理进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

端口

RabbitMQ 端口。如果设置了地址,则忽略。默认为 5672,如果启用了 SSL,则默认为 5671。(整数,默认:<none>)spring-doc.cadn.net.cn

发布者确认类型

确认使用的发布者类型。(ConfirmType,默认:<none>,可能的值:SIMPLE,CORRELATED,NONE)spring-doc.cadn.net.cn

发布者返回

是否启用发布商退货。(布尔值,默认:false)spring-doc.cadn.net.cn

请求通道最大值

客户端请求的每个连接的通道数。使用 0 表示无限制。(整数,默认:2047)spring-doc.cadn.net.cn

请求心跳

请求的检测信号超时;零表示无。如果未指定持续时间后缀,则将使用秒。(持续时间,默认:<none>)spring-doc.cadn.net.cn

用户名

登录用户以向代理进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

虚拟主机

连接到代理时要使用的虚拟主机。(字符串,默认:<none>)spring-doc.cadn.net.cn

另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的附加属性。spring-doc.cadn.net.cn

关于重试的说明
使用默认的 ackModeAUTO) 和重新排队true) 选项,将重试失败的消息传递 无限期。 由于兔源没有太多的加工,因此源本身失败的风险很小,除非 下游Binder由于某种原因未连接。 将 requeue 设置为 false 将导致邮件在第一次尝试时被拒绝(并可能发送到死信 Exchange/Queue(如果代理已如此配置)。 enableRetry 选项允许配置重试参数,以便可以重试失败的消息传递,并且 当重试用尽时,最终被丢弃(或死信)。 在重试间隔期间,传递线程将挂起。 重试选项包括 enableRetrymaxAttemptsinitialRetryIntervalretryMultiplermaxRetryInterval。 永远不会重试因 MessageConversionException 而失败的消息传递;假设如果消息 第一次尝试时无法转换,后续尝试也将失败。 此类消息将被丢弃(或死信)。

5.12.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

5.12.5. 示例

java -jar rabbit-source.jar --rabbit.queues=

5.13. Amazon S3 源代码

此源应用程序支持使用 Amazon S3 协议传输文件。 文件从remote目录(S3 存储桶)到local部署应用程序的目录。spring-doc.cadn.net.cn

默认情况下,源发出的消息作为字节数组提供。但是,这可能是 使用--mode选择:spring-doc.cadn.net.cn

使用时--mode=lines,您还可以提供附加选项--withMarkers=true. 如果设置为true,基础FileSplitter将在实际数据之前和之后发出额外的文件开始文件结束标记消息。 这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker.选项withMarkers默认为false如果未显式设置。spring-doc.cadn.net.cn

另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。spring-doc.cadn.net.cn

模式 = 线
头:
有效载荷:

一个String对于每行。spring-doc.cadn.net.cn

第一行前面有一条消息,其中有START标记有效载荷。 最后一行后面有一条消息,其中有END标记有效载荷。spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json性能。spring-doc.cadn.net.cn

模式 = ref
头:
有效载荷:

一个java.io.File对象。spring-doc.cadn.net.cn

5.13.3. 选项

s3 源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

文件.consumer
标记-json

当 'fileMarkers == true' 时,指定它们是否应生成为 FileSplitter.FileMarker 对象或 JSON。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的 FileReadingMode。值是 'ref' - File 对象,'lines' - 每行一条消息,或 'contents' - 以字节形式显示的内容。(FileReadingMode,默认:<none>,可能的值:ref,lines,contents)spring-doc.cadn.net.cn

带标记

设置为 true 以在数据之前/之后发出文件开始/文件结束标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认:<none>)spring-doc.cadn.net.cn

元数据.store.dynamo-db
创建延迟

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

创建重试

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

读取容量

表上的读取容量。(长,默认:1)spring-doc.cadn.net.cn

桌子

元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

生存时间

表条目的 TTL。(整数,默认:<none>)spring-doc.cadn.net.cn

写入容量

表上的写入容量。(长,默认:1)spring-doc.cadn.net.cn

metadata.store.gemfire
地区

元数据的 Gemfire 区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

元数据.store.jdbc
地区

在此存储中保留的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

表前缀

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

元数据.store.mongo-db
收集

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis
钥匙

元数据的 Redis 密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指示要配置的元数据存储类型(默认值为“内存”)。您必须包含相应的 Spring Integration 依赖项才能使用持久存储。(StoreType,默认:<none>,可能的值:mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)spring-doc.cadn.net.cn

元数据.store.zookeeper
连接字符串

Zookeeper 连接字符串,格式为 HOST:PORT。(字符串,默认:127.0.0.1:2181)spring-doc.cadn.net.cn

编码

在 Zookeeper 中存储数据时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

重试间隔

Zookeeper作的重试间隔(以毫秒为单位)。(整数,默认:1000)spring-doc.cadn.net.cn

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

s3.common
端点网址

用于连接到 s3 兼容存储的可选端点 url。(字符串,默认:<none>)spring-doc.cadn.net.cn

路径样式访问

使用路径样式访问。(布尔值,默认:false)spring-doc.cadn.net.cn

S3.提供商
自动创建本地目录

创建或不创建本地目录。(布尔值,默认:true)spring-doc.cadn.net.cn

删除远程文件

处理后删除或不删除远程文件。(布尔值,默认:false)spring-doc.cadn.net.cn

文件名模式

过滤远程文件的模式。(字符串,默认:<none>)spring-doc.cadn.net.cn

文件名-正则表达式

用于过滤远程文件的正则表达式。(模式,默认:<none>)spring-doc.cadn.net.cn

仅列表

设置为 true 可返回 s3 对象元数据,而不将文件复制到本地目录。(布尔值,默认:false)spring-doc.cadn.net.cn

本地目录

用于存储文件的本地目录。(文件,默认:<none>)spring-doc.cadn.net.cn

保留时间戳

将远程文件的时间戳传输到本地文件。(布尔值,默认:true)spring-doc.cadn.net.cn

远程目录

AWS S3 存储桶资源。(字符串,默认:bucket)spring-doc.cadn.net.cn

远程文件分隔符

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

tmp-文件后缀

临时文件后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

5.13.4. Amazon AWS 通用选项

Amazon S3 源(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,其自动配置 类由 Spring Boot 自动使用。 请参阅他们的文档,了解所需和有用的自动配置属性。spring-doc.cadn.net.cn

其中一些是关于 AWS 凭证的:spring-doc.cadn.net.cn

其他适用于 AWSRegion定义:spring-doc.cadn.net.cn

对于 AWSStack:spring-doc.cadn.net.cn

5.13.5. 示例

java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines

5.14. SFTP 源

此源应用程序支持使用 SFTP 协议传输文件。文件从remote目录到local部署应用的目录。 默认情况下,源发出的消息作为字节数组提供。但是,这可能是 使用--mode选择:spring-doc.cadn.net.cn

使用时--mode=lines,您还可以提供附加选项--withMarkers=true. 如果设置为true,基础FileSplitter将在实际数据之前和之后发出额外的文件开始文件结束标记消息。 这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker.选项withMarkers默认为false如果未显式设置。spring-doc.cadn.net.cn

sftp-supplier以获取高级配置选项。spring-doc.cadn.net.cn

另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。spring-doc.cadn.net.cn

5.14.1. 输入

N/A(从 SFTP 服务器获取文件)。spring-doc.cadn.net.cn

5.14.2. 输出

模式 = 内容
头:
有效载荷:

一个byte[]填充文件内容。spring-doc.cadn.net.cn

模式 = 线
头:
有效载荷:

一个String对于每行。spring-doc.cadn.net.cn

第一行前面有一条消息,其中有START标记有效载荷。 最后一行后面有一条消息,其中有END标记有效载荷。spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json性能。spring-doc.cadn.net.cn

模式 = ref
头:
有效载荷:

一个java.io.File对象。spring-doc.cadn.net.cn

5.14.3. 选项

ftp 源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

文件.consumer
标记-json

当 'fileMarkers == true' 时,指定它们是否应生成为 FileSplitter.FileMarker 对象或 JSON。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的 FileReadingMode。值是 'ref' - File 对象,'lines' - 每行一条消息,或 'contents' - 以字节形式显示的内容。(FileReadingMode,默认:<none>,可能的值:ref,lines,contents)spring-doc.cadn.net.cn

带标记

设置为 true 以在数据之前/之后发出文件开始/文件结束标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认:<none>)spring-doc.cadn.net.cn

元数据.store.dynamo-db
创建延迟

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

创建重试

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

读取容量

表上的读取容量。(长,默认:1)spring-doc.cadn.net.cn

桌子

元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

生存时间

表条目的 TTL。(整数,默认:<none>)spring-doc.cadn.net.cn

写入容量

表上的写入容量。(长,默认:1)spring-doc.cadn.net.cn

metadata.store.gemfire
地区

元数据的 Gemfire 区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

元数据.store.jdbc
地区

在此存储中保留的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

表前缀

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

元数据.store.mongo-db
收集

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis
钥匙

元数据的 Redis 密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指示要配置的元数据存储类型(默认值为“内存”)。您必须包含相应的 Spring Integration 依赖项才能使用持久存储。(StoreType,默认:<none>,可能的值:mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)spring-doc.cadn.net.cn

元数据.store.zookeeper
连接字符串

Zookeeper 连接字符串,格式为 HOST:PORT。(字符串,默认:127.0.0.1:2181)spring-doc.cadn.net.cn

编码

在 Zookeeper 中存储数据时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

重试间隔

Zookeeper作的重试间隔(以毫秒为单位)。(整数,默认:1000)spring-doc.cadn.net.cn

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

SFTP.提供商
自动创建本地目录

设置为 true 以创建本地目录(如果不存在)。(布尔值,默认:true)spring-doc.cadn.net.cn

空时延迟

未检测到新文件时的延迟持续时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

删除远程文件

设置为 true 可在成功传输后删除远程文件。(布尔值,默认:false)spring-doc.cadn.net.cn

目录

工厂“name.directory”对的列表。(String[],默认:<none>)spring-doc.cadn.net.cn

工厂

工厂名称到工厂的映射。(Map<String, Factory>,默认值:<none>)spring-doc.cadn.net.cn

公平

对于多个服务器/目录的公平轮换为 True。默认情况下,这是 false,因此如果一个源有多个条目,则将在访问其他源之前收到这些条目。(布尔值,默认:false)spring-doc.cadn.net.cn

文件名模式

一种筛选器模式,用于匹配要传输的文件的名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

文件名-正则表达式

用于匹配要传输的文件名称的过滤器正则表达式模式。(模式,默认:<none>)spring-doc.cadn.net.cn

仅列表

设置为 true 以返回不包含整个有效负载的文件元数据。(布尔值,默认:false)spring-doc.cadn.net.cn

本地目录

用于文件传输的本地目录。(文件,默认:<none>)spring-doc.cadn.net.cn

最大获取

每次轮询要获取的最大远程文件数;默认无限制。在列出文件或构建任务启动请求时不适用。(整数,默认:<none>)spring-doc.cadn.net.cn

保留时间戳

设置为 true 以保留原始时间戳。(布尔值,默认:true)spring-doc.cadn.net.cn

远程目录

远程 FTP 目录。(字符串,默认:/)spring-doc.cadn.net.cn

远程文件分隔符

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

将远程文件重命名为

成功传输后,必须将解析为新名称远程文件的 SpEL 表达式重命名为。(表达式,默认值:<none>)spring-doc.cadn.net.cn

设置为 true 以流式传输文件而不是复制到本地目录。(布尔值,默认:false)spring-doc.cadn.net.cn

tmp-文件后缀

传输过程中要使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

sftp.提供商.工厂
允许未知键

True 表示允许未知或更改的键。(布尔值,默认:false)spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

已知主机表达式

解析为已知主机文件位置的 SpEL 表达式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

密码短语

用户私钥的密码。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

密码

用于连接到服务器的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:22)spring-doc.cadn.net.cn

私钥

用户私钥的资源位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

用户名

用于连接到服务器的用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

sftp.supplier.sort-by
属性

要排序依据的文件列表条目的属性(FILENAME、ATIME:上次访问时间、MTIME:上次修改时间)。(属性,默认:<none>)spring-doc.cadn.net.cn

迪尔

分拣方向(ASC 或 DESC)。(Dir,默认:<none>)spring-doc.cadn.net.cn

5.14.4. 示例

java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
         --sftp.supplier.factory.username=user --ftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo

5.15. 系统日志

系统日志源通过 UDP、TCP 或两者接收 SYSLOG 数据包。支持RFC3164 (BSD) 和 RFC5424 格式。spring-doc.cadn.net.cn

5.15.1. 选项

syslog.supplier.buffer-size

解码消息时使用的缓冲区大小;较大的邮件将被拒绝。(整数,默认:2048)spring-doc.cadn.net.cn

系统日志.supplier.nio

是否使用蔚来(支持大量连接时)。(布尔值,默认:false)spring-doc.cadn.net.cn

系统日志.supplier.port

要侦听的端口。(整数,默认:1514)spring-doc.cadn.net.cn

syslog.supplier.protocol 系统日志

用于 SYSLOG(tcp 或 udp)的协议。(协议,默认:<none>,可能的值:tcp,udp,both)spring-doc.cadn.net.cn

syslog.supplier.反向查找

是否对传入套接字执行反向查找。(布尔值,默认:false)spring-doc.cadn.net.cn

syslog.supplier.rfc

'5424' 或 '3164' - 根据 RFC 的系统日志格式;3164 又名“BSD”格式。(字符串,默认:3164)spring-doc.cadn.net.cn

syslog.supplier.socket-超时

套接字超时。(整数,默认:0)spring-doc.cadn.net.cn

5.16. TCP的

tcp源充当服务器,允许远程方连接到它并通过原始 TCP 套接字提交数据。spring-doc.cadn.net.cn

TCP 是一种流式处理协议,需要某种机制来构建在线路上的消息。许多解码器是 可用,默认值为与 Telnet 兼容的“CRLF”。spring-doc.cadn.net.cn

TCP 源应用程序生成的消息具有byte[]有效载荷。spring-doc.cadn.net.cn

5.16.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

TCP
蔚来

是否使用蔚来。(布尔值,默认:false)spring-doc.cadn.net.cn

端口

要收听的端口;0 表示作系统选择端口。(整数,默认:1234)spring-doc.cadn.net.cn

反向查找

对远程 IP 地址执行反向 DNS 查找;如果为 false,则邮件标头中仅包含 IP 地址。(布尔值,默认:false)spring-doc.cadn.net.cn

套接字超时

未收到数据时关闭套接字之前的超时 (ms)。(整数,默认:120000)spring-doc.cadn.net.cn

使用直接缓冲区

是否使用直接缓冲区。(布尔值,默认:false)spring-doc.cadn.net.cn

tcp.提供商
缓冲区大小

解码消息时使用的缓冲区大小;较大的邮件将被拒绝。(整数,默认:2048)spring-doc.cadn.net.cn

译码器

接收消息时要使用的解码器。(编码,默认:<none>,可能的值:CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)spring-doc.cadn.net.cn

5.16.2. 可用的解码器

文本数据
CRLF(默认)

以回车符 (0x0d) 结尾的文本,后跟换行符 (0x0a)spring-doc.cadn.net.cn

如果

以换行符结尾的文本 (0x0a)spring-doc.cadn.net.cn

以空字节 (0x00) 结尾的文本spring-doc.cadn.net.cn

STXETX

前面是 STX (0x02) 并以 ETX (0x03) 结尾的文本spring-doc.cadn.net.cn

文本和二进制数据

no structure - 客户端通过关闭套接字来指示完整的消息spring-doc.cadn.net.cn

L1

前面有一个 1 字节(无符号)长度字段的数据(最多支持 255 字节)spring-doc.cadn.net.cn

L2

数据前面有一个两个字节(无符号)长度字段(最多 216-1 字节)spring-doc.cadn.net.cn

L4

数据前面有一个四字节(有符号)长度字段(最多 231-1 字节)spring-doc.cadn.net.cn

5.17. 时间源

时间源将只是每隔一段时间发出一个包含当前时间的字符串。spring-doc.cadn.net.cn

5.17.1. 选项

时间源具有以下选项:spring-doc.cadn.net.cn

spring.cloud.stream.poller.cron

Cron 表达式值。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.cloud.stream.poller.fixed-delay

修复了默认轮询器的延迟。(长,默认:1000)spring-doc.cadn.net.cn

spring.cloud.stream.poller.initial-delay

定期触发器的初始延迟。(整数,默认:0)spring-doc.cadn.net.cn

每次轮询spring.cloud.stream.poller.max消息

默认轮询程序的每次轮询的最大消息数。(长,默认:1)spring-doc.cadn.net.cn

spring.cloud.stream.poller.time-unit

要应用于延迟值的 TimeUnit。(TimeUnit,默认:<none>,可能的值:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)spring-doc.cadn.net.cn

time.date-format

日期值的格式。(字符串,默认:MM/dd/yy HH:mm:ss)spring-doc.cadn.net.cn

5.18. Twitter 消息源

重复检索过去 30 天内的直接消息(已发送和已接收),按时间倒序排序。 释放的消息被缓存(在MetadataStorecache)以防止重复。 默认情况下,内存中的SimpleMetadataStore被使用。spring-doc.cadn.net.cn

twitter.message.source.count控制返回的消息数。spring-doc.cadn.net.cn

spring.cloud.stream.poller属性控制消息轮询间隔。 必须与使用的 API 速率限制保持一致spring-doc.cadn.net.cn

5.18.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

spring.cloud.stream.轮询器
克罗恩

Cron 表达式值。(字符串,默认:<none>)spring-doc.cadn.net.cn

固定延迟

修复了默认轮询器的延迟。(长,默认:1000)spring-doc.cadn.net.cn

初始延迟

定期触发器的初始延迟。(整数,默认:0)spring-doc.cadn.net.cn

每次轮询的最大消息数

默认轮询程序的每次轮询的最大消息数。(长,默认:1)spring-doc.cadn.net.cn

时间单位

要应用于延迟值的 TimeUnit。(TimeUnit,默认:<none>,可能的值:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)spring-doc.cadn.net.cn

推特连接
访问Tokens

您的 Twitter Tokens。(字符串,默认:<none>)spring-doc.cadn.net.cn

访问Tokens密钥

你的 Twitter Tokens密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

消费者键

您的 Twitter 密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

消费者机密

你的 Twitter 秘密。(字符串,默认:<none>)spring-doc.cadn.net.cn

启用调试

启用 Twitter4J 调试模式。(布尔值,默认:false)spring-doc.cadn.net.cn

原始 JSON

启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认:true)spring-doc.cadn.net.cn

推特.message.source
计数

要返回的最大事件数。默认值为 20。最多 50 人(整数,默认:20)spring-doc.cadn.net.cn

5.19. Twitter 搜索源

Twitter 的标准搜索 API(搜索/推文)允许对最近或流行的推文的索引进行简单查询。这Source提供针对过去 7 天内发布的最近推文样本的连续搜索。“公共”API 集的一部分。spring-doc.cadn.net.cn

返回与指定查询匹配的相关推文的集合。spring-doc.cadn.net.cn

使用spring.cloud.stream.poller属性来控制连续搜索请求之间的间隔。速率限制 - 每 30 分钟窗口 180 个请求(例如 ~6 r/m,~ 1 req / 10 秒)spring-doc.cadn.net.cn

twitter.search查询属性允许按关键字查询并按时间和地理位置过滤结果。spring-doc.cadn.net.cn

twitter.search.counttwitter.search.page根据搜索 API 控制结果分页。spring-doc.cadn.net.cn

注意:Twitter 的搜索服务以及搜索 API 并不意味着成为推文的详尽来源。并非所有推文都会被索引或通过搜索界面提供。spring-doc.cadn.net.cn

5.19.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

spring.cloud.stream.轮询器
克罗恩

Cron 表达式值。(字符串,默认:<none>)spring-doc.cadn.net.cn

固定延迟

修复了默认轮询器的延迟。(长,默认:1000)spring-doc.cadn.net.cn

初始延迟

定期触发器的初始延迟。(整数,默认:0)spring-doc.cadn.net.cn

每次轮询的最大消息数

默认轮询程序的每次轮询的最大消息数。(长,默认:1)spring-doc.cadn.net.cn

时间单位

要应用于延迟值的 TimeUnit。(TimeUnit,默认:<none>,可能的值:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)spring-doc.cadn.net.cn

推特连接
访问Tokens

您的 Twitter Tokens。(字符串,默认:<none>)spring-doc.cadn.net.cn

访问Tokens密钥

你的 Twitter Tokens密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

消费者键

您的 Twitter 密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

消费者机密

你的 Twitter 秘密。(字符串,默认:<none>)spring-doc.cadn.net.cn

启用调试

启用 Twitter4J 调试模式。(布尔值,默认:false)spring-doc.cadn.net.cn

原始 JSON

启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认:true)spring-doc.cadn.net.cn

计数

每页要返回的推文数量(例如,每个请求),最多 100 条。(整数,默认:100)spring-doc.cadn.net.cn

将搜索到的推文限制为给定语言,由 http://en.wikipedia.org/wiki/ISO_639-1 给出。(字符串,默认:<none>)spring-doc.cadn.net.cn

在再次从最近的推文开始搜索之前,要向后搜索(从最近的推文到最旧的推文)的页面数(例如请求)。向后搜索的推文总量为(页数*数)(整数,默认:3)spring-doc.cadn.net.cn

查询

按搜索查询字符串搜索推文。(字符串,默认:<none>)spring-doc.cadn.net.cn

从最近的空响应重新启动

从空回复的最新推文重新开始搜索。仅在第一次重启后应用(例如,当since_id != UNBOUNDED 时)(布尔值,默认:false)spring-doc.cadn.net.cn

结果类型

指定您希望接收的搜索结果类型。当前默认值为“混合”。有效值包括:混合 :在响应中同时包含热门结果和实时结果。最近 :仅返回响应中的最新结果 popular :仅返回响应中最受欢迎的结果(ResultType,默认:<none>,可能的值:popular,mixed,recent)spring-doc.cadn.net.cn

因为

如果指定,则返回自给定日期以来的推文。日期的格式应为 YYYY-MM-DD。(字符串,默认:<none>)spring-doc.cadn.net.cn

推特.search.geocode
纬度

用户的纬度。(双精度,默认:-1)spring-doc.cadn.net.cn

经度

用户的经度。(双精度,默认:-1)spring-doc.cadn.net.cn

半径

(纬度、经度)点周围的半径(以公里为单位)。(双精度,默认:-1)spring-doc.cadn.net.cn

5.20. Twitter 流源

实时推文流筛选器示例 API 支持。spring-doc.cadn.net.cn

  • Filter API返回与一个或多个过滤器谓词匹配的公共状态。 多个参数允许使用与流式处理API的单个连接。 提示:这track,followlocations字段与 OR 运算符组合! 查询track=foofollow=1234返回匹配的推文test 由用户创建1234.spring-doc.cadn.net.cn

  • Sample API返回所有公共状态的小型随机样本。 默认访问级别返回的推文是相同的,因此,如果两个不同的客户端连接到此端点,它们将看到相同的推文。spring-doc.cadn.net.cn

默认访问级别允许多达 400 个跟踪关键字、5,000 个关注用户 ID 和 25 个 0.1-360 度位置框。spring-doc.cadn.net.cn

5.20.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

推特连接
访问Tokens

您的 Twitter Tokens。(字符串,默认:<none>)spring-doc.cadn.net.cn

访问Tokens密钥

你的 Twitter Tokens密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

消费者键

您的 Twitter 密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

消费者机密

你的 Twitter 秘密。(字符串,默认:<none>)spring-doc.cadn.net.cn

启用调试

启用 Twitter4J 调试模式。(布尔值,默认:false)spring-doc.cadn.net.cn

原始 JSON

启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认:true)spring-doc.cadn.net.cn

推特.stream.filter
计数

指示在过渡到实时流之前要流化的先前状态数。(整数,默认:0)spring-doc.cadn.net.cn

过滤器级别

筛选器级别将推文限制为具有最小 filterLevel 属性值的推文。无、低或中之一。(FilterLevel,默认:<none>)spring-doc.cadn.net.cn

跟随

按 ID 指定要从中接收公开推文的用户。(List<Long>,默认值:<none>)spring-doc.cadn.net.cn

语言

指定流的推文语言。(List<String>,默认值:<none>)spring-doc.cadn.net.cn

地点

要跟踪的位置。内部表示为 2D 数组。边界框无效:52.38、4.90、51.51、-0.12。第一对必须是盒子的SW角(List<BoundingBox>,默认值:<none>)spring-doc.cadn.net.cn

跟踪

指定要跟踪的关键字。(List<String>,默认值:<none>)spring-doc.cadn.net.cn

推特.stream
类型

<缺少文档>(StreamType,默认:<none>,可能的值:sample,filter,firehose,link)spring-doc.cadn.net.cn

5.21. Websocket 源代码

Websocket通过 Web 套接字生成消息的源。spring-doc.cadn.net.cn

5.21.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

websocket.supplier
允许的来源

允许的来源。(字符串,默认:*)spring-doc.cadn.net.cn

路径

服务器 WebSocket 处理程序公开的路径。(字符串,默认:/websocket)spring-doc.cadn.net.cn

websocket.supplier.sock-js
使

在服务器上启用 SockJS 服务。默认值为“false”(布尔值,默认:false)spring-doc.cadn.net.cn

5.21.2. 示例

要验证 websocket-source 是否从 Websocket 客户端接收消息,您可以使用以下简单的端到端设置。spring-doc.cadn.net.cn

第 1 步:启动 kafka
第 2 步:部署websocket-source在特定端口上,比如 8080
第 3 步:在端口 8080 路径“/websocket”上连接一个 websocket 客户端,并发送一些消息。

您可以启动 kafka 控制台使用者并在那里查看消息。spring-doc.cadn.net.cn

6. 处理器

6.1. 聚合处理器

聚合器处理器使应用程序能够将传入消息聚合到组中,并将它们释放到输出目标中。spring-doc.cadn.net.cn

java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbcspring-doc.cadn.net.cn

如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。spring-doc.cadn.net.cn

有效载荷

6.1.2. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

聚合
集合体

用于聚集策略的 SpEL 表达。默认值为有效负载的集合。(表达式,默认值:<none>)spring-doc.cadn.net.cn

相关

相关键的 SpEL 表达式。默认为 correlationId 标头。(表达式,默认值:<none>)spring-doc.cadn.net.cn

组超时

SpEL 表达式,用于超时到即将过期的未完成组。(表达式,默认值:<none>)spring-doc.cadn.net.cn

消息存储实体

持久性消息存储实体:RDBMS 中的表前缀、MongoDb 中的集合名称等。(字符串,默认:<none>)spring-doc.cadn.net.cn

消息存储类型

消息存储类型。(字符串,默认:<none>)spring-doc.cadn.net.cn

释放

用于释放策略的 SpEL 表达式。默认值基于 sequenceSize 标头。(表达式,默认值:<none>)spring-doc.cadn.net.cn

spring.data.mongodb
身份验证数据库

身份验证数据库名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

自动索引创建

是否启用自动索引创建。(布尔值,默认:<none>)spring-doc.cadn.net.cn

数据库

数据库名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

字段命名策略

要使用的 FieldNamingStrategy 的完全限定名称。(Class<?>,默认:<none>)spring-doc.cadn.net.cn

网格 FS 数据库

GridFS 数据库名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

主机

Mongo 服务器主机。无法使用 URI 进行设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

mongo 服务器的登录密码。不能使用 URI 设置。(Character[],默认:<none>)spring-doc.cadn.net.cn

端口

Mongo 服务器端口。无法使用 URI 设置。(整数,默认:<none>)spring-doc.cadn.net.cn

副本集名称

集群所需的副本集名称。无法使用 URI 进行设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

URI

Mongo 数据库 URI。不能使用主机、端口、凭据和副本集名称进行设置。(字符串,默认:mongodb://localhost/test)spring-doc.cadn.net.cn

用户名

mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uuid-表示

将 UUID 转换为 BSON 二进制值时使用的表示。(UuidRepresentation,默认:java-legacy,可能的值:UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)spring-doc.cadn.net.cn

spring.datasource
出错时继续

在初始化数据库时发生错误时是否停止。(布尔值,默认:false)spring-doc.cadn.net.cn

数据

数据 (DML) 脚本资源引用。(List<String>,默认值:<none>)spring-doc.cadn.net.cn

数据密码

用于执行 DML 脚本的数据库密码(如果不同)。(字符串,默认:<none>)spring-doc.cadn.net.cn

数据用户名

用于执行 DML 脚本的数据库的用户名(如果不同)。(字符串,默认:<none>)spring-doc.cadn.net.cn

驱动程序类名称

JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

生成唯一名称

是否生成随机数据源名称。(布尔值,默认:true)spring-doc.cadn.net.cn

初始化模式

确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化时要应用的模式。(DataSourceInitializationMode,默认:embedded,可能的值:ALWAYS,EMBEDDED,NEVER)spring-doc.cadn.net.cn

jndi-name

数据源的 JNDI 位置。设置时忽略类、url、用户名和密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

名称

数据源的名称。使用嵌入式数据库时默认为“testdb”。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

数据库的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

平台

要在 DDL 或 DML 脚本中使用的平台(例如 schema-${platform}.sql 或 data-${platform}.sql)。(字符串,默认:all)spring-doc.cadn.net.cn

图式

架构 (DDL) 脚本资源引用。(List<String>,默认值:<none>)spring-doc.cadn.net.cn

模式密码

用于执行 DDL 脚本的数据库密码(如果不同)。(字符串,默认:<none>)spring-doc.cadn.net.cn

架构用户名

用于执行 DDL 脚本的数据库的用户名(如果不同)。(字符串,默认:<none>)spring-doc.cadn.net.cn

分隔符

SQL 初始化脚本中的语句分隔符。(字符串,默认:;)spring-doc.cadn.net.cn

sql-script-encoding

SQL 脚本编码。(字符集,默认:<none>)spring-doc.cadn.net.cn

类型

要使用的连接池实现的完全限定名称。默认情况下,它是从类路径中自动检测到的。(Class<DataSource>,默认值:<none>)spring-doc.cadn.net.cn

网址

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

数据库的登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.mongodb.embedded
特征

要启用的功能的逗号分隔列表。默认使用已配置版本的默认值。(Set<Feature>,默认值:[sync_delay])spring-doc.cadn.net.cn

版本

要使用的 Mongo 版本。(字符串,默认:3.5.5)spring-doc.cadn.net.cn

spring.redis 的
客户端名称

要在与 CLIENT SETNAME 的连接上设置的客户端名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

数据库

连接工厂使用的数据库索引。(整数,默认:0)spring-doc.cadn.net.cn

主机

Redis 服务器主机。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

redis 服务器的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

Redis 服务器端口。(整数,默认:6379)spring-doc.cadn.net.cn

SSL的

是否启用 SSL 支持。(布尔值,默认:false)spring-doc.cadn.net.cn

超时

连接超时。(持续时间,默认:<none>)spring-doc.cadn.net.cn

网址

连接 URL。覆盖主机、端口和密码。用户被忽略。示例:redis://user:[email protected]:6379(字符串,默认:<none>)spring-doc.cadn.net.cn

6.2. 桥接处理器

一种处理器,只需将传入有效负载传递给出站,即可桥接输入和输出。spring-doc.cadn.net.cn

有效载荷

6.3. 滤波处理器

筛选处理器使应用程序能够检查传入的有效负载,然后对其应用谓词,以决定是否需要继续记录。 例如,如果传入有效负载的类型为String如果您想过滤掉任何少于五个字符的内容,您可以运行过滤处理器,如下所示。spring-doc.cadn.net.cn

java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4spring-doc.cadn.net.cn

如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。spring-doc.cadn.net.cn

有效载荷

您可以将任何类型作为有效负载传递,然后对其应用 SpEL 表达式进行过滤。如果传入类型为byte[]并将内容类型设置为text/plainapplication/json,则应用程序将byte[]String.spring-doc.cadn.net.cn

6.3.2. 选项

filter.function.expression

布尔 SpEL 表达式,用于应用于要过滤的请求消息。(表达式,默认值:<none>)spring-doc.cadn.net.cn

6.4. Groovy 处理器

对消息应用 Groovy 脚本的处理器。spring-doc.cadn.net.cn

6.4.1. 选项

groovy-processor 处理器具有以下选项:spring-doc.cadn.net.cn

6.5. 标头富集处理器

使用标头扩充器应用添加邮件标头。spring-doc.cadn.net.cn

标头以换行符分隔键值对的形式提供,其中键是标头名称,值是 SpEL 表达式。 例如--headers='foo=payload.someProperty \n bar=payload.otherProperty'.spring-doc.cadn.net.cn

6.5.1. 选项

标头扩充处理器具有以下选项:spring-doc.cadn.net.cn

header.enricher.headers

\n 表示标头的分隔属性,其中值是 SpEL 表达式,例如 foo='bar' \n baz=payload.baz。(属性,默认值:<none>)spring-doc.cadn.net.cn

header.enricher.overwrite

设置为 true 以覆盖任何现有邮件标头。(布尔值,默认:false)spring-doc.cadn.net.cn

6.6. Http 请求处理器

向 HTTP 资源发出请求并将响应正文作为消息有效负载发出的处理器应用程序。spring-doc.cadn.net.cn

6.6.1. 输入

任何必需的 HTTP 标头都必须通过headersheaders-expression财产。请参阅下面的示例。 标头值也可用于构造:spring-doc.cadn.net.cn

有效载荷

默认情况下,有效负载用作 POST 请求的请求正文,并且可以是任何 Java 类型。 它应该是 GET 请求的空字符串。 有效负载还可用于构造:spring-doc.cadn.net.cn

底层 WebClient 支持 Jackson JSON 序列化,以支持任何请求和响应类型(如有必要)。 这expected-response-type属性String.class默认情况下,可以设置为应用程序类路径中的任何类。 请注意,用户定义的有效负载类型需要将所需的依赖项添加到您的 pom 文件中。spring-doc.cadn.net.cn

6.6.2. 输出

没有 HTTP 消息头映射到出站消息。spring-doc.cadn.net.cn

有效载荷

原始输出对象是 ResponseEntity<?>其任何字段(例如body,headers) 或访问器方法 (statusCode) 可以作为reply-expression. 默认情况下,出站消息有效负载是响应正文。 请注意,ResponseEntity(由表达式#root) 默认情况下不能被 Jackson 反序列化,但可以呈现为HashMap.spring-doc.cadn.net.cn

6.6.3. 选项

http-request 处理器具有以下选项:spring-doc.cadn.net.cn

6.6.4. 选项

http.request.body-expression

一个 SpEL 表达式,用于从传入消息派生请求正文。(表达式,默认值:<none>)spring-doc.cadn.net.cn

http.request.expected-response-type

用于解释响应的类型。(Class<?>,默认:<none>)spring-doc.cadn.net.cn

http.request.headers-expression

用于派生要使用的 http 标头映射的 SpEL 表达式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

http.request.http-方法表达式

一个 SpEL 表达式,用于从传入消息派生请求方法。(表达式,默认值:<none>)spring-doc.cadn.net.cn

http.request.maximum-buffer-size (英语)

为输入流缓冲区分配的最大缓冲区大小(以字节为单位)。默认为 256k。根据需要增加发布或获取大型二进制内容。(整数,默认:0)spring-doc.cadn.net.cn

http.request.reply-expression

用于计算最终结果的 SpEL 表达式,应用于整个 http {@link org.springframework.http.ResponseEntity}。(表达式,默认值:<none>)spring-doc.cadn.net.cn

http.request.timeout

请求超时(以毫秒为单位)。(长,默认:30000)spring-doc.cadn.net.cn

http.request.url 表达式

针对传入消息的 SpEL 表达式,用于确定要使用的 URL。(表达式,默认值:<none>)spring-doc.cadn.net.cn

6.7. 图像识别处理器

使用 Inception 模型对 在实时图像中分为不同的类别(例如标签)。spring-doc.cadn.net.cn

模型实现深度卷积神经网络,可以在硬视觉识别任务上实现合理的性能 - 在图像识别等某些领域与人类的表现相当或超过人类。spring-doc.cadn.net.cn

模型的输入是作为二进制数组的图像。spring-doc.cadn.net.cn

输出是以下格式的 JSON 消息:spring-doc.cadn.net.cn

{
  "labels" : [
     {"giant panda":0.98649305}
  ]
}

结果包含已识别类别的名称(例如标签)以及图像代表此类别的置信度(例如置信度)。spring-doc.cadn.net.cn

如果response-seize设置为高于 1 的值,则结果将包括顶部response-seize可能的标签。例如response-size=3将返回:spring-doc.cadn.net.cn

{
  "labels": [
    {"giant panda":0.98649305},
    {"badger":0.010562794},
    {"ice bear":0.001130851}
  ]
}
有效载荷

如果传入类型为byte[]并将内容类型设置为application/octet-stream,则应用程序处理输入byte[]图像输入并输出增强byte[]图像有效负载和 JSON 标头。spring-doc.cadn.net.cn

6.7.2. 选项

image.recognition.cache-model

缓存预训练的 TensorFlow 模型。(布尔值,默认:true)spring-doc.cadn.net.cn

image.recognition.debug-output

<缺少文档>(布尔值,默认:false)spring-doc.cadn.net.cn

image.recognition.debug-output-path

<缺少文档>(字符串,默认:image-recognition-result.png)spring-doc.cadn.net.cn

image.recognition.model(图像识别模型)

预训练的 TensorFlow 图像识别模型。请注意,模型必须与所选模型类型匹配!(字符串,默认:https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb)spring-doc.cadn.net.cn

image.recognition.model-type

支持三种不同的预训练 tensorflow 图像识别模型:Inception、MobileNetV1 和 MobileNetV2 1。Inception 图使用“输入”作为输入,使用“输出”作为输出。2. MobileNetV2 预训练模型:https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - 归一化图像大小始终为正方形(例如 H=W) - 图形使用“input”作为输入,使用“MobilenetV2/Predictions/Reshape_1”作为输出。3. MobileNetV1 预训练模型:https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - graph 使用“input”作为输入,“MobilenetV1/Predictions/Reshape_1”作为输出。(ModelType,默认:<none>,可能的值:inception,mobilenetv1,mobilenetv2)spring-doc.cadn.net.cn

image.recognition.normalized-image-size

标准化图像大小。(整数,默认:224)spring-doc.cadn.net.cn

image.recognition.response-size

识别的图像数量。(整数,默认:5)spring-doc.cadn.net.cn

6.8. 对象检测处理器

对象检测处理器为 TensorFlow 对象检测 API 提供开箱即用的支持。它允许实时定位和识别单个图像或图像流中的多个对象。对象检测处理器建立在对象检测功能之上。spring-doc.cadn.net.cn

您必须向处理器提供预训练的对象检测模型和相应的对象标签spring-doc.cadn.net.cn

以下是一些合理的配置默认值:spring-doc.cadn.net.cn

下图显示了一个 Spring Cloud Data Flow,流管道,它实时预测输入图像流中的对象类型。spring-doc.cadn.net.cn

scdf tensorflow object detection arch

处理器的输入是一个图像字节数组,输出是一个增强图像和一个标头,称为detected_objects,提供检测到的对象的文本描述:spring-doc.cadn.net.cn

{
  "labels" : [
     {"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
     {"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
     {"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
     {"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
  ]
}

detected_objects标头格式为:spring-doc.cadn.net.cn

  • object-nameconfidence - 检测到的对象(例如标签)的人类可读名称,其置信度为 [0-1] 之间的浮点数spring-doc.cadn.net.cn

  • x1y1x2y2 - 响应还提供检测到的对象的边界框,表示为(x1, y1, x2, y2). 坐标是相对于图像大小的。spring-doc.cadn.net.cn

  • cid - 提供的标签配置文件中定义的分类标识符。spring-doc.cadn.net.cn

有效载荷

传入类型为byte[],内容类型为application/octet-stream. 处理器处理输入byte[]图像并输出增强的byte[]图像有效负载和 JSON 标头 (detected_objects).spring-doc.cadn.net.cn

6.8.2. 选项

对象.检测.缓存模型

<缺少文档>(布尔值,默认:true)spring-doc.cadn.net.cn

对象.检测.置信度

<缺少文档>(浮点,默认:0.4)spring-doc.cadn.net.cn

object.detection.debug-output

<缺少文档>(布尔值,默认:false)spring-doc.cadn.net.cn

对象.检测.调试输出路径

<缺少文档>(字符串,默认:object-detection-result.png)spring-doc.cadn.net.cn

对象.检测.标签

标记 URI。(字符串,默认:https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt)spring-doc.cadn.net.cn

对象.检测.模型

预训练的 TensorFlow 对象检测模型。(字符串,默认:https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb)spring-doc.cadn.net.cn

对象.检测.响应大小

<缺少文档>(整数,默认:<none>)spring-doc.cadn.net.cn

对象.detection.with-masks

<缺少文档>(布尔值,默认:false)spring-doc.cadn.net.cn

6.9. 语义分割处理器

基于最先进的 DeepLab Tensorflow 模型的图像语义分割。spring-doc.cadn.net.cn

Semantic Segmentation是将图像的每个像素与类标签(例如花、人、道路、天空、海洋或汽车)相关联的过程。 与Instance Segmentation,它会生成实例感知区域掩码,则Semantic Segmentation生成类感知掩码。 用于实施Instance Segmentation请改为咨询对象检测服务spring-doc.cadn.net.cn

Semantic Segmentation Processor使用语义分割函数库和 TensorFlow 服务spring-doc.cadn.net.cn

有效载荷

传入类型为byte[],内容类型为application/octet-stream. 处理器处理输入byte[]图像和输出增强byte[]图像有效负载和 JSON 标头。spring-doc.cadn.net.cn

处理器的输入是一个图像字节数组,输出是一个增强的图像字节数组和一个 JSON 标头semantic_segmentation在这种格式中:spring-doc.cadn.net.cn

[
    [ 0, 0, 0 ],
    [ 127, 127, 127 ],
    [ 255, 255, 255 ]
    ...
]

输出标头 json 格式表示从输入图像计算的颜色像素贴图。spring-doc.cadn.net.cn

6.9.2. 选项

semantic.segmentation.color-map-uri

每个预训练模型都基于某些对象颜色图。预定义的选项是: - classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json(字符串,默认:classpath:/colormap/citymap_colormap.json)spring-doc.cadn.net.cn

semantic.segmentation.debug-output

保存本地 debugOutputPath 路径中的输出映像。(布尔值,默认:false)spring-doc.cadn.net.cn

semantic.segmentation.debug-output-path

<缺少文档>(字符串,默认:semantic-segmentation-result.png)spring-doc.cadn.net.cn

semantic.segmentation.mask-transparency

计算的分割掩码图像的 alpha 颜色。(浮点,默认:0.45)spring-doc.cadn.net.cn

语义.segmentation.model

预训练的 TensorFlow 语义分割模型。(字符串,默认:https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb)spring-doc.cadn.net.cn

semantic.segmentation.output-type

指定输出图像类型。您可以返回带有计算的掩码叠加的输入图像,也可以单独返回掩码。(OutputType,默认:<none>,可能的值:blended,mask)spring-doc.cadn.net.cn

6.10. 脚本处理器

使用脚本转换消息的处理器。脚本正文直接提供 作为属性值。可以指定脚本的语言(groovy/javascript/ruby/python)。spring-doc.cadn.net.cn

6.10.1. 选项

脚本处理器处理器具有以下选项:spring-doc.cadn.net.cn

脚本处理器.language

script 属性中文本的语言。支持:groovy、javascript、ruby、python。(字符串,默认:<none>)spring-doc.cadn.net.cn

脚本处理器.script

脚本文本。(字符串,默认:<none>)spring-doc.cadn.net.cn

脚本处理器.变量

变量绑定为换行符,以名称-值对为分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<none>)spring-doc.cadn.net.cn

脚本处理器.变量位置

包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

6.11. 拆分器处理器

拆分器应用程序建立在 Spring Integration 中同名的概念之上,并允许将单个消息拆分为多个不同的消息。 处理器使用一个函数,该函数接受Message<?>作为输入,然后生成一个List<Message<?>作为基于各种属性的输出(见下文)。 您可以使用 SpEL 表达式或分隔符来指定要如何拆分传入消息。spring-doc.cadn.net.cn

有效载荷

如果传入类型为byte[]并将内容类型设置为text/plainapplication/json,则应用程序将byte[]String.spring-doc.cadn.net.cn

6.11.2. 选项

splitter.apply-sequence

在标头中添加相关性/序列信息,以方便以后聚合。(布尔值,默认:true)spring-doc.cadn.net.cn

splitter.charset

将基于文本的文件中的字节转换为 String 时要使用的字符集。(字符串,默认:<none>)spring-doc.cadn.net.cn

splitter.delimiters

当表达式为 null 时,标记 {@link String} 有效负载时要使用的分隔符。(字符串,默认:<none>)spring-doc.cadn.net.cn

splitter.expression

用于拆分有效负载的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

splitter.文件标记

设置为 true 或 false 以使用包含(或不包含)文件开头/结尾标记的 {@code FileSplitter}(按行拆分基于文本的文件)。(布尔值,默认:<none>)spring-doc.cadn.net.cn

splitter.markers-json

当 'fileMarkers == true' 时,指定它们是否应生成为 FileSplitter.FileMarker 对象或 JSON。(布尔值,默认:true)spring-doc.cadn.net.cn

6.12. 变换处理器

Transformer 处理器允许您根据 SpEL 表达式转换消息有效负载结构。spring-doc.cadn.net.cn

下面是如何运行此应用程序的示例。spring-doc.cadn.net.cn

java -jar filter-processor-kafka-<version>.jar --spel.function.expression=toUpperCase()spring-doc.cadn.net.cn

如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。spring-doc.cadn.net.cn

有效载荷

传入消息可以包含任何类型的有效负载。spring-doc.cadn.net.cn

6.12.2. 选项

spel.function.expression

要应用的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

6.13. Twitter 趋势和趋势位置处理器

可以返回趋势主题或趋势主题的位置的处理器。 这twitter.trend.trend-query-type属性允许来选择查询类型。spring-doc.cadn.net.cn

对于此模式集twitter.trend.trend-query-typetrend.spring-doc.cadn.net.cn

基于趋势 API 的处理器。 返回特定纬度、经度位置附近的趋势主题spring-doc.cadn.net.cn

6.13.2. 检索趋势位置

对于此模式集twitter.trend.trend-query-typetrendLocation.spring-doc.cadn.net.cn

按位置检索热门主题的完整或附近位置列表。spring-doc.cadn.net.cn

如果latitude,longitude参数不提供处理器执行可用趋势 API 并返回 Twitter 具有趋势主题信息的位置。spring-doc.cadn.net.cn

如果latitude,longitude参数,则处理器执行 Trends Closest API 并返回 Twitter 具有最接近指定位置的趋势主题信息的位置。spring-doc.cadn.net.cn

Response 是locations对该位置的 WOEID 和其他一些人类可读信息(例如该位置所属的规范名称和国家/地区)进行编码。spring-doc.cadn.net.cn

6.13.3. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

推特.trend.closest
纬度

如果提供了长参数,则可用的趋势位置将按距离(从最近到最远)到坐标对进行排序。经度的有效范围为 -180.0 到 +180.0(西为负,东为正)(含)。(表达式,默认值:<none>)spring-doc.cadn.net.cn

离子

如果提供了纬度参数,则可用的趋势位置将按距离(从最近到最远)到坐标对进行排序。经度的有效范围为 -180.0 到 +180.0(西为负,东为正)(含)。(表达式,默认值:<none>)spring-doc.cadn.net.cn

推特.trend
位置 ID

雅虎!要返回趋势信息的位置的地球上 ID。使用 1 作为 WOEID 即可获得全局信息。(表达式,默认值:payload)spring-doc.cadn.net.cn

趋势查询类型

<缺少文档>(TrendQueryType,默认:<none>,可能的值:trend,trendLocation)spring-doc.cadn.net.cn

7. 水槽

7.1. 卡桑德拉水槽

此接收器应用程序将收到的每条消息的内容写入 Cassandra。spring-doc.cadn.net.cn

它需要 JSON 字符串的有效负载,并使用其属性映射到表列。spring-doc.cadn.net.cn

有效载荷

表示要持久化的实体(或实体列表)的 JSON 字符串或字节数组。spring-doc.cadn.net.cn

7.1.2. 选项

cassandra 水槽具有以下选项:spring-doc.cadn.net.cn

spring.data.cassandra.cluster-name

<缺少文档>(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.compression

Cassandra 二进制协议支持压缩。(压缩,默认:none,可能的值:LZ4,SNAPPY,NONE)spring-doc.cadn.net.cn

spring.data.cassandra.connect-timeout

<缺少文档>(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.consistency-level

<缺少文档>(DefaultConsistencyLevel,默认:<none>,可能的值:ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_ONE,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL)spring-doc.cadn.net.cn

spring.data.cassandra.contact-points

集群节点地址格式为“host:port”,或简单的“host”,以使用配置的端口。(List<String>,默认值:[127.0.0.1:9042])spring-doc.cadn.net.cn

spring.data.cassandra.fetch-size

<缺少文档>(整数,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.keyspace-name

要使用的键空间名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.local-datacenter

被视为“本地”的数据中心。联系点应来自此数据中心。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.密码

服务器的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.port

如果触点未指定一个端口,则要使用的端口。(整数,默认:9042)spring-doc.cadn.net.cn

spring.data.cassandra.read-timeout

<缺少文档>(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.schema-action

启动时要执行的架构作。(字符串,默认:none)spring-doc.cadn.net.cn

spring.data.cassandra.serial-consistency-level

<缺少文档>(DefaultConsistencyLevel,默认:<none>,可能的值:ANY,ONE,TWO,THREE,QUORUM,ALL,LOCAL_ONE,LOCAL_QUORUM,EACH_QUORUM,SERIAL,LOCAL_SERIAL)spring-doc.cadn.net.cn

spring.data.cassandra.会话名称

Cassandra 会话的名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.ssl

启用 SSL 支持。(布尔值,默认:false)spring-doc.cadn.net.cn

spring.data.cassandra.用户名

服务器的登录用户。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.2. 分析接收器

Sink 应用程序,构建在 Analytics 使用者之上,它根据输入消息计算分析,并将分析作为指标发布到各种监控系统。它利用千分尺库在最流行的监控系统中提供统一的编程体验,并公开了 Spring 表达式语言 (SpEL) 属性,用于定义如何从输入数据计算指标名称、值和标签。spring-doc.cadn.net.cn

分析接收器可以生成两种指标类型:spring-doc.cadn.net.cn

  • 计数器 - 报告单个指标,即计数,该指标按固定的正量递增。计数器可用于计算数据随时间变化的速率。spring-doc.cadn.net.cn

  • 仪表 - 报告当前值。仪表的典型示例是集合或映射的大小或处于运行状态的线程数。spring-doc.cadn.net.cn

仪表(例如计数器或仪表)由其唯一标识namedimensions(术语“尺寸”和“标记”可以互换使用)。维度允许对特定命名指标进行切片,以向下钻取和推理数据。spring-doc.cadn.net.cn

作为指标,由其namedimensions,您可以为每个指标分配多个标签(e.g. key/值对),但之后不能随机更改这些标签!Prometheus 等监控系统会抱怨,如果具有相同名称的指标具有不同的标签集。

使用analytics.nameanalytics.name-expression属性 设置输出分析指标的名称。如果未设置,则指标名称默认为应用程序名称。spring-doc.cadn.net.cn

使用analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>,属性,用于向指标添加一个或多个标记。这TAG_NAME属性定义中使用的将在指标中显示为标记名称。TAG_VALUE是一个SpEL从传入消息动态计算标记值的表达式。spring-doc.cadn.net.cn

SpEL表达式使用headerspayload关键字来访问邮件的标头和有效负载值。spring-doc.cadn.net.cn

您可以使用文字(例如'fixed value') 设置具有固定值的标签。

所有 Stream 应用程序都支持,盒子的 ouf,三个最流行的监控系统,Wavefront,PrometheusInfluxDB并且您可以以声明方式启用它们中的每一个。 只需将 micrometer meter-registry 依赖项添加到Analytics Sink应用。spring-doc.cadn.net.cn

请访问 Spring Cloud Data Flow Stream Monitoring 以获取配置监控系统的详细说明。以下快速片段可以帮助您入门。spring-doc.cadn.net.cn

management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
如果启用了数据流服务器监控,则Analytics Sink将重用提供的指标配置。

下图说明了如何Analytics Sink帮助收集企业内部信息,用于证券交易所,实时管道。spring-doc.cadn.net.cn

Analytics Architecture
有效载荷

传入消息可以包含任何类型的有效负载。spring-doc.cadn.net.cn

7.2.2. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

分析学
金额表达式

用于计算输出指标值(例如金额)的 SpEL 表达式。默认为 1.0(表达式,默认值:<none>)spring-doc.cadn.net.cn

仪表型

千分尺计类型,用于向后端报告指标。(MeterType,默认:<none>,可能的值:counter,gauge)spring-doc.cadn.net.cn

名称

输出指标的名称。'name' 和 'nameExpression' 是互斥的。只能设置其中一个。(字符串,默认:<none>)spring-doc.cadn.net.cn

名称表达式

一个 SpEL 表达式,用于从输入消息计算输出指标名称。'name' 和 'nameExpression' 是互斥的。只能设置其中一个。(表达式,默认值:<none>)spring-doc.cadn.net.cn

analytics.tag
表达

根据 SpEL 表达式计算标签。单个 SpEL 表达式可以生成值数组,这反过来又意味着不同的名称/值标签。每个名称/值标签都会产生单独的仪表增量。代码表达式格式为:analytics.tag.expression。[标签名称]=[SpEL表达式](Map<String, Expression>,默认值:<none>)spring-doc.cadn.net.cn

固定

已弃用:请将 analytics.tag.expression 与文字 SpEL 表达式一起使用。自定义、固定标签。这些标记具有常量值,创建一次,然后与每个已发布的指标一起发送。定义固定标记的约定为:<code>analytics.tag.fixed。[标签名称]=[标签值] </code>(Map<String, String>,默认:<none>)spring-doc.cadn.net.cn

7.3. Elasticsearch Sink

将文档索引到 Elasticsearch 中的 Sink。spring-doc.cadn.net.cn

此 Elasticsearch 接收器仅支持索引 JSON 文档。 它使用来自输入目标的数据,然后将其索引到 Elasticsearch。 输入数据可以是纯 json 字符串,也可以是java.util.Map表示 JSON。 它还接受 Elasticsearch 提供的数据XContentBuilder. 但是,这是一种罕见的情况,因为中间件不太可能将记录保留为XContentBuilder. 这主要用于直接调用消费者。spring-doc.cadn.net.cn

7.3.1. 选项

Elasticsearch 接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

弹性搜索.consumer
异步

指示索引作是否异步。默认情况下,索引是同步完成的。(布尔值,默认:false)spring-doc.cadn.net.cn

批量大小

每个请求要索引的项数。默认为 1。对于大于 1 的值,将使用批量索引 API。(整数,默认:1)spring-doc.cadn.net.cn

组超时

超时(以毫秒为单位),之后在批量索引处于活动状态时刷新消息组。它默认为 -1,这意味着不会自动刷新空闲消息组。(长,默认:-1)spring-doc.cadn.net.cn

id

要编制索引的文档的 ID。如果设置,则INDEX_ID标头值将基于每条消息覆盖此属性。(表达式,默认值:<none>)spring-doc.cadn.net.cn

指数

索引的名称。如果设置,则INDEX_NAME标头值将基于每条消息覆盖此属性。(字符串,默认:<none>)spring-doc.cadn.net.cn

路由

指示要路由到的分片。如果未提供,Elasticsearch 将默认为文档 ID 的哈希值。(字符串,默认:<none>)spring-doc.cadn.net.cn

超时秒

分片可用的超时。如果未设置,则默认为 Elasticsearch 客户端设置的 1 分钟。(长,默认:0)spring-doc.cadn.net.cn

spring.elasticsearch.rest
连接超时

连接超时。(持续时间,默认:1s)spring-doc.cadn.net.cn

密码

凭据密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

读取超时

读取超时。(持续时间,默认:30s)spring-doc.cadn.net.cn

尿素

要使用的 Elasticsearch 实例的逗号分隔列表。(List<String>,默认值:[http://localhost:9200])spring-doc.cadn.net.cn

用户名

凭据用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.3.2. 运行此接收器的示例

  1. 从文件夹elasticsearch-sink:./mvnw clean packagespring-doc.cadn.net.cn

  2. CD 应用程序spring-doc.cadn.net.cn

  3. cd 到正确的 Binder 生成的应用程序(Kafka 或 RabbitMQ)spring-doc.cadn.net.cn

  4. ./mvnw clean packagespring-doc.cadn.net.cn

  5. 确保您正在运行 Elasticsearch。例如,您可以使用以下命令将其作为 docker 容器运行。docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2spring-doc.cadn.net.cn

  6. 如果中间件尚未运行,请启动中间件(Kafka 或 RabbitMQ)。spring-doc.cadn.net.cn

  7. java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testingspring-doc.cadn.net.cn

  8. 将一些 JSON 数据发送到中间件目标。例如:"foo":"bar"}spring-doc.cadn.net.cn

  9. 验证数据是否已编制索引:curl localhost:9200/testing/_searchspring-doc.cadn.net.cn

7.4. 文件接收器

文件接收器应用将其收到的每条消息写入文件。spring-doc.cadn.net.cn

有效载荷

7.4.2. 选项

file-sink具有以下选项:spring-doc.cadn.net.cn

文件.消费者.二进制文件

一个标志,指示是否应禁止在写入后添加换行符。(布尔值,默认:false)spring-doc.cadn.net.cn

file.consumer.charset

写入文本内容时要使用的字符集。(字符串,默认:UTF-8)spring-doc.cadn.net.cn

文件.消费者.目录

目标文件的父目录。(文件,默认:<none>)spring-doc.cadn.net.cn

文件.消费者.目录表达式

要计算目标文件的父目录的表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

文件.消费者.mode

如果目标文件已存在,则要使用的 FileExistsMode。(FileExistsMode,默认:<none>,可能的值:APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)spring-doc.cadn.net.cn

file.consumer.name

目标文件的名称。(字符串,默认:file-consumer)spring-doc.cadn.net.cn

文件.消费者.名称表达式

要计算目标文件名称的表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

file.consumer.suffix

要附加到文件名的后缀。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

7.5. FTP 接收器

FTP 接收器是将文件从传入邮件推送到 FTP 服务器的简单选项。spring-doc.cadn.net.cn

它使用ftp-outbound-adapter,因此传入消息可以是java.io.File对象,一个String(文件内容) 或数组bytes(文件内容也是如此)。spring-doc.cadn.net.cn

要使用此接收器,您需要一个用户名和一个密码才能登录。spring-doc.cadn.net.cn

默认情况下,Spring Integration 将使用o.s.i.file.DefaultFileNameGenerator如果未指定任何内容。DefaultFileNameGenerator将确定文件名 基于file_name标头(如果存在)在MessageHeaders,或者如果Message已经是一个java.io.File,那么它会 使用该文件的原始名称。
有效载荷

7.5.3. 输出

N/A(写入 FTP 服务器)。spring-doc.cadn.net.cn

7.5.4. 选项

ftp 接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

ftp.consumer 的
自动创建目录

是否创建远程目录。(布尔值,默认:true)spring-doc.cadn.net.cn

文件名表达式

用于生成远程文件名的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

模式

如果远程文件已存在,则要执行的作。(FileExistsMode,默认:<none>,可能的值:APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)spring-doc.cadn.net.cn

远程目录

远程 FTP 目录。(字符串,默认:/)spring-doc.cadn.net.cn

远程文件分隔符

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

临时远程目录

如果 '#isUseTemporaryFilename()' 为 true,则将写入文件的临时目录。(字符串,默认:/)spring-doc.cadn.net.cn

tmp-文件后缀

传输过程中要使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

使用临时文件名

是否写入临时文件并重命名。(布尔值,默认:true)spring-doc.cadn.net.cn

ftp.工厂
缓存会话

缓存会话。(布尔值,默认:<none>)spring-doc.cadn.net.cn

客户端模式

用于 FTP 会话的客户端模式。(ClientMode,默认:<none>,可能的值:ACTIVE,PASSIVE)spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

用于连接到服务器的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:21)spring-doc.cadn.net.cn

用户名

用于连接到服务器的用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.6. 晶洞水槽

Geode 接收器会将消息内容写入 Geode 区域。spring-doc.cadn.net.cn

7.6.1. 选项

晶洞接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

geode.消费者
json

指示 Geode 区域是否将 json 对象存储为 PdxInstance。(布尔值,默认:false)spring-doc.cadn.net.cn

键表达式

SpEL 表达式以用作缓存键。(字符串,默认:<none>)spring-doc.cadn.net.cn

geode.池
连接类型

指定连接类型:“服务器”或“定位器”。(ConnectType,默认:<none>,可能的值:locator,server)spring-doc.cadn.net.cn

主机地址

指定一个或多个格式为 [host]:[port] 的 Gemfire 定位器或服务器地址。(InetSocketAddress[],默认:<none>)spring-doc.cadn.net.cn

已启用订阅

设置为 true 以启用客户端池的订阅。需要将更新同步到客户端缓存。(布尔值,默认:false)spring-doc.cadn.net.cn

geode.region
区域名称

区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

geode.安全
密码

缓存密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

缓存用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

geode.security.ssl (地理洞.安全.ssl)
密码

将用于安全套接字连接的 SSL 密码配置为有效密码名称的数组。(字符串,默认:any)spring-doc.cadn.net.cn

密钥存储类型

标识用于 SSL 通信的密钥库类型(例如 JKS、PKCS11 等)。(字符串,默认:JKS)spring-doc.cadn.net.cn

密钥存储 uri

用于连接到 Geode 集群的预先创建的密钥库 URI 的位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

SSL-密钥存储密码

用于访问密钥信任库的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

SSL-信任存储密码

用于访问信任存储的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

信任存储类型

标识用于 SSL 通信的信任库类型(例如 JKS、PKCS11 等)。(字符串,默认:JKS)spring-doc.cadn.net.cn

信任存储 uri

用于连接到 Geode 集群的预先创建的信任库 URI 的位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

用户主目录

本地目录,用于缓存从 truststoreUri 和 keystoreUri 位置下载的信任库和密钥库文件。(字符串,默认:user.home)spring-doc.cadn.net.cn

7.7. JDBC Sink

JDBC 接收器允许您将传入的有效负载持久化到 RDBMS 数据库中。spring-doc.cadn.net.cn

jdbc.consumer.columns属性表示COLUMN_NAME[:EXPRESSION_FOR_VALUE]哪里EXPRESSION_FOR_VALUE(连同冒号)是可选的。 在这种情况下,值是通过生成的表达式计算的,例如payload.COLUMN_NAME,因此这样我们就可以从对象属性直接映射到表列。 例如,我们有一个 JSON 有效负载,如下所示:spring-doc.cadn.net.cn

{
  "name": "My Name",
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

因此,我们可以将其插入到表格中name,citystreet结构使用配置:spring-doc.cadn.net.cn

--jdbc.consumer.columns=name,city:address.city,street:address.street

此接收器支持批量插入,只要基础 JDBC 驱动程序支持。 批次插入通过batch-sizeidle-timeout性能: 传入消息将聚合到batch-size消息存在,然后作为批处理插入。 如果idle-timeout毫秒过去没有新消息,则插入聚合的批次,即使它小于batch-size,限制最大延迟。spring-doc.cadn.net.cn

该模块还使用 Spring Boot 的 DataSource 支持来配置数据库连接,因此像spring.datasource.url 等适用

7.7.1. 示例

java -jar jdbc-sink.jar --jdbc.consumer.tableName=names --jdbc.consumer.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
有效载荷

7.7.2. 选项

jdbc 接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

jdbc.consumer
批量大小

将数据刷新到数据库表时消息数的阈值。(整数,默认:1)spring-doc.cadn.net.cn

逗号分隔了基于冒号的列名和 SpEL 表达式对,用于插入/更新的值。初始化时使用名称来发出 DDL。(字符串,默认:payload:payload.toString())spring-doc.cadn.net.cn

空闲超时

数据自动刷新到数据库表时的空闲超时(以毫秒为单位)。(长,默认:-1)spring-doc.cadn.net.cn

初始化

'true'、'false' 或表的自定义初始化脚本的位置。(字符串,默认:false)spring-doc.cadn.net.cn

表名

要写入的表的名称。(字符串,默认:messages)spring-doc.cadn.net.cn

spring.datasource
数据

数据 (DML) 脚本资源引用。(List<String>,默认值:<none>)spring-doc.cadn.net.cn

驱动程序类名称

JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

初始化模式

确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化时要应用的模式。(DataSourceInitializationMode,默认:embedded,可能的值:ALWAYS,EMBEDDED,NEVER)spring-doc.cadn.net.cn

密码

数据库的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

图式

架构 (DDL) 脚本资源引用。(List<String>,默认值:<none>)spring-doc.cadn.net.cn

网址

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

数据库的登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.8. 原木汇

logSink 使用应用程序记录器输出数据以供检查。spring-doc.cadn.net.cn

敬请谅解log接收器使用无类型处理程序,这会影响实际日志记录的执行方式。 这意味着,如果内容类型是文本的,则原始有效负载字节将转换为 String,否则将记录原始字节。 请参阅用户指南中的更多信息。spring-doc.cadn.net.cn

7.8.1. 选项

日志接收器具有以下选项:spring-doc.cadn.net.cn

日志表达式

一个 SpEL 表达式(针对传入消息)以计算为记录的消息。(字符串,默认:payload)spring-doc.cadn.net.cn

日志级别

记录消息的级别。(级别,默认:<none>,可能的值:FATAL,ERROR,WARN,INFO,DEBUG,TRACE)spring-doc.cadn.net.cn

log.name

要使用的记录器的名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.9. MongoDB 接收器

此接收器应用程序将传入数据摄取到 MongoDB 中。此应用程序完全基于MongoDataAutoConfiguration,因此请参阅 Spring Boot MongoDB 支持 了解更多信息。spring-doc.cadn.net.cn

7.9.1. 输入

7.9.2. 选项

mongodb 接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

mongodb.consumer
收集

用于存储数据的 MongoDB 集合。(字符串,默认:<none>)spring-doc.cadn.net.cn

集合表达式

用于评估 MongoDB 集合的 SpEL 表达式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

spring.data.mongodb
身份验证数据库

身份验证数据库名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

自动索引创建

是否启用自动索引创建。(布尔值,默认:<none>)spring-doc.cadn.net.cn

数据库

数据库名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

字段命名策略

要使用的 FieldNamingStrategy 的完全限定名称。(Class<?>,默认:<none>)spring-doc.cadn.net.cn

网格 FS 数据库

GridFS 数据库名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

主机

Mongo 服务器主机。无法使用 URI 进行设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

mongo 服务器的登录密码。不能使用 URI 设置。(Character[],默认:<none>)spring-doc.cadn.net.cn

端口

Mongo 服务器端口。无法使用 URI 设置。(整数,默认:<none>)spring-doc.cadn.net.cn

副本集名称

集群所需的副本集名称。无法使用 URI 进行设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

URI

Mongo 数据库 URI。不能使用主机、端口、凭据和副本集名称进行设置。(字符串,默认:mongodb://localhost/test)spring-doc.cadn.net.cn

用户名

mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uuid-表示

将 UUID 转换为 BSON 二进制值时使用的表示。(UuidRepresentation,默认:java-legacy,可能的值:UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)spring-doc.cadn.net.cn

7.10. MQTT 接收器

该模块向 MQTT 发送消息。spring-doc.cadn.net.cn

有效载荷:

7.10.2. 选项

mqtt 接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

MQTT
清洁会话

客户端和服务器是否应记住重新启动和重新连接的状态。(布尔值,默认:true)spring-doc.cadn.net.cn

连接超时

连接超时(以秒为单位)。(整数,默认:30)spring-doc.cadn.net.cn

保持活动间隔

ping间隔(以秒为单位)。(整数,默认:60)spring-doc.cadn.net.cn

密码

连接到代理时要使用的密码。(字符串,默认:guest)spring-doc.cadn.net.cn

坚持

'内存'或'文件'。(字符串,默认:memory)spring-doc.cadn.net.cn

持久性目录

持久性目录。(字符串,默认:/tmp/paho)spring-doc.cadn.net.cn

网址

MQTT 代理的位置(逗号分隔的列表)。(String[],默认:[tcp://localhost:1883])spring-doc.cadn.net.cn

用户名

连接到代理时要使用的用户名。(字符串,默认:guest)spring-doc.cadn.net.cn

mqtt.consumer 的
异步

是否使用异步发送。(布尔值,默认:false)spring-doc.cadn.net.cn

字符集

用于将 String 有效负载转换为 byte[] 的字符集。(字符串,默认:UTF-8)spring-doc.cadn.net.cn

客户端 ID

标识客户端。(字符串,默认:stream.client.id.sink)spring-doc.cadn.net.cn

QOS

使用的服务质量。(整数,默认:1)spring-doc.cadn.net.cn

保留

是否设置“保留”标志。(布尔值,默认:false)spring-doc.cadn.net.cn

主题

接收器将发布到的主题。(字符串,默认:stream.mqtt)spring-doc.cadn.net.cn

7.11. Pgcopy 接收器

使用 PostgreSQL COPY 命令将其传入有效负载写入 RDBMS 的模块。spring-doc.cadn.net.cn

7.11.1. 输入

有效载荷

列表达式将根据消息进行计算,并且表达式通常仅与一种类型(例如 Map 或 bean 等)兼容。spring-doc.cadn.net.cn

7.11.2. 输出

7.11.3. 选项

jdbc 接收器具有以下选项:spring-doc.cadn.net.cn

spring.datasource.driver-class-name

JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.datasource.password

数据库的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.datasource.url

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.datasource.用户名

数据库的登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

该模块还使用 Spring Boot 的 DataSource 支持来配置数据库连接,因此像spring.datasource.url 等适用

7.11.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

要运行集成测试,请在本地主机上启动 PostgreSQL 数据库:spring-doc.cadn.net.cn

    docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest

7.11.5. 示例

java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

7.12. RabbitMQ Sink

该模块向 RabbitMQ 发送消息。spring-doc.cadn.net.cn

7.12.1. 选项

子水槽有以下选项:spring-doc.cadn.net.cn

(有关 RabbitMQ 连接属性,请参阅 Spring Boot 文档)spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

转换器 bean-name

自定义消息转换器的 bean 名称;如果省略,则使用 SimpleMessageConverter。如果是 'jsonConverter',则将为您创建一个 Jackson2JsonMessageConverter bean。(字符串,默认:<none>)spring-doc.cadn.net.cn

交换

Exchange 名称 - 由 exchangeNameExpression(如果提供)覆盖。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

交换表达式

计算结果为交换名称的 SpEL 表达式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

标头映射最后

映射出站邮件的标头时,请确定是在转换邮件之前还是之后映射标头。(布尔值,默认:true)spring-doc.cadn.net.cn

映射请求标头

将要映射的标头。(String[],默认:[*])spring-doc.cadn.net.cn

自己的连接

如果为 true,请根据启动属性使用单独的连接。(布尔值,默认:false)spring-doc.cadn.net.cn

持久交付模式

“amqp_deliveryMode”标头不存在时的默认传递模式,对于 PERSISTENT。(布尔值,默认:false)spring-doc.cadn.net.cn

路由键

路由键 - 由 routingKeyExpression(如果提供)覆盖。(字符串,默认:<none>)spring-doc.cadn.net.cn

路由键表达式

计算结果为路由键的 SpEL 表达式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

spring.rabbitmq
地址

客户端应连接到的地址的逗号分隔列表。设置后,将忽略主机和端口。(字符串,默认:<none>)spring-doc.cadn.net.cn

连接超时

连接超时。将其设置为零以永久等待。(持续时间,默认:<none>)spring-doc.cadn.net.cn

主机

RabbitMQ 主机。如果设置了地址,则忽略。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

登录以对代理进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

端口

RabbitMQ 端口。如果设置了地址,则忽略。默认为 5672,如果启用了 SSL,则默认为 5671。(整数,默认:<none>)spring-doc.cadn.net.cn

发布者确认类型

确认使用的发布者类型。(ConfirmType,默认:<none>,可能的值:SIMPLE,CORRELATED,NONE)spring-doc.cadn.net.cn

发布者返回

是否启用发布商退货。(布尔值,默认:false)spring-doc.cadn.net.cn

请求通道最大值

客户端请求的每个连接的通道数。使用 0 表示无限制。(整数,默认:2047)spring-doc.cadn.net.cn

请求心跳

请求的检测信号超时;零表示无。如果未指定持续时间后缀,则将使用秒。(持续时间,默认:<none>)spring-doc.cadn.net.cn

用户名

登录用户以向代理进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

虚拟主机

连接到代理时要使用的虚拟主机。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.13. Redis 接收器

向 Redis 发送消息。spring-doc.cadn.net.cn

7.13.1. 选项

Redis 接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

redis.consumer
钥匙

存储到键时要使用的文字键名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

键表达式

用于存储到键的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

队列

存储在队列中时要使用的文字队列名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

队列表达式

用于队列的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

主题

发布到主题时要使用的文字主题名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

主题表达式

用于主题的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.redis 的
客户端名称

要在与 CLIENT SETNAME 的连接上设置的客户端名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

数据库

连接工厂使用的数据库索引。(整数,默认:0)spring-doc.cadn.net.cn

主机

Redis 服务器主机。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

redis 服务器的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

Redis 服务器端口。(整数,默认:6379)spring-doc.cadn.net.cn

SSL的

是否启用 SSL 支持。(布尔值,默认:false)spring-doc.cadn.net.cn

超时

连接超时。(持续时间,默认:<none>)spring-doc.cadn.net.cn

网址

连接 URL。覆盖主机、端口和密码。用户被忽略。示例:redis://user:[email protected]:6379(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.redis.jedis.池
最大活动

池在给定时间可以分配的最大连接数。使用负值表示无限制。(整数,默认:8)spring-doc.cadn.net.cn

最大空闲

池中“空闲”连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认:8)spring-doc.cadn.net.cn

最大等待

在池耗尽时引发异常之前,连接分配应阻止的最长时间。使用负值无限期阻止。(持续时间,默认:-1ms)spring-doc.cadn.net.cn

最小空闲

在池中维护的最小空闲连接数的目标。仅当此设置和逐出运行之间的时间均为正时,此设置才有效。(整数,默认:0)spring-doc.cadn.net.cn

驱逐运行之间的时间

空闲对象 evictor 线程运行之间的时间。当为正时,空闲对象 evictor 线程将启动,否则不会执行空闲对象逐出。(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.redis.lettuce.pool
最大活动

池在给定时间可以分配的最大连接数。使用负值表示无限制。(整数,默认:8)spring-doc.cadn.net.cn

最大空闲

池中“空闲”连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认:8)spring-doc.cadn.net.cn

最大等待

在池耗尽时引发异常之前,连接分配应阻止的最长时间。使用负值无限期阻止。(持续时间,默认:-1ms)spring-doc.cadn.net.cn

最小空闲

在池中维护的最小空闲连接数的目标。仅当此设置和逐出运行之间的时间均为正时,此设置才有效。(整数,默认:0)spring-doc.cadn.net.cn

驱逐运行之间的时间

空闲对象 evictor 线程运行之间的时间。当为正时,空闲对象 evictor 线程将启动,否则不会执行空闲对象逐出。(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.redis.哨兵
主人

Redis 服务器的名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

节点

逗号分隔的“host:port”对列表。(List<String>,默认值:<none>)spring-doc.cadn.net.cn

密码

用于向哨兵进行身份验证的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.14. 路由器接收器

此应用程序将消息路由到命名通道。spring-doc.cadn.net.cn

7.14.1. 选项

路由器接收器具有以下选项:spring-doc.cadn.net.cn

router.default-output-channel

在何处发送不可路由的消息。(字符串,默认:nullChannel)spring-doc.cadn.net.cn

router.destination-mappings

目标映射作为以名称-值对为分隔的新行字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<none>)spring-doc.cadn.net.cn

router.expression

要应用于消息以确定要路由到的通道的表达式。请注意,text、json 或 xml 等内容类型的有效负载线格式是 byte[] 而不是 String!。有关如何处理字节数组有效负载内容,请参阅文档。(表达式,默认值:<none>)spring-doc.cadn.net.cn

router.refresh-delay

检查脚本更改的频率(以毫秒为单位)(如果存在);< 0 表示不刷新。(整数,默认:60000)spring-doc.cadn.net.cn

需要 router.resolution-required

是否需要通道分辨率。(布尔值,默认:false)spring-doc.cadn.net.cn

路由器.脚本

返回通道或通道映射分辨率键的 groovy 脚本的位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

router.变量

变量绑定为换行符,以名称-值对为分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<none>)spring-doc.cadn.net.cn

router.变量位置

包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

由于这是一个动态路由器,因此会根据需要创建目标;因此,默认情况下,defaultOutputChannelresolutionRequired仅当Binder绑定到目标时存在一些问题。

您可以使用spring.cloud.stream.dynamicDestinations财产。 默认情况下,所有已解析的目的地都将动态绑定;如果此属性具有逗号分隔的列表 目标名称,则只有这些名称才会被绑定。 解析到不在此列表中的目标的消息将被路由到defaultOutputChannel哪 还必须出现在列表中。spring-doc.cadn.net.cn

destinationMappings用于将评估结果映射到实际的目标名称。spring-doc.cadn.net.cn

7.14.2. 基于 SpEL 的路由

该表达式根据消息进行计算,并返回通道名称或通道名称映射的键。spring-doc.cadn.net.cn

有关更多信息,请参阅 Spring 中的“路由器和 Spring 表达式语言 (SpEL)”小节 集成参考手册配置(通用)路由器部分spring-doc.cadn.net.cn

从 Spring Cloud Stream 2.0 开始,消息线格式json,textxml内容类型是byte[]String! 这是与 SCSt 1.x 相比的一个改变,它将这些类型视为字符串! 取决于内容类型,处理byte[]有效载荷可用。对于普通text内容类型,可以使用new String(payload)SpEL 表达。为json键入 jsonPath() SpEL 实用程序 已经支持字符串和字节数组内容互换。这同样适用于xml内容类型和 #xpath() SpEL 实用程序。

例如,对于text应该使用的内容类型:spring-doc.cadn.net.cn

 new String(payload).contains('a')

json内容类型 SpEL 表达式,如下所示:spring-doc.cadn.net.cn

 #jsonPath(payload, '$.person.name')

7.14.3. 基于 Groovy 的路由

除了 SpEL 表达式,还可以使用 Groovy 脚本。让我们在文件系统中创建一个 Groovy 脚本,位于 “file:/my/path/router.groovy”,或“classpath:/my/path/router.groovy”:spring-doc.cadn.net.cn

println("Groovy processing payload '" + payload + "'");
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

如果要将变量值传递给脚本,可以使用变量选项或 (可选)使用 propertiesLocation 选项将路径传递到包含绑定的属性文件。 文件中的所有属性都将作为变量提供给脚本。您可以同时指定变量和 propertiesLocation在这种情况下,作为变量提供的任何重复值都会覆盖 propertiesLocation 中提供的值。 请注意,有效负载标头是隐式绑定的,以允许您访问消息中包含的数据。spring-doc.cadn.net.cn

有关更多信息,请参阅 Spring Integration 参考手册 Groovy 支持spring-doc.cadn.net.cn

7.15. RSocket 水槽

RSocket 接收器,使用 RSocket 协议的触发并忘记策略发送数据。spring-doc.cadn.net.cn

7.15.1. 选项

rsocket 接收器具有以下选项:spring-doc.cadn.net.cn

rsocket.consumer.host

RSocket 主机。(字符串,默认:localhost)spring-doc.cadn.net.cn

rsocket.consumer.port

RSocket 端口。(整数,默认:7000)spring-doc.cadn.net.cn

rsocket.consumer.route

用于 RSocket 的路由。(字符串,默认:<none>)spring-doc.cadn.net.cn

rsocket.consumer.uri 的

可用于基于 websocket 的传输的 URI。(URI,默认:<none>)spring-doc.cadn.net.cn

7.16. 亚马逊 S3 接收器

此接收器应用程序支持将对象传输到 Amazon S3 存储桶。 文件有效负载(和目录递归)将传输到remote目录(S3 存储桶)到local部署应用程序的目录。spring-doc.cadn.net.cn

此接收器接受的消息必须包含payload如:spring-doc.cadn.net.cn

7.16.1. 选项

s3 接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

s3.common
端点网址

用于连接到 s3 兼容存储的可选端点 url。(字符串,默认:<none>)spring-doc.cadn.net.cn

路径样式访问

使用路径样式访问。(布尔值,默认:false)spring-doc.cadn.net.cn

s3.消费者
前交叉韧带

S3 对象访问控制列表。(CannedAccessControlList,默认:<none>,可能的值:private,public-read,public-read-write,authenticated-read,log-delivery-write,bucket-owner-read,bucket-owner-full-control,aws-exec-read)spring-doc.cadn.net.cn

ACL 表达

用于评估 S3 对象访问控制列表的表达式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

用于存储目标文件的 AWS 存储桶。(字符串,默认:<none>)spring-doc.cadn.net.cn

桶表达式

用于评估 AWS 存储桶名称的表达式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

键表达式

用于计算 S3 对象键的表达式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

目标生成的应用程序基于AmazonS3SinkConfiguration可以通过S3MessageHandler.UploadMetadataProvider和/或S3ProgressListener,这些被注入S3MessageHandler豆。 有关更多详细信息,请参阅 Spring Integration AWS 支持。spring-doc.cadn.net.cn

7.16.2. Amazon AWS 通用选项

Amazon S3 Sink(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,其自动配置 类由 Spring Boot 自动使用。 请参阅他们的文档,了解所需和有用的自动配置属性。spring-doc.cadn.net.cn

其中一些是关于 AWS 凭证的:spring-doc.cadn.net.cn

其他适用于 AWSRegion定义:spring-doc.cadn.net.cn

对于 AWSStack:spring-doc.cadn.net.cn

例子
java -jar s3-sink.jar --s3.bucket=/tmp/bar

7.17. 任务Starters水槽

此模块使用来自PollableMessageSource并使用数据流 REST 客户端在配置的任务平台上启动已注册的任务。 必须将客户端配置为连接到远程数据流服务器,包括任何必需的身份验证(请参阅下面的配置选项)。spring-doc.cadn.net.cn

此应用程序使用数据流服务器 REST API 启动已注册的任务定义。spring-doc.cadn.net.cn

7.17.1. 输入

任务启动请求,包括:spring-doc.cadn.net.cn

消息有效负载可以是 JSON 文档:spring-doc.cadn.net.cn

{
  "name":"foo",
  "deploymentProps": {"key1":"val1","key2":"val2"},
  "args":["--debug", "--foo", "bar"]
}

至少,它必须包含任务名称。spring-doc.cadn.net.cn

{"name":"foo"}
选项

tasklauncher-dataflow 接收器支持以下配置属性:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

spring.cloud.dataflow.client.authentication
访问Tokens

OAuth2 访问Tokens。(字符串,默认:<none>)spring-doc.cadn.net.cn

客户端 ID

OAuth2 客户端 ID。(字符串,默认:<none>)spring-doc.cadn.net.cn

客户端密钥

OAuth2 客户端密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

范围

OAuth2 作用域。(Set<String>,默认值:<none>)spring-doc.cadn.net.cn

Tokens uri

OAuth2 Tokens URI。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.cloud.dataflow.client.authentication.basic
密码

登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.cloud.dataflow.client.authentication.oauth2
客户端注册 ID

<缺少文档>(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

<缺少文档>(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

<缺少文档>(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.cloud.dataflow.client
启用-DSL

启用数据流 DSL 访问。(布尔值,默认:false)spring-doc.cadn.net.cn

服务器 uri

数据流服务器 URI。(字符串,默认:http://localhost:9393)spring-doc.cadn.net.cn

跳过 SSL 验证

跳过 SSL 验证。(布尔值,默认:false)spring-doc.cadn.net.cn

任务Starters
平台名称

用于启动任务的 Spring Cloud Data Flow 平台。(字符串,默认:default)spring-doc.cadn.net.cn

触发
初始延迟

初始延迟(以毫秒为单位)。(整数,默认:1000)spring-doc.cadn.net.cn

最大周期

最大轮询周期(以毫秒为单位)。如果 period > maxPeriod,则将设置为 period。(整数,默认:30000)spring-doc.cadn.net.cn

时期

轮询周期(以毫秒为单位)。(整数,默认:1000)spring-doc.cadn.net.cn

7.17.2. 使用 TaskLauncher

tasklauncher 接收器消耗LaunchRequest消息,如上所述,并使用目标数据流服务器(由--spring.cloud.dataflow.client.server-uri). 任务Starters会定期轮询其启动请求的输入源,但当平台达到其并发任务执行限制时,将暂停轮询,由spring.cloud.dataflow.task.platform.<platform-type>.accounts[<account-name>].maximum-concurrent-tasks. 这可以防止 SCDF 部署者的部署平台在繁重的任务负载下耗尽其资源。 轮询器使用DynamicPeriodicTrigger.默认情况下,初始轮询速率为 1 秒,但可以配置为任意持续时间。当轮询暂停时,或者如果不存在启动请求,则触发周期将增加,应用指数退避,最多达到配置的最大值(默认为 30 秒)。spring-doc.cadn.net.cn

此版本的数据流任务Starters需要 SCDF 版本 2.4.x 或更高版本

SCDF 服务器可以配置为在多个平台上启动任务。 每个任务Starters实例都针对单个平台进行配置,由platformName属性 (default如果未指定)。 强制执行此限制是因为如果服务器配置了多个任务平台,则其某些任务平台可能处于限制,而有些则未达到限制。 在这种情况下,只有知道下一个启动请求针对哪个任务平台,我们才能使用下一个启动请求。 因此,如果 SCDF 服务器配置为多个任务平台(或单个非默认平台),则我们假设所有启动请求都针对该平台。 任务Starters将设置所需的部署属性spring.cloud.dataflow.task.platformName如果请求未提供。spring-doc.cadn.net.cn

如果请求包含部署属性spring.cloud.dataflow.task.platformName,并且该值与 tasklauncher 的platformName,则任务Starters将抛出异常。

要在多个平台上启动任务,您必须为每个平台配置一个任务Starters实例,并使用路由器接收器分区策略将请求路由到正确的实例。spring-doc.cadn.net.cn

当轮询器暂停时,它会施加压力 因此,在极端情况下需要进行一些调整以平衡资源利用率。
客户端身份验证

如果数据流服务器需要身份验证,则客户端必须传递具有授权的凭据才能启动任务。 数据流客户端支持基本身份验证和 OAuth2 身份验证。spring-doc.cadn.net.cn

对于基本身份验证,请设置用户名和密码:spring-doc.cadn.net.cn

--spring.cloud.dataflow.client.authentication.basic.username=<username> --spring.cloud.dataflow.client.authentication.basic.password=<password>

对于 OAuth2 身份验证,请将client-id,client-secrettoken-uri至少。这些值对应于 SCDF 服务器的 OAuth2 配置中设置的值。 有关更多详细信息,请参阅数据流参考中的“安全性”部分spring-doc.cadn.net.cn

--spring.cloud.dataflow.client.authentication.client-id=<client-id> --spring.cloud.dataflow.client.authentication.client-secret=<client-secret> spring.cloud.dataflow.client.authentication.token-uri: <token-uri>

7.18. SFTP 接收器

SFTP 接收器是一个简单的选项,用于将文件从传入邮件推送到 SFTP 服务器。spring-doc.cadn.net.cn

它使用sftp-outbound-adapter,因此传入消息可以是java.io.File对象,一个String(文件内容) 或数组bytes(文件内容也是如此)。spring-doc.cadn.net.cn

要使用此接收器,您需要一个用户名和一个密码才能登录。spring-doc.cadn.net.cn

默认情况下,Spring Integration 将使用o.s.i.file.DefaultFileNameGenerator如果未指定任何内容。DefaultFileNameGenerator将确定文件名 基于file_name标头(如果存在)在MessageHeaders,或者如果Message已经是一个java.io.File,那么它会 使用该文件的原始名称。

配置sftp.factory.known-hosts-expression选项,则评估的根对象是应用程序上下文,例如可能是sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.spring-doc.cadn.net.cn

7.18.1. 输入

有效载荷

7.18.2. 输出

N/A(写入 SFTP 服务器)。spring-doc.cadn.net.cn

7.18.3. 选项

sftp 接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

sftp.消费者
自动创建目录

是否创建远程目录。(布尔值,默认:true)spring-doc.cadn.net.cn

文件名表达式

用于生成远程文件名的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

模式

如果远程文件已存在,则要执行的作。(FileExistsMode,默认:<none>,可能的值:APPEND,APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED)spring-doc.cadn.net.cn

远程目录

远程 FTP 目录。(字符串,默认:/)spring-doc.cadn.net.cn

远程文件分隔符

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

临时远程目录

如果 'isUseTemporaryFilename()' 为 true,则将写入文件的临时目录。(字符串,默认:/)spring-doc.cadn.net.cn

tmp-文件后缀

传输过程中要使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

使用临时文件名

是否写入临时文件并重命名。(布尔值,默认:true)spring-doc.cadn.net.cn

sftp.消费者.工厂
允许未知键

True 表示允许未知或更改的键。(布尔值,默认:false)spring-doc.cadn.net.cn

缓存会话

缓存会话。(布尔值,默认:<none>)spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

已知主机表达式

解析为已知主机文件位置的 SpEL 表达式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

密码短语

用户私钥的密码。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

密码

用于连接到服务器的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:22)spring-doc.cadn.net.cn

私钥

用户私钥的资源位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

用户名

用于连接到服务器的用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.19. TCP接收器

此模块使用编码器将消息写入 TCP。spring-doc.cadn.net.cn

TCP 是一种流式处理协议,需要某种机制来构建在线路上的消息。许多编码器是 available,默认值为“CRLF”。spring-doc.cadn.net.cn

7.19.1. 选项

tcp 接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

tcp.consumer
字符集

从字节转换为字符串时使用的字符集。(字符串,默认:UTF-8)spring-doc.cadn.net.cn

关闭

是否在每条消息后关闭套接字。(布尔值,默认:false)spring-doc.cadn.net.cn

编码器

发送消息时要使用的编码器。(编码,默认:<none>,可能的值:CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)spring-doc.cadn.net.cn

主机

此接收器将连接到的主机。(字符串,默认:<none>)spring-doc.cadn.net.cn

TCP
蔚来

是否使用蔚来。(布尔值,默认:false)spring-doc.cadn.net.cn

端口

要收听的端口;0 表示作系统选择端口。(整数,默认:1234)spring-doc.cadn.net.cn

反向查找

对远程 IP 地址执行反向 DNS 查找;如果为 false,则邮件标头中仅包含 IP 地址。(布尔值,默认:false)spring-doc.cadn.net.cn

套接字超时

未收到数据时关闭套接字之前的超时 (ms)。(整数,默认:120000)spring-doc.cadn.net.cn

使用直接缓冲区

是否使用直接缓冲区。(布尔值,默认:false)spring-doc.cadn.net.cn

7.19.2. 可用的编码器

文本数据
CRLF(默认)

以回车符 (0x0d) 结尾的文本,后跟换行符 (0x0a)spring-doc.cadn.net.cn

如果

以换行符结尾的文本 (0x0a)spring-doc.cadn.net.cn

以空字节 (0x00) 结尾的文本spring-doc.cadn.net.cn

STXETX

前面是 STX (0x02) 并以 ETX (0x03) 结尾的文本spring-doc.cadn.net.cn

文本和二进制数据

no structure - 客户端通过关闭套接字来指示完整的消息spring-doc.cadn.net.cn

L1

前面有一个 1 字节(无符号)长度字段的数据(最多支持 255 字节)spring-doc.cadn.net.cn

L2

数据前面有一个两个字节(无符号)长度字段(最多 216-1 字节)spring-doc.cadn.net.cn

L4

数据前面有一个四字节(有符号)长度字段(最多 231-1 字节)spring-doc.cadn.net.cn

7.20. 吞吐量接收器

接收器,它将按选定的时间间隔对消息进行计数并记录观察到的吞吐量。spring-doc.cadn.net.cn

7.20.1. 选项

吞吐量接收器具有以下选项:spring-doc.cadn.net.cn

吞吐量.report-every-ms

报告的频率。(整数,默认:1000)spring-doc.cadn.net.cn

7.21. Twitter 消息接收器

从身份验证用户向指定用户发送直接消息。需要 JSON POST 正文和Content-Typeheader 设置为application/json.spring-doc.cadn.net.cn

收到来自用户的消息时,您可以在 24 小时窗口内最多发送 5 条消息作为响应。收到的每条消息都会重置 24 小时窗口和 5 条分配的消息。在 24 小时窗口内发送第 6 条消息或在 24 小时窗口之外发送消息将计入速率限制。此行为仅适用于使用 POST direct_messages/events/new 端点时。

SpEL 表达式用于从输入消息中计算请求参数。spring-doc.cadn.net.cn

7.21.1. 选项

使用单引号 () 将'SpEL表达式属性。 例如,要设置固定消息文本,请使用text='Fixed Text'. 对于固定目标 userId,请使用userId='666'.
推特.message.update.media-id

要与消息关联的媒体 ID。私信只能引用单个媒体 ID。(表达式,默认值:<none>)spring-doc.cadn.net.cn

推特.message.update.屏幕名称

向其发送直接消息的用户的屏幕名称。(表达式,默认值:<none>)spring-doc.cadn.net.cn

推特.message.update.text

直接消息文本。根据需要对 URL 进行编码。最大长度为 10,000 个字符。(表达式,默认值:payload)spring-doc.cadn.net.cn

推特.message.update.user-id

向其发送直接消息的用户的用户 ID。(表达式,默认值:<none>)spring-doc.cadn.net.cn

7.22. Twitter 更新接收器

更新身份验证用户的当前文本(例如推文)。spring-doc.cadn.net.cn

对于每次更新尝试,都会将更新文本与身份验证用户最近的推文进行比较。 任何可能导致重复的尝试都将被阻止,从而导致 403 错误。 用户不能连续提交两次相同的文本。

虽然不受 API 的速率限制,但用户一次可以创建的推文数量受到限制。 标准 API 的更新限制为 3 小时窗口内 300 次。 如果用户发布的更新数达到当前允许的限制,此方法将返回 HTTP 403 错误。spring-doc.cadn.net.cn

7.22.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

推特.update
附件网址

(SpEL 表达式)为了使 URL 不计入扩展推文的文本正文中,请提供 URL 作为推文附件。此 URL 必须是推文永久链接或私信深层链接。任意非 Twitter URL 必须保留在文本文本中。传递给 attachment_url 参数的 URL 与推文永久链接或私信深度链接不匹配的 URL 将在创建推文时失败并导致异常。(表达式,默认值:<none>)spring-doc.cadn.net.cn

显示坐标

(SpEL表达式)是否在发送推文的确切坐标上放置图钉。(表达式,默认值:<none>)spring-doc.cadn.net.cn

回复状态 ID

(SpEL表达式)更新要回复的现有文本的 ID。注意:除非文本中提及此参数引用的推文的作者,否则此参数将被忽略。因此,您必须在更新中包含@username,其中用户名是引用推文的作者。设置 inReplyToStatusId 后,也会自动设置auto_populate_reply_metadata。稍后确保从原始推文中查找主要@mentions,并从那里添加到新推文中。随着回复链的增长,这会将@mentions附加到扩展推文的元数据中,直到达到@mentions限制。如果原始推文已被删除,回复将失败。(表达式,默认值:<none>)spring-doc.cadn.net.cn

媒体 ID

(SpEL表达式)要与推文关联的media_ids的逗号分隔列表。您最多可以在推文中添加 4 张照片或 1 个动画 GIF 或 1 个视频。有关上传媒体的更多详细信息,请参阅上传媒体。(表达式,默认值:<none>)spring-doc.cadn.net.cn

地点 ID

(SpEL表达式)世界上的一个地方。(表达式,默认值:<none>)spring-doc.cadn.net.cn

文本

(SpEL表达式)文本更新的文本。根据需要对 URL 进行编码。t.co 链接换行将影响字符数。默认为消息的有效负载(表达式,默认值:payload)spring-doc.cadn.net.cn

推特.update.location
纬度

此推文所指位置的纬度。除非该参数在 -90.0 到 +90.0(北方为正)范围内,否则将忽略此参数。如果没有相应的 long 参数,它也会被忽略。(表达式,默认值:<none>)spring-doc.cadn.net.cn

离子

此推文所指位置的经度。经度的有效范围为 -180.0 到 +180.0(东为正),包括在内。如果超出该范围、如果它不是数字、如果禁用geo_enabled或没有相应的 lat 参数,则将忽略此参数。(表达式,默认值:<none>)spring-doc.cadn.net.cn

7.23. 波前汇

Wavefront 接收器使用 Messages<?>,将其转换为 Wavefront 数据格式的指标,并将指标直接发送到 Wavefront 或 Wavefront 代理。spring-doc.cadn.net.cn

支持常见的 ETL 用例,其中必须清理、转换现有(历史)指标数据并将其存储在 Wavefront 中以进行进一步分析。spring-doc.cadn.net.cn

7.23.1. 选项

波前接收器具有以下选项:spring-doc.cadn.net.cn

wavefront.api Tokens

Wavefront API 访问Tokens。(字符串,默认:<none>)spring-doc.cadn.net.cn

wavefront.metric-expression

计算结果为度量值的 SpEL 表达式。(表达式,默认值:<none>)spring-doc.cadn.net.cn

wavefront.metric-name

指标的名称。默认为应用程序名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

波前.proxy-uri

Wavefront 代理的 URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

波前.source

发出指标的唯一应用程序、主机、容器或实例。(字符串,默认:<none>)spring-doc.cadn.net.cn

wavefront.tag表达式

与指标关联的自定义元数据的集合。点标记不能为空。键的有效字符:字母数字、连字符 ('-')、下划线('_')、点 ('.')。对于值,允许使用任何字符,包括空格。要包含双引号,请用反斜杠转义它,反斜杠不能是标签值中的最后一个字符。点标记键和值组合的最大允许长度为 254 个字符(255 个字符,包括分隔键和值的 '=')。如果该值较长,则拒绝并记录该点(Map<String, Expression>,默认值:<none>)spring-doc.cadn.net.cn

wavefront.timestamp 表达式

计算结果为指标时间戳的 SpEL 表达式(可选)。(表达式,默认值:<none>)spring-doc.cadn.net.cn

波前线.uri

Wavefront 环境的 URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.24. Websocket Sink

一个简单的 Websocket Sink 实现。spring-doc.cadn.net.cn

7.24.1. 选项

支持以下选项:spring-doc.cadn.net.cn

websocket.consumer.log级

netty 通道的 logLevel 。默认值为 <tt>WARN</tt>(字符串,默认:<none>)spring-doc.cadn.net.cn

websocket.consumer.path

WebsocketSink 使用者需要连接的路径。默认值为 <tt>/websocket</tt>(字符串,默认:/websocket)spring-doc.cadn.net.cn

websocket.consumer.port

Netty 服务器侦听的端口。默认值为 <tt>9292</tt>(整数,默认:9292)spring-doc.cadn.net.cn

websocket.consumer.ssl 的

是否创建 {@link io.netty.handler.ssl.SslContext}。(布尔值,默认:false)spring-doc.cadn.net.cn

websocket.consumer.threads

Netty {@link io.netty.channel.EventLoopGroup} 的线程数。默认值为 <tt>1</tt>(整数,默认:1)spring-doc.cadn.net.cn

7.24.2. 示例

要验证 websocket-sink 是否接收来自其他 spring-cloud-stream 应用程序的消息,您可以使用 遵循简单的端到端设置。spring-doc.cadn.net.cn

第 1 步:启动 Rabbitmq
第 2 步:部署time-source
步骤 3:部署websocket-sink

最后在trace模式,以便您看到由time-source在日志中:spring-doc.cadn.net.cn

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE

您应该开始在启动 WebsocketSink 的控制台中看到日志消息,如下所示:spring-doc.cadn.net.cn

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

7.24.3. 执行器

有一个Endpoint您可以使用它来访问最后一个n发送和接收的消息。你必须 通过提供--endpoints.websocketconsumertrace.enabled=true.默认情况下,它通过host:port/websocketconsumertrace.下面是一个示例输出:spring-doc.cadn.net.cn

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

还有一个简单的 HTML 页面,您可以在其中看到文本区域中转发的消息。您可以访问 它直接通过host:port在您的浏览器中。spring-doc.cadn.net.cn