应用程序

5. 源码

5.1. CDC 源

变更数据捕获(CDC)source可以捕获并从各种数据库中流式传输更改事件。
目前,它支持MySQLPostgreSQLMongoDBOracleSQL Server数据库。spring-doc.cadn.net.cn

基于Debezium Embedded ConnectorCDC Source允许捕获和流式传输数据库更改,通过不同的消息绑定器如Apache Kafka、RabbitMQ以及所有支持Spring Cloud Stream的代理服务器。spring-doc.cadn.net.cn

它支持所有 Debezium 配置属性。只需将cdc.config.前缀添加到现有的 Debezium 属性中即可。例如,要设置 Debezium 的 connector.class 属性,请使用 cdc.config.connector.class 源属性。spring-doc.cadn.net.cn

我们提供了最常用的 Debezium 属性的便捷快捷方式。例如,您可以使用我们的 cdc.connector=mysql 快捷方式代替长格式的cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector Debezium 属性。下表列出了所有可用的快捷方式及其所代表的 Debezium 属性。
Debezium 属性(如cdc.config.XXX)始终优先于快捷方式!spring-doc.cadn.net.cn

CDC Source 引入了一个新的默认 BackingOffsetStore 配置,该配置基于 MetadataStore 服务。后续还提供了各种微服务友好的方式来存储偏移元数据。spring-doc.cadn.net.cn

5.1.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

cdc
配置

Spring 德贝兹伊姆配置属性的传递包装器。所有以 'cdc.config.' 前缀开头的属性都是德贝兹伊姆原生属性。前缀被移除,将其转换为 Debezium io.debezium.config.Configuration。(Map<String, String>, 默认值: <none>)spring-doc.cadn.net.cn

连接器

cdc.config.connector.class 属性的快捷方式。只要它们不相互矛盾,就可以使用其中任何一个。(ConnectorType,默认值:<none>,可能的取值:mysqlpostgresmongodboraclesqlserver)spring-doc.cadn.net.cn

姓名

此 sourceConnector 实例的唯一名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

架构

将模式作为出站消息的一部分。(布尔值,默认:falsespring-doc.cadn.net.cn

cdc.flattening
add-fields

以逗号分隔的元数据字段列表,将添加到展平的消息中。这些字段将根据结构规范以“__”或“__[<]struct]__”为前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

add-headers

用逗号分隔的列表,指定要添加到展平消息报头中的元数据字段。这些字段将以"__"或"__[struct]__"为前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

delete-handling-mode

处理已删除记录的选项:(1) 不进行任何操作 - 将记录传递通过,(2) 删除 - 移除记录以及 (3) 重写 - 给记录添加一个 '__deleted' 字段。 (DeleteHandlingMode,默认值:<none>,可选值:droprewritenone)spring-doc.cadn.net.cn

drop-tombstones

默认情况下,Debezium会生成墓碑记录,以便对删除的记录进行Kafka压缩。dropTombstones可以抑制墓碑记录。(布尔值,默认:true)spring-doc.cadn.net.cn

启用

