应用
5. 来源
5.1. CDC 来源
更改数据捕获 (CDC)source
从各种数据库捕获和流式传输更改事件。
目前,它支持MySQL
,PostgreSQL
,MongoDB
,Oracle
和SQL Server
数据库。
基于 Debezium 嵌入式连接器构建,CDC Source
允许通过不同的消息绑定器(如 Apache Kafka、RabbitMQ 和所有 Spring Cloud Stream 支持代理)捕获和流式传输数据库更改。
它支持所有 Debezium 配置属性。只需将cdc.config.
前缀添加到现有的 Debezium 属性。例如,要将 Debezium 的connector.class
属性使用cdc.config.connector.class
source 属性。
我们为最常用的 Debezium 属性提供了方便的快捷方式。例如,代替长cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector
Debezium 属性,您可以使用我们的cdc.connector=mysql
捷径。下表列出了所有可用的快捷方式以及它们所代表的 Debezium 属性。
Debezium 属性(例如cdc.config.XXX
) 始终优先于快捷方式!
CDC Source 引入了新的默认值BackingOffsetStore
配置,基于 MetadataStore 服务。Later 提供了各种微服务友好的方式来存储偏移元数据。
5.1.1. 选项
按前缀分组的属性:
疾病预防控制中心
- 配置
-
用于 debezium 配置属性的 Spring pass-trough 包装器。所有带有“cdc.config.”前缀的属性都是原生 Debezium 属性。删除前缀,将它们转换为 Debezium io.debezium.config.Configuration。(Map<String, String>,默认:
<none>
) - 连接器
-
cdc.config.connector.class 属性的快捷方式。只要它们不相互矛盾,就可以使用其中任何一个。(ConnectorType,默认:
<none>
,可能的值:mysql
,postgres
,mongodb
,oracle
,sqlserver
) - 名称
-
此 sourceConnector 实例的唯一名称。(字符串,默认:
<none>
) - 图式
-
将架构作为出站消息的一部分包含在内。(布尔值,默认:
false
)
cdc.扁平化
- 添加字段
-
要添加到扁平化邮件的元数据字段的逗号分隔列表。这些字段将以“__”或“__[<]struct]__”为前缀,具体取决于结构的规范。(字符串,默认:
<none>
) - 添加标头
-
逗号分隔列表指定要添加到扁平邮件标头的元数据字段列表。这些字段将以“__”或“__[struct]__”为前缀。(字符串,默认:
<none>
) - 删除处理模式
-
处理已删除记录的选项:(1) 无 - 传递记录,(2) 删除 - 删除记录和 (3) 重写 - 向记录添加“__deleted”字段。(DeleteHandlingMode,默认:
<none>
,可能的值:drop
,rewrite
,none
) - 掉落墓碑
-
默认情况下,Debezium 会生成逻辑删除记录以对已删除的记录启用 Kafka 压缩。dropTombstones 可以禁止显示墓碑记录。(布尔值,默认:
true
) - 启用
-
启用平展源记录事件 (https://debezium.io/docs/configuration/event-flattening)。(布尔值,默认:
true
)
cdc.偏移
- 提交超时
-
在取消进程并恢复将来尝试提交的偏移数据之前,等待刷新记录并将要提交到偏移存储的偏移数据的最大毫秒数。(持续时间,默认:
5000ms
) - 冲洗间隔
-
尝试提交偏移量的时间间隔。默认值为 1 分钟。(持续时间,默认:
60000ms
) - 政策
-
偏移存储提交策略。(OffsetPolicy,默认:
<none>
) - 存储
-
Kafka 连接器跟踪已处理的记录数,并定期将计数(作为“偏移量”)存储在预配置的元数据存储中。重新启动时,连接器会从上次记录的源偏移量恢复读取。(OffsetStorageType,默认:
<none>
,可能的值:memory
,file
,kafka
,metadata
)
cdc.stream.header
- 转换连接标头
-
当 true 时,{@link org.apache.kafka.connect.header.Header} 将转换为消息头,名称为 {@link org.apache.kafka.connect.header.Header#key()},{@link org.apache.kafka.connect.header.Header#value()}。(布尔值,默认:
true
) - 抵消
-
将源记录的偏移量元数据序列化到 cdc.offset 下的出站邮件标头中。(布尔值,默认:
false
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认:
25
) - 读取容量
-
表上的读取容量。(长,默认:
1
) - 桌子
-
元数据的表名。(字符串,默认:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认:
<none>
) - 写入容量
-
表上的写入容量。(长,默认:
1
)
metadata.store
- 类型
-
指示要配置的元数据存储类型(默认值为“内存”)。您必须包含相应的 Spring Integration 依赖项才能使用持久存储。(StoreType,默认:
<none>
,可能的值:mongodb
,gemfire
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
元数据.store.zookeeper
- 连接字符串
-
Zookeeper 连接字符串,格式为 HOST:PORT。(字符串,默认:
127.0.0.1:2181
) - 编码
-
在 Zookeeper 中存储数据时使用的编码。(字符集,默认:
UTF-8
) - 重试间隔
-
Zookeeper作的重试间隔(以毫秒为单位)。(整数,默认:
1000
) - 根
-
根节点 - 存储条目是此节点的子节点。(字符串,默认:
/SpringIntegration-MetadataStore
)
Debezium 属性快捷方式映射
下表列出了所有可用的快捷方式以及它们所代表的 Debezium 属性。
捷径 | 源语言 | 描述 |
---|---|---|
cdc.connector |
cdc.config.connector.class |
|
cdc.name |
cdc.config.name |
|
cdc.offset.flush-interval |
cdc.config.offset.flush.interval.ms |
|
cdc.offset.commit-timeout |
cdc.config.offset.flush.timeout.ms |
|
cdc.offset.policy |
cdc.config.offset.commit.policy |
|
cdc.offset.storage |
cdc.config.offset.storage |
|
cdc.flattening.drop-tombstones |
cdc.config.drop.墓碑 |
|
cdc.flattening.delete-handling-mode |
cdc.config.delete.handling.mode |
|
5.1.2. 数据库支持
这CDC Source
使用 Debezium 实用程序,目前支持 CDC 的五个数据存储:MySQL
,PostgreSQL
,MongoDB
,Oracle
和SQL Server
数据库。
5.1.3. 示例和测试
[CdcSourceIntegrationTest]() 、[CdcDeleteHandlingIntegrationTest]() 和 [CdcFlatteningIntegrationTest]() 集成测试使用在本地计算机上运行的测试数据库夹具。我们使用预构建的 debezium docker 数据库镜像。Maven 构建借助docker-maven-plugin
.
要从 IDE 运行和调试测试,您需要从命令行部署所需的数据库映像。以下说明说明说明了如何从 Docker 映像运行预配置的测试数据库。
MySQL
启动debezium/example-mysql
在 docker 中:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0
(可选)用
|
使用以下属性将 CDC 源连接到 MySQL DB:
cdc.connector=mysql (1)
cdc.name=my-sql-connector (2)
cdc.config.database.server.id=85744 (2)
cdc.config.database.server.name=my-app-connector (2)
cdc.config.database.user=debezium (3)
cdc.config.database.password=dbz (3)
cdc.config.database.hostname=localhost (3)
cdc.config.database.port=3306 (3)
cdc.schema=true (4)
cdc.flattening.enabled=true (5)
1 | 将 CDC 源配置为使用 MySqlConnector。(相当于cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector ). |
2 | 元数据用于识别和调度传入事件。 |
3 | 连接到运行在localhost:3306 如debezium 用户。 |
4 | 在SourceRecord 事件。 |
5 | 启用 CDC 事件平展化。 |
您还可以运行CdcSourceIntegrationTests#CdcMysqlTests
使用此 MySQL 配置。
PostgreSQL
从debezium/example-postgres:1.0
Docker 镜像:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0
您可以像这样连接到此服务器:
psql -U postgres -h localhost -p 5432
使用以下属性将 CDC 源连接到 PostgreSQL:
cdc.connector=postgres (1)
cdc.offset.storage=memory (2)
cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)
cdc.config.database.user=postgres (4)
cdc.config.database.password=postgres (4)
cdc.config.database..dbname=postgres (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=5432 (4)
cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 | 配置CDC Source 使用 PostgresConnector。等效于设置cdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector . |
2 | 配置 Debezium 引擎以使用memory (例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)支持偏移量存储。 |
3 | 元数据用于识别和调度传入事件。 |
4 | 连接到运行在localhost:5432 如postgres 用户。 |
5 | 在SourceRecord 事件。 |
6 | 启用 CDC 事件平展化。 |
您还可以运行CdcSourceIntegrationTests#CdcPostgresTests
使用此 MySQL 配置。
Mongo数据库
从debezium/example-mongodb:0.10
Docker 镜像:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:0.10
初始化库存集合
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
在mongodb
终端输出,搜索像host: "3f95a8a6516e:27017"
:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
加127.0.0.1 3f95a8a6516e
进入您的/etc/hosts
使用以下属性将 CDC 源连接到 MongoDB:
cdc.connector=mongodb (1)
cdc.offset.storage=memory (2)
cdc.config.mongodb.hosts=rs0/localhost:27017 (3)
cdc.config.mongodb.name=dbserver1 (3)
cdc.config.mongodb.user=debezium (3)
cdc.config.mongodb.password=dbz (3)
cdc.config.database.whitelist=inventory (3)
cdc.config.tasks.max=1 (4)
cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 | 配置CDC Source 使用 MongoDB 连接器。这映射到cdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector . |
2 | 配置 Debezium 引擎以使用memory (例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)支持偏移量存储。 |
3 | 连接到运行在localhost:27017 如debezium 用户。 |
4 | debezium.io/docs/connectors/mongodb/#tasks |
5 | 在SourceRecord 事件。 |
6 | 启用 CDC 事件平展化。 |
您还可以运行CdcSourceIntegrationTests#CdcPostgresTests
使用此 MySQL 配置。
SQL 服务器
开始一个sqlserver
从debezium/example-postgres:1.0
Docker 镜像:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
从 debezium 的 sqlserver 教程中填充示例数据:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
使用以下属性将 CDC 源连接到 SQLServer:
cdc.connector=sqlserver (1)
cdc.offset.storage=memory (2)
cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)
cdc.config.database.user=sa (4)
cdc.config.database.password=Password! (4)
cdc.config.database..dbname=testDB (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=1433 (4)
1 | 配置CDC Source 以使用 SqlServerConnector。等效于设置cdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector . |
2 | 配置 Debezium 引擎以使用memory (例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)支持偏移量存储。 |
3 | 元数据用于识别和调度传入事件。 |
4 | 连接到运行在localhost:1433 如sa 用户。 |
您还可以运行CdcSourceIntegrationTests#CdcSqlServerTests
使用此 MySQL 配置。
神谕
从 localhost 启动可访问的 Oracle,并使用 Debezium Vagrant 设置中描述的配置、用户和授权进行设置
从 Debezium 的 Oracle 教程中填充示例数据:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. 文件源
此应用程序轮询目录并将新文件或其内容发送到输出通道。默认情况下,文件源将 File 的内容作为字节数组提供。但是,可以使用 --file.supplier.mode 选项对其进行自定义:
-
ref 提供 java.io.File 引用
-
行 将逐行拆分文件并为每行发出一条新消息
-
内容默认值。将文件的内容作为字节数组提供
使用时--file.supplier.mode=lines
,您还可以提供附加选项--file.supplier.withMarkers=true
. 如果设置为 true,则基础 FileSplitter 将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。这 2 条附加标记消息的有效负载类型为FileSplitter.FileMarker
. 如果未显式设置,则选项 withMarkers 默认为 false。
5.2.1. 选项
文件源具有以下选项:
按前缀分组的属性:
文件.consumer
- 标记-json
-
当 'fileMarkers == true' 时,指定它们是否应生成为 FileSplitter.FileMarker 对象或 JSON。(布尔值,默认:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值是 'ref' - File 对象,'lines' - 每行一条消息,或 'contents' - 以字节形式显示的内容。(FileReadingMode,默认:
<none>
,可能的值:ref
,lines
,contents
) - 带标记
-
设置为 true 以在数据之前/之后发出文件开始/文件结束标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认:
<none>
)
文件.提供商
- 空时延迟
-
未检测到新文件时的延迟持续时间。(持续时间,默认:
1s
) - 目录
-
要轮询新文件的目录。(文件,默认:
<none>
) - 文件名模式
-
一个简单的Ant模式来匹配文件。(字符串,默认:
<none>
) - 文件名-正则表达式
-
用于匹配文件的正则表达式模式。(模式,默认:
<none>
) - 预防重复
-
设置为 true 以包含防止重复的 AcceptOnceFileListFilter。(布尔值,默认:
true
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认:
25
) - 读取容量
-
表上的读取容量。(长,默认:
1
) - 桌子
-
元数据的表名。(字符串,默认:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认:
<none>
) - 写入容量
-
表上的写入容量。(长,默认:
1
)
5.3. FTP 源
此源应用程序支持使用 FTP 协议传输文件。
文件从remote
目录到local
部署应用的目录。
默认情况下,源发出的消息作为字节数组提供。但是,这可能是
使用--mode
选择:
-
裁判 提供一个
java.io.File
参考 -
线 将逐行拆分文件并为每一行发出一条新消息
-
内容 默认值。将文件的内容作为字节数组提供
使用时--mode=lines
,您还可以提供附加选项--withMarkers=true
. 如果设置为true
,基础FileSplitter
将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。这 2 条附加标记消息的有效负载类型为FileSplitter.FileMarker
. 选项withMarkers
默认为false
如果未显式设置。
另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。
5.3.2. 输出
5.3.3. 选项
ftp 源具有以下选项:
按前缀分组的属性:
文件.consumer
- 标记-json
-
当 'fileMarkers == true' 时,指定它们是否应生成为 FileSplitter.FileMarker 对象或 JSON。(布尔值,默认:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值是 'ref' - File 对象,'lines' - 每行一条消息,或 'contents' - 以字节形式显示的内容。(FileReadingMode,默认:
<none>
,可能的值:ref
,lines
,contents
) - 带标记
-
设置为 true 以在数据之前/之后发出文件开始/文件结束标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认:
<none>
)
ftp.工厂
- 缓存会话
-
缓存会话。(布尔值,默认:
<none>
) - 客户端模式
-
用于 FTP 会话的客户端模式。(ClientMode,默认:
<none>
,可能的值:ACTIVE
,PASSIVE
) - 主机
-
服务器的主机名。(字符串,默认:
localhost
) - 密码
-
用于连接到服务器的密码。(字符串,默认:
<none>
) - 端口
-
服务器的端口。(整数,默认:
21
) - 用户名
-
用于连接到服务器的用户名。(字符串,默认:
<none>
)
ftp.supplier
- 自动创建本地目录
-
设置为 true 以创建本地目录(如果不存在)。(布尔值,默认:
true
) - 空时延迟
-
未检测到新文件时的延迟持续时间。(持续时间,默认:
1s
) - 删除远程文件
-
设置为 true 可在成功传输后删除远程文件。(布尔值,默认:
false
) - 文件名模式
-
一种筛选器模式,用于匹配要传输的文件的名称。(字符串,默认:
<none>
) - 文件名-正则表达式
-
用于匹配要传输的文件名称的过滤器正则表达式模式。(模式,默认:
<none>
) - 本地目录
-
用于文件传输的本地目录。(文件,默认:
<none>
) - 保留时间戳
-
设置为 true 以保留原始时间戳。(布尔值,默认:
true
) - 远程目录
-
远程 FTP 目录。(字符串,默认:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认:
/
) - tmp-文件后缀
-
传输过程中要使用的后缀。(字符串,默认:
.tmp
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认:
25
) - 读取容量
-
表上的读取容量。(长,默认:
1
) - 桌子
-
元数据的表名。(字符串,默认:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认:
<none>
) - 写入容量
-
表上的写入容量。(长,默认:
1
)
5.4. 晶洞源
Geode 源将发出从 Apache Geode EntryEvents 或 CqEvents 中提取的对象流。
5.4.1. 选项
晶洞源具有以下选项:
按前缀分组的属性:
geode.池
- 连接类型
-
指定连接类型:“服务器”或“定位器”。(ConnectType,默认:
<none>
,可能的值:locator
,server
) - 主机地址
-
指定一个或多个格式为 [host]:[port] 的 Gemfire 定位器或服务器地址。(InetSocketAddress[],默认:
<none>
) - 已启用订阅
-
设置为 true 以启用客户端池的订阅。需要将更新同步到客户端缓存。(布尔值,默认:
false
)
geode.security.ssl (地理洞.安全.ssl)
- 密码
-
将用于安全套接字连接的 SSL 密码配置为有效密码名称的数组。(字符串,默认:
any
) - 密钥存储类型
-
标识用于 SSL 通信的密钥库类型(例如 JKS、PKCS11 等)。(字符串,默认:
JKS
) - 密钥存储 uri
-
用于连接到 Geode 集群的预先创建的密钥库 URI 的位置。(资源,默认值:
<none>
) - SSL-密钥存储密码
-
用于访问密钥信任库的密码。(字符串,默认:
<none>
) - SSL-信任存储密码
-
用于访问信任存储的密码。(字符串,默认:
<none>
) - 信任存储类型
-
标识用于 SSL 通信的信任库类型(例如 JKS、PKCS11 等)。(字符串,默认:
JKS
) - 信任存储 uri
-
用于连接到 Geode 集群的预先创建的信任库 URI 的位置。(资源,默认值:
<none>
) - 用户主目录
-
本地目录,用于缓存从 truststoreUri 和 keystoreUri 位置下载的信任库和密钥库文件。(字符串,默认:
user.home
)
5.5. Http 源代码
侦听 HTTP 请求并将正文作为消息有效负载发出的源应用程序。
如果 Content-Type 匹配text/*
或application/json
,有效负载将是一个字符串,
否则,有效负载将是一个字节数组。
5.6. JDBC 源代码
此源轮询来自 RDBMS 的数据。此源完全基于DataSourceAutoConfiguration
,因此请参阅 Spring Boot JDBC 支持 了解更多信息。
5.6.2. 选项
jdbc 源具有以下选项:
按前缀分组的属性:
jdbc.supplier
- 最大行数
-
要处理查询的最大行数。(整数,默认:
0
) - 查询
-
用于选择数据的查询。(字符串,默认:
<none>
) - 分裂
-
是否将 SQL 结果拆分为单个消息。(布尔值,默认:
true
) - 更新
-
要执行的 SQL 更新语句,用于将轮询的消息标记为“已见”。(字符串,默认:
<none>
)
spring.datasource
- 数据
-
数据 (DML) 脚本资源引用。(List<String>,默认值:
<none>
) - 驱动程序类名称
-
JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认:
<none>
) - 初始化模式
-
确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化时要应用的模式。(DataSourceInitializationMode,默认:
embedded
,可能的值:ALWAYS
,EMBEDDED
,NEVER
) - 密码
-
数据库的登录密码。(字符串,默认:
<none>
) - 图式
-
架构 (DDL) 脚本资源引用。(List<String>,默认值:
<none>
) - 网址
-
数据库的 JDBC URL。(字符串,默认:
<none>
) - 用户名
-
数据库的登录用户名。(字符串,默认:
<none>
)
spring.integration.轮询器
- 克罗恩
-
用于轮询的 Cron 表达式。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认:
<none>
) - 固定延迟
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认:
<none>
) - 固定利率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认:
<none>
) - 初始延迟
-
轮询初始延迟。申请 'fixedDelay' 和 'fixedRate';“cron”被忽略。(持续时间,默认:
<none>
) - 每次轮询的最大消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认:
<none>
) - 接收超时
-
轮询消息的等待时间。(持续时间,默认:
1s
)
另请参阅 Spring Boot 文档以获取补充DataSource
properties 和TriggerProperties
和MaxMessagesProperties
用于轮询选项。
5.7. JMS 源代码
JMS 源允许从 JMS 接收消息。
5.7.1. 选项
JMS 源具有以下选项:
按前缀分组的属性:
jms.supplier
- 客户端 ID
-
持久订阅的客户端 ID。(字符串,默认:
<none>
) - 目的地
-
接收消息的目标(队列或主题)。(字符串,默认:
<none>
) - 消息选择器
-
消息的选择器。(字符串,默认:
<none>
) - 会话交易
-
True 启用事务并选择 DefaultMessageListenerContainer,false 选择 SimpleMessageListenerContainer。(布尔值,默认:
true
) - 订阅持久
-
对于持久订阅,则为 true。(布尔值,默认:
<none>
) - 订阅名称
-
持久订阅或共享订阅的名称。(字符串,默认:
<none>
) - 订阅共享
-
对于共享订阅,则为 true。(布尔值,默认:
<none>
)
5.9. 邮件源
侦听电子邮件并将邮件正文作为邮件有效负载发出的源应用程序。
5.9.1. 选项
邮件源具有以下选项:
- mail.supplier.charset
-
byte[] 邮件到字符串转换的字符集。(字符串,默认:
UTF-8
) - 邮件.提供商.delete
-
设置为 true 可在下载后删除电子邮件。(布尔值,默认:
false
) - 邮件.提供商表达式
-
配置 SpEL 表达式以选择消息。(字符串,默认:
true
) - 邮件.supplier.idle-imap
-
设置为 true 以使用 IdleImap 配置。(布尔值,默认:
false
) - mail.supplier.java-邮件属性
-
JavaMail 属性作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<none>
) - mail.supplier.标记为已读
-
设置为 true 将电子邮件标记为已读。(布尔值,默认:
false
) - 邮件.supplier.url
-
用于连接到邮件服务器的邮件连接 URL,例如“imaps://username:[email protected]:993/Inbox”。(URLName,默认:
<none>
) - mail.supplier.user-flag
-
当服务器不支持 \Recent 时标记邮件的标志。(字符串,默认:
<none>
)
5.10. MongoDB 源代码
此源轮询来自 MongoDB 的数据。
此来源完全基于MongoDataAutoConfiguration
,因此请参阅 Spring Boot MongoDB 支持 了解更多信息。
5.10.1. 选项
mongodb 源代码具有以下选项:
按前缀分组的属性:
mongodb.supplier
- 收集
-
要查询的 MongoDB 集合。(字符串,默认:
<none>
) - 查询
-
MongoDB 查询。(字符串,默认:
{ }
) - 查询表达式
-
MongoDB查询DSL样式中的SpEL表达式。(表达式,默认值:
<none>
) - 分裂
-
是否将查询结果拆分为单个消息。(布尔值,默认:
true
) - 更新表达式
-
MongoDB 中的 SpEL 表达式更新了 DSL 样式。(表达式,默认值:
<none>
)
spring.data.mongodb
- 身份验证数据库
-
身份验证数据库名称。(字符串,默认:
<none>
) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认:
<none>
) - 数据库
-
数据库名称。(字符串,默认:
<none>
) - 字段命名策略
-
要使用的 FieldNamingStrategy 的完全限定名称。(Class<?>,默认:
<none>
) - 主机
-
Mongo 服务器主机。不能使用 URI 设置。(字符串,默认:
<none>
) - 密码
-
mongo 服务器的登录密码。不能使用 URI 设置。(Character[],默认:
<none>
) - 端口
-
Mongo 服务器端口。不能使用 URI 设置。(整数,默认:
<none>
) - 副本集名称
-
集群所需的副本集名称。不能使用 URI 设置。(字符串,默认:
<none>
) - URI
-
Mongo 数据库 URI。不能使用主机、端口、凭据和副本集名称进行设置。(字符串,默认:
mongodb://localhost/test
) - 用户名
-
mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认:
<none>
) - uuid-表示
-
将 UUID 转换为 BSON 二进制值时使用的表示。(UuidRepresentation,默认:
java-legacy
,可能的值:UNSPECIFIED
,STANDARD
,C_SHARP_LEGACY
,JAVA_LEGACY
,PYTHON_LEGACY
)
另请参阅 Spring Boot 文档以获取更多内容MongoProperties
性能。
See 和TriggerProperties
用于轮询选项。
5.11. MQTT 源代码
启用从 MQTT 接收消息的源。
5.11.2. 选项
mqtt 源具有以下选项:
按前缀分组的属性:
MQTT
- 清洁会话
-
客户端和服务器是否应记住重新启动和重新连接的状态。(布尔值,默认:
true
) - 连接超时
-
连接超时(以秒为单位)。(整数,默认:
30
) - 保持活动间隔
-
ping间隔(以秒为单位)。(整数,默认:
60
) - 密码
-
连接到代理时要使用的密码。(字符串,默认:
guest
) - 坚持
-
'内存'或'文件'。(字符串,默认:
memory
) - 持久性目录
-
持久性目录。(字符串,默认:
/tmp/paho
) - SSL-属性
-
MQTT 客户端 SSL 属性。(Map<String, String>,默认:
<none>
) - 网址
-
MQTT 代理的位置(逗号分隔的列表)。(String[],默认:
[tcp://localhost:1883]
) - 用户名
-
连接到代理时要使用的用户名。(字符串,默认:
guest
)
5.12. RabbitMQ 源代码
“rabbit”源允许从 RabbitMQ 接收消息。
在部署流之前,队列必须存在;它们不会自动创建。 您可以使用 RabbitMQ Web UI 轻松创建队列。
5.12.3. 选项
兔子源有以下选项:
按前缀分组的属性:
兔子.提供商
- 启用重试
-
true 启用重试。(布尔值,默认:
false
) - 初始重试间隔
-
启用重试时的初始重试间隔。(整数,默认:
1000
) - 映射请求标头
-
将要映射的标头。(String[],默认:
[STANDARD_REQUEST_HEADERS]
) - 最大尝试次数
-
启用重试时的最大传递尝试次数。(整数,默认:
3
) - 最大重试间隔
-
启用重试时的最大重试间隔。(整数,默认:
30000
) - 自己的连接
-
如果为 true,请根据启动属性使用单独的连接。(布尔值,默认:
false
) - 队列
-
源将侦听消息的队列。(String[],默认:
<none>
) - 重新排队
-
是否应将被拒绝的邮件重新排队。(布尔值,默认:
true
) - 重试乘数
-
启用重试时重试回退乘数。(双精度,默认:
2
) - 交易
-
通道是否已交易。(布尔值,默认:
false
)
spring.rabbitmq
- 地址随机模式
-
用于打乱已配置地址的模式。(AddressShuffleMode,默认:
none
,可能的值:NONE
,RANDOM
,INORDER
) - 地址
-
客户端应连接到的地址的逗号分隔列表。设置后,将忽略主机和端口。(字符串,默认:
<none>
) - 通道 rpc 超时
-
通道中 RPC 调用的延续超时。将其设置为零以永久等待。(持续时间,默认:
10m
) - 连接超时
-
连接超时。将其设置为零以永久等待。(持续时间,默认:
<none>
) - 主机
-
RabbitMQ 主机。如果设置了地址,则忽略。(字符串,默认:
localhost
) - 密码
-
登录以对代理进行身份验证。(字符串,默认:
guest
) - 端口
-
RabbitMQ 端口。如果设置了地址,则忽略。默认为 5672,如果启用了 SSL,则默认为 5671。(整数,默认:
<none>
) - 发布者确认类型
-
确认使用的发布者类型。(ConfirmType,默认:
<none>
,可能的值:SIMPLE
,CORRELATED
,NONE
) - 发布者返回
-
是否启用发布商退货。(布尔值,默认:
false
) - 请求通道最大值
-
客户端请求的每个连接的通道数。使用 0 表示无限制。(整数,默认:
2047
) - 请求心跳
-
请求的检测信号超时;零表示无。如果未指定持续时间后缀,则将使用秒。(持续时间,默认:
<none>
) - 用户名
-
登录用户以向代理进行身份验证。(字符串,默认:
guest
) - 虚拟主机
-
连接到代理时要使用的虚拟主机。(字符串,默认:
<none>
)
另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的附加属性。
关于重试的说明
使用默认的 ackMode (AUTO) 和重新排队 (true) 选项,将重试失败的消息传递
无限期。
由于兔源没有太多的加工,因此源本身失败的风险很小,除非
下游Binder 由于某种原因未连接。
将 requeue 设置为 false 将导致邮件在第一次尝试时被拒绝(并可能发送到死信
Exchange/Queue(如果代理已如此配置)。
enableRetry 选项允许配置重试参数,以便可以重试失败的消息传递,并且
当重试用尽时,最终被丢弃(或死信)。
在重试间隔期间,传递线程将挂起。
重试选项包括 enableRetry、maxAttempts、initialRetryInterval、retryMultipler 和 maxRetryInterval。
永远不会重试因 MessageConversionException 而失败的消息传递;假设如果消息
第一次尝试时无法转换,后续尝试也将失败。
此类消息将被丢弃(或死信)。 |
5.13. Amazon S3 源代码
此源应用程序支持使用 Amazon S3 协议传输文件。
文件从remote
目录(S3 存储桶)到local
部署应用程序的目录。
默认情况下,源发出的消息作为字节数组提供。但是,这可能是
使用--mode
选择:
-
裁判 提供一个
java.io.File
参考 -
线 将逐行拆分文件并为每一行发出一条新消息
-
内容 默认值。将文件的内容作为字节数组提供
使用时--mode=lines
,您还可以提供附加选项--withMarkers=true
. 如果设置为true
,基础FileSplitter
将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。这 2 条附加标记消息的有效负载类型为FileSplitter.FileMarker
. 选项withMarkers
默认为false
如果未显式设置。
另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。
模式 = 线
5.13.3. 选项
s3 源具有以下选项:
按前缀分组的属性:
文件.consumer
- 标记-json
-
当 'fileMarkers == true' 时,指定它们是否应生成为 FileSplitter.FileMarker 对象或 JSON。(布尔值,默认:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值是 'ref' - File 对象,'lines' - 每行一条消息,或 'contents' - 以字节形式显示的内容。(FileReadingMode,默认:
<none>
,可能的值:ref
,lines
,contents
) - 带标记
-
设置为 true 以在数据之前/之后发出文件开始/文件结束标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认:
<none>
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认:
25
) - 读取容量
-
表上的读取容量。(长,默认:
1
) - 桌子
-
元数据的表名。(字符串,默认:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认:
<none>
) - 写入容量
-
表上的写入容量。(长,默认:
1
)
metadata.store
- 类型
-
指示要配置的元数据存储类型(默认值为“内存”)。您必须包含相应的 Spring Integration 依赖项才能使用持久存储。(StoreType,默认:
<none>
,可能的值:mongodb
,gemfire
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
元数据.store.zookeeper
- 连接字符串
-
Zookeeper 连接字符串,格式为 HOST:PORT。(字符串,默认:
127.0.0.1:2181
) - 编码
-
在 Zookeeper 中存储数据时使用的编码。(字符集,默认:
UTF-8
) - 重试间隔
-
Zookeeper作的重试间隔(以毫秒为单位)。(整数,默认:
1000
) - 根
-
根节点 - 存储条目是此节点的子节点。(字符串,默认:
/SpringIntegration-MetadataStore
)
S3.提供商
- 自动创建本地目录
-
创建或不创建本地目录。(布尔值,默认:
true
) - 删除远程文件
-
处理后删除或不删除远程文件。(布尔值,默认:
false
) - 文件名模式
-
过滤远程文件的模式。(字符串,默认:
<none>
) - 文件名-正则表达式
-
用于过滤远程文件的正则表达式。(模式,默认:
<none>
) - 仅列表
-
设置为 true 可返回 s3 对象元数据,而不将文件复制到本地目录。(布尔值,默认:
false
) - 本地目录
-
用于存储文件的本地目录。(文件,默认:
<none>
) - 保留时间戳
-
将远程文件的时间戳传输到本地文件。(布尔值,默认:
true
) - 远程目录
-
AWS S3 存储桶资源。(字符串,默认:
bucket
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认:
/
) - tmp-文件后缀
-
临时文件后缀。(字符串,默认:
.tmp
)
5.13.4. Amazon AWS 通用选项
Amazon S3 Source(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,其自动配置类由 Spring Boot 自动使用。有关所需和有用的自动配置属性,请参阅他们的文档。
其中一些是关于 AWS 凭证的:
-
cloud.aws.credentials.accessKey
-
cloud.aws.credentials.secretKey
-
cloud.aws.credentials.instanceProfile
-
cloud.aws.credentials.profileName
-
cloud.aws.credentials.profilePath
其他适用于 AWSRegion
定义:
-
cloud.aws.region.auto
-
cloud.aws.region.static
对于 AWSStack
:
-
云.aws.stack.auto
-
cloud.aws.stack.name
5.14. SFTP 源
此源应用程序支持使用 SFTP 协议传输文件。
文件从remote
目录到local
部署应用的目录。
默认情况下,源发出的消息作为字节数组提供。但是,这可能是
使用--mode
选择:
-
裁判 提供一个
java.io.File
参考 -
线 将逐行拆分文件并为每一行发出一条新消息
-
内容 默认值。将文件的内容作为字节数组提供
使用时--mode=lines
,您还可以提供附加选项--withMarkers=true
. 如果设置为true
,基础FileSplitter
将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。这 2 条附加标记消息的有效负载类型为FileSplitter.FileMarker
. 选项withMarkers
默认为false
如果未显式设置。
看sftp-supplier
以获取高级配置选项。
另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。
5.14.2. 输出
模式 = 内容
模式 = 线
5.14.3. 选项
ftp 源具有以下选项:
按前缀分组的属性:
文件.consumer
- 标记-json
-
当 'fileMarkers == true' 时,指定它们是否应生成为 FileSplitter.FileMarker 对象或 JSON。(布尔值,默认:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值是 'ref' - File 对象,'lines' - 每行一条消息,或 'contents' - 以字节形式显示的内容。(FileReadingMode,默认:
<none>
,可能的值:ref
,lines
,contents
) - 带标记
-
设置为 true 以在数据之前/之后发出文件开始/文件结束标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认:
<none>
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认:
25
) - 读取容量
-
表上的读取容量。(长,默认:
1
) - 桌子
-
元数据的表名。(字符串,默认:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认:
<none>
) - 写入容量
-
表上的写入容量。(长,默认:
1
)
metadata.store
- 类型
-
指示要配置的元数据存储类型(默认值为“内存”)。您必须包含相应的 Spring Integration 依赖项才能使用持久存储。(StoreType,默认:
<none>
,可能的值:mongodb
,gemfire
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
元数据.store.zookeeper
- 连接字符串
-
Zookeeper 连接字符串,格式为 HOST:PORT。(字符串,默认:
127.0.0.1:2181
) - 编码
-
在 Zookeeper 中存储数据时使用的编码。(字符集,默认:
UTF-8
) - 重试间隔
-
Zookeeper作的重试间隔(以毫秒为单位)。(整数,默认:
1000
) - 根
-
根节点 - 存储条目是此节点的子节点。(字符串,默认:
/SpringIntegration-MetadataStore
)
SFTP.提供商
- 自动创建本地目录
-
设置为 true 以创建本地目录(如果不存在)。(布尔值,默认:
true
) - 空时延迟
-
未检测到新文件时的延迟持续时间。(持续时间,默认:
1s
) - 删除远程文件
-
设置为 true 可在成功传输后删除远程文件。(布尔值,默认:
false
) - 目录
-
工厂“name.directory”对的列表。(String[],默认:
<none>
) - 工厂
-
工厂名称到工厂的映射。(Map<String, Factory>,默认值:
<none>
) - 公平
-
对于多个服务器/目录的公平轮换为 True。默认情况下,这是 false,因此如果一个源有多个条目,则将在访问其他源之前收到这些条目。(布尔值,默认:
false
) - 文件名模式
-
一种筛选器模式,用于匹配要传输的文件的名称。(字符串,默认:
<none>
) - 文件名-正则表达式
-
用于匹配要传输的文件名称的过滤器正则表达式模式。(模式,默认:
<none>
) - 仅列表
-
设置为 true 以返回不包含整个有效负载的文件元数据。(布尔值,默认:
false
) - 本地目录
-
用于文件传输的本地目录。(文件,默认:
<none>
) - 最大获取
-
每次轮询要获取的最大远程文件数;默认无限制。在列出文件或构建任务启动请求时不适用。(整数,默认:
<none>
) - 保留时间戳
-
设置为 true 以保留原始时间戳。(布尔值,默认:
true
) - 远程目录
-
远程 FTP 目录。(字符串,默认:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认:
/
) - 将远程文件重命名为
-
成功传输后,必须将解析为新名称远程文件的 SpEL 表达式重命名为。(表达式,默认值:
<none>
) - 流
-
设置为 true 以流式传输文件而不是复制到本地目录。(布尔值,默认:
false
) - tmp-文件后缀
-
传输过程中要使用的后缀。(字符串,默认:
.tmp
)
5.15. 系统日志
系统日志源通过 UDP、TCP 或两者接收 SYSLOG 数据包。支持 RFC3164 (BSD) 和 RFC5424 格式。
5.15.1. 选项
- syslog.supplier.buffer-size
-
解码消息时使用的缓冲区大小;较大的邮件将被拒绝。(整数,默认:
2048
) - 系统日志.supplier.nio
-
是否使用蔚来(支持大量连接时)。(布尔值,默认:
false
) - 系统日志.supplier.port
-
要侦听的端口。(整数,默认:
1514
) - syslog.supplier.protocol 系统日志
-
用于 SYSLOG(tcp 或 udp)的协议。(协议,默认:
<none>
,可能的值:tcp
,udp
,both
) - syslog.supplier.反向查找
-
是否对传入套接字执行反向查找。(布尔值,默认:
false
) - syslog.supplier.rfc
-
'5424' 或 '3164' - 根据 RFC 的系统日志格式;3164 又名“BSD”格式。(字符串,默认:
3164
) - syslog.supplier.socket-超时
-
套接字超时。(整数,默认:
0
)
5.16. TCP的
这tcp
源充当服务器,允许远程方连接到它并通过原始 TCP 套接字提交数据。
TCP 是一种流式处理协议,需要某种机制来构建在线路上的消息。许多解码器是 可用,默认值为与 Telnet 兼容的“CRLF”。
TCP 源应用程序生成的消息具有byte[]
有效载荷。
5.17. 时间源
时间源将只是每隔一段时间发出一个包含当前时间的字符串。
5.17.1. 选项
时间源具有以下选项:
- spring.integration.poller.cron
-
用于轮询的 Cron 表达式。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认:
<none>
) - spring.integration.poller.fixed-delay
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认:
<none>
) - spring.integration.poller.固定速率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认:
<none>
) - spring.integration.poller.initial-delay
-
轮询初始延迟。申请 'fixedDelay' 和 'fixedRate';“cron”被忽略。(持续时间,默认:
<none>
) - 每次轮询spring.integration.poller.max消息
-
每个轮询周期要轮询的最大消息数。(整数,默认:
<none>
) - spring.integration.poller.receive-timeout
-
轮询消息的等待时间。(持续时间,默认:
1s
) - time.date-format
-
日期值的格式。(字符串,默认:
MM/dd/yy HH:mm:ss
)
5.18. Twitter 消息源
重复检索过去 30 天内的直接消息(已发送和已接收),按时间倒序排序。
释放的消息被缓存(在MetadataStore
cache)以防止重复。
默认情况下,内存中的SimpleMetadataStore
被使用。
这twitter.message.source.count
控制返回的消息数。
这spring.cloud.stream.poller
属性控制消息轮询间隔。
必须与使用的 API 速率限制保持一致
5.18.1. 选项
按前缀分组的属性:
spring.integration.轮询器
- 克罗恩
-
用于轮询的 Cron 表达式。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认:
<none>
) - 固定延迟
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认:
<none>
) - 固定利率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认:
<none>
) - 初始延迟
-
轮询初始延迟。申请 'fixedDelay' 和 'fixedRate';“cron”被忽略。(持续时间,默认:
<none>
) - 每次轮询的最大消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认:
<none>
) - 接收超时
-
轮询消息的等待时间。(持续时间,默认:
1s
)
推特连接
- 访问Tokens
-
您的 Twitter Tokens。(字符串,默认:
<none>
) - 访问Tokens密钥
-
你的 Twitter Tokens密钥。(字符串,默认:
<none>
) - 消费者键
-
您的 Twitter 密钥。(字符串,默认:
<none>
) - 消费者机密
-
你的 Twitter 秘密。(字符串,默认:
<none>
) - 启用调试
-
启用 Twitter4J 调试模式。(布尔值,默认:
false
) - 原始 JSON
-
启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认:
true
)
5.19. Twitter 搜索源
Twitter 的标准搜索 API(搜索/推文)允许对最近或流行的推文的索引进行简单查询。这Source
提供针对过去 7 天内发布的最近推文样本的连续搜索。“公共”API 集的一部分。
返回与指定查询匹配的相关推文的集合。
使用spring.cloud.stream.poller
属性来控制连续搜索请求之间的间隔。速率限制 - 每 30 分钟窗口 180 个请求(例如 ~6 r/m,~ 1 req / 10 秒)
这twitter.search
查询属性允许按关键字查询并按时间和地理位置过滤结果。
这twitter.search.count
和twitter.search.page
根据搜索 API 控制结果分页。
注意:Twitter 的搜索服务以及搜索 API 并不意味着成为推文的详尽来源。并非所有推文都会被索引或通过搜索界面提供。
5.19.1. 选项
按前缀分组的属性:
spring.integration.轮询器
- 克罗恩
-
用于轮询的 Cron 表达式。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认:
<none>
) - 固定延迟
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认:
<none>
) - 固定利率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认:
<none>
) - 初始延迟
-
轮询初始延迟。申请 'fixedDelay' 和 'fixedRate';“cron”被忽略。(持续时间,默认:
<none>
) - 每次轮询的最大消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认:
<none>
) - 接收超时
-
轮询消息的等待时间。(持续时间,默认:
1s
)
推特连接
- 访问Tokens
-
您的 Twitter Tokens。(字符串,默认:
<none>
) - 访问Tokens密钥
-
你的 Twitter Tokens密钥。(字符串,默认:
<none>
) - 消费者键
-
您的 Twitter 密钥。(字符串,默认:
<none>
) - 消费者机密
-
你的 Twitter 秘密。(字符串,默认:
<none>
) - 启用调试
-
启用 Twitter4J 调试模式。(布尔值,默认:
false
) - 原始 JSON
-
启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认:
true
)
推特.search
- 计数
-
每页要返回的推文数量(例如,每个请求),最多 100 条。(整数,默认:
100
) - 朗
-
将搜索到的推文限制为给定语言,由 http://en.wikipedia.org/wiki/ISO_639-1 给出。(字符串,默认:
<none>
) - 页
-
在再次开始从最近的推文开始搜索之前,要向后搜索(从最近到最旧的推文)的页面数(例如请求)。向后搜索的推文总数为(页面 * 计数)(整数,默认:
3
) - 查询
-
按搜索查询字符串搜索推文。(字符串,默认:
<none>
) - 从最近的空响应重新启动
-
在空响应时从最近的推文重新开始搜索。仅在第一次重新启动后应用(例如,当since_id != UNBOUNDED 时)(布尔值,默认:
false
) - 结果类型
-
指定您希望接收的搜索结果类型。当前默认值为“混合”。有效值包括: 混合 :在响应中同时包含热门结果和实时结果。最近 :仅返回响应中的最新结果 常用 :仅返回响应中最受欢迎的结果(ResultType,默认:
<none>
,可能的值:popular
,mixed
,recent
) - 因为
-
如果指定,则返回自给定日期以来的推文。日期的格式应为 YYYY-MM-DD。(字符串,默认:
<none>
)
5.20. Twitter 流源
-
这
Filter API
返回与一个或多个过滤器谓词匹配的公共状态。多个参数允许使用与流式处理API的单个连接。提示:track
,follow
和locations
字段与 OR 运算符组合!使用track=foo
和follow=1234
返回匹配的推文test
或由用户创建1234
. -
这
Sample API
返回所有公共状态的小型随机样本。默认访问级别返回的推文是相同的,因此,如果两个不同的客户端连接到此端点,它们将看到相同的推文。
默认访问级别允许多达 400 个跟踪关键字、5,000 个关注用户 ID 和 25 个 0.1-360 度位置框。
5.20.1. 选项
按前缀分组的属性:
推特连接
- 访问Tokens
-
您的 Twitter Tokens。(字符串,默认:
<none>
) - 访问Tokens密钥
-
你的 Twitter Tokens密钥。(字符串,默认:
<none>
) - 消费者键
-
您的 Twitter 密钥。(字符串,默认:
<none>
) - 消费者机密
-
你的 Twitter 秘密。(字符串,默认:
<none>
) - 启用调试
-
启用 Twitter4J 调试模式。(布尔值,默认:
false
) - 原始 JSON
-
启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认:
true
)
推特.stream.filter
- 计数
-
指示在过渡到实时流之前要流化的先前状态数。(整数,默认:
0
) - 过滤器级别
-
筛选器级别将推文显示在流中限制为具有最小 filterLevel 属性值的推文。无、低或中之一。(FilterLevel,默认:
<none>
) - 跟随
-
按 ID 指定要从中接收公开推文的用户。(List<Long>,默认值:
<none>
) - 语言
-
指定流的推文语言。(List<String>,默认值:
<none>
) - 地点
-
要跟踪的位置。内部表示为 2D 数组。边界框无效:52.38、4.90、51.51、-0.12。第一对必须是框的 SW 角(List<BoundingBox>,默认值:
<none>
) - 跟踪
-
指定要跟踪的关键字。(List<String>,默认值:
<none>
)
5.22. ZeroMQ 源代码
“zeromq”源允许从 ZeroMQ 接收消息。
5.22.3. 选项
zeromq 源具有以下选项:
- zeromq.supplier.bind-port
-
用于创建 ZeroMQ 套接字的绑定端口;0 选择一个随机端口。(整数,默认:
0
) - zeromq.supplier.connect-url
-
到 ZeroMQ 套接字的连接 URL。(字符串,默认:
<none>
) - zeromq.supplier.消费延迟
-
未收到数据时从 ZeroMQ 套接字消费的延迟。(持续时间,默认:
1s
) - zeromq.supplier.socket-type
-
连接应建立的套接字类型。(SocketType,默认:
<none>
,可能的值:PAIR
,PUB
,SUB
,REQ
,REP
,DEALER
,ROUTER
,PULL
,PUSH
,XPUB
,XSUB
,STREAM
) - zeromq.supplier.topics
-
要订阅的主题。(String[],默认:
[]
)
另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的附加属性。
6. 处理器
6.1. 聚合处理器
聚合器处理器使应用程序能够将传入消息聚合到组中,并将它们释放到输出目标中。
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。
6.1.2. 选项
按前缀分组的属性:
聚合
- 集合体
-
用于聚集策略的 SpEL 表达。默认值为有效负载的集合。(表达式,默认值:
<none>
) - 相关
-
相关键的 SpEL 表达式。默认为 correlationId 标头。(表达式,默认值:
<none>
) - 组超时
-
SpEL 表达式,用于超时到即将过期的未完成组。(表达式,默认值:
<none>
) - 消息存储实体
-
持久性消息存储实体:RDBMS 中的表前缀、MongoDb 中的集合名称等。(字符串,默认:
<none>
) - 消息存储类型
-
消息存储类型。(字符串,默认:
<none>
) - 释放
-
用于释放策略的 SpEL 表达式。默认值基于 sequenceSize 标头。(表达式,默认值:
<none>
)
spring.data.mongodb
- 身份验证数据库
-
身份验证数据库名称。(字符串,默认:
<none>
) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认:
<none>
) - 数据库
-
数据库名称。(字符串,默认:
<none>
) - 字段命名策略
-
要使用的 FieldNamingStrategy 的完全限定名称。(Class<?>,默认:
<none>
) - 网格 FS 数据库
-
<缺少文档>(字符串,默认:
<none>
) - 主机
-
Mongo 服务器主机。不能使用 URI 设置。(字符串,默认:
<none>
) - 密码
-
mongo 服务器的登录密码。不能使用 URI 设置。(Character[],默认:
<none>
) - 端口
-
Mongo 服务器端口。不能使用 URI 设置。(整数,默认:
<none>
) - 副本集名称
-
集群所需的副本集名称。不能使用 URI 设置。(字符串,默认:
<none>
) - URI
-
Mongo 数据库 URI。不能使用主机、端口、凭据和副本集名称进行设置。(字符串,默认:
mongodb://localhost/test
) - 用户名
-
mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认:
<none>
) - uuid-表示
-
将 UUID 转换为 BSON 二进制值时使用的表示。(UuidRepresentation,默认:
java-legacy
,可能的值:UNSPECIFIED
,STANDARD
,C_SHARP_LEGACY
,JAVA_LEGACY
,PYTHON_LEGACY
)
spring.datasource
- 出错时继续
-
在初始化数据库时发生错误时是否停止。(布尔值,默认:
false
) - 数据
-
数据 (DML) 脚本资源引用。(List<String>,默认值:
<none>
) - 数据密码
-
用于执行 DML 脚本的数据库密码(如果不同)。(字符串,默认:
<none>
) - 数据用户名
-
用于执行 DML 脚本的数据库的用户名(如果不同)。(字符串,默认:
<none>
) - 驱动程序类名称
-
JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认:
<none>
) - 嵌入式数据库连接
-
嵌入式数据库的连接详细信息。默认为类路径上可用的最合适的嵌入式数据库。(EmbeddedDatabaseConnection,默认:
<none>
,可能的值:NONE
,H2
,DERBY
,HSQL
,HSQLDB
) - 生成唯一名称
-
是否生成随机数据源名称。(布尔值,默认:
true
) - 初始化模式
-
确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化时要应用的模式。(DataSourceInitializationMode,默认:
embedded
,可能的值:ALWAYS
,EMBEDDED
,NEVER
) - jndi-name
-
数据源的 JNDI 位置。设置时忽略类、url、用户名和密码。(字符串,默认:
<none>
) - 名称
-
如果“generate-unique-name”为 false,则要使用的数据源名称。使用嵌入式数据库时默认为“testdb”,否则为 null。(字符串,默认:
<none>
) - 密码
-
数据库的登录密码。(字符串,默认:
<none>
) - 平台
-
要在 DDL 或 DML 脚本中使用的平台(例如 schema-${platform}.sql 或 data-${platform}.sql)。(字符串,默认:
all
) - 图式
-
架构 (DDL) 脚本资源引用。(List<String>,默认值:
<none>
) - 模式密码
-
用于执行 DDL 脚本的数据库密码(如果不同)。(字符串,默认:
<none>
) - 架构用户名
-
用于执行 DDL 脚本的数据库的用户名(如果不同)。(字符串,默认:
<none>
) - 分隔符
-
SQL 初始化脚本中的语句分隔符。(字符串,默认:
;
) - sql-script-encoding
-
SQL 脚本编码。(字符集,默认:
<none>
) - 类型
-
要使用的连接池实现的完全限定名称。默认情况下,它是从类路径中自动检测到的。(Class<DataSource>,默认值:
<none>
) - 网址
-
数据库的 JDBC URL。(字符串,默认:
<none>
) - 用户名
-
数据库的登录用户名。(字符串,默认:
<none>
)
spring.mongodb.embedded
- 特征
-
要启用的功能的逗号分隔列表。默认使用已配置版本的默认值。(Set<Feature>,默认值:
[sync_delay]
) - 版本
-
要使用的 Mongo 版本。(字符串,默认:
3.5.5
)
spring.redis 的
- 客户端名称
-
要在与 CLIENT SETNAME 的连接上设置的客户端名称。(字符串,默认:
<none>
) - 客户端类型
-
要使用的客户端类型。默认情况下,根据类路径自动检测。(ClientType,默认:
<none>
,可能的值:LETTUCE
,JEDIS
) - 连接超时
-
连接超时。(持续时间,默认:
<none>
) - 数据库
-
连接工厂使用的数据库索引。(整数,默认:
0
) - 主机
-
Redis 服务器主机。(字符串,默认:
localhost
) - 密码
-
redis 服务器的登录密码。(字符串,默认:
<none>
) - 端口
-
Redis 服务器端口。(整数,默认:
6379
) - SSL的
-
是否启用 SSL 支持。(布尔值,默认:
false
) - 超时
-
读取超时。(持续时间,默认:
<none>
) - 网址
-
连接 URL。覆盖主机、端口和密码。用户被忽略。示例:redis://user:[email protected]:6379(字符串,默认:
<none>
) - 用户名
-
redis 服务器的登录用户名。(字符串,默认:
<none>
)
6.3. 滤波处理器
筛选处理器使应用程序能够检查传入的有效负载,然后对其应用谓词,以决定是否需要继续记录。
例如,如果传入有效负载的类型为String
如果您想过滤掉任何少于五个字符的内容,您可以运行过滤处理器,如下所示。
java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。
6.5. 标头富集处理器
使用标头扩充器应用添加邮件标头。
标头以换行符分隔键值对的形式提供,其中键是标头名称,值是 SpEL 表达式。 例如--headers='foo=payload.someProperty \n bar=payload.otherProperty'
.
6.6. Http 请求处理器
向 HTTP 资源发出请求并将响应正文作为消息有效负载发出的处理器应用程序。
6.6.1. 输入
头
任何必需的 HTTP 标头都必须通过headers
或headers-expression
财产。请参阅下面的示例。
标头值也可用于构造:
-
请求正文在
body-expression
财产。 -
HTTP 方法在
http-method-expression
财产。 -
在
url-expression
财产。
有效载荷
默认情况下,有效负载用作 POST 请求的请求正文,并且可以是任何 Java 类型。 它应该是 GET 请求的空字符串。 有效负载还可用于构造:
-
请求正文在
body-expression
财产。 -
HTTP 方法在
http-method-expression
财产。 -
在
url-expression
财产。
底层 WebClient 支持 Jackson JSON 序列化,以支持任何请求和响应类型(如有必要)。
这expected-response-type
属性String.class
默认情况下,可以设置为应用程序类路径中的任何类。
请注意,用户定义的有效负载类型需要将所需的依赖项添加到您的 pom 文件中。
6.6.2. 输出
有效载荷
原始输出对象是 ResponseEntity<?>其任何字段(例如body
,headers
) 或访问器方法 (statusCode
) 可以作为reply-expression
.
默认情况下,出站消息有效负载是响应正文。
请注意,ResponseEntity(由表达式#root
) 默认情况下不能被 Jackson 反序列化,但可以呈现为HashMap
.
6.6.4. 选项
- http.request.body-expression
-
一个 SpEL 表达式,用于从传入消息派生请求正文。(表达式,默认值:
<none>
) - http.request.expected-response-type
-
用于解释响应的类型。(Class<?>,默认:
<none>
) - http.request.headers-expression
-
用于派生要使用的 http 标头映射的 SpEL 表达式。(表达式,默认值:
<none>
) - http.request.http-方法表达式
-
一个 SpEL 表达式,用于从传入消息派生请求方法。(表达式,默认值:
<none>
) - http.request.maximum-buffer-size (英语)
-
分配给输入流缓冲区的最大缓冲区大小(以字节为单位)。默认为 256k。根据需要增加发布或获取大型二进制内容。(整数,默认:
0
) - http.request.reply-expression
-
用于计算最终结果的 SpEL 表达式,应用于整个 http {@link org.springframework.http.ResponseEntity}。(表达式,默认值:
<none>
) - http.request.timeout
-
请求超时(以毫秒为单位)。(长,默认:
30000
) - http.request.url 表达式
-
针对传入消息的 SpEL 表达式,用于确定要使用的 URL。(表达式,默认值:
<none>
)
6.7. 图像识别处理器
使用 Inception 模型对实时图像进行分类的处理器(例如标签)。
模型的输入是作为二进制数组的图像。
输出是以下格式的 JSON 消息:
{
"labels" : [
{"giant panda":0.98649305}
]
}
结果包含已识别类别的名称(例如标签)以及图像代表此类别的置信度(例如置信度)。
如果response-seize
设置为高于 1 的值,则结果将包括顶部response-seize
可能的标签。例如response-size=3
将返回:
{
"labels": [
{"giant panda":0.98649305},
{"badger":0.010562794},
{"ice bear":0.001130851}
]
}
6.7.2. 选项
- image.recognition.cache-model
-
缓存预训练的 TensorFlow 模型。(布尔值,默认:
true
) - image.recognition.debug-output
-
<缺少文档>(布尔值,默认:
false
) - image.recognition.debug-output-path
-
<缺少文档>(字符串,默认:
image-recognition-result.png
) - image.recognition.model(图像识别模型)
-
预训练的 TensorFlow 图像识别模型。请注意,模型必须与所选模型类型匹配!(字符串,默认:
https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb
) - image.recognition.model-type
-
支持三种不同的预训练 tensorflow 图像识别模型:Inception、MobileNetV1 和 MobileNetV2 1。Inception 图使用“输入”作为输入,使用“输出”作为输出。2. MobileNetV2 预训练模型:https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - 归一化图像大小始终为正方形(例如 H=W) - 图形使用“input”作为输入,使用“MobilenetV2/Predictions/Reshape_1”作为输出。3. MobileNetV1 预训练模型:https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - graph 使用“input”作为输入,“MobilenetV1/Predictions/Reshape_1”作为输出。(ModelType,默认:
<none>
,可能的值:inception
,mobilenetv1
,mobilenetv2
) - image.recognition.normalized-image-size
-
标准化图像大小。(整数,默认:
224
) - image.recognition.response-size
-
识别的图像数量。(整数,默认:
5
)
6.8. 对象检测处理器
对象检测处理器为 TensorFlow 对象检测 API 提供开箱即用的支持。它允许实时定位和识别单个图像或图像流中的多个对象。对象检测处理器建立在对象检测功能之上。
以下是一些合理的配置默认值:
-
object.detection.model
:storage.googleapis.com/scdf-tensorflow-models/object-detection/faster_rcnn_resnet101_coco_2018_01_28_frozen_inference_graph.pb
-
object.detection.labels
:storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt
-
object.detection.with-masks
:false
下图显示了一个 Spring Cloud Data Flow,流管道,它实时预测输入图像流中的对象类型。

处理器的输入是一个图像字节数组,输出是一个增强图像和一个标头,称为detected_objects
,提供检测到的对象的文本描述:
{
"labels" : [
{"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
{"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
{"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
{"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
]
}
这detected_objects
标头格式为:
-
object-name:confidence - 检测到的对象(例如标签)的人类可读名称,其置信度为 [0-1] 之间的浮点数
-
x1, y1, x2, y2 - 响应还提供检测到的对象的边界框,表示为
(x1, y1, x2, y2)
.坐标是相对于图像大小的。 -
cid - 提供的标签配置文件中定义的分类标识符。
有效载荷
传入类型为byte[]
,内容类型为application/octet-stream
.处理器处理输入byte[]
图像并输出增强的byte[]
图像有效负载和 JSON 标头 (detected_objects
).
6.8.2. 选项
- 对象.检测.缓存模型
-
<缺少文档>(布尔值,默认:
true
) - 对象.检测.置信度
-
<缺少文档>(浮点,默认:
0.4
) - object.detection.debug-output
-
<缺少文档>(布尔值,默认:
false
) - 对象.检测.调试输出路径
-
<缺少文档>(字符串,默认:
object-detection-result.png
) - 对象.检测.标签
-
标记 URI。(字符串,默认:
https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt
) - 对象.检测.模型
-
预训练的 TensorFlow 对象检测模型。(字符串,默认:
https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb
) - 对象.检测.响应大小
-
<缺少文档>(整数,默认:
<none>
) - 对象.detection.with-masks
-
<缺少文档>(布尔值,默认:
false
)
6.9. 语义分割处理器
基于最先进的 DeepLab Tensorflow 模型的图像语义分割。
这Semantic Segmentation
是将图像的每个像素与类标签(例如花、人、道路、天空、海洋或汽车)相关联的过程。与Instance Segmentation
,它会生成实例感知区域掩码,则Semantic Segmentation
生成类感知掩码。用于实现Instance Segmentation
请改为咨询对象检测服务。
这Semantic Segmentation Processor
使用语义分割函数库和 TensorFlow 服务。
有效载荷
传入类型为byte[]
,内容类型为application/octet-stream
.处理器处理输入byte[]
图像和输出增强byte[]
图像有效负载和 JSON 标头。
处理器的输入是一个图像字节数组,输出是一个增强的图像字节数组和一个 JSON 标头semantic_segmentation
在这种格式中:
[
[ 0, 0, 0 ],
[ 127, 127, 127 ],
[ 255, 255, 255 ]
...
]
输出标头 json 格式表示从输入图像计算的颜色像素贴图。
6.9.2. 选项
- semantic.segmentation.color-map-uri
-
每个预训练模型都基于某些对象颜色图。预定义的选项是: - classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json(字符串,默认:
classpath:/colormap/citymap_colormap.json
) - semantic.segmentation.debug-output
-
保存本地 debugOutputPath 路径中的输出映像。(布尔值,默认:
false
) - semantic.segmentation.debug-output-path
-
<缺少文档>(字符串,默认:
semantic-segmentation-result.png
) - semantic.segmentation.mask-transparency
-
计算的分割掩码图像的 alpha 颜色。(浮点,默认:
0.45
) - 语义.segmentation.model
-
预训练的 TensorFlow 语义分割模型。(字符串,默认:
https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb
) - semantic.segmentation.output-type
-
指定输出图像类型。您可以返回带有计算的掩码叠加的输入图像,也可以单独返回掩码。(OutputType,默认:
<none>
,可能的值:blended
,mask
)
6.11. 拆分器处理器
拆分器应用程序建立在 Spring Integration 中同名的概念之上,并允许将单个消息拆分为多个不同的消息。
处理器使用一个函数,该函数接受Message<?>
作为输入,然后生成一个List<Message<?>
作为基于各种属性的输出(见下文)。
您可以使用 SpEL 表达式或分隔符来指定要如何拆分传入消息。
有效载荷
-
传入有效载荷 -
Message<?
>
如果传入类型为byte[]
并将内容类型设置为text/plain
或application/json
,则应用程序将byte[]
到String
.
-
传出有效载荷 -
List<Message<?>
6.11.2. 选项
- splitter.apply-sequence
-
在标头中添加相关性/序列信息,以方便以后聚合。(布尔值,默认:
true
) - splitter.charset
-
将基于文本的文件中的字节转换为 String 时要使用的字符集。(字符串,默认:
<none>
) - splitter.delimiters
-
当表达式为 null 时,标记 {@link String} 有效负载时要使用的分隔符。(字符串,默认:
<none>
) - splitter.expression
-
用于拆分有效负载的 SpEL 表达式。(字符串,默认:
<none>
) - splitter.文件标记
-
设置为 true 或 false 以使用包含(或不包含)文件开头/结尾标记的 {@code FileSplitter}(按行拆分基于文本的文件)。(布尔值,默认:
<none>
) - splitter.markers-json
-
当 'fileMarkers == true' 时,指定它们是否应生成为 FileSplitter.FileMarker 对象或 JSON。(布尔值,默认:
true
)
6.12. 变换处理器
Transformer 处理器允许您根据 SpEL 表达式转换消息有效负载结构。
下面是如何运行此应用程序的示例。
java -jar transform-processor-kafka-<version>.jar --spel.function.expression=payload.toUpperCase()
如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。
6.13. Twitter 趋势和趋势位置处理器
可以返回趋势主题或趋势主题的位置的处理器。
这twitter.trend.trend-query-type
属性允许来选择查询类型。
6.13.2. 检索趋势位置
对于此模式集twitter.trend.trend-query-type
自trendLocation
.
按位置检索热门主题的完整或附近位置列表。
如果latitude
,longitude
参数不提供处理器执行可用趋势 API 并返回 Twitter 具有趋势主题信息的位置。
如果latitude
,longitude
参数,则处理器执行 Trends Closest API 并返回 Twitter 具有最接近指定位置的趋势主题信息的位置。
Response 是locations
对该位置的 WOEID 和其他一些人类可读信息(例如该位置所属的规范名称和国家/地区)进行编码。
7. 水槽
7.1. 卡桑德拉水槽
此接收器应用程序将收到的每条消息的内容写入 Cassandra。
它需要 JSON 字符串的有效负载,并使用其属性映射到表列。
7.1.2. 选项
cassandra 水槽具有以下选项:
- spring.data.cassandra.compression
-
Cassandra 二进制协议支持压缩。(压缩,默认:
none
,可能的值:LZ4
,SNAPPY
,NONE
) - spring.data.cassandra.config
-
要使用的配置文件的位置。(资源,默认值:
<none>
) - spring.data.cassandra.contact-points
-
集群节点地址格式为“host:port”,或简单的“host”,以使用配置的端口。(List<String>,默认值:
[127.0.0.1:9042]
) - spring.data.cassandra.keyspace-name
-
要使用的键空间名称。(字符串,默认:
<none>
) - spring.data.cassandra.local-datacenter
-
被视为“本地”的数据中心。联系点应来自此数据中心。(字符串,默认:
<none>
) - spring.data.cassandra.密码
-
服务器的登录密码。(字符串,默认:
<none>
) - spring.data.cassandra.port
-
如果触点未指定一个端口,则要使用的端口。(整数,默认:
9042
) - spring.data.cassandra.schema-action
-
启动时要执行的架构作。(字符串,默认:
none
) - spring.data.cassandra.会话名称
-
Cassandra 会话的名称。(字符串,默认:
<none>
) - spring.data.cassandra.ssl
-
启用 SSL 支持。(布尔值,默认:
false
) - spring.data.cassandra.用户名
-
服务器的登录用户。(字符串,默认:
<none>
)
7.2. 分析接收器
Sink 应用程序,构建在 Analytics 使用者之上,它根据输入消息计算分析,并将分析作为指标发布到各种监控系统。它利用千分尺库在最流行的监控系统中提供统一的编程体验,并公开了 Spring 表达式语言 (SpEL) 属性,用于定义如何从输入数据计算指标名称、值和标签。
分析接收器可以生成两种指标类型:
仪表(例如计数器或仪表)由其唯一标识name
和dimensions
(术语“尺寸”和“标记”可以互换使用)。维度允许对特定命名指标进行切片,以向下钻取和推理数据。
作为指标,由其name 和dimensions ,您可以为每个指标分配多个标签(e.g. key/值对),但之后不能随机更改这些标签!Prometheus 等监控系统会抱怨,如果具有相同名称的指标具有不同的标签集。 |
使用analytics.name
或analytics.name-expression
属性 设置输出分析指标的名称。如果未设置,则指标名称默认为应用程序名称。
使用analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>
,属性,用于向指标添加一个或多个标记。这TAG_NAME
属性定义中使用的将在指标中显示为标记名称。TAG_VALUE是一个SpEL
从传入消息动态计算标记值的表达式。
这SpEL
表达式使用headers
和payload
关键字来访问邮件的标头和有效负载值。
您可以使用文字(例如'fixed value' ) 设置具有固定值的标签。 |
所有 Stream 应用程序都支持,盒子的 ouf,三个最流行的监控系统,Wavefront
,Prometheus
和InfluxDB
并且您可以以声明方式启用它们中的每一个。
只需将 micrometer meter-registry 依赖项添加到Analytics Sink
应用。
请访问 Spring Cloud Data Flow Stream Monitoring 以获取配置监控系统的详细说明。以下快速片段可以帮助您入门。
-
要启用 Prometheus 仪表注册表,请设置以下属性。
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
要启用 Wavefront 仪表注册表,请设置以下属性。
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
要启用 InfluxDB 仪表注册表,请设置以下属性。
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
如果启用了数据流服务器监控,则Analytics Sink 将重用提供的指标配置。 |
下图说明了如何Analytics Sink
帮助收集企业内部信息,用于证券交易所,实时管道。

7.2.2. 选项
按前缀分组的属性:
分析学
- 金额表达式
-
用于计算输出指标值(例如金额)的 SpEL 表达式。默认为 1.0(表达式,默认值:
<none>
) - 仪表型
-
千分尺计类型,用于向后端报告指标。(MeterType,默认:
<none>
,可能的值:counter
,gauge
) - 名称
-
输出指标的名称。'name' 和 'nameExpression' 是互斥的。只能设置其中一个。(字符串,默认:
<none>
) - 名称表达式
-
一个 SpEL 表达式,用于从输入消息计算输出指标名称。'name' 和 'nameExpression' 是互斥的。只能设置其中一个。(表达式,默认值:
<none>
)
analytics.tag
- 表达
-
根据 SpEL 表达式计算标签。单个 SpEL 表达式可以生成值数组,这反过来又意味着不同的名称/值标签。每个名称/值标签都会产生单独的仪表增量。代码表达式格式为:analytics.tag.expression。[标签名称]=[SpEL表达式](Map<String, Expression>,默认值:
<none>
) - 固定
-
已弃用:请将 analytics.tag.expression 与文字 SpEL 表达式一起使用。自定义、固定标签。这些标记具有常量值,创建一次,然后与每个已发布的指标一起发送。定义固定标记的约定为:<code>analytics.tag.fixed。[标签名称]=[标签值] </code>(Map<String, String>,默认:
<none>
)
7.3. Elasticsearch Sink
将文档索引到 Elasticsearch 中的 Sink。
此 Elasticsearch 接收器仅支持索引 JSON 文档。
它使用来自输入目标的数据,然后将其索引到 Elasticsearch。
输入数据可以是纯 json 字符串,也可以是java.util.Map
表示 JSON。
它还接受 Elasticsearch 提供的数据XContentBuilder
.
但是,这是一种罕见的情况,因为中间件不太可能将记录保留为XContentBuilder
.
这主要用于直接调用消费者。
7.3.1. 选项
Elasticsearch 接收器具有以下选项:
- elasticsearch.consumer.async
-
指示索引作是否异步。默认情况下,索引是同步完成的。(布尔值,默认:
false
) - elasticsearch.consumer.batch-size
-
每个请求要索引的项数。默认为 1。对于大于 1 的值,将使用批量索引 API。(整数,默认:
1
) - elasticsearch.consumer.group-超时
-
超时(以毫秒为单位),之后在批量索引处于活动状态时刷新消息组。它默认为 -1,这意味着不会自动刷新空闲消息组。(长,默认:
-1
) - elasticsearch.consumer.id
-
要编制索引的文档的 ID。如果设置,则INDEX_ID标头值将基于每条消息覆盖此属性。(表达式,默认值:
<none>
) - elasticsearch.consumer.index
-
索引的名称。如果设置,则INDEX_NAME标头值将基于每条消息覆盖此属性。(字符串,默认:
<none>
) - elasticsearch.consumer.routing
-
指示要路由到的分片。如果未提供,Elasticsearch 将默认为文档 ID 的哈希值。(字符串,默认:
<none>
) - elasticsearch.consumer.timeout-秒
-
分片可用的超时。如果未设置,则默认为 Elasticsearch 客户端设置的 1 分钟。(长,默认:
0
)
7.3.2. 运行此接收器的示例
-
从文件夹
elasticsearch-sink
:./mvnw clean package
-
CD 应用程序
-
cd 到正确的 Binder 生成的应用程序(Kafka 或 RabbitMQ)
-
./mvnw clean package
-
确保您正在运行 Elasticsearch。例如,您可以使用以下命令将其作为 docker 容器运行。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
-
如果中间件尚未运行,请启动中间件(Kafka 或 RabbitMQ)。
-
java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing
-
将一些 JSON 数据发送到中间件目标。例如:
"foo":"bar"}
-
验证数据是否已编制索引:
curl localhost:9200/testing/_search
7.4. 文件接收器
文件接收器应用将其收到的每条消息写入文件。
7.4.2. 选项
这file-sink
具有以下选项:
- 文件.消费者.二进制文件
-
一个标志,指示是否应禁止在写入后添加换行符。(布尔值,默认:
false
) - file.consumer.charset
-
写入文本内容时要使用的字符集。(字符串,默认:
UTF-8
) - 文件.消费者.目录
-
目标文件的父目录。(文件,默认:
<none>
) - 文件.消费者.目录表达式
-
要计算目标文件的父目录的表达式。(字符串,默认:
<none>
) - 文件.消费者.mode
-
如果目标文件已存在,则要使用的 FileExistsMode。(FileExistsMode,默认:
<none>
,可能的值:APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - file.consumer.name
-
目标文件的名称。(字符串,默认:
file-consumer
) - 文件.消费者.名称表达式
-
要计算目标文件名称的表达式。(字符串,默认:
<none>
) - file.consumer.suffix
-
要附加到文件名的后缀。(字符串,默认:
<empty string>
)
7.5. FTP 接收器
FTP 接收器是将文件从传入邮件推送到 FTP 服务器的简单选项。
它使用ftp-outbound-adapter
,因此传入消息可以是java.io.File
对象,一个String
(文件内容)
或数组bytes
(文件内容也是如此)。
要使用此接收器,您需要一个用户名和一个密码才能登录。
默认情况下,Spring Integration 将使用o.s.i.file.DefaultFileNameGenerator 如果未指定任何内容。DefaultFileNameGenerator 将确定文件名
基于file_name 标头(如果存在)在MessageHeaders ,或者如果Message 已经是一个java.io.File ,那么它会
使用该文件的原始名称。 |
7.5.4. 选项
ftp 接收器具有以下选项:
按前缀分组的属性:
ftp.consumer 的
- 自动创建目录
-
是否创建远程目录。(布尔值,默认:
true
) - 文件名表达式
-
用于生成远程文件名的 SpEL 表达式。(字符串,默认:
<none>
) - 模式
-
如果远程文件已存在,则要执行的作。(FileExistsMode,默认:
<none>
,可能的值:APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - 远程目录
-
远程 FTP 目录。(字符串,默认:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认:
/
) - 临时远程目录
-
如果 '#isUseTemporaryFilename()' 为 true,则将写入文件的临时目录。(字符串,默认:
/
) - tmp-文件后缀
-
传输过程中要使用的后缀。(字符串,默认:
.tmp
) - 使用临时文件名
-
是否写入临时文件并重命名。(布尔值,默认:
true
)
7.6. 晶洞水槽
Geode 接收器会将消息内容写入 Geode 区域。
7.6.1. 选项
晶洞接收器具有以下选项:
按前缀分组的属性:
geode.消费者
- json
-
指示 Geode 区域是否将 json 对象存储为 PdxInstance。(布尔值,默认:
false
) - 键表达式
-
SpEL 表达式以用作缓存键。(字符串,默认:
<none>
)
geode.池
- 连接类型
-
指定连接类型:“服务器”或“定位器”。(ConnectType,默认:
<none>
,可能的值:locator
,server
) - 主机地址
-
指定一个或多个格式为 [host]:[port] 的 Gemfire 定位器或服务器地址。(InetSocketAddress[],默认:
<none>
) - 已启用订阅
-
设置为 true 以启用客户端池的订阅。需要将更新同步到客户端缓存。(布尔值,默认:
false
)
geode.security.ssl (地理洞.安全.ssl)
- 密码
-
将用于安全套接字连接的 SSL 密码配置为有效密码名称的数组。(字符串,默认:
any
) - 密钥存储类型
-
标识用于 SSL 通信的密钥库类型(例如 JKS、PKCS11 等)。(字符串,默认:
JKS
) - 密钥存储 uri
-
用于连接到 Geode 集群的预先创建的密钥库 URI 的位置。(资源,默认值:
<none>
) - SSL-密钥存储密码
-
用于访问密钥信任库的密码。(字符串,默认:
<none>
) - SSL-信任存储密码
-
用于访问信任存储的密码。(字符串,默认:
<none>
) - 信任存储类型
-
标识用于 SSL 通信的信任库类型(例如 JKS、PKCS11 等)。(字符串,默认:
JKS
) - 信任存储 uri
-
用于连接到 Geode 集群的预先创建的信任库 URI 的位置。(资源,默认值:
<none>
) - 用户主目录
-
本地目录,用于缓存从 truststoreUri 和 keystoreUri 位置下载的信任库和密钥库文件。(字符串,默认:
user.home
)
7.7. JDBC Sink
JDBC 接收器允许您将传入的有效负载持久化到 RDBMS 数据库中。
这jdbc.consumer.columns
属性表示COLUMN_NAME[:EXPRESSION_FOR_VALUE]
哪里EXPRESSION_FOR_VALUE
(连同冒号)是可选的。
在这种情况下,值是通过生成的表达式计算的,例如payload.COLUMN_NAME
,因此这样我们就可以从对象属性直接映射到表列。
例如,我们有一个 JSON 有效负载,如下所示:
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
因此,我们可以将其插入到表格中name
,city
和street
结构使用配置:
--jdbc.consumer.columns=name,city:address.city,street:address.street
此接收器支持批量插入,只要基础 JDBC 驱动程序支持。
批次插入通过batch-size
和idle-timeout
性能:
传入消息将聚合到batch-size
消息存在,然后作为批处理插入。
如果idle-timeout
毫秒过去没有新消息,则插入聚合的批次,即使它小于batch-size
,限制最大延迟。
该模块还使用 Spring Boot 的 DataSource 支持来配置数据库连接,因此像spring.datasource.url 等适用。 |
7.7.1. 示例
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names --jdbc.consumer.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.7.2. 选项
jdbc 接收器具有以下选项:
按前缀分组的属性:
jdbc.consumer
- 批量大小
-
将数据刷新到数据库表时消息数的阈值。(整数,默认:
1
) - 列
-
逗号分隔了基于冒号的列名和 SpEL 表达式对,用于插入/更新的值。初始化时使用名称来发出 DDL。(字符串,默认:
payload:payload.toString()
) - 空闲超时
-
数据自动刷新到数据库表时的空闲超时(以毫秒为单位)。(长,默认:
-1
) - 初始化
-
'true'、'false' 或表的自定义初始化脚本的位置。(字符串,默认:
false
) - 表名
-
要写入的表的名称。(字符串,默认:
messages
)
spring.datasource
- 数据
-
数据 (DML) 脚本资源引用。(List<String>,默认值:
<none>
) - 驱动程序类名称
-
JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认:
<none>
) - 初始化模式
-
确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化时要应用的模式。(DataSourceInitializationMode,默认:
embedded
,可能的值:ALWAYS
,EMBEDDED
,NEVER
) - 密码
-
数据库的登录密码。(字符串,默认:
<none>
) - 图式
-
架构 (DDL) 脚本资源引用。(List<String>,默认值:
<none>
) - 网址
-
数据库的 JDBC URL。(字符串,默认:
<none>
) - 用户名
-
数据库的登录用户名。(字符串,默认:
<none>
)
7.8. 原木汇
这log
Sink 使用应用程序记录器输出数据以供检查。
敬请谅解log
接收器使用无类型处理程序,这会影响实际日志记录的执行方式。
这意味着,如果内容类型是文本的,则原始有效负载字节将转换为 String,否则将记录原始字节。
请参阅用户指南中的更多信息。
7.9. MongoDB 接收器
此接收器应用程序将传入数据摄取到 MongoDB 中。
此应用程序完全基于MongoDataAutoConfiguration
,因此请参阅 Spring Boot MongoDB 支持 了解更多信息。
7.9.2. 选项
mongodb 接收器具有以下选项:
按前缀分组的属性:
mongodb.consumer
- 收集
-
用于存储数据的 MongoDB 集合。(字符串,默认:
<none>
) - 集合表达式
-
用于评估 MongoDB 集合的 SpEL 表达式。(表达式,默认值:
<none>
)
spring.data.mongodb
- 身份验证数据库
-
身份验证数据库名称。(字符串,默认:
<none>
) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认:
<none>
) - 数据库
-
数据库名称。(字符串,默认:
<none>
) - 字段命名策略
-
要使用的 FieldNamingStrategy 的完全限定名称。(Class<?>,默认:
<none>
) - 主机
-
Mongo 服务器主机。不能使用 URI 设置。(字符串,默认:
<none>
) - 密码
-
mongo 服务器的登录密码。不能使用 URI 设置。(Character[],默认:
<none>
) - 端口
-
Mongo 服务器端口。不能使用 URI 设置。(整数,默认:
<none>
) - 副本集名称
-
集群所需的副本集名称。不能使用 URI 设置。(字符串,默认:
<none>
) - URI
-
Mongo 数据库 URI。不能使用主机、端口、凭据和副本集名称进行设置。(字符串,默认:
mongodb://localhost/test
) - 用户名
-
mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认:
<none>
) - uuid-表示
-
将 UUID 转换为 BSON 二进制值时使用的表示。(UuidRepresentation,默认:
java-legacy
,可能的值:UNSPECIFIED
,STANDARD
,C_SHARP_LEGACY
,JAVA_LEGACY
,PYTHON_LEGACY
)
7.10. MQTT 接收器
该模块向 MQTT 发送消息。
7.10.2. 选项
mqtt 接收器具有以下选项:
按前缀分组的属性:
MQTT
- 清洁会话
-
客户端和服务器是否应记住重新启动和重新连接的状态。(布尔值,默认:
true
) - 连接超时
-
连接超时(以秒为单位)。(整数,默认:
30
) - 保持活动间隔
-
ping间隔(以秒为单位)。(整数,默认:
60
) - 密码
-
连接到代理时要使用的密码。(字符串,默认:
guest
) - 坚持
-
'内存'或'文件'。(字符串,默认:
memory
) - 持久性目录
-
持久性目录。(字符串,默认:
/tmp/paho
) - SSL-属性
-
MQTT 客户端 SSL 属性。(Map<String, String>,默认:
<none>
) - 网址
-
MQTT 代理的位置(逗号分隔的列表)。(String[],默认:
[tcp://localhost:1883]
) - 用户名
-
连接到代理时要使用的用户名。(字符串,默认:
guest
)
7.11. Pgcopy 接收器
使用 PostgreSQL COPY 命令将其传入有效负载写入 RDBMS 的模块。
7.11.3. 选项
jdbc 接收器具有以下选项:
- spring.datasource.driver-class-name
-
JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认:
<none>
) - spring.datasource.password
-
数据库的登录密码。(字符串,默认:
<none>
) - spring.datasource.url
-
数据库的 JDBC URL。(字符串,默认:
<none>
) - spring.datasource.用户名
-
数据库的登录用户名。(字符串,默认:
<none>
)
该模块还使用 Spring Boot 的 DataSource 支持来配置数据库连接,因此像spring.datasource.url 等适用。 |
7.12. RabbitMQ Sink
该模块向 RabbitMQ 发送消息。
7.12.1. 选项
兔子水槽有以下选项:
(有关 RabbitMQ 连接属性,请参阅 Spring Boot 文档)
按前缀分组的属性:
兔
- 转换器 bean-name
-
自定义消息转换器的 bean 名称;如果省略,则使用 SimpleMessageConverter。如果是 'jsonConverter',则将为您创建一个 Jackson2JsonMessageConverter bean。(字符串,默认:
<none>
) - 交换
-
Exchange 名称 - 由 exchangeNameExpression(如果提供)覆盖。(字符串,默认:
<empty string>
) - 交换表达式
-
计算结果为交换名称的 SpEL 表达式。(表达式,默认值:
<none>
) - 标头映射最后
-
映射出站邮件的标头时,请确定是在转换邮件之前还是之后映射标头。(布尔值,默认:
true
) - 映射请求标头
-
将要映射的标头。(String[],默认:
[*]
) - 自己的连接
-
如果为 true,请根据启动属性使用单独的连接。(布尔值,默认:
false
) - 持久交付模式
-
“amqp_deliveryMode”标头不存在时的默认传递模式,对于 PERSISTENT。(布尔值,默认:
false
) - 路由键
-
路由键 - 由 routingKeyExpression(如果提供)覆盖。(字符串,默认:
<none>
) - 路由键表达式
-
计算结果为路由键的 SpEL 表达式。(表达式,默认值:
<none>
)
spring.rabbitmq
- 地址随机模式
-
用于打乱已配置地址的模式。(AddressShuffleMode,默认:
none
,可能的值:NONE
,RANDOM
,INORDER
) - 地址
-
客户端应连接到的地址的逗号分隔列表。设置后,将忽略主机和端口。(字符串,默认:
<none>
) - 通道 rpc 超时
-
通道中 RPC 调用的延续超时。将其设置为零以永久等待。(持续时间,默认:
10m
) - 连接超时
-
连接超时。将其设置为零以永久等待。(持续时间,默认:
<none>
) - 主机
-
RabbitMQ 主机。如果设置了地址,则忽略。(字符串,默认:
localhost
) - 密码
-
登录以对代理进行身份验证。(字符串,默认:
guest
) - 端口
-
RabbitMQ 端口。如果设置了地址,则忽略。默认为 5672,如果启用了 SSL,则默认为 5671。(整数,默认:
<none>
) - 发布者确认类型
-
确认使用的发布者类型。(ConfirmType,默认:
<none>
,可能的值:SIMPLE
,CORRELATED
,NONE
) - 发布者返回
-
是否启用发布商退货。(布尔值,默认:
false
) - 请求通道最大值
-
客户端请求的每个连接的通道数。使用 0 表示无限制。(整数,默认:
2047
) - 请求心跳
-
请求的检测信号超时;零表示无。如果未指定持续时间后缀,则将使用秒。(持续时间,默认:
<none>
) - 用户名
-
登录用户以向代理进行身份验证。(字符串,默认:
guest
) - 虚拟主机
-
连接到代理时要使用的虚拟主机。(字符串,默认:
<none>
)
7.13. Redis 接收器
向 Redis 发送消息。
7.13.1. 选项
Redis 接收器具有以下选项:
按前缀分组的属性:
redis.consumer
- 钥匙
-
存储到键时要使用的文字键名称。(字符串,默认:
<none>
) - 键表达式
-
用于存储到键的 SpEL 表达式。(字符串,默认:
<none>
) - 队列
-
存储在队列中时要使用的文字队列名称。(字符串,默认:
<none>
) - 队列表达式
-
用于队列的 SpEL 表达式。(字符串,默认:
<none>
) - 主题
-
发布到主题时要使用的文字主题名称。(字符串,默认:
<none>
) - 主题表达式
-
用于主题的 SpEL 表达式。(字符串,默认:
<none>
)
spring.redis 的
- 客户端名称
-
要在与 CLIENT SETNAME 的连接上设置的客户端名称。(字符串,默认:
<none>
) - 客户端类型
-
要使用的客户端类型。默认情况下,根据类路径自动检测。(ClientType,默认:
<none>
,可能的值:LETTUCE
,JEDIS
) - 连接超时
-
连接超时。(持续时间,默认:
<none>
) - 数据库
-
连接工厂使用的数据库索引。(整数,默认:
0
) - 主机
-
Redis 服务器主机。(字符串,默认:
localhost
) - 密码
-
redis 服务器的登录密码。(字符串,默认:
<none>
) - 端口
-
Redis 服务器端口。(整数,默认:
6379
) - SSL的
-
是否启用 SSL 支持。(布尔值,默认:
false
) - 超时
-
读取超时。(持续时间,默认:
<none>
) - 网址
-
连接 URL。覆盖主机、端口和密码。用户被忽略。示例:redis://user:[email protected]:6379(字符串,默认:
<none>
) - 用户名
-
redis 服务器的登录用户名。(字符串,默认:
<none>
)
spring.redis.jedis.池
- 启用
-
是否启用池。如果“commons-pool2”可用,则自动启用。使用 Jedis,在哨兵模式下隐式启用池化,此设置仅适用于单节点设置。(布尔值,默认:
<none>
) - 最大活动
-
池在给定时间可以分配的最大连接数。使用负值表示无限制。(整数,默认:
8
) - 最大空闲
-
池中“空闲”连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认:
8
) - 最大等待
-
在池耗尽时引发异常之前,连接分配应阻止的最长时间。使用负值无限期阻止。(持续时间,默认:
-1ms
) - 最小空闲
-
在池中维护的最小空闲连接数的目标。仅当此设置和逐出运行之间的时间均为正时,此设置才有效。(整数,默认:
0
) - 驱逐运行之间的时间
-
空闲对象 evictor 线程运行之间的时间。当为正时,空闲对象 evictor 线程将启动,否则不会执行空闲对象逐出。(持续时间,默认:
<none>
)
spring.redis.lettuce.pool
- 启用
-
是否启用池。如果“commons-pool2”可用,则自动启用。使用 Jedis,在哨兵模式下隐式启用池化,此设置仅适用于单节点设置。(布尔值,默认:
<none>
) - 最大活动
-
池在给定时间可以分配的最大连接数。使用负值表示无限制。(整数,默认:
8
) - 最大空闲
-
池中“空闲”连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认:
8
) - 最大等待
-
在池耗尽时引发异常之前,连接分配应阻止的最长时间。使用负值无限期阻止。(持续时间,默认:
-1ms
) - 最小空闲
-
在池中维护的最小空闲连接数的目标。仅当此设置和逐出运行之间的时间均为正时,此设置才有效。(整数,默认:
0
) - 驱逐运行之间的时间
-
空闲对象 evictor 线程运行之间的时间。当为正时,空闲对象 evictor 线程将启动,否则不会执行空闲对象逐出。(持续时间,默认:
<none>
)
7.14. 路由器接收器
此应用程序将消息路由到命名通道。
7.14.1. 选项
路由器接收器具有以下选项:
- router.default-output-channel
-
在何处发送不可路由的消息。(字符串,默认:
nullChannel
) - router.destination-mappings
-
目标映射作为以名称-值对为分隔的新行字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<none>
) - router.expression
-
要应用于消息以确定要路由到的通道的表达式。请注意,text、json 或 xml 等内容类型的有效负载线格式是 byte[] 而不是 String!。有关如何处理字节数组有效负载内容,请参阅文档。(表达式,默认值:
<none>
) - router.refresh-delay
-
检查脚本更改的频率(以毫秒为单位)(如果存在);< 0 表示不刷新。(整数,默认:
60000
) - 需要 router.resolution-required
-
是否需要通道分辨率。(布尔值,默认:
false
) - 路由器.脚本
-
返回通道或通道映射分辨率键的 groovy 脚本的位置。(资源,默认值:
<none>
) - router.变量
-
变量绑定为换行符,以名称-值对为分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<none>
) - router.变量位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<none>
)
由于这是一个动态路由器,因此会根据需要创建目标;因此,默认情况下,defaultOutputChannel 和resolutionRequired 仅当Binder 绑定到目标时存在一些问题。 |
您可以使用spring.cloud.stream.dynamicDestinations
财产。
默认情况下,所有已解析的目的地都将动态绑定;如果此属性具有逗号分隔的列表
目标名称,则只有这些名称才会被绑定。
解析到不在此列表中的目标的消息将被路由到defaultOutputChannel
哪
还必须出现在列表中。
destinationMappings
用于将评估结果映射到实际的目标名称。
7.14.2. 基于 SpEL 的路由
该表达式根据消息进行计算,并返回通道名称或通道名称映射的键。
有关更多信息,请参阅 Spring 中的“路由器和 Spring 表达式语言 (SpEL)”小节 集成参考手册配置通用路由器部分。
从 Spring Cloud Stream 2.0 开始,消息线格式json ,text 和xml 内容类型是byte[] 不String !
这是与 SCSt 1.x 相比的一个改变,它将这些类型视为字符串!
取决于内容类型,处理byte[] 有效载荷可用。对于普通text 内容类型,可以使用new String(payload) SpEL 表达。为json 键入 jsonPath() SpEL 实用程序
已经支持字符串和字节数组内容互换。这同样适用于xml 内容类型和 #xpath() SpEL 实用程序。 |
例如,对于text
应该使用的内容类型:
new String(payload).contains('a');
和json
内容类型 SpEL 表达式,如下所示:
#jsonPath(payload, '$.person.name')
7.14.3. 基于 Groovy 的路由
除了 SpEL 表达式,还可以使用 Groovy 脚本。让我们在文件系统中创建一个 Groovy 脚本,位于 “file:/my/path/router.groovy”,或“classpath:/my/path/router.groovy”:
println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
如果要将变量值传递给脚本,可以使用变量选项或 (可选)使用 propertiesLocation 选项将路径传递到包含绑定的属性文件。 文件中的所有属性都将作为变量提供给脚本。您可以同时指定变量和 propertiesLocation,在这种情况下,作为变量提供的任何重复值都会覆盖 propertiesLocation 中提供的值。 请注意,有效负载和标头是隐式绑定的,以允许您访问消息中包含的数据。
有关更多信息,请参阅 Spring Integration 参考手册 Groovy 支持。
7.16. 亚马逊 S3 接收器
此接收器应用程序支持将对象传输到 Amazon S3 存储桶。
文件有效负载(和目录递归)将传输到remote
目录(S3 存储桶)到local
部署应用程序的目录。
此接收器接受的消息必须包含payload
如:
-
File
,包括递归上传的目录; -
InputStream
; -
byte[]
7.16.1. 选项
s3 接收器具有以下选项:
按前缀分组的属性:
s3.消费者
- 前交叉韧带
-
S3 对象访问控制列表。(CannedAccessControlList,默认:
<none>
,可能的值:private
,public-read
,public-read-write
,authenticated-read
,log-delivery-write
,bucket-owner-read
,bucket-owner-full-control
,aws-exec-read
) - ACL 表达
-
用于评估 S3 对象访问控制列表的表达式。(表达式,默认值:
<none>
) - 桶
-
用于存储目标文件的 AWS 存储桶。(字符串,默认:
<none>
) - 桶表达式
-
用于评估 AWS 存储桶名称的表达式。(表达式,默认值:
<none>
) - 键表达式
-
用于计算 S3 对象键的表达式。(表达式,默认值:
<none>
)
目标生成的应用程序基于AmazonS3SinkConfiguration
可以通过S3MessageHandler.UploadMetadataProvider
和/或S3ProgressListener
,这些被注入S3MessageHandler
豆。
有关更多详细信息,请参阅 Spring Integration AWS 支持。
7.16.2. Amazon AWS 通用选项
Amazon S3 Sink(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,其自动配置 类由 Spring Boot 自动使用。 请参阅他们的文档,了解所需和有用的自动配置属性。
其中一些是关于 AWS 凭证的:
-
cloud.aws.credentials.accessKey
-
cloud.aws.credentials.secretKey
-
cloud.aws.credentials.instanceProfile
-
cloud.aws.credentials.profileName
-
cloud.aws.credentials.profilePath
其他适用于 AWSRegion
定义:
-
cloud.aws.region.auto
-
cloud.aws.region.static
对于 AWSStack
:
-
云.aws.stack.auto
-
cloud.aws.stack.name
7.17. SFTP 接收器
SFTP 接收器是一个简单的选项,用于将文件从传入邮件推送到 SFTP 服务器。
它使用sftp-outbound-adapter
,因此传入消息可以是java.io.File
对象,一个String
(文件内容)
或数组bytes
(文件内容也是如此)。
要使用此接收器,您需要一个用户名和一个密码才能登录。
默认情况下,Spring Integration 将使用o.s.i.file.DefaultFileNameGenerator 如果未指定任何内容。DefaultFileNameGenerator 将确定文件名
基于file_name 标头(如果存在)在MessageHeaders ,或者如果Message 已经是一个java.io.File ,那么它会
使用该文件的原始名称。 |
配置sftp.factory.known-hosts-expression
选项,则评估的根对象是应用程序上下文,例如可能是sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
.
7.17.3. 选项
sftp 接收器具有以下选项:
按前缀分组的属性:
sftp.消费者
- 自动创建目录
-
是否创建远程目录。(布尔值,默认:
true
) - 文件名表达式
-
用于生成远程文件名的 SpEL 表达式。(字符串,默认:
<none>
) - 模式
-
如果远程文件已存在,则要执行的作。(FileExistsMode,默认:
<none>
,可能的值:APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - 远程目录
-
远程 FTP 目录。(字符串,默认:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认:
/
) - 临时远程目录
-
如果 'isUseTemporaryFilename()' 为 true,则将写入文件的临时目录。(字符串,默认:
/
) - tmp-文件后缀
-
传输过程中要使用的后缀。(字符串,默认:
.tmp
) - 使用临时文件名
-
是否写入临时文件并重命名。(布尔值,默认:
true
)
sftp.消费者.工厂
- 允许未知键
-
True 表示允许未知或更改的键。(布尔值,默认:
false
) - 缓存会话
-
缓存会话。(布尔值,默认:
<none>
) - 主机
-
服务器的主机名。(字符串,默认:
localhost
) - 已知主机表达式
-
解析为已知主机文件位置的 SpEL 表达式。(表达式,默认值:
<none>
) - 密码短语
-
用户私钥的密码。(字符串,默认:
<empty string>
) - 密码
-
用于连接到服务器的密码。(字符串,默认:
<none>
) - 端口
-
服务器的端口。(整数,默认:
22
) - 私钥
-
用户私钥的资源位置。(资源,默认值:
<none>
) - 用户名
-
用于连接到服务器的用户名。(字符串,默认:
<none>
)
7.18. TCP接收器
此模块使用编码器将消息写入 TCP。
TCP 是一种流协议,需要某种机制来构建在线消息上。许多编码器是可用,默认值为“CRLF”。
7.20. Twitter 消息接收器
从身份验证用户向指定用户发送直接消息。
需要 JSON POST 正文和Content-Type
header 设置为application/json
.
当收到来自用户的消息时,您可以在 24 小时内发送最多 5 条消息作为响应。 收到的每条消息都会重置 24 小时窗口和 5 条分配的消息。 在 24 小时窗口内发送第 6 条消息或在 24 小时窗口之外发送消息将计入速率限制。 此行为仅适用于使用 POST direct_messages/events/new 端点时。 |
SpEL 表达式用于从输入消息中计算请求参数。
7.20.1. 选项
使用单引号 () 将' SpEL 表达式属性。
例如,要设置固定消息文本,请使用text='Fixed Text' .
对于固定目标 userId,请使用userId='666' . |
- 推特.message.update.media-id
-
要与消息关联的媒体 ID。直接消息只能引用单个媒体 ID。(表达式,默认值:
<none>
) - 推特.message.update.屏幕名称
-
向其发送直接消息的用户的屏幕名称。(表达式,默认值:
<none>
) - 推特.message.update.text
-
直接消息文本。根据需要对 URL 进行编码。最大长度为 10,000 个字符。(表达式,默认值:
payload
) - 推特.message.update.user-id
-
向其发送直接消息的用户的用户 ID。(表达式,默认值:
<none>
)
7.21. Twitter 更新接收器
更新身份验证用户的当前文本(例如推文)。
对于每次更新尝试,都会将更新文本与身份验证用户最近的推文进行比较。任何可能导致重复的尝试都将被阻止,从而导致 403 错误。用户不能连续提交两次相同的文本。 |
虽然不受 API 的速率限制,但用户一次可以创建的推文数量受到限制。标准 API 的更新限制为 3 小时窗口内 300 个。如果用户发布的更新数量达到当前允许的限制,此方法将返回 HTTP 403 错误。
您可以在此处找到更新 API 的详细信息:developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.21.1. 选项
按前缀分组的属性:
推特.update
- 附件网址
-
(SpEL表达式)为了使 URL 不计入扩展推文的文本正文中,请提供 URL 作为推文附件。此 URL 必须是推文永久链接或私信深层链接。任意非 Twitter URL 必须保留在文本文本中。传递给 attachment_url 参数的 URL 与推文永久链接或私信深度链接不匹配,则在创建推文时失败并导致异常。(表达式,默认值:
<none>
) - 显示坐标
-
(SpEL 表达式)是否在发送推文的确切坐标上放置图钉。(表达式,默认值:
<none>
) - 回复状态 ID
-
(SpEL 表达式)更新回复的现有文本的 ID。注意:除非文本文本中提到了此参数引用的推文的作者,否则此参数将被忽略。因此,您必须在更新中包含 @username,其中 username 是引用推文的作者。设置 inReplyToStatusId 后,也会自动设置auto_populate_reply_metadata。稍后确保从原始推文中查找前导@mentions,并从那里添加到新推文中。随着回复链的增长,这将@mentions附加到扩展推文的元数据中,直到达到@mentions限制。如果原始推文已被删除,则回复将失败。(表达式,默认值:
<none>
) - 媒体 ID
-
(SpEL表达式)要与推文关联的media_ids的逗号分隔列表。您最多可以在推文中添加 4 张照片或 1 个动画 GIF 或 1 个视频。有关上传媒体的更多详细信息,请参阅上传媒体。(表达式,默认值:
<none>
) - 地点 ID
-
(SpEL表达式)世界上的一个地方。(表达式,默认值:
<none>
) - 文本
-
(SpEL表达式)文本更新的文本。根据需要对 URL 进行编码。t.co 链接换行将影响字符数。默认为消息的有效负载(表达式,默认值:
payload
)
7.22. 波前汇
Wavefront 接收器使用 Messages<?>,将其转换为 Wavefront 数据格式的指标,并将指标直接发送到 Wavefront 或 Wavefront 代理。
支持常见的 ETL 用例,其中必须清理、转换现有(历史)指标数据并将其存储在 Wavefront 中以进行进一步分析。
7.22.1. 选项
波前接收器具有以下选项:
- wavefront.api Tokens
-
Wavefront API 访问Tokens。(字符串,默认:
<none>
) - wavefront.metric-expression
-
计算结果为度量值的 SpEL 表达式。(表达式,默认值:
<none>
) - wavefront.metric-name
-
指标的名称。默认为应用程序名称。(字符串,默认:
<none>
) - 波前.proxy-uri
-
Wavefront 代理的 URL。(字符串,默认:
<none>
) - 波前.source
-
发出指标的唯一应用程序、主机、容器或实例。(字符串,默认:
<none>
) - wavefront.tag表达式
-
与指标关联的自定义元数据的集合。点标记不能为空。键的有效字符:字母数字、连字符 ('-')、下划线('_')、点 ('.')。对于值,允许使用任何字符,包括空格。要包含双引号,请用反斜杠转义它,反斜杠不能是标签值中的最后一个字符。点标记键和值组合的最大允许长度为 254 个字符(255 个字符,包括分隔键和值的 '=')。如果该值较长,则拒绝并记录该点(Map<String, Expression>,默认值:
<none>
) - wavefront.timestamp 表达式
-
计算结果为指标时间戳的 SpEL 表达式(可选)。(表达式,默认值:
<none>
) - 波前线.uri
-
Wavefront 环境的 URL。(字符串,默认:
<none>
)
7.23. Websocket 接收器
一个简单的 Websocket Sink 实现。
7.23.1. 选项
支持以下选项:
- websocket.consumer.log级
-
netty 通道的 logLevel 。默认值为 <tt>WARN</tt>(字符串,默认:
<none>
) - websocket.consumer.path
-
WebsocketSink 使用者需要连接的路径。默认值为 <tt>/websocket</tt>(字符串,默认:
/websocket
) - websocket.consumer.port
-
Netty 服务器侦听的端口。默认值为 <tt>9292</tt>(整数,默认:
9292
) - websocket.consumer.ssl 的
-
是否创建 {@link io.netty.handler.ssl.SslContext}。(布尔值,默认:
false
) - websocket.consumer.threads
-
Netty {@link io.netty.channel.EventLoopGroup} 的线程数。默认值为 <tt>1</tt>(整数,默认:
1
)
7.23.2. 示例
要验证 websocket-sink 是否接收来自其他 spring-cloud-stream 应用程序的消息,您可以使用 遵循简单的端到端设置。
步骤 3:部署websocket-sink
最后在trace
模式,以便您看到由time-source
在日志中:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
您应该开始在启动 WebsocketSink 的控制台中看到日志消息,如下所示:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.23.3. 执行器
有一个Endpoint
您可以使用它来访问最后一个n
发送和接收的消息。你必须
通过提供--endpoints.websocketconsumertrace.enabled=true
.默认情况下,它通过host:port/websocketconsumertrace
.下面是一个示例输出:
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
还有一个简单的 HTML 页面,您可以在其中看到文本区域中转发的消息。您可以访问
它直接通过host:port
在您的浏览器中。
7.24. ZeroMQ 接收器
“zeromq”接收器允许将消息发送到 ZeroMQ 套接字。
7.24.3. 选项
zeromq 接收器具有以下选项:
- zeromq.consumer.connect-url
-
用于连接到 ZeroMQ 套接字的连接 URL。(字符串,默认:
<none>
) - zeromq.consumer.socket 类型
-
连接应建立的套接字类型。(SocketType,默认:
<none>
,可能的值:PAIR
,PUB
,SUB
,REQ
,REP
,DEALER
,ROUTER
,PULL
,PUSH
,XPUB
,XSUB
,STREAM
) - zeromq.consumer.topic
-
一个 Topic SpEL 表达式,用于在向订阅者发送消息之前评估主题。(表达式,默认值:
<none>
)