启用展平源记录事件(https://debezium.io/docs/configuration/event-flattening)。(布尔值,默认:truespring-doc.cadn.net.cn

cdc.offset
commit-timeout

在取消过程并恢复要提交的偏移量数据以便在未来尝试中再次提交之前,等待记录刷新并将分区偏移量数据提交到偏移存储的最大毫秒数。(持续时间,默认:5000ms)spring-doc.cadn.net.cn

flush-interval

尝试提交偏移量的时间间隔。默认为1分钟。(持续时间,默认:60000ms)spring-doc.cadn.net.cn

policy

偏移量存储提交策略。(OffsetPolicy,默认:<none>)spring-doc.cadn.net.cn

存储

Kafka 连接器会跟踪已处理记录的数量,并定期将计数(作为“偏移量”)存储在预先配置的元数据存储中。重启时,连接器从上次记录的数据源偏移量处恢复读取。(OffsetStorageType,默认值:<none>,可选值:memoryfilekafkametadata)spring-doc.cadn.net.cn

cdc.stream.header
convert-connect-headers

当为 true 时,{@link org.apache.kafka.connect.header.Header} 将转换为消息头,其中 {@link org.apache.kafka.connect.header.Header#key()} 作为名称,{@link org.apache Kafka.connect.header.Header#value()} 作为值。(布尔类型,默认:true)spring-doc.cadn.net.cn

偏移量

将源记录的偏移量元数据序列化到输出消息头中的 cdc.offset。 (布尔值,默认:falsespring-doc.cadn.net.cn

metadata.store.dynamo-db
create-delay

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

create-retries

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

read-capacity

表上的读取容量。(长整型,默认:1spring-doc.cadn.net.cn

表格

用于元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

time-to-live

表项的TTL。 (整数,默认:<none>spring-doc.cadn.net.cn

write-capacity

表上的写入容量。(长整型,默认:1)spring-doc.cadn.net.cn

metadata.store.gemfire
区域

元数据的 Gemfire 区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.jdbc
区域

使用此存储持久化的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

table-prefix

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.mongo-db
集合

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis

元数据的 Redis 键。 (字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指示要配置的元数据存储类型(默认为“memory”)。若要使用持久化存储,您必须包含相应的Spring Integration依赖。(StoreType, 默认值: <none>, 可能的值: mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)spring-doc.cadn.net.cn

metadata.store.zookeeper
connect-string

Zookeeper连接字符串格式为HOST:PORT。(字符串,默认:127.0.0.1:2181spring-doc.cadn.net.cn

编码

存储数据到Zookeeper时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

retry-interval

Zookeeper操作的重试间隔,单位为毫秒。(Integer,默认:1000)spring-doc.cadn.net.cn

根目录

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

Debezium 属性 快捷映射

下表列出了所有可用的快捷键及其所代表的 Debezium 属性。spring-doc.cadn.net.cn

表1. 表快捷属性映射
快捷方式 原始 描述

cdc.connectorspring-doc.cadn.net.cn

cdc.config.connector.classspring-doc.cadn.net.cn

mysql : MySQL连接器,postgres : PostgreSQL连接器,mongodb : MongoDB数据源连接器,oracle : Oracle连接器,sqlserver : SQL Server连接器spring-doc.cadn.net.cn

cdc.namespring-doc.cadn.net.cn

cdc.config.namespring-doc.cadn.net.cn

cdc.offset.flush-intervalspring-doc.cadn.net.cn

cdc.config.offset.flush.interval.msspring-doc.cadn.net.cn

cdc.offset.commit-timeoutspring-doc.cadn.net.cn

cdc.config.offset.flush.timeout.msspring-doc.cadn.net.cn

cdc.offset.policyspring-doc.cadn.net.cn

cdc.config.offset.commit.policyspring-doc.cadn.net.cn

periodic:PeriodicCommitOffsetPolicy,always:AlwaysCommitOffsetPolicyspring-doc.cadn.net.cn

cdc.offset.storagespring-doc.cadn.net.cn

cdc.config.offset.storagespring-doc.cadn.net.cn

metadata: 元数据存储偏移量后备存储,file: 文件偏移量后备存储,kafka: Kafka 偏移量后备存储,memory: 内存偏移量后备存储spring-doc.cadn.net.cn

cdc.flattening.drop-tombstonesspring-doc.cadn.net.cn

cdc.config.drop.tombstonesspring-doc.cadn.net.cn

cdc.flattening.delete-handling-modespring-doc.cadn.net.cn

cdc.config.delete.handling.modespring-doc.cadn.net.cn

none : 无, drop : 下拉, rewrite : 重写spring-doc.cadn.net.cn

5.1.2. 数据库支持

CDC Source 使用 Debezium 实用工具,目前支持五种数据存储的 CDC:MySQLPostgreSQLMongoDBOracleSQL Server 数据库。spring-doc.cadn.net.cn

5.1.3. 示例和测试

集成测试 [CdcSourceIntegrationTest](), [CdcDeleteHandlingIntegrationTest]() 和 [CdcFlatteningIntegrationTest]() 使用在本地计算机上运行的数据库固定数据。
我们使用预先构建好的 debezium docker 数据库镜像。
Maven 构建借助 docker-maven-plugin 创建测试数据库固定数据。spring-doc.cadn.net.cn

要在您的 IDE 中运行和调试测试,需要从命令行部署所需的数据库映像。下面的说明解释了如何从 Docker 映像运行预配置的测试数据库。spring-doc.cadn.net.cn

MySQL

在 docker 中启动 debezium/example-mysqlspring-doc.cadn.net.cn

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0

(可选) 使用 mysql 客户端连接到数据库并创建具有所需凭据的 debezium 个用户:
spring-doc.cadn.net.cn

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

使用以下属性连接 CDC 源到 MySQL 数据库:spring-doc.cadn.net.cn

cdc.connector=mysql (1)

cdc.name=my-sql-connector (2)
cdc.config.database.server.id=85744 (2)
cdc.config.database.server.name=my-app-connector (2)

cdc.config.database.user=debezium  (3)
cdc.config.database.password=dbz (3)
cdc.config.database.hostname=localhost (3)
cdc.config.database.port=3306 (3)

cdc.schema=true (4)
cdc.flattening.enabled=true (5)
1 配置 CDC 源以使用MySqlConnector。(相当于设置cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector)。
2 用于识别和分派传入事件的元数据。
3 连接到运行在localhost:3306上的MySQL服务器,作为debezium用户。
4 SourceRecord事件中包含更改事件值模式。
5 启用CDC事件扁平化

您也可以使用此mysql配置运行CdcSourceIntegrationTests#CdcMysqlTestsspring-doc.cadn.net.cn

PostgreSQL

debezium/example-postgres:1.0 Docker镜像启动一个预配置的postgres服务器:spring-doc.cadn.net.cn

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0

您可以像这样连接到此服务器:spring-doc.cadn.net.cn

psql -U postgres -h localhost -p 5432

使用以下属性连接 CDC 源与 PostgreSQL:spring-doc.cadn.net.cn

cdc.connector=postgres (1)
cdc.offset.storage=memory (2)

cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)

cdc.config.database.user=postgres  (4)
cdc.config.database.password=postgres (4)
cdc.config.database..dbname=postgres (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=5432 (4)

cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 配置CDC Source使用PostgresConnector。相当于设置cdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector
2 配置Debezium引擎使用memory(例如`cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore`)作为后端偏移存储。
3 用于识别和分派传入事件的元数据。
4 连接到运行在localhost:5432上的PostgreSQL服务器,作为postgres用户。
5 SourceRecord事件中包含更改事件值模式。
6 启用CDC事件扁平化

您也可以使用此mysql配置运行CdcSourceIntegrationTests#CdcPostgresTestsspring-doc.cadn.net.cn

MongoDB

debezium/example-mongodb:0.10Docker镜像启动一个预配置的mongodb:spring-doc.cadn.net.cn

docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz  debezium/example-mongodb:0.10

初始化库存集合spring-doc.cadn.net.cn

docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'

mongodb 终端输出中,搜索类似于 host: "3f95a8a6516e:27017" 的日志条目:spring-doc.cadn.net.cn

2019-01-10T13:46:10.004+0000 I COMMAND  [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms

在你的/etc/hosts中添加127.0.0.1 3f95a8a6516e条目spring-doc.cadn.net.cn

使用以下属性连接 CDC 源到 MongoDB:<br>spring-doc.cadn.net.cn

cdc.connector=mongodb (1)
cdc.offset.storage=memory (2)

cdc.config.mongodb.hosts=rs0/localhost:27017 (3)
cdc.config.mongodb.name=dbserver1 (3)
cdc.config.mongodb.user=debezium (3)
cdc.config.mongodb.password=dbz (3)
cdc.config.database.whitelist=inventory (3)

cdc.config.tasks.max=1 (4)

cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 配置CDC Source以使用MongoDB连接器。这会映射到cdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector
2 配置Debezium引擎使用memory(例如`cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore`)作为后端偏移存储。
3 连接到运行在localhost:27017上的MongoDB,作为debezium用户。
4 debezium.io/docs/connectors/mongodb/#tasks
5 SourceRecord事件中包含更改事件值模式。
6 启用CDC事件扁平化

您也可以使用此mysql配置运行CdcSourceIntegrationTests#CdcPostgresTestsspring-doc.cadn.net.cn

SQL Server

debezium/example-postgres:1.0 Docker 镜像启动一个 sqlserverspring-doc.cadn.net.cn

docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2

使用 debezium 的 sqlserver 教程中的示例数据填充表单:spring-doc.cadn.net.cn

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

使用以下属性将 CDC 源连接到 SQL Server:spring-doc.cadn.net.cn

cdc.connector=sqlserver (1)
cdc.offset.storage=memory (2)

cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)

cdc.config.database.user=sa  (4)
cdc.config.database.password=Password! (4)
cdc.config.database..dbname=testDB (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=1433 (4)
1 配置 CDC Source 使用 SqlServerConnector。相当于设置 cdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
2 配置Debezium引擎使用memory(例如`cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore`)作为后端偏移存储。
3 用于识别和分派传入事件的元数据。
4 连接到在localhost:1433上运行的SQL Server,作为sa用户。

您也可以使用此mysql配置运行CdcSourceIntegrationTests#CdcSqlServerTestsspring-doc.cadn.net.cn

Oracle

启动Oracle并从localhost访问,按照Debezium Vagrant设置中描述的配置、用户和授权进行设置spring-doc.cadn.net.cn

使用 Debezium 的 Oracle 教程中的示例数据填充表单:spring-doc.cadn.net.cn

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

5.2. 文件源

此应用程序轮询一个目录,并将新文件或其内容发送到输出通道。<br/>默认情况下,文件源会提供文件的内容作为字节数组。<br/>但是,可以使用 --file.supplier.mode 选项进行自定义:spring-doc.cadn.net.cn

在使用 --file.supplier.mode=lines 时,还可以提供附加选项 --file.supplier.withMarkers=true。如果将其设置为 true,则底层的 FileSplitter 在实际数据之前和之后会发出额外的文件开始和结束标记消息。这两个额外标记消息的有效载荷类型为 FileSplitter.FileMarker。如果未明确设置,则 withMarkers 选项默认为 false。spring-doc.cadn.net.cn

5.2.1. 选项

文件源具有以下选项:filespring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

file.consumer
markers-json

当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:<none>,可能的值:reflinescontents)spring-doc.cadn.net.cn

with-markers

设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:<none>)spring-doc.cadn.net.cn

file.supplier
delay-when-empty

未检测到新文件时的延迟持续时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

目录

要轮询新文件的目录。(文件,默认:<none>)spring-doc.cadn.net.cn

filename-pattern

一个简单的通配符模式,用于匹配文件。 (字符串,默认:<none>)spring-doc.cadn.net.cn

filename-regex

用于匹配文件的正则表达式模式。(Pattern,默认值:<none>)spring-doc.cadn.net.cn

prevent-duplicates

设置为 true 可包含一个 AcceptOnceFileListFilter,防止重复。(布尔值,默认:true)spring-doc.cadn.net.cn

5.3. FTP源

此源应用程序支持使用FTP协议传输文件。 文件从remote目录传输到部署应用的local目录中。 默认情况下,源发出的消息以字节数组形式提供。但是,可以使用--mode选项进行自定义:spring-doc.cadn.net.cn

使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarkerwithMarkers 的默认值是 false,除非明确设置。spring-doc.cadn.net.cn

另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
spring-doc.cadn.net.cn

5.3.1. 输入

N/A(从FTP服务器获取文件)。spring-doc.cadn.net.cn

5.3.2. 输出

模式 = 内容
标题:
负载:

用文件内容填充的 byte[]spring-doc.cadn.net.cn

模式 = 行
标题:
负载:

每行一个Stringspring-doc.cadn.net.cn

第一行可选地由带有START标记有效负载的消息前导。
最后一行可选地由带有END标记有效负载的消息后置。
spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json属性确定。spring-doc.cadn.net.cn

模式 = 参考
标题:
负载:

一个 java.io.File 对象。spring-doc.cadn.net.cn

5.3.3. 选项

ftp源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

file.consumer
markers-json

当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:<none>,可能的值:reflinescontents)spring-doc.cadn.net.cn

with-markers

设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:<none>)spring-doc.cadn.net.cn

ftp.factory
cache-sessions

缓存会话。 (布尔值,默认:<none>spring-doc.cadn.net.cn

client-mode

FTP会话使用的客户端模式。(ClientMode,默认:<none>,可选值:ACTIVEPASSIVE)spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

连接到服务器时使用的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:21)spring-doc.cadn.net.cn

用户名

连接服务器时要使用的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

ftp.supplier
auto-create-local-dir

如果不存在,则设置为 true 以创建本地目录。(Boolean,默认:true)spring-doc.cadn.net.cn

delay-when-empty

未检测到新文件时的延迟持续时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

delete-remote-files

设置为true表示在成功传输后删除远程文件。(布尔类型,默认:false)spring-doc.cadn.net.cn

filename-pattern

一个过滤器模式,用于匹配要传输的文件名。(字符串,默认:<none>)spring-doc.cadn.net.cn

filename-regex

用于匹配要传输的文件名的过滤正则表达式模式。(Pattern,默认值:<none>)spring-doc.cadn.net.cn

local-dir

用于文件传输的本地目录。(文件,默认:<none>)spring-doc.cadn.net.cn

preserve-timestamp

设置为 true 可保留原始时间戳。(布尔类型,默认值:true)spring-doc.cadn.net.cn

remote-dir

远程 FTP 目录。(字符串,默认:/spring-doc.cadn.net.cn

remote-file-separator

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

tmp-file-suffix

传输进行期间使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

metadata.store.dynamo-db
create-delay

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

create-retries

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

read-capacity

表上的读取容量。(长整型,默认:1spring-doc.cadn.net.cn

表格

用于元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

time-to-live

表项的TTL。 (整数,默认:<none>spring-doc.cadn.net.cn

write-capacity

表上的写入容量。(长整型,默认:1)spring-doc.cadn.net.cn

metadata.store.gemfire
区域

元数据的 Gemfire 区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.jdbc
区域

使用此存储持久化的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

table-prefix

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.mongo-db
集合

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis

元数据的 Redis 键。 (字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指示要配置的元数据存储类型(默认为“memory”)。若要使用持久化存储,您必须包含相应的Spring Integration依赖。(StoreType, 默认值: <none>, 可能的值: mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)spring-doc.cadn.net.cn

metadata.store.zookeeper
connect-string

Zookeeper连接字符串格式为HOST:PORT。(字符串,默认:127.0.0.1:2181spring-doc.cadn.net.cn

编码

存储数据到Zookeeper时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

retry-interval

Zookeeper操作的重试间隔,单位为毫秒。(Integer,默认:1000)spring-doc.cadn.net.cn

根目录

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

5.3.4. 示例

java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
         --ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo

5.4. Geode 源

Geode 源将从 Apache Geode EntryEvents 或 CqEvents 中提取对象并生成流。spring-doc.cadn.net.cn

5.4.1. 选项

Geode 源具有以下选项:geodespring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

geode.client
pdx-read-serialized

将 Geode 对象反序列化为 PdxInstance 而不是域类。(布尔值,默认:false)spring-doc.cadn.net.cn

geode.pool
connect-type

指定连接类型:serverlocator(ConnectType,默认值:<none>,可能的取值:locatorserver)spring-doc.cadn.net.cn

host-addresses

指定一个或多个Gemfire定位器或服务器地址,格式为[主机]:[端口]。(InetSocketAddress[],默认:<none>)spring-doc.cadn.net.cn

subscription-enabled

设置为 true 可以启用客户端池的订阅。这是同步更新到客户端缓存所必需的。(Boolean,默认:false)spring-doc.cadn.net.cn

geode.region
region-name

区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

geode.security
密码

缓存密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

缓存用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

geode.security.ssl
密码

为安全套接字连接配置使用的 SSL 密码,作为有效密码名称的数组。(字符串,默认:any)spring-doc.cadn.net.cn

keystore-type

标识用于SSL通信的Keystore类型(例如:JKS,PKCS11等)。 (String,默认值:JKS)spring-doc.cadn.net.cn

keystore-uri

用于连接到 Geode 集群的预创建 Keystore URI。 (Resource, 默认值: <none>)spring-doc.cadn.net.cn

ssl-keystore-password

访问密钥库 truststore 的密码。 (String,默认值: <none>)spring-doc.cadn.net.cn

ssl-truststore-password

访问信任存储库的密码。 (String, 默认值: <none>)spring-doc.cadn.net.cn

truststore-type

标识用于SSL通信的信任存储类型(例如 JKS,PKCS11 等)。 (String,默认值: JKS)spring-doc.cadn.net.cn

truststore-uri

用于连接到 Geode 集群的预创建信任库 URI 的位置。 (资源, 默认: <none>)spring-doc.cadn.net.cn

user-home-directory

本地缓存从truststoreUri和keystoreUri位置下载的truststore和keystore文件的目录。 (String, 默认值: user.home)spring-doc.cadn.net.cn

geode.supplier
event-expression

SpEL 表达式用于从一个 {@link org.apache.geode.cache.EntryEvent} 或 {@link org.apache.geode.cache.query.CqEvent} 中提取数据。 (表达式, 默认值: <none>)spring-doc.cadn.net.cn

查询

一个OQL查询。如果提供了该查询将启用连续查询。 (String, 默认值: <none>)spring-doc.cadn.net.cn

5.5. HTTP 源

一个监听HTTP请求并将其主体作为消息负载发出的应用程序。 如果Content-Type匹配text/*application/json,负载将是一个字符串, 否则负载将是一个字节数组。spring-doc.cadn.net.cn

负载:

If content type matches text/*application/jsonspring-doc.cadn.net.cn

如果内容类型不匹配 text/*application/jsonspring-doc.cadn.net.cn

5.5.2. 选项

The http source 支持以下配置属性:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

http.cors
allow-credentials

是否应包含与被注解请求的域关联的任何cookie。 (布尔值,默认: <none>)spring-doc.cadn.net.cn

allowed-headers

实际请求中可以使用的请求头列表。 (String[], 默认: <none>)spring-doc.cadn.net.cn

allowed-origins

允许的 origins 列表,例如 https://domain1.com。 (String[], 默认值: <none>)spring-doc.cadn.net.cn

HTTP
mapped-request-headers

要映射的标头。 (String[], 默认值: <none>)spring-doc.cadn.net.cn

path-pattern

HTTP端点路径映射。 (String, 默认值: /)spring-doc.cadn.net.cn

服务器
端口

服务器HTTP端口。 (整数, 默认值: 8080)spring-doc.cadn.net.cn

5.6. JDBC 源

此源从关系型数据库管理系统(RDBMS)中轮询数据。 此源完全基于 DataSourceAutoConfiguration,有关更多信息,请参阅 Spring Boot JDBC 支持spring-doc.cadn.net.cn

负载
  • Map<String, Object>jdbc.split == true(默认)且 List<Map<String, Object>> 其他情况时spring-doc.cadn.net.cn

5.6.2. 选项

数据源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

jdbc.supplier
max-rows

Max numbers of rows to process for query。 (Integer, 默认值: 0)spring-doc.cadn.net.cn

查询

用于选择数据的查询。 (String,默认值: <none>)spring-doc.cadn.net.cn

分割

是否将SQL查询结果拆分为单独的消息。 (布尔值, 默认: true)spring-doc.cadn.net.cn

更新

用于将轮询的消息标记为“已查看”的SQL更新语句。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.cloud.stream.poller
定时任务

Cron表达式值用于Cron触发器。(字符串,默认:<none>)spring-doc.cadn.net.cn

fixed-delay

默认轮询程序的固定延迟时间。(Long,默认值:1000)spring-doc.cadn.net.cn

initial-delay

周期性触发器的初始延迟。(整数,默认值:0)spring-doc.cadn.net.cn

max-messages-per-poll

默认轮询器每次轮询的最大消息数。(Long,默认:1)spring-doc.cadn.net.cn

time-unit

应用于延迟值的时间单位。(TimeUnit,默认:<none>,有效值:NANOSECONDSMICROSECONDSMILLISECONDSSECONDSMINUTESHOURSDAYS)spring-doc.cadn.net.cn

spring.datasource
数据

数据(DML)脚本资源引用。(List<String>,默认:<none>spring-doc.cadn.net.cn

driver-class-name

JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

initialization-mode

用于确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化的模式。(DataSourceInitializationMode,默认值:embedded,可选值:ALWAYSEMBEDDEDNEVER)spring-doc.cadn.net.cn

密码

数据库登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

架构

Schema (DDL) script resource references. (List<String>, default: <none>)spring-doc.cadn.net.cn

url

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

数据库登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

也请参见 Spring Boot 文档 对于额外的 DataSource 属性和 TriggerProperties 以及 MaxMessagesProperties 的轮询选项。spring-doc.cadn.net.cn

5.7. JMS 源

JMS 源可接收来自 JMS 的消息。spring-doc.cadn.net.cn

5.7.1. 选项

JMS 的源有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

jms.supplier
client-id

用于持久订阅的客户端 ID。(字符串,默认:<none>)spring-doc.cadn.net.cn

目的地

要从中接收消息的目标(队列或主题)。(字符串,默认:<none>)spring-doc.cadn.net.cn

message-selector

消息的选择器。(字符串,默认:<none>)spring-doc.cadn.net.cn

session-transacted

启用事务并选择 DefaultMessageListenerContainer,不启用则选择 SimpleMessageListenerContainer。 (布尔类型,默认:true)spring-doc.cadn.net.cn

subscription-durable

持久订阅为真。 (布尔类型,默认:<none>)spring-doc.cadn.net.cn

subscription-name

持久或共享订阅的名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

subscription-shared

共享订阅为真。(布尔值,默认:<none>spring-doc.cadn.net.cn

spring.jms
jndi-name

连接工厂JNDI名称。设置后,优先于其他连接工厂自动配置。(字符串,默认:<none>)spring-doc.cadn.net.cn

pub-sub-domain

默认的目标类型是否为主题。(布尔值,默认:false)spring-doc.cadn.net.cn

spring.jms.listener
acknowledge-mode

容器的确认模式。默认情况下,监听器是事务性的,并使用自动确认。(AcknowledgeMode,默认值:<none>,可选值:AUTOCLIENTDUPS_OK)spring-doc.cadn.net.cn

auto-startup

启动时自动启动容器。(布尔类型,默认值:true)spring-doc.cadn.net.cn

并发

最少并发消费者数量。(整数,默认:<none>)spring-doc.cadn.net.cn

max-concurrency

最大并发消费者数。(整型,默认:<none>)spring-doc.cadn.net.cn

receive-timeout

接收调用时使用的超时时间。使用-1表示无等待接收,0表示不设置超时。如果不在事务管理器内运行,则仅可能设置为0,并且通常不建议这样做,因为它会阻止干净关闭。(持续时间,默认:1s)spring-doc.cadn.net.cn

5.8. 加载生成器源码

一个产生生成数据并将其发送到流的源。spring-doc.cadn.net.cn

5.8.1. 选项

The load-generator 源代码具有以下选项:spring-doc.cadn.net.cn

load-generator.generate-timestamp

是否生成时间戳。 (布尔值, 默认: false)spring-doc.cadn.net.cn

load-generator.message-count

消息数量. (Integer, 默认值: 1000)spring-doc.cadn.net.cn

load-generator.message-size

消息大小。 (整数, 默认值: 1000)spring-doc.cadn.net.cn

load-generator.producers

生产者数量。 (整数, 默认值: 1)spring-doc.cadn.net.cn

5.9. 邮件发送源

一个监听邮件并将其消息正文作为消息负载发出的应用程序源代码。spring-doc.cadn.net.cn

5.9.1. 选项

邮件 有以下选项:spring-doc.cadn.net.cn

mail.supplier.charset

字节数组邮件到字符串转换的字符集。 (字符串,默认:UTF-8)spring-doc.cadn.net.cn

mail.supplier.delete

设置为 true 表示下载邮件后将其删除。(布尔类型,默认:false)spring-doc.cadn.net.cn

mail.supplier.expression

配置一个 SpEL 表达式来选择消息。(字符串,默认:true)spring-doc.cadn.net.cn

mail.supplier.idle-imap

设置为 true 以使用 IdleImap 配置。(布尔类型,默认:false)spring-doc.cadn.net.cn

mail.supplier.java-mail-properties

JavaMail 属性作为由换行符分隔的 name-value 对字符串,例如 'foo=bar baz=car'。 (Properties, 默认: <none>)spring-doc.cadn.net.cn

mail.supplier.mark-as-read

设置为 true 可标记邮件为已读。(布尔类型,默认:false)spring-doc.cadn.net.cn

mail.supplier.url

邮件服务器连接的URL,例如 'imaps://username:[[email protected]:993/Inbox'。 (URLName,默认:<none>)spring-doc.cadn.net.cn

mail.supplier.user-flag

当服务器不支持 ecent时标记消息的标志。(字符串,默认:<none>)spring-doc.cadn.net.cn

5.10. MongoDB 数据源

此源从MongoDB获取数据。 此源完全基于MongoDataAutoConfiguration,请参阅 Spring Boot MongoDB支持 以获取更多信息。spring-doc.cadn.net.cn

5.10.1. 选项

The mongodb source 有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

mongodb.supplier
集合

要查询的MongoDB集合。(字符串,默认:<none>)spring-doc.cadn.net.cn

查询

MongoDB 查询。 (字符串,默认:{ })spring-doc.cadn.net.cn

query-expression

MongoDB 查询 DSL 样式的 SpEL 表达式。(表达式,默认:<none>spring-doc.cadn.net.cn

分割

是否将查询结果拆分为单个消息。(布尔类型,默认:true)spring-doc.cadn.net.cn

update-expression

MongoDB 更新 DSL 样式的 SpEL 表达式。(表达式,默认:<none>spring-doc.cadn.net.cn

spring.data.mongodb
authentication-database

认证数据库名称。(字符串,默认:<none>spring-doc.cadn.net.cn

auto-index-creation

是否启用自动索引创建。(布尔类型,默认:<none>)spring-doc.cadn.net.cn

数据库

数据库名称。 (字符串,默认:<none>)spring-doc.cadn.net.cn

field-naming-strategy

要使用的 FieldNamingStrategy 的完全限定名。(Class<?>, 默认:<none>)spring-doc.cadn.net.cn

grid-fs-database

<文档缺失> (字符串,默认:<none>spring-doc.cadn.net.cn

主机

Mongo服务器主机。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

登录 MongoDB 服务器的密码。不能与 URI 一起设置。(字符数组,默认:<none>)spring-doc.cadn.net.cn

端口

Mongo服务器端口。不能与URI一起设置。(Integer,默认:<none>spring-doc.cadn.net.cn

replica-set-name

群集所需的副本集名称。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uri

Mongo 数据库连接字符串。不能与主机、端口、凭据和副本集名称一起设置。(字符串,默认:mongodb://localhost/test)spring-doc.cadn.net.cn

用户名

登录 MongoDB 服务器的用户。不能与 URI 同时设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uuid-representation

转换 UUID 为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认:java-legacy,可能的值:UNSPECIFIEDSTANDARDC_SHARP_LEGACYJAVA_LEGACYPYTHON_LEGACY)spring-doc.cadn.net.cn

也请参见 Spring Boot 文档 以了解额外的 MongoProperties 属性。 查看并 TriggerProperties 了解轮询选项。spring-doc.cadn.net.cn

5.11. MQTT 源

源代码可用于接收来自 MQTT 的消息。spring-doc.cadn.net.cn

负载:

5.11.2. 选项

MQTT 源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

mqtt
clean-session

客户端和服务器是否应记住重启和重新连接之间的状态。(布尔值,默认:true)spring-doc.cadn.net.cn

connection-timeout

连接超时时间(秒)。 (Integer, 默认值:30)spring-doc.cadn.net.cn

keep-alive-interval

ping间隔,单位为秒。(整数,默认值:60)spring-doc.cadn.net.cn

密码

连接到代理时要使用的密码。(字符串,默认:guest)spring-doc.cadn.net.cn

持久化

'memory' 或 'file'。 (字符串,默认:memory)spring-doc.cadn.net.cn

persistence-directory

持久化目录。 (字符串,默认:/tmp/pahospring-doc.cadn.net.cn

url

MQTT代理的位置(以逗号分隔的列表)。(字符串数组,默认:[tcp://localhost:1883])spring-doc.cadn.net.cn

用户名

连接到代理时使用的用户名。(字符串,默认:guest)spring-doc.cadn.net.cn

mqtt.supplier
二进制

将负载保留为字节形式。 (布尔值,默认:false)spring-doc.cadn.net.cn

编码

用于将字节转换为字符串的字符集(当二进制为假时)。(字符串,默认:UTF-8)spring-doc.cadn.net.cn

client-id

标识客户端。(字符串,默认:stream.client.id.source)spring-doc.cadn.net.cn

服务质量

qos;适用于所有主题的单一值或逗号分隔的列表以匹配主题。(Integer[],默认:[0])spring-doc.cadn.net.cn

主题

源将要订阅的主题(用逗号分隔)。(String[],默认:[stream.mqtt])spring-doc.cadn.net.cn

5.12. RabbitMQ 源

The "rabbit" 源启用接收来自RabbitMQ的消息。spring-doc.cadn.net.cn

The queue(s) 必须在流部署之前存在;它们不会自动创建。 您可以轻松地使用 RabbitMQ 网页界面创建一个队列。spring-doc.cadn.net.cn

5.12.1. 输入

5.12.2. 输出

负载

5.12.3. 选项

Rabbit 源代码有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

rabbit.supplier
enable-retry

启用重试的布尔值。(Boolean,默认值:false)spring-doc.cadn.net.cn

initial-retry-interval

启用重试时的初始重试间隔。(整数,默认值:1000)spring-doc.cadn.net.cn

mapped-request-headers

要映射的标头。 (String[], 默认值: [STANDARD_REQUEST_HEADERS])spring-doc.cadn.net.cn

max-attempts

重试启用时的最大交付尝试次数。(整数,默认:3)spring-doc.cadn.net.cn

max-retry-interval

启用重试时的最大重试间隔。(整数,默认:30000)spring-doc.cadn.net.cn

own-connection

为 true 时,使用基于引导属性的单独连接。(布尔类型,默认值:false)spring-doc.cadn.net.cn

队列

源将监听消息的队列。(字符串数组,默认:<none>)spring-doc.cadn.net.cn

重新排队

拒绝的消息是否应重新排队。(布尔值,默认:true)spring-doc.cadn.net.cn

retry-multiplier

启用重试时的重试退避倍数。(Double,默认:2)spring-doc.cadn.net.cn

事务性

渠道是否已交易。(布尔型,默认:false)spring-doc.cadn.net.cn

spring.rabbitmq
address-shuffle-mode

用于打乱配置地址的模式。(AddressShuffleMode,默认值:none,可选值:NONERANDOMINORDER)spring-doc.cadn.net.cn

地址

客户端应连接的地址列表,用逗号分隔。设置后,主机和端口将被忽略。(字符串,默认:<none>)spring-doc.cadn.net.cn

channel-rpc-timeout

通道中RPC调用的继续超时时间。将其设置为零以无限期等待。(持续时间,默认:10m)spring-doc.cadn.net.cn

connection-timeout

连接超时时间。将其设置为零以无限期等待。(持续时间,默认:<none>)spring-doc.cadn.net.cn

主机

RabbitMQ主机。如果设置了地址,则忽略此设置。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

登录以向代理进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

端口

RabbitMQ 端口。如果设置了地址,则忽略此设置。默认为 5672,如果启用了 SSL,则默认为 5671。(Integer,默认:<none>)spring-doc.cadn.net.cn

publisher-confirm-type

发布者类型确认要使用的。(ConfirmType,默认值:<none>,可能的取值:SIMPLECORRELATEDNONE)spring-doc.cadn.net.cn

publisher-returns

是否启用发布商返还。(布尔值,默认:falsespring-doc.cadn.net.cn

requested-channel-max

客户端请求的每个连接的通道数。使用0表示无限制。(整数,默认:2047)spring-doc.cadn.net.cn

requested-heartbeat

请求的心跳超时时间;零表示无。如果未指定持续时间后缀,则使用秒。(持续时间,默认:<none>)spring-doc.cadn.net.cn

用户名

登录用户以对经纪人进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

virtual-host

连接到代理时使用的虚拟主机。(字符串,默认:<none>)spring-doc.cadn.net.cn

也请参见 Spring Boot 文档 对于 broker 连接和监听器属性的额外属性。spring-doc.cadn.net.cn

一个关于重试的注意事项
在默认的 ackModeAUTO)和 requeuetrue)选项下,失败的消息投递将无限次重试。由于Rabbit源本身处理的内容不多,因此源本身出现故障的风险较小,除非下游的Binder因某种原因未连接。将 重试 设置为 false 将导致消息在首次尝试时被拒绝(如果代理配置了死信交换/队列,则可能发送到该位置)。enableRetry 选项允许配置重试参数,以便在消息传递失败时进行重试,并在重试次数用尽后最终丢弃(或死信)。在重试间隔期间,交付线程被挂起。重试选项包括enableRetrymaxAttemptsinitialRetryIntervalretryMultipliermaxRetryInterval。使用 MessageConversionException 导致的消息传递失败永远不会重试;假设如果消息在第一次尝试时无法转换,后续尝试也会失败。此类消息将被丢弃(或转入死信队列)。

5.12.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

5.12.5. 示例

java -jar rabbit-source.jar --rabbit.queues=

5.13. 亚马逊S3源

此源应用程序支持使用 Amazon S3 协议传输文件。 文件从 remote 目录(S3 存储桶)传输到部署应用程序的 local 目录。spring-doc.cadn.net.cn

源发出的消息默认以字节数组提供。但是,可以使用--mode选项进行自定义:spring-doc.cadn.net.cn

使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarkerwithMarkers 的默认值是 false,除非明确设置。spring-doc.cadn.net.cn

另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
spring-doc.cadn.net.cn

模式 = 行
标题:
负载:

每行一个Stringspring-doc.cadn.net.cn

第一行可选地由带有START标记有效负载的消息前导。
最后一行可选地由带有END标记有效负载的消息后置。
spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json属性确定。spring-doc.cadn.net.cn

模式 = 参考
标题:
负载:

一个 java.io.File 对象。spring-doc.cadn.net.cn

5.13.3. 选项

s3 源具有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

file.consumer
markers-json

当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:<none>,可能的值:reflinescontents)spring-doc.cadn.net.cn

with-markers

设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:<none>)spring-doc.cadn.net.cn

metadata.store.dynamo-db
create-delay

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

create-retries

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

read-capacity

表上的读取容量。(长整型,默认:1spring-doc.cadn.net.cn

表格

用于元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

time-to-live

表项的TTL。 (整数,默认:<none>spring-doc.cadn.net.cn

write-capacity

表上的写入容量。(长整型,默认:1)spring-doc.cadn.net.cn

metadata.store.gemfire
区域

元数据的 Gemfire 区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.jdbc
区域

使用此存储持久化的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

table-prefix

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.mongo-db
集合

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis

元数据的 Redis 键。 (字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指示要配置的元数据存储类型(默认为“memory”)。若要使用持久化存储,您必须包含相应的Spring Integration依赖。(StoreType, 默认值: <none>, 可能的值: mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)spring-doc.cadn.net.cn

metadata.store.zookeeper
connect-string

Zookeeper连接字符串格式为HOST:PORT。(字符串,默认:127.0.0.1:2181spring-doc.cadn.net.cn

编码

存储数据到Zookeeper时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

retry-interval

Zookeeper操作的重试间隔,单位为毫秒。(Integer,默认:1000)spring-doc.cadn.net.cn

根目录

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

s3.common
endpoint-url

连接到与 S3 兼容的存储的可选端点 URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

path-style-access

使用路径样式访问。(Boolean,默认值:false)spring-doc.cadn.net.cn

s3.supplier
auto-create-local-dir

是否创建本地目录。(布尔类型,默认:true)spring-doc.cadn.net.cn

delete-remote-files

处理完成后是否删除远程文件。(布尔值,默认:false)spring-doc.cadn.net.cn

filename-pattern

用于过滤远程文件的模式。(字符串,默认:<none>)spring-doc.cadn.net.cn

filename-regex

用于筛选远程文件的正则表达式。(模式,默认:<none>)spring-doc.cadn.net.cn

list-only

设置为 true 可以返回 s3 对象元数据而不需将文件复制到本地目录。(Boolean,默认:false)spring-doc.cadn.net.cn

local-dir

本地目录,用于存储文件。(File,默认:<none>)spring-doc.cadn.net.cn

preserve-timestamp

是否将远程文件的时间戳传递给本地文件。(布尔类型,默认:true)spring-doc.cadn.net.cn

remote-dir

AWS S3存储桶资源。(字符串,默认:bucketspring-doc.cadn.net.cn

remote-file-separator

远程文件分隔符。 (字符串,默认:/)spring-doc.cadn.net.cn

tmp-file-suffix

临时文件后缀。(字符串,默认:.tmpspring-doc.cadn.net.cn

5.13.4. Amazon AWS 常用选项

Amazon S3 源(与其他所有 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,并且其自动配置类由 Spring Boot 自动使用。有关所需和有用的自动配置属性,请参阅他们的文档。spring-doc.cadn.net.cn

其中一些是关于AWS凭据的:spring-doc.cadn.net.cn

其他用于 AWS Region 定义:spring-doc.cadn.net.cn

对于 AWS Stackspring-doc.cadn.net.cn

5.13.5. 示例

java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines

5.14. SFTP 源

此源应用程序支持使用SFTP协议传输文件。文件从remote目录传输到部署应用的local目录。默认情况下,源发出的消息为字节数组。但是,可以使用--mode选项进行自定义:spring-doc.cadn.net.cn

使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarkerwithMarkers 的默认值是 false,除非明确设置。spring-doc.cadn.net.cn

有关高级配置选项,请参阅sftp-supplierspring-doc.cadn.net.cn

另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
spring-doc.cadn.net.cn

5.14.1. 输入

N/A(从SFTP服务器获取文件)。spring-doc.cadn.net.cn

5.14.2. 输出

模式 = 内容
标题:
负载:

用文件内容填充的 byte[]spring-doc.cadn.net.cn

模式 = 行
标题:
负载:

每行一个Stringspring-doc.cadn.net.cn

第一行可选地由带有START标记有效负载的消息前导。
最后一行可选地由带有END标记有效负载的消息后置。
spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json属性确定。spring-doc.cadn.net.cn

模式 = 参考
标题:
负载:

一个 java.io.File 对象。spring-doc.cadn.net.cn

5.14.3. 选项

ftp源具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

file.consumer
markers-json

当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:true)spring-doc.cadn.net.cn

模式

用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:<none>,可能的值:reflinescontents)spring-doc.cadn.net.cn

with-markers

设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:<none>)spring-doc.cadn.net.cn

metadata.store.dynamo-db
create-delay

创建表重试之间的延迟。(整数,默认:1)spring-doc.cadn.net.cn

create-retries

创建表请求的重试次数。(整数,默认:25)spring-doc.cadn.net.cn

read-capacity

表上的读取容量。(长整型,默认:1spring-doc.cadn.net.cn

表格

用于元数据的表名。(字符串,默认:<none>)spring-doc.cadn.net.cn

time-to-live

表项的TTL。 (整数,默认:<none>spring-doc.cadn.net.cn

write-capacity

表上的写入容量。(长整型,默认:1)spring-doc.cadn.net.cn

metadata.store.gemfire
区域

元数据的 Gemfire 区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.jdbc
区域

使用此存储持久化的消息的唯一分组标识符。(字符串,默认:DEFAULT)spring-doc.cadn.net.cn

table-prefix

自定义表名的前缀。(字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store.mongo-db
集合

元数据的 MongoDB 集合名称。(字符串,默认:metadataStore)spring-doc.cadn.net.cn

metadata.store.redis

元数据的 Redis 键。 (字符串,默认:<none>)spring-doc.cadn.net.cn

metadata.store
类型

指示要配置的元数据存储类型(默认为“memory”)。若要使用持久化存储,您必须包含相应的Spring Integration依赖。(StoreType, 默认值: <none>, 可能的值: mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)spring-doc.cadn.net.cn

metadata.store.zookeeper
connect-string

Zookeeper连接字符串格式为HOST:PORT。(字符串,默认:127.0.0.1:2181spring-doc.cadn.net.cn

编码

存储数据到Zookeeper时使用的编码。(字符集,默认:UTF-8)spring-doc.cadn.net.cn

retry-interval

Zookeeper操作的重试间隔,单位为毫秒。(Integer,默认:1000)spring-doc.cadn.net.cn

根目录

根节点 - 存储条目是此节点的子节点。(字符串,默认:/SpringIntegration-MetadataStore)spring-doc.cadn.net.cn

sftp.supplier
auto-create-local-dir

如果不存在,则设置为 true 以创建本地目录。(Boolean,默认:true)spring-doc.cadn.net.cn

delay-when-empty

未检测到新文件时的延迟持续时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

delete-remote-files

设置为true表示在成功传输后删除远程文件。(布尔类型,默认:false)spring-doc.cadn.net.cn

目录

工厂“name.directory”对的列表。(String[],默认:<none>)spring-doc.cadn.net.cn

工厂

一个工厂名称到工厂的映射。(Map<String, Factory>, 默认值: <none>)spring-doc.cadn.net.cn

公平

多个服务器/目录公平轮询。默认为false,因此如果源有多个条目,则在访问其他源之前会先接收这些条目。(布尔型,默认:false)spring-doc.cadn.net.cn

filename-pattern

一个过滤器模式,用于匹配要传输的文件名。(字符串,默认:<none>)spring-doc.cadn.net.cn

filename-regex

用于匹配要传输的文件名的过滤正则表达式模式。(Pattern,默认值:<none>)spring-doc.cadn.net.cn

list-only

设置为 true 可以在不包含整个有效负载的情况下返回文件元数据。(布尔值,默认:false)spring-doc.cadn.net.cn

local-dir

用于文件传输的本地目录。(文件,默认:<none>)spring-doc.cadn.net.cn

max-fetch

每次轮询要获取的远程文件的最大数量;默认为无限制。在列出文件或构建任务启动请求时无效。(整数,默认:<none>)spring-doc.cadn.net.cn

preserve-timestamp

设置为 true 可保留原始时间戳。(布尔类型,默认值:true)spring-doc.cadn.net.cn

remote-dir

远程 FTP 目录。(字符串,默认:/spring-doc.cadn.net.cn

remote-file-separator

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

rename-remote-files-to

一个 SpEL 表达式,解析为新名称,在远程文件成功传输后必须重命名。(表达式,默认:<none>)spring-doc.cadn.net.cn

设置为 true 可以流式传输文件,而不是复制到本地目录。(Boolean,默认:false)spring-doc.cadn.net.cn

tmp-file-suffix

传输进行期间使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

sftp.supplier.factory
allow-unknown-keys

允许未知或已更改的密钥。 (Boolean,默认:false)spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

known-hosts-expression

一个解析为已知主机文件位置的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

pass-phrase

用户私钥的密码。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

密码

连接到服务器时使用的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:22)spring-doc.cadn.net.cn

private-key

用户私钥的位置。 (资源,默认:<none>)spring-doc.cadn.net.cn

用户名

连接服务器时要使用的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

sftp.supplier.sort-by
属性

文件列表条目排序的属性(FILENAME、ATIME:上次访问时间、MTIME:上次修改时间)。(属性,默认:<none>spring-doc.cadn.net.cn

目录

排序方向(升序或降序)。 (Dir,默认:<none>spring-doc.cadn.net.cn

5.14.4. 示例

java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
         --sftp.supplier.factory.username=user --ftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo

5.15. 系统日志

syslog源通过UDP、TCP或两者同时接收SYSLOG数据包。支持RFC3164(BSD)和RFC5424格式。spring-doc.cadn.net.cn

5.15.1. 选项

syslog.supplier.buffer-size

解码消息时使用的缓冲区大小;较大的消息将被拒绝。(Integer,默认:2048)spring-doc.cadn.net.cn

syslog.supplier.nio

是否使用NIO(在支持大量连接时)。(布尔值,默认:falsespring-doc.cadn.net.cn

syslog.supplier.port

要监听的端口。(整数,默认:1514)spring-doc.cadn.net.cn

syslog.supplier.protocol

用于 SYSLOG 的协议(tcp 或 udp)。(协议,缺省:<none>,有效值:tcpudpboth)spring-doc.cadn.net.cn

syslog.supplier.reverse-lookup

是否对传入的套接字执行反向查找。(布尔值,默认:false)spring-doc.cadn.net.cn

syslog.supplier.rfc

"5424" 或 "3164" - 根据 RFC 的 syslog 格式;3164 又称作 "BSD" 格式。 (字符串,默认值:3164)spring-doc.cadn.net.cn

syslog.supplier.socket-timeout

套接字超时时间。(整型,默认:0)spring-doc.cadn.net.cn

5.16. TCP

源代码tcp用作服务器,允许远程方连接到它并通过原始tcp套接字提交数据。spring-doc.cadn.net.cn

TCP是一种流式协议,需要某种机制来在传输线上对消息进行定界。提供了多种解码器,默认的是'CRLF',与Telnet兼容。spring-doc.cadn.net.cn

由TCP源应用程序生成的消息的byte[]有效负载。spring-doc.cadn.net.cn

5.16.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

TCP
中文

是否使用 NIO。(布尔值,默认:falsespring-doc.cadn.net.cn

端口

要监听的端口号;0 表示操作系统选择一个端口。(整数,默认:1234)spring-doc.cadn.net.cn

reverse-lookup

对远程IP地址执行反向DNS查找;如果为false,则仅在消息头中包含IP地址。(布尔值,默认:falsespring-doc.cadn.net.cn

socket-timeout

套接字在未收到数据前超时关闭的时间(毫秒)。(Integer,缺省:120000)spring-doc.cadn.net.cn

use-direct-buffers

是否使用直接缓冲区。(布尔值,默认:false)spring-doc.cadn.net.cn

tcp.supplier
buffer-size

解码消息时使用的缓冲区大小;较大的消息将被拒绝。(Integer,默认:2048)spring-doc.cadn.net.cn

解码器

接收消息时使用的解码器。(编码,默认:<none>,可选值:CRLFLFNULLSTXETXRAWL1L2L4)spring-doc.cadn.net.cn

5.16.2. 可用解码器

文本数据
CRLF(默认)

文本由回车符(0x0d)后跟换行符(0x0a)终止spring-doc.cadn.net.cn

LF

以换行符终止的文本(0x0a)spring-doc.cadn.net.cn

null

以空字节(0x00)终止的文本spring-doc.cadn.net.cn

STXETX

以 STX (0x02) 开头并以 ETX (0x03) 结尾的文本spring-doc.cadn.net.cn

文本和二进制数据
原始内容

无结构 - 客户端通过关闭套接字来表示一条完整的消息spring-doc.cadn.net.cn

L1

以一个字节(无符号)长度字段开头的数据(支持最多255字节)spring-doc.cadn.net.cn

L2

以两个字节(无符号)长度字段开头的数据(最多为216-1个字节)spring-doc.cadn.net.cn

L4

以四个字节(有符号)长度字段开头的数据(最多可达231-1字节)spring-doc.cadn.net.cn

5.17. 时间源

时间源将每隔一段时间简单地发出包含当前时间的字符串。spring-doc.cadn.net.cn

5.17.1. 可选项

时间源具有以下选项:timespring-doc.cadn.net.cn

spring.cloud.stream.poller.cron

Cron表达式值用于Cron触发器。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.cloud.stream.poller.fixed-delay

默认轮询程序的固定延迟时间。(Long,默认值:1000)spring-doc.cadn.net.cn

spring.cloud.stream.poller.initial-delay

周期性触发器的初始延迟。(整数,默认值:0)spring-doc.cadn.net.cn

spring.cloud.stream.poller.max-messages-per-poll

默认轮询器每次轮询的最大消息数。(Long,默认:1)spring-doc.cadn.net.cn

spring.cloud.stream.poller.time-unit

应用于延迟值的时间单位。(TimeUnit,默认:<none>,有效值:NANOSECONDSMICROSECONDSMILLISECONDSSECONDSMINUTESHOURSDAYS)spring-doc.cadn.net.cn

time.date-format

日期值的格式。(字符串,默认:MM/dd/yy HH:mm:ss)spring-doc.cadn.net.cn

5.18. Twitter 消息源

重复获取最近30天内的直接消息(包括已发送和接收的消息),并按时间倒序排列。
缓解后的消息被缓存(在MetadataStore缓存中)以防止重复。
默认使用内存中的SimpleMetadataStorespring-doc.cadn.net.cn

twitter.message.source.count控制返回消息的数量。spring-doc.cadn.net.cn

属性spring.cloud.stream.poller控制 the message poll interval。
必须与使用的API速率限制一致
spring-doc.cadn.net.cn

5.18.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

spring.cloud.stream.poller
定时任务

Cron表达式值用于Cron触发器。(字符串,默认:<none>)spring-doc.cadn.net.cn

fixed-delay

默认轮询程序的固定延迟时间。(Long,默认值:1000)spring-doc.cadn.net.cn

initial-delay

周期性触发器的初始延迟。(整数,默认值:0)spring-doc.cadn.net.cn

max-messages-per-poll

默认轮询器每次轮询的最大消息数。(Long,默认:1)spring-doc.cadn.net.cn

time-unit

应用于延迟值的时间单位。(TimeUnit,默认:<none>,有效值:NANOSECONDSMICROSECONDSMILLISECONDSSECONDSMINUTESHOURSDAYS)spring-doc.cadn.net.cn

twitter.connection
access-token

您的 Twitter Tokens。 (字符串,默认值:<none>spring-doc.cadn.net.cn

access-token-secret

您的 Twitter Tokens密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

consumer-key

您的 Twitter 密钥。(字符串,默认:<none>spring-doc.cadn.net.cn

consumer-secret

您的 Twitter 密钥。 (字符串,默认值:<none>)spring-doc.cadn.net.cn

debug-enabled

启用 Twitter4J 调试模式。(布尔值,默认:falsespring-doc.cadn.net.cn

raw-json

启用缓存由 Twitter API 返回的原始(未经处理)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 JSON 表示形式。当设置为 True 时,结果将使用原始的 Twitter API JSON 表示形式。(布尔值,默认:truespring-doc.cadn.net.cn

twitter.message.source
计数

要返回的事件最大数量。默认为20,最多50。(整数,默认:20)spring-doc.cadn.net.cn

5.19. Twitter搜索源

Twitter 的 标准搜索 API(search/tweets)允许对最近或热门推文的索引进行简单查询。这个 Source 提供了针对过去 7 天内发布的新近推文抽样的连续搜索。是 'public' API 集合的一部分。spring-doc.cadn.net.cn

返回与指定查询匹配的相关推文集合。spring-doc.cadn.net.cn

使用spring.cloud.stream.poller属性来控制连续搜索请求之间的间隔。速率限制-每30分钟窗口内180个请求(例如,约6次/分钟,约1次/10秒)spring-doc.cadn.net.cn

twitter.search查询属性允许通过关键字进行查询,并根据时间和地理位置过滤结果。spring-doc.cadn.net.cn

代码twitter.search.counttwitter.search.page控制搜索API的结果分页。spring-doc.cadn.net.cn

注意:Twitter 的搜索服务以及由此扩展的搜索 API 并不打算作为推文的全面来源。并非所有的推文都会被索引或通过搜索界面提供。spring-doc.cadn.net.cn

5.19.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

spring.cloud.stream.poller
定时任务

Cron表达式值用于Cron触发器。(字符串,默认:<none>)spring-doc.cadn.net.cn

fixed-delay

默认轮询程序的固定延迟时间。(Long,默认值:1000)spring-doc.cadn.net.cn

initial-delay

周期性触发器的初始延迟。(整数,默认值:0)spring-doc.cadn.net.cn

max-messages-per-poll

默认轮询器每次轮询的最大消息数。(Long,默认:1)spring-doc.cadn.net.cn

time-unit

应用于延迟值的时间单位。(TimeUnit,默认:<none>,有效值:NANOSECONDSMICROSECONDSMILLISECONDSSECONDSMINUTESHOURSDAYS)spring-doc.cadn.net.cn

twitter.connection
access-token

您的 Twitter Tokens。 (字符串,默认值:<none>spring-doc.cadn.net.cn

access-token-secret

您的 Twitter Tokens密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

consumer-key

您的 Twitter 密钥。(字符串,默认:<none>spring-doc.cadn.net.cn

consumer-secret

您的 Twitter 密钥。 (字符串,默认值:<none>)spring-doc.cadn.net.cn

debug-enabled

启用 Twitter4J 调试模式。(布尔值,默认:falsespring-doc.cadn.net.cn

raw-json

启用缓存由 Twitter API 返回的原始(未经处理)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 JSON 表示形式。当设置为 True 时,结果将使用原始的 Twitter API JSON 表示形式。(布尔值,默认:truespring-doc.cadn.net.cn

计数

每页返回的推文数量(例如,每次单一请求),最多为100个。(整数,默认:100spring-doc.cadn.net.cn

语言

将搜索到的推文限制为指定语言,由 http://en.wikipedia.org/wiki/ISO_639-1 定义。(字符串,默认:<none>spring-doc.cadn.net.cn

页面

从最近的推文开始搜索之前,要向后搜索的页面数量(例如请求),以在最近年份的推文中再次启动搜索。总共向后搜索的推文数为(页码 * 数量)(整数,默认:3spring-doc.cadn.net.cn

查询

通过搜索查询字符串搜索推文。(字符串,默认:<none>)spring-doc.cadn.net.cn

restart-from-most-recent-on-empty-response

在空响应时,从最近的推文重新开始搜索。仅在第一次重启后应用(例如,当 since_id != UNBOUNDED 时)(布尔值,默认:falsespring-doc.cadn.net.cn

result-type

指定您偏好的搜索结果类型。当前默认值为“混合”。有效值包括:混合:在响应中同时包含热门和实时结果。最近:仅在响应中返回最新的结果。热门:仅在响应中返回最热门的结果。(ResultType,默认:<none>,可能值:popularmixedrecent)spring-doc.cadn.net.cn

Since

如果指定,返回自给定日期以来的推文。日期应按 YYYY-MM-DD 格式设置。(字符串,默认:<none>spring-doc.cadn.net.cn

twitter.search.geocode
纬度

用户的纬度。 (双精度浮点数,默认值:-1)spring-doc.cadn.net.cn

经度

用户的经度。(双精度浮点数,默认:-1spring-doc.cadn.net.cn

半径

以(纬度,经度)点为中心的半径(单位:千米)。(双精度浮点数,默认:-1spring-doc.cadn.net.cn

5.20. Twitter 流源

实时推文流 过滤采样 API 支持。spring-doc.cadn.net.cn

  • The Filter API 返回与一个或多个过滤器谓词匹配的公共状态。 使用多个参数可以使用单个连接到流式API。 提示:trackfollowlocations 字段用运算符组合! 包含 track=foofollow=1234 的查询返回匹配 test由用户 1234 创建的推文。spring-doc.cadn.net.cn

  • Sample API 返回所有公共状态的小随机样本。
    默认访问级别的推文返回的结果相同,因此如果两个不同的客户端连接到此端点,则它们会看到相同的推文。spring-doc.cadn.net.cn

默认访问级别允许最多400个跟踪关键字、5,000个关注用户ID以及25个0.1-360度的位置框。spring-doc.cadn.net.cn

5.20.1. 可选项

按前缀分组的属性:spring-doc.cadn.net.cn

twitter.connection
access-token

您的 Twitter Tokens。 (字符串,默认值:<none>spring-doc.cadn.net.cn

access-token-secret

您的 Twitter Tokens密钥。(字符串,默认:<none>)spring-doc.cadn.net.cn

consumer-key

您的 Twitter 密钥。(字符串,默认:<none>spring-doc.cadn.net.cn

consumer-secret

您的 Twitter 密钥。 (字符串,默认值:<none>)spring-doc.cadn.net.cn

debug-enabled

启用 Twitter4J 调试模式。(布尔值,默认:falsespring-doc.cadn.net.cn

raw-json

启用缓存由 Twitter API 返回的原始(未经处理)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 JSON 表示形式。当设置为 True 时,结果将使用原始的 Twitter API JSON 表示形式。(布尔值,默认:truespring-doc.cadn.net.cn

twitter.stream.filter
计数

指示在转换到实时流之前要流式传输的先前状态的数量。(整数,默认:0spring-doc.cadn.net.cn

filter-level

过滤器级别限制流中显示的推文,仅包括具有最小filterLevel属性值的推文。可以是none、low或medium之一。(FilterLevel,默认:<none>)spring-doc.cadn.net.cn

关注

指定要接收公共推文的用户,按ID。(List<Long>, 默认:<none>)spring-doc.cadn.net.cn

语言

指定流的推文语言。(List<String>, 默认: <none>)spring-doc.cadn.net.cn

位置

要跟踪的位置。内部表示为二维数组。边界框无效:52.38,4.90,51.51,-0.12。第一对必须是框的西南角(List<BoundingBox>,默认:<none>)spring-doc.cadn.net.cn

跟踪

指定要跟踪的关键词。(List<String>, 默认: <none>)spring-doc.cadn.net.cn

twitter.stream
类型

<documentation missing> (数据流类型,默认值:<none>,可选值:samplefilterfirehoselink)spring-doc.cadn.net.cn

5.21. WebSocket 源

通过 WebSocket 生产消息的 Websocket 源。spring-doc.cadn.net.cn

5.21.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

websocket.supplier
allowed-origins

允许的来源。 (字符串,默认:*)spring-doc.cadn.net.cn

路径

服务器WebSocket处理器暴露的路径。(字符串,默认:/websocket)spring-doc.cadn.net.cn

websocket.supplier.sock-js
启用

在服务器上启用 SockJS 服务。默认为 'false'(Boolean,默认:false)spring-doc.cadn.net.cn

5.21.2. 示例

要验证 websocket-source 是否从 WebSocket 客户端接收消息,可以使用以下简单的端到端设置。spring-doc.cadn.net.cn

步骤1:启动kafka
第二步:部署websocket-source在特定端口上,例如8080
第3步:在端口8080路径“/websocket”上连接一个WebSocket客户端,并发送一些消息。

您可以启动一个kafka控制台消费者并在那里查看消息。spring-doc.cadn.net.cn

5.22. ZeroMQ源

“zeromq”源启用从ZeroMQ接收消息。spring-doc.cadn.net.cn

5.22.1. 输入

5.22.2. 输出

负载

5.22.3. 选项

Java开发Spring框架的英文网站有以下选项:zeromqspring-doc.cadn.net.cn

zeromq.supplier.bind-port

创建 ZeroMQ 套接字时绑定的端口;0 表示选择一个随机端口。(整数,默认:0)spring-doc.cadn.net.cn

zeromq.supplier.connect-url

连接到 ZeroMQ 套接字的 URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

zeromq.supplier.consume-delay

当 ZeroMQ Socket 没有接收到数据时,消费的延迟。(持续时间,默认:1s)spring-doc.cadn.net.cn

zeromq.supplier.socket-type

连接应建立的套接字类型。(SocketType,默认:<none>,可能值:PAIRPUBSUBREQREPDEALERROUTERPULLPUSHXPUBXSUBSTREAM)spring-doc.cadn.net.cn

zeromq.supplier.topics

要订阅的主题。(字符串数组,默认:[])spring-doc.cadn.net.cn

也请参见 Spring Boot 文档 对于 broker 连接和监听器属性的额外属性。spring-doc.cadn.net.cn

5.22.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

5.22.5. 示例

java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=

6. 处理器

6.1. 聚合处理器

聚合处理器使应用程序能够将传入的消息分组并释放到输出目标中。spring-doc.cadn.net.cn

java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbcspring-doc.cadn.net.cn

如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。spring-doc.cadn.net.cn

负载

6.1.2. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

聚合器
聚合

用于聚合策略的SpEL表达式。默认为负载集合。(表达式,默认:<none>)spring-doc.cadn.net.cn

相关性

用于关联键的 SpEL 表达式。默认为 correlationId 标头。(表达式,默认:<none>)spring-doc.cadn.net.cn

group-timeout

SpEL 表达式用于设置超时未完成组的过期时间。(表达式,默认:<none>)spring-doc.cadn.net.cn

message-store-entity

持久化消息存储实体:关系型数据库中的表前缀,MongoDb 中的集合名称等。(字符串,默认:<none>)spring-doc.cadn.net.cn

message-store-type

消息存储类型。 (字符串,默认:<none>spring-doc.cadn.net.cn

发布

发布策略的 SpEL 表达式。默认情况下基于 sequenceSize 请求头。(表达式,默认:<none>)spring-doc.cadn.net.cn

spring.data.mongodb
authentication-database

认证数据库名称。(字符串,默认:<none>spring-doc.cadn.net.cn

auto-index-creation

是否启用自动索引创建。(布尔类型,默认:<none>)spring-doc.cadn.net.cn

数据库

数据库名称。 (字符串,默认:<none>)spring-doc.cadn.net.cn

field-naming-strategy

要使用的 FieldNamingStrategy 的完全限定名。(Class<?>, 默认:<none>)spring-doc.cadn.net.cn

grid-fs-database

<文档缺失> (字符串,默认:<none>spring-doc.cadn.net.cn

主机

Mongo服务器主机。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

登录 MongoDB 服务器的密码。不能与 URI 一起设置。(字符数组,默认:<none>)spring-doc.cadn.net.cn

端口

Mongo服务器端口。不能与URI一起设置。(Integer,默认:<none>spring-doc.cadn.net.cn

replica-set-name

群集所需的副本集名称。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uri

Mongo 数据库连接字符串。不能与主机、端口、凭据和副本集名称一起设置。(字符串,默认:mongodb://localhost/test)spring-doc.cadn.net.cn

用户名

登录 MongoDB 服务器的用户。不能与 URI 同时设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uuid-representation

转换 UUID 为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认:java-legacy,可能的值:UNSPECIFIEDSTANDARDC_SHARP_LEGACYJAVA_LEGACYPYTHON_LEGACY)spring-doc.cadn.net.cn

spring.datasource
continue-on-error

初始化数据库时如果发生错误是否停止。(布尔类型,默认:falsespring-doc.cadn.net.cn

数据

数据(DML)脚本资源引用。(List<String>,默认:<none>spring-doc.cadn.net.cn

data-password

执行DML脚本时的数据库密码(如果不同)。(字符串,默认:<none>spring-doc.cadn.net.cn

data-username

要执行DML脚本的数据库用户名(如果不同)。(字符串,默认:<none>spring-doc.cadn.net.cn

driver-class-name

JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

embedded-database-connection

嵌入式数据库的连接详情。如果类路径中存在可用的最合适的嵌入式数据库,则默认使用该数据库。(EmbeddedDatabaseConnection,默认值:<none>,可能的值:NONEH2DERBYHSQLHSQLDB)spring-doc.cadn.net.cn

generate-unique-name

是否生成随机数据源名称。(布尔型,默认:true)spring-doc.cadn.net.cn

initialization-mode

用于确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化的模式。(DataSourceInitializationMode,默认值:embedded,可选值:ALWAYSEMBEDDEDNEVER)spring-doc.cadn.net.cn

jndi-name

JNDI位置的数据源。设置后,类、URL、用户名和密码将被忽略。(字符串,默认:<none>spring-doc.cadn.net.cn

姓名

如果 "generate-unique-name" 为 false,则要使用的数据源名称。当使用嵌入式数据库时,默认值为 "testdb",否则为 null。(String,默认:<none>)spring-doc.cadn.net.cn

密码

数据库登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

平台

DDL 或 DML 脚本中使用的平台(例如 schema-${platform}.sql 或 data-${platform}.sql)。(字符串,默认:all)spring-doc.cadn.net.cn

架构

Schema (DDL) script resource references. (List<String>, default: <none>)spring-doc.cadn.net.cn

schema-password

执行 DDL 脚本的数据库密码(如果不同)。(字符串,默认:<none>spring-doc.cadn.net.cn

schema-username

要执行DDL脚本的数据库的用户名(如果不同的话)。(字符串,默认:<none>spring-doc.cadn.net.cn

分隔符

SQL初始化脚本中的语句分隔符。(字符串,默认:;)spring-doc.cadn.net.cn

sql-script-encoding

SQL 脚本编码。 (字符集,默认:<none>)spring-doc.cadn.net.cn

类型

要使用的连接池实现的完全限定名。默认情况下,它会自动从类路径中检测。(Class<DataSource>,默认:<none>spring-doc.cadn.net.cn

url

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

数据库登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.mongodb.embedded
功能

要启用的功能的逗号分隔列表。默认情况下使用配置版本的默认值。(Set<Feature>, 默认:[sync_delay])spring-doc.cadn.net.cn

版本

要使用的 Mongo 版本。(字符串,默认:3.5.5)spring-doc.cadn.net.cn

spring.redis
client-name

连接上要设置的客户端名称,通过 CLIENT SETNAME 设置。 (字符串,默认:<none>)spring-doc.cadn.net.cn

client-type

要使用的客户端类型。默认情况下,根据类路径自动检测。(ClientType,默认值:<none>,可能的取值:LETTUCEJEDIS)spring-doc.cadn.net.cn

connect-timeout

连接超时。(持续时间,默认:<none>)spring-doc.cadn.net.cn

数据库

连接工厂使用的数据库索引。(整数,默认:0)spring-doc.cadn.net.cn

主机

Redis服务器主机。(字符串,默认:localhostspring-doc.cadn.net.cn

密码

Redis服务器的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

Redis服务器端口。(整数,默认:6379spring-doc.cadn.net.cn

SSL

是否启用SSL支持。(布尔类型,默认:falsespring-doc.cadn.net.cn

timeout

读取超时时间。(持续时间,默认:<none>)spring-doc.cadn.net.cn

url

连接URL。覆盖主机、端口和密码。用户被忽略。例如:redis://user:[email protected]:6379 (字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

登录 Redis 服务器的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

6.2. 桥接处理器

一个处理器,通过简单地将传入的有效负载传递到出站来连接输入和输出。spring-doc.cadn.net.cn

负载

6.3 过滤器处理器

过滤器处理器允许应用程序检查传入的有效载荷,然后对其应用谓词以决定是否需要继续记录。 例如,如果传入的有效载荷类型为 String 并且您想筛选掉少于五个字符的内容,则可以按如下方式运行过滤器处理器。spring-doc.cadn.net.cn

java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4spring-doc.cadn.net.cn

如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。spring-doc.cadn.net.cn

负载

您可以传递任何类型的负载,然后对其应用SpEL表达式进行过滤。如果传入的类型是byte[]且内容类型设置为text/plainapplication/json,则应用程序会将byte[]转换为Stringspring-doc.cadn.net.cn

6.3.2. 选项

filter.function.expression

针对请求消息进行过滤的布尔型SpEL表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

6.4. Groovy 处理器

一个处理器,用于在消息上应用 Groovy 脚本。spring-doc.cadn.net.cn

6.4.1. 选项

groovy-processor 处理器具有以下选项:spring-doc.cadn.net.cn

groovy-processor.script

指向用于处理消息的脚本的引用。(资源,缺省:<none>)spring-doc.cadn.net.cn

groovy-processor.variables

变量绑定作为以换行符分隔的名称-值对字符串,例如 'foo=bar baz=car'。(属性,默认:<none>)spring-doc.cadn.net.cn

groovy-processor.variables-location

包含自定义脚本变量绑定的属性文件的位置。(资源,缺省:<none>)spring-doc.cadn.net.cn

6.5. Header Enricher Processor

使用 header-enricher 应用添加消息头。spring-doc.cadn.net.cn

标题以换行符分隔的键值对形式提供,其中键是标题名称,值为SpEL表达式。
例如 --headers='foo=payload.someProperty \n bar=payload.otherProperty'spring-doc.cadn.net.cn

6.5.1. 选项

带有 header-enricher 处理器具有以下选项:spring-doc.cadn.net.cn

header.enricher.headers

换行分隔的属性表示标题,其中值是SpEL表达式,例如 foo='bar' baz=payload.baz。 (属性,默认:<none>)spring-doc.cadn.net.cn

header.enricher.overwrite

设置为 true 可覆盖任何现有的消息头。(布尔类型,默认:false)spring-doc.cadn.net.cn

6.6. Http 请求处理器

一个处理器应用程序,该程序向HTTP资源发出请求,并将响应正文作为消息有效负载进行发布。spring-doc.cadn.net.cn

6.6.1. 输入

headers

任何必需的 HTTP 头部必须通过 headersheaders-expression 属性显式设置。请参见下面的示例。
头部值也可用于构造:spring-doc.cadn.net.cn

负载

默认情况下,负载用作 POST 请求的请求体,并且可以是任何 Java 类型。对于 GET 请求,它应该是一个空字符串。负载还可以用于构建:spring-doc.cadn.net.cn

底层的WebClient支持Jackson JSON序列化,以支持任何请求和响应类型。expected-response-type属性,默认为String.class,可以设置为您应用程序类路径中的任何类。请注意,用户定义的有效负载类型需要向您的pom文件添加所需的依赖项。spring-doc.cadn.net.cn

6.6.2. 输出

headers

没有HTTP消息头被映射到出站Message。spring-doc.cadn.net.cn

负载

原始输出对象是ResponseEntity<?>,其任何字段(例如:bodyheaders)或访问器方法(statusCode)都可以作为reply-expression的一部分引用。默认情况下,传出消息的有效负载是响应体。请注意,Jackson 默认无法反序列化 ResponseEntity(由表达式 #root 引用),但可以将其渲染为HashMapspring-doc.cadn.net.cn

6.6.3. 选项

The http-request processor has the following options:spring-doc.cadn.net.cn

6.6.4. 选项

http.request.body-expression

从传入消息中推导请求体的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

http.request.expected-response-type

用于解释响应的类型。(Class<?>,默认:<none>spring-doc.cadn.net.cn

http.request.headers-expression

用于推导要使用的 HTTP 头部映射的 SpEL 表达式。 (表达式,默认:<none>spring-doc.cadn.net.cn

http.request.http-method-expression

从传入的消息推导请求方法的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

http.request.maximum-buffer-size

输入流缓冲区分配的最大字节数。默认值为256k。如需发布或获取大二进制内容,请适当增加。(Integer,缺省:0)spring-doc.cadn.net.cn

http.request.reply-expression

用于计算最终结果的SpEL表达式,应用于整个http {@link org.springframework.http.ResponseEntity}。 (表达式,默认:<none>)spring-doc.cadn.net.cn

http.request.timeout

请求超时时间(毫秒)。(Long,默认:30000spring-doc.cadn.net.cn

http.request.url-expression

对传入消息进行SpEL表达式计算以确定要使用的URL。(表达式,默认:<none>)spring-doc.cadn.net.cn

6.7. 图像识别处理器

一个使用Inception模型对实时图像进行分类的处理器,可以将图像分为不同的类别(例如标签)。spring-doc.cadn.net.cn

模型实现了一个深度的卷积神经网络,可以在困难的视觉识别任务上达到合理性能- 匹配或超过某些领域(如图像识别)中的人类表现。spring-doc.cadn.net.cn

模型的输入是二进制数组形式的图像。spring-doc.cadn.net.cn

输出为以下格式的JSON消息:spring-doc.cadn.net.cn

{
  "labels" : [
     {"giant panda":0.98649305}
  ]
}

Result 包含识别类别的名称(例如 label)以及该图像代表此类别的置信度(例如 confidence)。spring-doc.cadn.net.cn

如果将response-seize设置为高于1的值,则结果将包括最可能的前response-seize个标签。例如,response-size=3将返回:spring-doc.cadn.net.cn

{
  "labels": [
    {"giant panda":0.98649305},
    {"badger":0.010562794},
    {"ice bear":0.001130851}
  ]
}
负载

如果传入类型为 byte[] 且内容类型设置为 application/octet-stream,则应用程序将输入的 byte[] 图像处理并输出增强后的 byte[] 图像负载和 JSON 标头。spring-doc.cadn.net.cn

6.7.2. 选项

image.recognition.cache-model

缓存预训练的 TensorFlow 模型。(布尔类型,默认:true)spring-doc.cadn.net.cn

image.recognition.debug-output

<文档缺失> (布尔值,默认: false)spring-doc.cadn.net.cn

image.recognition.debug-output-path

<文档缺失> (字符串,默认:image-recognition-result.pngspring-doc.cadn.net.cn

image.recognition.model

预训练的TensorFlow图像识别模型。请注意,该模型必须与所选的模型类型匹配!(字符串,默认:https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb)spring-doc.cadn.net.cn

image.recognition.model-type

支持三种不同的预训练 tensorflow 图像识别模型:inception、mobilenetv1 和 mobilenetv2。1. inception图使用“input”作为输入,“output”作为输出。2. mobilenetv2 预训练模型:https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - 标准化图像尺寸始终为正方形(例如 h=w)- 图使用“input”作为输入和“mobilenetv2/predictions/reshape_1”作为输出。3. mobilenetv1 预训练模型:https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - 图使用“input”作为输入和“mobilenetv1/predictions/reshape_1”作为输出。(modeltype,默认:<none>,可能值:inceptionmobilenetv1mobilenetv2)spring-doc.cadn.net.cn

image.recognition.normalized-image-size

归一化图像大小。(整数,默认:224)spring-doc.cadn.net.cn

image.recognition.response-size

已识别图片的数量。(整数,默认值:5)spring-doc.cadn.net.cn

6.8. 对象检测处理器

对象检测处理器提供开箱即用的 TensorFlow 对象检测 API 支持。它允许在单个图像或图像流中实时定位和识别多个对象。对象检测处理器是基于 对象检测功能 构建的。spring-doc.cadn.net.cn

您必须向处理器提供一个预训练的目标检测模型,以及相应的对象标签spring-doc.cadn.net.cn

这里是一些合理的配置默认值:spring-doc.cadn.net.cn

下图显示了一个 Spring Cloud 数据流,用于实时预测输入图像流中的对象类型。spring-doc.cadn.net.cn

scdf tensorflow object detection arch

处理器的输入是一个图像字节数组,输出是增强后的图像和一个标题,称为detected_objects,它提供了检测到的对象的文字描述:spring-doc.cadn.net.cn

{
  "labels" : [
     {"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
     {"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
     {"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
     {"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
  ]
}

0 头格式是:
spring-doc.cadn.net.cn

  • 对象名称:置信度 - 检测到的对象的人类可读名称(例如,标签)及其置信度,置信度为[0-1]之间的浮点数spring-doc.cadn.net.cn

  • x1y1x2y2 - 响应还提供了检测到的对象的边界框,表示为 (x1, y1, x2, y2)。坐标相对于图像大小进行相对定位。spring-doc.cadn.net.cn

  • cid-分类标识符,如所提供的标签配置文件中定义。spring-doc.cadn.net.cn

负载

传入的类型是 byte[],内容类型是 application/octet-stream。处理器处理输入的 byte[] 图像,并输出一个增强的 byte[] 图像负载和一个 JSON 头部(detected_objects)。spring-doc.cadn.net.cn

6.8.2. 选项

object.detection.cache-model

<文档缺失> (布尔值,默认: true)spring-doc.cadn.net.cn

object.detection.confidence

<文档缺失> (浮点数,默认:0.4)spring-doc.cadn.net.cn

object.detection.debug-output

<文档缺失> (布尔值,默认: false)spring-doc.cadn.net.cn

object.detection.debug-output-path

<文档缺失> (字符串,默认:object-detection-result.pngspring-doc.cadn.net.cn

object.detection.labels

标签 URI。 (字符串,默认:https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt)spring-doc.cadn.net.cn

object.detection.model

预训练的 TensorFlow 对象检测模型。(字符串,默认:https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb)spring-doc.cadn.net.cn

object.detection.response-size

<documentation missing> (Integer,默认:<none>spring-doc.cadn.net.cn

object.detection.with-masks

<文档缺失> (布尔值,默认: false)spring-doc.cadn.net.cn

6.9. 语义分割处理器

基于最先进的DeepLab TensorFlow模型的图像语义分割。spring-doc.cadn.net.cn

图像分割是将图像中的每个像素与一个类别标签(如花朵、人、道路、天空、海洋或汽车)关联起来的过程。Semantic Segmentation不同于Instance Segmentation,后者生成实例感知的区域掩码,而Semantic Segmentation则生成类别感知的掩码。要实现Instance Segmentation,请参阅目标检测服务spring-doc.cadn.net.cn

Semantic Segmentation Processor 使用 语义分割功能 库和 TensorFlow 服务spring-doc.cadn.net.cn

负载

传入的类型是 byte[],内容类型是 application/octet-stream。处理器处理输入的 byte[] 图像,并输出增强的 byte[] 图像负载和 json 头。spring-doc.cadn.net.cn

处理器的输入是图像字节数组,输出是增强后的图像字节数组,并且包含一个JSON标头semantic_segmentation,格式如下:spring-doc.cadn.net.cn

[
    [ 0, 0, 0 ],
    [ 127, 127, 127 ],
    [ 255, 255, 255 ]
    ...
]

输出头的json格式表示从输入图像计算出的颜色像素图。spring-doc.cadn.net.cn

6.9.2. 选项

semantic.segmentation.color-map-uri

每个预训练模型都基于某些对象颜色映射。预定义的选项有:- classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json (字符串,默认:classpath:/colormap/citymap_colormap.json)spring-doc.cadn.net.cn

semantic.segmentation.debug-output

将输出图像保存在本地 debugOutputPath 路径下。(布尔类型,默认:falsespring-doc.cadn.net.cn

semantic.segmentation.debug-output-path

<文档缺失> (字符串,默认:semantic-segmentation-result.pngspring-doc.cadn.net.cn

semantic.segmentation.mask-transparency

计算分割掩码图像的alpha颜色。(浮点数,默认:0.45)spring-doc.cadn.net.cn

semantic.segmentation.model

预训练的TensorFlow语义分割模型。(字符串,默认:https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb)spring-doc.cadn.net.cn

semantic.segmentation.output-type

指定输出图像类型。您可以返回带有计算掩码叠加的输入图像,或者仅返回掩码。(OutputType,默认值:<none>,可选值:blendedmask)spring-doc.cadn.net.cn

6.10. 脚本处理器

使用脚本转换消息的处理器。脚本正文作为属性值直接提供。可以指定脚本语言(groovy/javascript/ruby/python)。spring-doc.cadn.net.cn

6.10.1. 选项

脚本处理器有以下选项:script-processorspring-doc.cadn.net.cn

script-processor.language

脚本属性中文本的语言。支持的类型为:groovy、javascript、ruby、python。(字符串,默认值:<none>spring-doc.cadn.net.cn

script-processor.script

脚本文本。(字符串,默认:<none>)spring-doc.cadn.net.cn

script-processor.variables

变量绑定作为以换行符分隔的名称-值对字符串,例如 'foo=bar baz=car'。(属性,默认:<none>)spring-doc.cadn.net.cn

script-processor.variables-location

包含自定义脚本变量绑定的属性文件的位置。(资源,缺省:<none>)spring-doc.cadn.net.cn

6.11. 分割处理器

拆分器应用程序建立在Spring Integration中的同名概念之上,允许将单个消息拆分为多个不同的消息。
处理器使用一个函数作为输入Message<?>,然后根据各种属性产生输出List<Message<?>(见下文)。
您可以使用SpEL表达式或定界符来指定如何拆分传入的消息。spring-doc.cadn.net.cn

负载

如果传入的类型是 byte[] 并且内容类型设置为 text/plainapplication/json,则应用程序会将 byte[] 转换为 Stringspring-doc.cadn.net.cn

6.11.2. 选项

splitter.apply-sequence

在标头中添加关联/序列信息,以方便稍后的聚合。(布尔值,默认:true)spring-doc.cadn.net.cn

splitter.charset

在将基于文本的文件中的字节转换为字符串时要使用的字符集。(String,默认:<none>)spring-doc.cadn.net.cn

splitter.delimiters

当表达式为 null 时,用于对 {@link String} 负载进行标记化的分隔符。 (字符串,默认值:<none>)spring-doc.cadn.net.cn

splitter.expression

用于拆分有效负载的 SpEL 表达式。(字符串,默认值:<none>)spring-doc.cadn.net.cn

splitter.file-markers

设置为 true 或 false,以使用一个 {@code FileSplitter}(按行拆分基于文本的文件)来包含(或不包含)文件开头/结尾标记。(布尔型,默认:<none>spring-doc.cadn.net.cn

splitter.markers-json

当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:true)spring-doc.cadn.net.cn

6.12. 转换处理器

转换器处理器允许您基于SpEL表达式来转换消息负载结构。spring-doc.cadn.net.cn

以下是运行此应用程序的示例。spring-doc.cadn.net.cn

java -jar filter-processor-kafka-<version>.jar --spel.function.expression=toUpperCase()spring-doc.cadn.net.cn

如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。spring-doc.cadn.net.cn

负载

传入的消息可以包含任何类型的负载。spring-doc.cadn.net.cn

6.12.2. 选项

spel.function.expression

要应用的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

6.13. Twitter趋势和趋势位置处理器

能够返回热门话题或热门话题位置的处理器。
twitter.trend.trend-query-type属性允许选择查询类型。spring-doc.cadn.net.cn

在此模式下,将twitter.trend.trend-query-type设置为trendspring-doc.cadn.net.cn

基于趋势API的处理器。返回特定纬度、经度位置附近的流行话题spring-doc.cadn.net.cn

6.13.2. 获取趋势位置

在此模式下,将twitter.trend.trend-query-type设置为trendLocationspring-doc.cadn.net.cn

通过位置检索热门话题的完整列表或附近的位置列表。spring-doc.cadn.net.cn

如果未提供latitudelongitude参数,则处理器执行可用趋势API,并返回Twitter具有话题趋势信息的位置。spring-doc.cadn.net.cn

如果提供了latitudelongitude参数,处理器会执行趋势最近API并返回Twitter具有趋势话题信息的位置,这些位置最接近指定的位置。spring-doc.cadn.net.cn

响应是一个 locations 数组,用于编码位置的 WOEID 和其他人类可读信息,例如规范名称和该位置所属国家。spring-doc.cadn.net.cn

6.13.3. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

twitter.trend.closest
纬度

如果提供了长参数,可用趋势位置将以距离坐标对的距离排序,从最近到最远。经度的有效范围是 -180.0 到 +180.0(西为负,东为正),包含边界。(表达式,默认:<none>)spring-doc.cadn.net.cn

如果提供了 lat 参数,则可用的趋势位置将按照距离排序,最近到最远,对应坐标对。经度的有效范围为 -180.0 到 +180.0(西为负,东为正),包括端点。(表达式,默认:<none>)spring-doc.cadn.net.cn

twitter.trend
location-id

要返回趋势信息的位置的 Yahoo! 地方地球 ID。全球信息可使用 1 作为 WOEID 获取。(表达式,默认:payload)spring-doc.cadn.net.cn

trend-query-type

<文档缺失> (趋势查询类型,默认值: <none>,可能的值: trend,trendLocation)spring-doc.cadn.net.cn

7. 消息通道

7.1. Cassandra Sink(Cassandra 接收器)

此接收应用程序将接收到的每个消息的内容写入Cassandra。spring-doc.cadn.net.cn

它期望一个JSON字符串作为负载,并使用其属性来映射到表列。spring-doc.cadn.net.cn

负载

表示要持久化的实体(或实体列表)的 JSON 字符串或字节数组。spring-doc.cadn.net.cn

7.1.2. 选项

cassandra接收器具有以下选项:
spring-doc.cadn.net.cn

spring.data.cassandra.compression

通过Cassandra二进制协议支持的压缩。(压缩,默认:none,可能值:LZ4SNAPPYNONEspring-doc.cadn.net.cn

spring.data.cassandra.config

要使用的配置文件的位置。(资源,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.contact-points

集群节点地址,格式为“host:port”,或者仅使用配置端口的简单“host”。(List<String>, 默认:[127.0.0.1:9042])spring-doc.cadn.net.cn

spring.data.cassandra.keyspace-name

要使用的 Keyspace 名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.local-datacenter

被视为“本地”的数据中心。联系点应来自此数据中心。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.password

服务器的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.data.cassandra.port

如果联系点未指定,则使用的端口。(整数,默认:9042)spring-doc.cadn.net.cn

spring.data.cassandra.schema-action

启动时要采取的架构操作。(字符串,默认:none)spring-doc.cadn.net.cn

spring.data.cassandra.session-name

命名 Cassandra 会话。(字符串,默认值:<none>spring-doc.cadn.net.cn

spring.data.cassandra.ssl

启用 SSL 支持。(布尔类型,默认:falsespring-doc.cadn.net.cn

spring.data.cassandra.username

服务器登录用户。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.2. 分析接收器

基于分析消费者构建的接收应用程序,该程序从输入消息中计算分析,并将分析结果作为指标发布到各种监控系统。它利用micrometer库在最流行的监控系统上提供统一的编程体验,并公开Spring表达式语言(SpEL)属性,用于定义如何从输入数据中计算度量名称、值和标签。spring-doc.cadn.net.cn

分析接收器可以生成两种指标类型:spring-doc.cadn.net.cn

一个 计量器(例如计数器或仪表)由其唯一的 namedimensions 确定(术语维度和标签可以互换使用)。维度允许对特定命名指标进行切片,以便深入分析和推理数据。spring-doc.cadn.net.cn

由于指标由其namedimensions唯一标识,因此您可以为每个指标分配多个标签(例如键/值对),但之后不能随意更改这些标签!如果具有相同名称的指标具有不同的标签集,则监控系统(如Prometheus)会发出警告。

使用analytics.nameanalytics.name-expression属性设置输出分析指标的名称。如果没有设置,指标名称默认为应用程序的名称。spring-doc.cadn.net.cn

使用analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>属性可以向您的指标添加一个或多个标签。TAG_NAME在属性定义中将作为标签名称出现在指标中。TAG_VALUE是SpEL表达式,该表达式从传入的消息动态计算出标签值。spring-doc.cadn.net.cn

表达式使用SpELheaders关键字来访问消息的头和有效载荷值。spring-doc.cadn.net.cn

您可以使用字面量(例如 'fixed value')来设置具有固定值的标签。

所有流应用程序开箱即用地支持三种最流行的监控系统,WavefrontPrometheusInfluxDB,您可以通过声明方式启用每种系统。通过只需向 Analytics Sink 应用程序添加它们的 Micrometer Meter-Registry 依赖项即可添加对其他监控系统的支持。spring-doc.cadn.net.cn

请访问 Spring Cloud Data Flow 流监控 获取配置监控系统的详细说明。以下快速片段可帮助您开始。spring-doc.cadn.net.cn

management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
如果启用了数据流服务器监控,则Analytics Sink将重用提供的度量配置。

下图说明了Analytics Sink如何帮助收集股票交易所和实时数据管道的业务内部信息。spring-doc.cadn.net.cn

Analytics Architecture
负载

传入的消息可以包含任何类型的负载。spring-doc.cadn.net.cn

7.2.2. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

分析
amount-expression

用于计算输出指标值(例如金额)的 SpEL 表达式。默认值为 1.0 (表达式,默认:<none>)spring-doc.cadn.net.cn

meter-type

用于向后端报告指标的 Micrometer 计量类型。(MeterType,默认值:<none>,可能的取值:countergauge)spring-doc.cadn.net.cn

姓名

输出指标的名称。'name' 和 'nameExpression' 相互排斥,只能设置其中一个。(字符串,默认:<none>)spring-doc.cadn.net.cn

name-expression

用于从输入消息计算输出指标名称的SpEL表达式。'name' 和 'nameExpression' 是互斥的,只能设置其中一个。(表达式,默认:<none>)spring-doc.cadn.net.cn

analytics.tag
表达式

从 SpEL 表达式计算标签。单个 SpEL 表达式可以生成值数组,这意味着不同的名称/值标签。每个名称/值标签都会产生单独的计量器增量。标签表达式格式是:analytics.tag.expression.[tag-name]=[SpEL 表达式] (Map<String, Expression>, 默认: <none>)spring-doc.cadn.net.cn

固定

已弃用:请使用 analytics.tag.expression 并带有字面量 SpEL 表达式。自定义固定标签。这些标签具有常数值,创建一次后会随每次发布的指标一起发送。定义固定标签的约定是:<code>analytics.tag.fixed.[tag-name]=[tag-value]</code>(Map<String, String>, 默认值:<none>)spring-doc.cadn.net.cn

7.3. Elasticsearch 写入器

将文档索引到Elasticsearch中的接收器。<br>spring-doc.cadn.net.cn

此Elasticsearch接收器仅支持索引JSON文档。
它从输入目标获取数据,然后将其索引到Elasticsearch中。
输入数据可以是普通的json字符串,或者表示JSON的java.util.Map
它还接受作为Elasticsearch提供的XContentBuilder的数据。
然而,在中间件不将记录保存为XContentBuilder的情况下,这种情况很少见。
这主要是为了直接调用消费者而提供的。spring-doc.cadn.net.cn

7.3.1. 选项

Spring 框架的 Elasticsearch 接收器具有以下选项:spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

elasticsearch.consumer
异步

指示索引操作是异步还是同步。默认情况下,索引操作是同步进行的。(布尔值,默认:false)spring-doc.cadn.net.cn

batch-size

每次请求要索引的项目数量。默认值为1。如果设置大于1,则会使用批量索引API。(整数,默认:1)spring-doc.cadn.net.cn

group-timeout

批量索引激活时,在超时毫秒后将消息组刷新。默认值为-1,表示不会自动刷新空闲的消息组。(Long,默认:-1)spring-doc.cadn.net.cn

id

要索引的文档的id。如果设置了,则每条消息的INDEX_ID报头值将覆盖此属性。(表达式,默认:<none>)spring-doc.cadn.net.cn

索引

索引名称。如果设置,则每个消息的 INDEX_NAME 头值将覆盖此属性。(字符串,默认:<none>)spring-doc.cadn.net.cn

路由

指示要路由到的分片。如果没有提供,Elasticsearch 将默认使用文档 ID 的哈希。(字符串,默认:<none>)spring-doc.cadn.net.cn

timeout-seconds

分片可用的超时时间。如果未设置,则由Elasticsearch客户端默认设为1分钟。(Long,默认:0)spring-doc.cadn.net.cn

spring.elasticsearch.rest
connection-timeout

连接超时。(持续时间,默认:1s)spring-doc.cadn.net.cn

密码

凭证密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

read-timeout

读取超时时间。(持续时间,默认:30s)spring-doc.cadn.net.cn

统一资源标识符(URIs)

用作逗号分隔的 Elasticsearch 实例列表。 (List<String>, 默认值: [http://localhost:9200])spring-doc.cadn.net.cn

用户名

凭证用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.3.2. 运行此接收器的示例

  1. 从文件夹 elasticsearch-sink./mvnw clean packagespring-doc.cadn.net.cn

  2. cd appsspring-doc.cadn.net.cn

  3. 切换到正确的绑定器生成的应用程序(Kafka 或 RabbitMQ)spring-doc.cadn.net.cn

  4. ./mvnw clean packagespring-doc.cadn.net.cn

  5. 请确保您已运行Elasticsearch。例如,您可以使用以下命令将其作为docker容器运行。docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2spring-doc.cadn.net.cn

  6. 如果中间件(Kafka 或 RabbitMQ)尚未运行,请先启动它。spring-doc.cadn.net.cn

  7. java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testingspring-doc.cadn.net.cn

  8. 将一些JSON数据发送到中间件目的地。例如:"foo":"bar"}spring-doc.cadn.net.cn

  9. 验证数据是否已索引:curl localhost:9200/testing/_searchspring-doc.cadn.net.cn

7.4. 文件接收器

文件接收应用程序将接收到的每个消息写入到一个文件中。spring-doc.cadn.net.cn

7.4.2. 选项

代码file-sink有以下选项:spring-doc.cadn.net.cn

file.consumer.binary

一个标志,用于指示是否应抑制在写入后添加换行符。(布尔值,默认:false)spring-doc.cadn.net.cn

file.consumer.charset

写入文本内容时使用的字符集。(字符串,默认:UTF-8)spring-doc.cadn.net.cn

file.consumer.directory

目标文件的父目录。(File,缺省:<none>)spring-doc.cadn.net.cn

file.consumer.directory-expression

要评估的目标文件的父目录表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

file.consumer.mode

如果目标文件已存在,则要使用的 FileExistsMode。(FileExistsMode,默认值:<none>,可选值:APPENDAPPEND_NO_FLUSHFAILIGNOREREPLACEREPLACE_IF_MODIFIED)spring-doc.cadn.net.cn

file.consumer.name

目标文件的名称。(字符串,默认:file-consumer)spring-doc.cadn.net.cn

file.consumer.name-expression

要计算的目标文件名的表达式。(字符串,默认值:<none>)spring-doc.cadn.net.cn

file.consumer.suffix

文件名后缀。 (字符串,默认:<empty string>)spring-doc.cadn.net.cn

7.5. FTP 接收器

FTP接收器是从传入的消息推送到FTP服务器的简单选项。spring-doc.cadn.net.cn

它使用ftp-outbound-adapter,因此传入的消息可以是java.io.File对象、String(文件内容)或bytes数组(文件内容也是这样)。spring-doc.cadn.net.cn

要使用此接收器,您需要一个用户名和密码来登录。spring-doc.cadn.net.cn


默认情况下,如果未指定,则 Spring Integration 将使用 o.s.i.file.DefaultFileNameGenerator。如果DefaultFileNameGenerator基于file_name标头(如果存在)的值来确定文件名,在MessageHeaders中,或者如果Message的有效载荷已经是java.io.File,则它会
使用该文件的原始名称。
headers

7.5.3. 输出

不适用(写入FTP服务器)。spring-doc.cadn.net.cn

7.5.4. 可选项

FTP 接收器 具有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

ftp.consumer
auto-create-dir

是否创建远程目录。(布尔值,默认:true)spring-doc.cadn.net.cn

filename-expression

用于生成远程文件名的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

模式

远程文件已存在时要采取的操作。(FileExistsMode,默认:<none>,可选值:APPENDAPPEND_NO_FLUSHFAILIGNOREREPLACEREPLACE_IF_MODIFIED)spring-doc.cadn.net.cn

remote-dir

远程 FTP 目录。(字符串,默认:/spring-doc.cadn.net.cn

remote-file-separator

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

temporary-remote-dir

如果 '#isUseTemporaryFilename()' 为 true,则文件将被写入的临时目录。(字符串,默认:/)spring-doc.cadn.net.cn

tmp-file-suffix

传输进行期间使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

use-temporary-filename

是否写入临时文件并重命名。(布尔类型,默认:true)spring-doc.cadn.net.cn

ftp.factory
cache-sessions

缓存会话。 (布尔值,默认:<none>spring-doc.cadn.net.cn

client-mode

FTP会话使用的客户端模式。(ClientMode,默认:<none>,可选值:ACTIVEPASSIVE)spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

连接到服务器时使用的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:21)spring-doc.cadn.net.cn

用户名

连接服务器时要使用的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

7.6. Geode接收器

Geode 接收器将消息内容写入 Geode 区域。spring-doc.cadn.net.cn

7.6.1. 可选项

geode 接收器有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

geode.consumer
JSON

指示 Geode 区域是否将 JSON 对象存储为 PdxInstance。(布尔值,默认:false)spring-doc.cadn.net.cn

key-expression

作为缓存键使用的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

geode.pool
connect-type

指定连接类型:serverlocator(ConnectType,默认值:<none>,可能的取值:locatorserver)spring-doc.cadn.net.cn

host-addresses

指定一个或多个Gemfire定位器或服务器地址,格式为[主机]:[端口]。(InetSocketAddress[],默认:<none>)spring-doc.cadn.net.cn

subscription-enabled

设置为 true 可以启用客户端池的订阅。这是同步更新到客户端缓存所必需的。(Boolean,默认:false)spring-doc.cadn.net.cn

geode.region
region-name

区域名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

geode.security
密码

缓存密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

缓存用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

geode.security.ssl
密码

为安全套接字连接配置使用的 SSL 密码,作为有效密码名称的数组。(字符串,默认:any)spring-doc.cadn.net.cn

keystore-type

标识用于SSL通信的Keystore类型(例如:JKS,PKCS11等)。 (String,默认值:JKS)spring-doc.cadn.net.cn

keystore-uri

用于连接到 Geode 集群的预创建 Keystore URI。 (Resource, 默认值: <none>)spring-doc.cadn.net.cn

ssl-keystore-password

访问密钥库 truststore 的密码。 (String,默认值: <none>)spring-doc.cadn.net.cn

ssl-truststore-password

访问信任存储库的密码。 (String, 默认值: <none>)spring-doc.cadn.net.cn

truststore-type

标识用于SSL通信的信任存储类型(例如 JKS,PKCS11 等)。 (String,默认值: JKS)spring-doc.cadn.net.cn

truststore-uri

用于连接到 Geode 集群的预创建信任库 URI 的位置。 (资源, 默认: <none>)spring-doc.cadn.net.cn

user-home-directory

本地缓存从truststoreUri和keystoreUri位置下载的truststore和keystore文件的目录。 (String, 默认值: user.home)spring-doc.cadn.net.cn

7.7. JDBC 处理程序

JDBC接收器允许您将传入的有效负载持久保存到关系型数据库管理系统(RDBMS)数据库中。spring-doc.cadn.net.cn

jdbc.consumer.columns 属性表示 COLUMN_NAME[:EXPRESSION_FOR_VALUE] 的键值对,其中 EXPRESSION_FOR_VALUE(连同冒号)是可选的。 在这种情况下,该值通过生成的表达式(如 payload.COLUMN_NAME)进行评估,因此我们可以通过这种方式实现从对象属性到表列的直接映射。 例如,我们有一个如下所示的 JSON 负载:spring-doc.cadn.net.cn

{
  "name": "My Name",
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

因此,我们可以使用以下配置将数据插入到具有 namecitystreet 结构的表中:spring-doc.cadn.net.cn

--jdbc.consumer.columns=name,city:address.city,street:address.street

此接收器支持批量插入,具体取决于底层的 JDBC 驱动程序。 通过 batch-sizeidle-timeout 属性配置批量插入: 当接收到的消息达到 batch-size 条时进行聚合,并作为一批次插入。 如果在没有新消息的情况下经过了 idle-timeout 毫秒,则即使批次大小小于 batch-size,也会插入已聚合的数据,以此来限制最大延迟。spring-doc.cadn.net.cn

该模块还使用了 Spring Boot 的 DataSource 支持 来配置数据库连接,因此像 spring.datasource.url 等属性也适用。

7.7.1. 示例

java -jar jdbc-sink.jar --jdbc.consumer.tableName=names --jdbc.consumer.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
负载

7.7.2. 选项

jdbc接收器有以下选项:jdbcspring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

jdbc.consumer
batch-size

当消息数量达到阈值时,数据将被刷新到数据库表中。(整数,默认:1)spring-doc.cadn.net.cn

逗号分隔的基于冒号的列名和 SpEL 表达式对,用于插入/更新值。在初始化时使用这些名称来发布 DDL。(字符串,默认:payload:payload.toString())spring-doc.cadn.net.cn

idle-timeout

数据自动刷新到数据库表时的空闲超时时间(毫秒)。(Long,默认:-1)spring-doc.cadn.net.cn

初始化

'true'、'false' 或表格的自定义初始化脚本的位置。(字符串,默认:false)spring-doc.cadn.net.cn

table-name

要写入的表名。(字符串,默认:messages)spring-doc.cadn.net.cn

spring.datasource
数据

数据(DML)脚本资源引用。(List<String>,默认:<none>spring-doc.cadn.net.cn

driver-class-name

JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

initialization-mode

用于确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化的模式。(DataSourceInitializationMode,默认值:embedded,可选值:ALWAYSEMBEDDEDNEVER)spring-doc.cadn.net.cn

密码

数据库登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

架构

Schema (DDL) script resource references. (List<String>, default: <none>)spring-doc.cadn.net.cn

url

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

数据库登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.8. 日志接收器

log 接收器使用应用程序记录器输出数据以供检查。spring-doc.cadn.net.cn

请理解,log 接收器使用无类型的处理器,这会影响实际的日志记录方式。这意味着如果内容类型为文本,则原始负载字节将被转换为字符串;否则将记录原始字节。用户指南中包含更多信息。spring-doc.cadn.net.cn

7.8.1. 参数<br>

日志接收器具有以下选项:logspring-doc.cadn.net.cn

log.expression

要作为日志消息记录的消息上的SpEL表达式。(字符串,默认:payload)spring-doc.cadn.net.cn

log.level

日志消息记录的级别。(级别,默认:<none>,可能值:FATALERRORWARNINFODEBUGTRACE)spring-doc.cadn.net.cn

log.name

要使用的记录器的名称。(字符串,默认值:<none>)spring-doc.cadn.net.cn

7.9. MongoDB Sink

此接收应用程序将传入数据存入MongoDB。
该应用程序完全基于MongoDataAutoConfiguration,因此请参阅Spring Boot MongoDB支持获取更多信息。spring-doc.cadn.net.cn

7.9.1. 输入

7.9.2. 选项

MongoDB 接收器具有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

mongodb.consumer
集合

存储数据的 MongoDB 集合。(字符串,默认:<none>)spring-doc.cadn.net.cn

collection-expression

要评估 MongoDB 集合的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

spring.data.mongodb
authentication-database

认证数据库名称。(字符串,默认:<none>spring-doc.cadn.net.cn

auto-index-creation

是否启用自动索引创建。(布尔类型,默认:<none>)spring-doc.cadn.net.cn

数据库

数据库名称。 (字符串,默认:<none>)spring-doc.cadn.net.cn

field-naming-strategy

要使用的 FieldNamingStrategy 的完全限定名。(Class<?>, 默认:<none>)spring-doc.cadn.net.cn

grid-fs-database

<文档缺失> (字符串,默认:<none>spring-doc.cadn.net.cn

主机

Mongo服务器主机。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

密码

登录 MongoDB 服务器的密码。不能与 URI 一起设置。(字符数组,默认:<none>)spring-doc.cadn.net.cn

端口

Mongo服务器端口。不能与URI一起设置。(Integer,默认:<none>spring-doc.cadn.net.cn

replica-set-name

群集所需的副本集名称。不能与URI一起设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uri

Mongo 数据库连接字符串。不能与主机、端口、凭据和副本集名称一起设置。(字符串,默认:mongodb://localhost/test)spring-doc.cadn.net.cn

用户名

登录 MongoDB 服务器的用户。不能与 URI 同时设置。(字符串,默认:<none>)spring-doc.cadn.net.cn

uuid-representation

转换 UUID 为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认:java-legacy,可能的值:UNSPECIFIEDSTANDARDC_SHARP_LEGACYJAVA_LEGACYPYTHON_LEGACY)spring-doc.cadn.net.cn

7.10. MQTT接收器

此模块用于向MQTT发送消息。spring-doc.cadn.net.cn

7.10.2. 选项

MQTT接收器具有以下选项:mqttspring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

mqtt
clean-session

客户端和服务器是否应记住重启和重新连接之间的状态。(布尔值,默认:true)spring-doc.cadn.net.cn

connection-timeout

连接超时时间(秒)。 (Integer, 默认值:30)spring-doc.cadn.net.cn

keep-alive-interval

ping间隔,单位为秒。(整数,默认值:60)spring-doc.cadn.net.cn

密码

连接到代理时要使用的密码。(字符串,默认:guest)spring-doc.cadn.net.cn

持久化

'memory' 或 'file'。 (字符串,默认:memory)spring-doc.cadn.net.cn

persistence-directory

持久化目录。 (字符串,默认:/tmp/pahospring-doc.cadn.net.cn

url

MQTT代理的位置(以逗号分隔的列表)。(字符串数组,默认:[tcp://localhost:1883])spring-doc.cadn.net.cn

用户名

连接到代理时使用的用户名。(字符串,默认:guest)spring-doc.cadn.net.cn

mqtt.consumer
异步

是否使用异步发送。(布尔类型,默认:false)spring-doc.cadn.net.cn

编码

用于将字符串负载转换为字节的字符集。(字符串,默认:UTF-8)spring-doc.cadn.net.cn

client-id

标识客户端。(字符串,默认:stream.client.id.sink)spring-doc.cadn.net.cn

服务质量

要使用的服务的质量。(整数,默认:1)spring-doc.cadn.net.cn

保留

是否设置 'retained' 标志。 (布尔值,默认:false)spring-doc.cadn.net.cn

主题

接收器将发布的主题。(字符串,默认:stream.mqtt)spring-doc.cadn.net.cn

7.11. Pgcopy 填充器

一个模块,使用PostgreSQL COPY命令将传入的有效负载写入RDBMS。spring-doc.cadn.net.cn

7.11.1. 输入

headers
负载

列表达式将根据消息进行评估,该表达式通常仅与一种类型(如 Map 或 bean 等)兼容。spring-doc.cadn.net.cn

7.11.2. 输出

7.11.3. 选项

jdbc接收器有以下选项:jdbcspring-doc.cadn.net.cn

spring.datasource.driver-class-name

JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.datasource.password

数据库登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.datasource.url

数据库的 JDBC URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.datasource.username

数据库登录用户名。(字符串,默认:<none>)spring-doc.cadn.net.cn

该模块还使用了 Spring Boot 的 DataSource 支持 来配置数据库连接,因此像 spring.datasource.url 等属性也适用。

7.11.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

你可以在这里找到相应的活页夹基础项目。 然后,你可以进入其中一个文件夹并进行构建:spring-doc.cadn.net.cn

$ ./mvnw clean package

要运行集成测试,请在本地主机上启动一个 PostgreSQL 数据库:spring-doc.cadn.net.cn

    docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest

7.11.5. 示例

java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

7.12. RabbitMQ Sink

此模块向RabbitMQ发送消息。spring-doc.cadn.net.cn

7.12.1. 选项

rabbit 接收器有以下选项:
spring-doc.cadn.net.cn

(请参阅Spring Boot文档中的RabbitMQ连接属性)spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

兔子
converter-bean-name

自定义消息转换器的bean名称;如果省略,则使用SimpleMessageConverter。如果为'jsonConverter',将为您创建一个Jackson2JsonMessageConverter bean。(字符串,默认:<none>)spring-doc.cadn.net.cn

交换

交换名称 - 如果提供了 exchangeNameExpression,则会被覆盖。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

exchange-expression

一个计算结果为交换名称的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

headers-mapped-last

在映射传出消息的标头时,确定是在转换消息之前还是之后进行标头映射。(布尔型,默认:true)spring-doc.cadn.net.cn

mapped-request-headers

要映射的标头。 (String[], 默认值: [*])spring-doc.cadn.net.cn

own-connection

为 true 时,使用基于引导属性的单独连接。(布尔类型,默认值:false)spring-doc.cadn.net.cn

persistent-delivery-mode

当没有 'amqp_deliveryMode' 标头时的默认传递模式,PERSISTENT 表示为 true。(Boolean,默认:false)spring-doc.cadn.net.cn

routing-key

路由键 - 如果提供了 routingKeyExpression,则会被覆盖。(字符串,默认:<none>)spring-doc.cadn.net.cn

routing-key-expression

一个计算路由键的SpEL表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

spring.rabbitmq
address-shuffle-mode

用于打乱配置地址的模式。(AddressShuffleMode,默认值:none,可选值:NONERANDOMINORDER)spring-doc.cadn.net.cn

地址

客户端应连接的地址列表,用逗号分隔。设置后,主机和端口将被忽略。(字符串,默认:<none>)spring-doc.cadn.net.cn

channel-rpc-timeout

通道中RPC调用的继续超时时间。将其设置为零以无限期等待。(持续时间,默认:10m)spring-doc.cadn.net.cn

connection-timeout

连接超时时间。将其设置为零以无限期等待。(持续时间,默认:<none>)spring-doc.cadn.net.cn

主机

RabbitMQ主机。如果设置了地址,则忽略此设置。(字符串,默认:localhost)spring-doc.cadn.net.cn

密码

登录以向代理进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

端口

RabbitMQ 端口。如果设置了地址,则忽略此设置。默认为 5672,如果启用了 SSL,则默认为 5671。(Integer,默认:<none>)spring-doc.cadn.net.cn

publisher-confirm-type

发布者类型确认要使用的。(ConfirmType,默认值:<none>,可能的取值:SIMPLECORRELATEDNONE)spring-doc.cadn.net.cn

publisher-returns

是否启用发布商返还。(布尔值,默认:falsespring-doc.cadn.net.cn

requested-channel-max

客户端请求的每个连接的通道数。使用0表示无限制。(整数,默认:2047)spring-doc.cadn.net.cn

requested-heartbeat

请求的心跳超时时间;零表示无。如果未指定持续时间后缀,则使用秒。(持续时间,默认:<none>)spring-doc.cadn.net.cn

用户名

登录用户以对经纪人进行身份验证。(字符串,默认:guest)spring-doc.cadn.net.cn

virtual-host

连接到代理时使用的虚拟主机。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.13. Redis存储

向Redis发送消息。spring-doc.cadn.net.cn

7.13.1. 选项

Redis接收器具有以下选项:redisspring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

redis.consumer

存储密钥时使用的字面量键名。(字符串,默认:<none>)spring-doc.cadn.net.cn

key-expression

A SpEL 表达式,用于存储到键中。(字符串,默认:<none>spring-doc.cadn.net.cn

队列

存储到队列时要使用的队列名称字面量。(字符串,默认:<none>)spring-doc.cadn.net.cn

queue-expression

用于队列的 SpEL 表达式。(字符串,默认值:<none>)spring-doc.cadn.net.cn

主题

发布到主题时使用的文字主题名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

topic-expression

用于主题的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

spring.redis
client-name

连接上要设置的客户端名称,通过 CLIENT SETNAME 设置。 (字符串,默认:<none>)spring-doc.cadn.net.cn

client-type

要使用的客户端类型。默认情况下,根据类路径自动检测。(ClientType,默认值:<none>,可能的取值:LETTUCEJEDIS)spring-doc.cadn.net.cn

connect-timeout

连接超时。(持续时间,默认:<none>)spring-doc.cadn.net.cn

数据库

连接工厂使用的数据库索引。(整数,默认:0)spring-doc.cadn.net.cn

主机

Redis服务器主机。(字符串,默认:localhostspring-doc.cadn.net.cn

密码

Redis服务器的登录密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

Redis服务器端口。(整数,默认:6379spring-doc.cadn.net.cn

SSL

是否启用SSL支持。(布尔类型,默认:falsespring-doc.cadn.net.cn

timeout

读取超时时间。(持续时间,默认:<none>)spring-doc.cadn.net.cn

url

连接 URL。覆盖主机、端口和密码。用户被忽略。示例:redis://user:[email protected]:6379 (字符串,默认:<none>)spring-doc.cadn.net.cn

用户名

登录 Redis 服务器的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

spring.redis.jedis.pool
max-active

连接池在某一时间可以分配的最大连接数。使用负值表示无限制。(Integer,默认:8)spring-doc.cadn.net.cn

max-idle

连接池中"空闲"连接的最大数量。使用负值表示空闲连接数没有上限。(Integer,默认:8)spring-doc.cadn.net.cn

max-wait

连接池耗尽时,分配连接应阻塞的最大时间(以抛出异常)。使用负值表示无限期阻塞。(持续时间,默认:-1ms)spring-doc.cadn.net.cn

min-idle

池中要维持的最少空闲连接数的目标。只有当此设置和驱逐运行之间的间隔都为正时,才会产生效果。(整数,默认:0)spring-doc.cadn.net.cn

time-between-eviction-runs

空闲对象驱逐线程运行的时间间隔。如果为正数,则启动空闲对象驱逐线程,否则不执行任何空闲对象清除操作。(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.redis.lettuce.pool
max-active

连接池在某一时间可以分配的最大连接数。使用负值表示无限制。(Integer,默认:8)spring-doc.cadn.net.cn

max-idle

连接池中"空闲"连接的最大数量。使用负值表示空闲连接数没有上限。(Integer,默认:8)spring-doc.cadn.net.cn

max-wait

连接池耗尽时,分配连接应阻塞的最大时间(以抛出异常)。使用负值表示无限期阻塞。(持续时间,默认:-1ms)spring-doc.cadn.net.cn

min-idle

池中要维持的最少空闲连接数的目标。只有当此设置和驱逐运行之间的间隔都为正时,才会产生效果。(整数,默认:0)spring-doc.cadn.net.cn

time-between-eviction-runs

空闲对象驱逐线程运行的时间间隔。如果为正数,则启动空闲对象驱逐线程,否则不执行任何空闲对象清除操作。(持续时间,默认:<none>)spring-doc.cadn.net.cn

spring.redis.sentinel
master

Redis服务器的名称。(字符串,默认:<none>)spring-doc.cadn.net.cn

节点

用冒号分隔的“主机:端口”对列表。(List<String>, 默认:<none>)spring-doc.cadn.net.cn

密码

用于与哨兵(sentinel)进行身份验证的密码。(字符串,默认:<none>spring-doc.cadn.net.cn

7.14. 路由器接收器

此应用程序将消息路由到命名通道。spring-doc.cadn.net.cn

7.14.1. 选项

路由器接收器具有以下选项:routerspring-doc.cadn.net.cn

router.default-output-channel

无法路由的消息发送到何处。(字符串,默认:nullChannel)spring-doc.cadn.net.cn

router.destination-mappings

目的地映射作为以换行符分隔的名称-值对字符串,例如 'foo=bar\ baz=car'。 (属性,默认:<none>)spring-doc.cadn.net.cn

router.expression

应用于消息以确定路由到的通道的表达式。请注意,对于文本、JSON 或 XML 等内容类型,有效负载的字节格式为 byte[] 而不是 String!请查阅文档了解如何处理字节数组有效负载内容。(表达式,默认:<none>)spring-doc.cadn.net.cn

router.refresh-delay

检查脚本更改的时间间隔(毫秒);如果为负数,则不刷新。(Integer,默认值:60000)spring-doc.cadn.net.cn

router.resolution-required

是否需要通道解析。(布尔值,默认:false)spring-doc.cadn.net.cn

router.script

返回通道或通道映射解析键的 Groovy 脚本的位置。(资源,默认:<none>)spring-doc.cadn.net.cn

router.variables

变量绑定作为以换行符分隔的名称-值对字符串,例如 'foo=bar baz=car'。(属性,默认:<none>)spring-doc.cadn.net.cn

router.variables-location

包含自定义脚本变量绑定的属性文件的位置。(资源,缺省:<none>)spring-doc.cadn.net.cn

由于这是一个动态路由器,目的地是按需创建的;因此,默认情况下,只有当Binder无法绑定到目标时,才会使用defaultOutputChannelresolutionRequired

您可以使用spring.cloud.stream.dynamicDestinations属性限制动态绑定的创建。默认情况下,所有解析的目标都将被动态绑定;如果此属性具有逗号分隔的目标名称列表,则只有这些目标会被绑定。解析到不在该列表中的目标的消息将路由到defaultOutputChannel,它也必须出现在列表中。spring-doc.cadn.net.cn

destinationMappings 用于将评估结果映射到实际的目标名称。spring-doc.cadn.net.cn

7.14.2. 基于 SpEL 的路由

该表达式针对消息进行求值,并返回一个频道名称,或者是一个频道名称映射表的键。spring-doc.cadn.net.cn

有关更多详细信息,请参阅Spring集成参考手册中的“路由器和Spring表达式语言(SpEL)”小节。配置通用路由器部分。spring-doc.cadn.net.cn

从 Spring Cloud Stream 2.0 开始,jsontextxml 内容类型的消息线格式为 byte[] 而不是 String! 这是与 SCSt 1.x 的变更,后者将这些类型视为字符串! 根据内容类型的不同,处理 byte[] 负载有不同的技术。对于普通的 text 内容类型,可以使用 new String(payload) SpEL 表达式将八位字节负载转换成字符串。 对于 json 类型,jsonPath() SpEL 工具 已经支持字符串和字节数组内容的互换。 同样的规则适用于 xml 内容类型和 #xpath() SpEL 工具。

例如对于 text 内容类型,应使用:
spring-doc.cadn.net.cn

 new String(payload).contains('a')

并且对于 json 内容类型,SpEL 表达式如下:
spring-doc.cadn.net.cn

 #jsonPath(payload, '$.person.name')

7.14.3. 基于Groovy的路由

除了使用 SpEL 表达式,还可以使用 Groovy 脚本。让我们在文件系统中创建一个 Groovy 脚本,路径为“file:/my/path/router.groovy”,或者“classpath:/my/path/router.groovy”:spring-doc.cadn.net.cn

println("Groovy processing payload '" + payload + "'");
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

如果您想向脚本传递变量值,可以使用variables选项静态绑定值,或者选择性地通过propertiesLocation选项传递包含绑定属性文件的路径。文件中的所有属性都将作为变量提供给脚本。您可以同时指定variablespropertiesLocation,在这种情况下,作为variables提供的任何重复值将覆盖在propertiesLocation中提供的值。

请注意,payloadheaders是隐式绑定的,以便您能够访问消息中包含的数据。spring-doc.cadn.net.cn

有关更多信息,请参阅 Spring Integration 参考手册Groovy 支持spring-doc.cadn.net.cn

7.15. RSocket Sink

RSocket 使用 RSocket 协议的 fire-and-forget 策略将数据发送到消息接收器。spring-doc.cadn.net.cn

7.15.1. 选项

rsocket 接收器具有以下选项:
spring-doc.cadn.net.cn

rsocket.consumer.host

RSocket 主机。 (字符串,默认值:localhostspring-doc.cadn.net.cn

rsocket.consumer.port

RSocket 端口。(整数,默认值:7000spring-doc.cadn.net.cn

rsocket.consumer.route

用于 RSocket 的路由。(字符串,默认:<none>)spring-doc.cadn.net.cn

rsocket.consumer.uri

可用于基于 WebSocket 的传输的 URI。(URI,默认:<none>)spring-doc.cadn.net.cn

7.16. Amazon S3 接收器

此接收应用程序支持将对象传输到Amazon S3存储桶。
文件负载(以及递归目录)被传输到remote目录(S3存储桶),然后传输到应用程序部署所在的local目录。spring-doc.cadn.net.cn

此接收器接受的消息必须包含以下之一:payloadspring-doc.cadn.net.cn

7.16.1. 选项

s3接收器具有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

s3.common
endpoint-url

连接到与 S3 兼容的存储的可选端点 URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

path-style-access

使用路径样式访问。(Boolean,默认值:false)spring-doc.cadn.net.cn

s3.consumer
访问控制列表

S3 对象访问控制列表。(预定义访问控制列表,缺省值:<none>,可取值:privatepublic-readpublic-read-writeauthenticated-readlog-delivery-writebucket-owner-readbucket-owner-full-controlaws-exec-read)spring-doc.cadn.net.cn

acl-expression

要评估的 S3 对象访问控制列表表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

目标文件存储的 AWS 存储桶。(字符串,默认:<none>spring-doc.cadn.net.cn

bucket-expression

要计算的AWS存储桶名称表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

key-expression

要评估的 S3 对象键表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

基于AmazonS3SinkConfiguration生成的目标应用程序可以使用S3MessageHandler.UploadMetadataProvider和/或S3ProgressListener进行增强,这两个组件被注入到S3MessageHandler Bean中。有关详细信息,请参阅Spring集成AWS支持。spring-doc.cadn.net.cn

7.16.2. 亚马逊AWS通用选项

Amazon S3 连接器(如同所有其他 Amazon AWS 应用程序)基于 Spring Cloud AWS 项目作为基础,其自动配置类由 Spring Boot 自动使用。请查阅相关文档了解所需和有用的自动配置属性。spring-doc.cadn.net.cn

其中一些是关于AWS凭据的:spring-doc.cadn.net.cn

其他用于 AWS Region 定义:spring-doc.cadn.net.cn

对于 AWS Stackspring-doc.cadn.net.cn

示例
java -jar s3-sink.jar --s3.bucket=/tmp/bar

7.17. SFTP接收器

SFTP接收器是从传入消息向SFTP服务器推送文件的简单选项。spring-doc.cadn.net.cn

它使用sftp-outbound-adapter,因此传入的消息可以是java.io.File对象、String(文件内容)或bytes数组(文件内容也是这样)。spring-doc.cadn.net.cn

要使用此接收器,您需要一个用户名和密码来登录。spring-doc.cadn.net.cn


默认情况下,如果未指定,则 Spring Integration 将使用 o.s.i.file.DefaultFileNameGenerator。如果DefaultFileNameGenerator基于file_name标头(如果存在)的值来确定文件名,在MessageHeaders中,或者如果Message的有效载荷已经是java.io.File,则它会
使用该文件的原始名称。

配置 sftp.factory.known-hosts-expression 选项时,评估的根对象是应用程序上下文,例如sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'spring-doc.cadn.net.cn

7.17.1. 输入

headers

7.17.2. 输出

N/A (写入到 SFTP 服务器)。spring-doc.cadn.net.cn

7.17.3. 选项

sftp 接收器具有以下选项:
spring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

sftp.consumer
auto-create-dir

是否创建远程目录。(布尔值,默认:true)spring-doc.cadn.net.cn

filename-expression

用于生成远程文件名的 SpEL 表达式。(字符串,默认:<none>)spring-doc.cadn.net.cn

模式

远程文件已存在时要采取的操作。(FileExistsMode,默认:<none>,可选值:APPENDAPPEND_NO_FLUSHFAILIGNOREREPLACEREPLACE_IF_MODIFIED)spring-doc.cadn.net.cn

remote-dir

远程 FTP 目录。(字符串,默认:/spring-doc.cadn.net.cn

remote-file-separator

远程文件分隔符。(字符串,默认:/)spring-doc.cadn.net.cn

temporary-remote-dir

如果 isUseTemporaryFilename() 为 true,则文件将被写入的临时目录。(字符串,默认:/)spring-doc.cadn.net.cn

tmp-file-suffix

传输进行期间使用的后缀。(字符串,默认:.tmp)spring-doc.cadn.net.cn

use-temporary-filename

是否写入临时文件并重命名。(布尔类型,默认:true)spring-doc.cadn.net.cn

sftp.consumer.factory
allow-unknown-keys

允许未知或已更改的密钥。 (Boolean,默认:false)spring-doc.cadn.net.cn

cache-sessions

缓存会话。 (布尔值,默认:<none>spring-doc.cadn.net.cn

主机

服务器的主机名。(字符串,默认:localhost)spring-doc.cadn.net.cn

known-hosts-expression

一个解析为已知主机文件位置的 SpEL 表达式。(表达式,默认:<none>)spring-doc.cadn.net.cn

pass-phrase

用户私钥的密码。(字符串,默认:<empty string>)spring-doc.cadn.net.cn

密码

连接到服务器时使用的密码。(字符串,默认:<none>)spring-doc.cadn.net.cn

端口

服务器的端口。(整数,默认:22)spring-doc.cadn.net.cn

private-key

用户私钥的位置。 (资源,默认:<none>)spring-doc.cadn.net.cn

用户名

连接服务器时要使用的用户名。(字符串,默认值:<none>)spring-doc.cadn.net.cn

7.18. TCP接收器

此模块使用编码器通过TCP写入消息。spring-doc.cadn.net.cn

TCP是一个流式协议,需要一些机制来在传输线上封装消息。提供了多种编码器,默认的是'CRLF'。spring-doc.cadn.net.cn

7.18.1. 选项

TCP接收器具有以下选项:tcpspring-doc.cadn.net.cn

按前缀分组的属性:spring-doc.cadn.net.cn

tcp.consumer
编码

转换字节为字符串时使用的字符集。(String,默认:UTF-8)spring-doc.cadn.net.cn

关闭

每条消息后是否关闭套接字。(布尔值,默认:false)spring-doc.cadn.net.cn

编码器

发送消息时使用的编码器。(编码,缺省:<none>,可用值:CRLFLFNULLSTXETXRAWL1L2L4)spring-doc.cadn.net.cn

主机

此接收器将要连接的主机。(字符串,默认:<none>)spring-doc.cadn.net.cn

TCP
中文

是否使用 NIO。(布尔值,默认:falsespring-doc.cadn.net.cn

端口

要监听的端口号;0 表示操作系统选择一个端口。(整数,默认:1234)spring-doc.cadn.net.cn

reverse-lookup

对远程IP地址执行反向DNS查找;如果为false,则仅在消息头中包含IP地址。(布尔值,默认:falsespring-doc.cadn.net.cn

socket-timeout

套接字在未收到数据前超时关闭的时间(毫秒)。(Integer,缺省:120000)spring-doc.cadn.net.cn

use-direct-buffers

是否使用直接缓冲区。(布尔值,默认:false)spring-doc.cadn.net.cn

7.18.2. 可用编解码器

文本数据
CRLF(默认)

文本由回车符(0x0d)后跟换行符(0x0a)终止spring-doc.cadn.net.cn

LF

以换行符终止的文本(0x0a)spring-doc.cadn.net.cn

null

以空字节(0x00)终止的文本spring-doc.cadn.net.cn

STXETX

以 STX (0x02) 开头并以 ETX (0x03) 结尾的文本spring-doc.cadn.net.cn

文本和二进制数据
原始内容

无结构 - 客户端通过关闭套接字来表示一条完整的消息spring-doc.cadn.net.cn

L1

以一个字节(无符号)长度字段开头的数据(支持最多255字节)spring-doc.cadn.net.cn

L2

以两个字节(无符号)长度字段开头的数据(最多为216-1个字节)spring-doc.cadn.net.cn

L4

以四个字节(有符号)长度字段开头的数据(最多可达231-1字节)spring-doc.cadn.net.cn

7.19. 吞吐量接收器

收集器将计算消息数量,并在选定的时间间隔内记录观察到的吞吐量。spring-doc.cadn.net.cn

7.19.1. 选项

通过吞吐量接收器具有以下选项:spring-doc.cadn.net.cn

throughput.report-every-ms

报告频率。(整数,默认:1000spring-doc.cadn.net.cn

7.20. Twitter 消息接收器

从认证用户向指定用户发送直接消息。需要JSON POST请求体,并将Content-Type头部设置为application/jsonspring-doc.cadn.net.cn

当您收到用户的消息时,可以在24小时内回复最多5条消息。每收到一条消息都会重置24小时的时间窗口和5条消息的配额。在24小时内发送第6条消息或在24小时时间窗口外发送消息会被计入速率限制。此行为仅适用于使用POST direct_messages/events/new端点时。

Spring Expression Language (SpEL) 表达式用于从输入消息计算请求参数。spring-doc.cadn.net.cn

7.20.1. 选项

使用单引号 (') 来包裹 SpEL 表达式属性的字面值。 例如,要设置固定的消息文本,请使用 text='Fixed Text'。 对于固定的目标 userId,请使用 userId='666'
twitter.message.update.media-id

与消息关联的媒体ID。直接消息只能引用单个媒体ID。(表达式,默认:<none>)spring-doc.cadn.net.cn

twitter.message.update.screen-name

要发送直聊的用户的屏幕名称。(表达式,默认:<none>)spring-doc.cadn.net.cn

twitter.message.update.text

直接消息文本。如有必要,请进行URL编码。最大长度为10,000个字符。(表达式,默认:payload)spring-doc.cadn.net.cn

twitter.message.update.user-id

要发送直聊的消息的用户 ID。 (表达式,默认:<none>)spring-doc.cadn.net.cn

7.21. 推特更新接收器

更新认证用户的当前文本(例如发布推文)。spring-doc.cadn.net.cn

每次更新尝试时,都会将更新文本与认证用户的近期推文进行比较。任何会导致重复的尝试都将被阻止,并导致返回403错误。用户不能连续两次提交相同的文本。

虽然API本身没有进行速率限制,但用户在一次时间内可以创建的推文数量是有限制的。标准API的更新限制为每3小时窗口内最多300条。如果该用户的更新次数达到当前允许的上限,此方法将返回HTTP 403错误。spring-doc.cadn.net.cn

7.21.1. 选项

按前缀分组的属性:spring-doc.cadn.net.cn

twitter.update
attachment-url

(SpEL 表达式) 要使 URL 不计入扩展推文的文本正文中,请提供作为推文附件的 URL。该 URL 必须是推文永久链接或直接消息深度链接。任意非 Twitter 的 URL 必须保留在文本中。传递给 attachment_url 参数且不符合任何推文永久链接或直接消息深度链接的 URL 将在创建推文时失败并引发异常。(表达式,默认:<none>)spring-doc.cadn.net.cn

display-coordinates

(SpEL表达式) 是否在发送推文的确切坐标上放置一个标记。 (表达式,默认:<none>)spring-doc.cadn.net.cn

in-reply-to-status-id

(SpEL 表达式) 已存在的文本ID,更新将是对此文本的回复。注意:除非在此参数引用的推文作者在文本中被提及,否则将忽略此参数。因此,您必须包含@username,其中username是所引用推文的作者,在更新中。当设置inReplyToStatusId时,auto_populate_reply_metadata也会自动设置。这确保从原始推文中查找前导@mentions,并将其添加到新推文中。这将在扩展推文的元数据中追加@mentions,直到达到@mentions的限制。如果原始推文已被删除,则回复将会失败。(表达式,默认:<none>)spring-doc.cadn.net.cn

media-ids

(SpEL 表达式) 与推文关联的媒体 ID 列表,用逗号分隔。您可以在一条推文中包含最多 4 张照片或 1 个动画 GIF 或 1 个视频。有关上传媒体的更多详细信息,请参阅上传媒体。(表达式,默认:<none>)spring-doc.cadn.net.cn

place-id

(SpEL表达式)世界上的一个地方。(表达式,默认:<none>spring-doc.cadn.net.cn

text

(SpEL 表达式) 文本更新的文本。如有需要,请进行 URL 编码。t.co 链接包裹将影响字符计数。默认为消息的有效负载(表达式,默认:payload)spring-doc.cadn.net.cn

twitter.update.location
纬度

此推文引用位置的纬度。除非该参数位于-90.0到+90.0(北为正)范围内,否则将被忽略。如果不存在相应的经度参数,则也会被忽略。(表达式,默认:<none>)spring-doc.cadn.net.cn

此推文所指位置的经度。有效的经度范围为-180.0到+180.0(东为正)。如果超出该范围,不是数字,geo_enabled被禁用,或者没有对应的纬度参数,则忽略该参数。(表达式,默认:<none>)spring-doc.cadn.net.cn

7.22. 波前接收器

Wavefront 接收器消耗 Message<?>,将其转换为波形数据格式中的指标,然后直接发送到 Wavefront 或者 Wavefront 代理。Wavefront 数据格式spring-doc.cadn.net.cn

支持常见的ETL用例,其中现有的(历史)指标数据必须经过清洗、转换并存储在Wavefront中以供进一步分析。spring-doc.cadn.net.cn

7.22.1. 选项

波形图接收器具有以下选项:Wavefrontspring-doc.cadn.net.cn

wavefront.api-token

Wavefront API 访问Tokens。(字符串,默认:<none>spring-doc.cadn.net.cn

wavefront.metric-expression

一个SpEL表达式,用于计算指标值。(表达式,默认:<none>)spring-doc.cadn.net.cn

wavefront.metric-name

指标的名称,默认为应用程序名称。(字符串,默认值:<none>)spring-doc.cadn.net.cn

wavefront.proxy-uri

波形代理的URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

wavefront.source

发出指标的唯一应用程序、主机、容器或实例。(字符串,默认:<none>)spring-doc.cadn.net.cn

wavefront.tag-expression

与指标点标记关联的自定义元数据集合不能为空。键的有效字符为字母数字、连字符('-')、下划线('_')、点('.')。对于值,任何字符都允许,包括空格。要包含双引号,请用反斜杠转义,反斜杠不能是标记值中的最后一个字符。点标记键和值组合的最大长度为254个字符(包括分隔键和值的 '=' 号,共255个字符)。如果值较长,则拒绝该点并记录日志(Map<String, Expression>, 默认: <none>)spring-doc.cadn.net.cn

wavefront.timestamp-expression

一个 SpEL 表达式,用于计算指标的时间戳(可选)。(表达式,默认:<none>)spring-doc.cadn.net.cn

wavefront.uri

Wavefront 环境的 URL。(字符串,默认:<none>)spring-doc.cadn.net.cn

7.23. WebSocket接收器

一个简单的 Websocket Sink 实现。spring-doc.cadn.net.cn

7.23.1. 选项

支持以下选项:<br/>spring-doc.cadn.net.cn

websocket.consumer.log-level

netty通道的日志级别。默认为<tt>WARN</tt>(字符串,默认:<none>)spring-doc.cadn.net.cn

websocket.consumer.path

WebSocketSink 消费者需要连接的路径。默认值为 <tt>/websocket</tt> (字符串,默认:/websocket)spring-doc.cadn.net.cn

websocket.consumer.port

Netty服务器监听的端口。默认为9292(整数,默认:9292)spring-doc.cadn.net.cn

websocket.consumer.ssl

是否创建一个 {@link io.netty.handler.ssl.SslContext}。 (Boolean,缺省值: false)spring-doc.cadn.net.cn

websocket.consumer.threads

Netty {@link io.netty.channel.EventLoopGroup} 的线程数。默认是 <tt>1</tt> (Integer,默认:1)spring-doc.cadn.net.cn

7.23.2. 示例

为了验证 websocket-sink 是否从其他 spring-cloud-stream 应用接收消息,您可以使用以下简单的端到端设置。spring-doc.cadn.net.cn

第一步:启动Rabbitmq
步骤 2:部署一个time-source
步骤 3:部署<br>websocket-sink

最后,以trace模式启动一个websocket接收器,以便在日志中看到由time-source产生的消息:spring-doc.cadn.net.cn

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE

你应该开始在启动WebsocketSink的控制台中看到日志消息,如下所示:spring-doc.cadn.net.cn

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

7.23.3. 执行器

您可以使用Endpoint来访问最后发送和接收的n条消息。您必须通过提供--endpoints.websocketconsumertrace.enabled=true来启用它。默认情况下,它会通过host:port/websocketconsumertrace显示最后100条消息。以下是样本输出:spring-doc.cadn.net.cn

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

还有一个简单的HTML页面,您可以在文本区域中看到转发的消息。您可以直接通过浏览器中的host:port访问它。spring-doc.cadn.net.cn

7.24. ZeroMQ接收器

"zeromq"接收器启用向ZeroMQ套接字发送消息。spring-doc.cadn.net.cn

7.24.1. 输入

7.24.2. 输出

7.24.3. 选项

zeromq 接收器有以下选项:
spring-doc.cadn.net.cn

zeromq.consumer.connect-url

连接到 ZeroMQ 套接字的连接 URL。(字符串,默认值:<none>)spring-doc.cadn.net.cn

zeromq.consumer.socket-type

连接应建立的套接字类型。(SocketType,默认:<none>,可能值:PAIRPUBSUBREQREPDEALERROUTERPULLPUSHXPUBXSUBSTREAM)spring-doc.cadn.net.cn

zeromq.consumer.topic

发送消息给订阅者之前,用于评估主题的表达式。 (表达式,默认:<none>)spring-doc.cadn.net.cn

7.24.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在以下位置找到相应的绑定器项目。<br>然后进入其中一个文件夹并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

7.24.5. 示例

java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=