应用程序
5. 源码
5.1. CDC 源
变更数据捕获(CDC)source可以捕获并从各种数据库中流式传输更改事件。
目前,它支持MySQL、PostgreSQL、MongoDB、Oracle和SQL Server数据库。
基于Debezium Embedded Connector,CDC Source允许捕获和流式传输数据库更改,通过不同的消息绑定器如Apache Kafka、RabbitMQ以及所有支持Spring Cloud Stream的代理服务器。
它支持所有 Debezium 配置属性。只需将cdc.config.前缀添加到现有的 Debezium 属性中即可。例如,要设置 Debezium 的 connector.class 属性,请使用 cdc.config.connector.class 源属性。
我们提供了最常用的 Debezium 属性的便捷快捷方式。例如,您可以使用我们的 cdc.connector=mysql 快捷方式代替长格式的cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector Debezium 属性。下表列出了所有可用的快捷方式及其所代表的 Debezium 属性。
Debezium 属性(如cdc.config.XXX)始终优先于快捷方式!
CDC Source 引入了一个新的默认 BackingOffsetStore 配置,该配置基于 MetadataStore 服务。后续还提供了各种微服务友好的方式来存储偏移元数据。
5.1.1. 选项
按前缀分组的属性:
cdc
- 配置
-
Spring 德贝兹伊姆配置属性的传递包装器。所有以 'cdc.config.' 前缀开头的属性都是德贝兹伊姆原生属性。前缀被移除,将其转换为 Debezium io.debezium.config.Configuration。(Map<String, String>, 默认值:
<none>) - 连接器
-
cdc.config.connector.class 属性的快捷方式。只要它们不相互矛盾,就可以使用其中任何一个。(ConnectorType,默认值:
<none>,可能的取值:mysql,postgres,mongodb,oracle,sqlserver) - 姓名
-
此 sourceConnector 实例的唯一名称。(字符串,默认:
<none>) - 架构
-
将模式作为出站消息的一部分。(布尔值,默认:
false)
cdc.flattening
- add-fields
-
以逗号分隔的元数据字段列表,将添加到展平的消息中。这些字段将根据结构规范以“__”或“__[<]struct]__”为前缀。(字符串,默认:
<none>) - add-headers
-
用逗号分隔的列表,指定要添加到展平消息报头中的元数据字段。这些字段将以"__"或"__[struct]__"为前缀。(字符串,默认:
<none>) - delete-handling-mode
-
处理已删除记录的选项:(1) 不进行任何操作 - 将记录传递通过,(2) 删除 - 移除记录以及 (3) 重写 - 给记录添加一个 '__deleted' 字段。 (DeleteHandlingMode,默认值:
<none>,可选值:drop,rewrite,none) - drop-tombstones
-
默认情况下,Debezium会生成墓碑记录,以便对删除的记录进行Kafka压缩。dropTombstones可以抑制墓碑记录。(布尔值,默认:
true) - 启用
-
启用展平源记录事件(https://debezium.io/docs/configuration/event-flattening)。(布尔值,默认:
true)
cdc.offset
- commit-timeout
-
在取消过程并恢复要提交的偏移量数据以便在未来尝试中再次提交之前,等待记录刷新并将分区偏移量数据提交到偏移存储的最大毫秒数。(持续时间,默认:
5000ms) - flush-interval
-
尝试提交偏移量的时间间隔。默认为1分钟。(持续时间,默认:
60000ms) - policy
-
偏移量存储提交策略。(OffsetPolicy,默认:
<none>) - 存储
-
Kafka 连接器会跟踪已处理记录的数量,并定期将计数(作为“偏移量”)存储在预先配置的元数据存储中。重启时,连接器从上次记录的数据源偏移量处恢复读取。(OffsetStorageType,默认值:
<none>,可选值:memory、file、kafka、metadata)
cdc.stream.header
- convert-connect-headers
-
当为 true 时,{@link org.apache.kafka.connect.header.Header} 将转换为消息头,其中 {@link org.apache.kafka.connect.header.Header#key()} 作为名称,{@link org.apache Kafka.connect.header.Header#value()} 作为值。(布尔类型,默认:
true) - 偏移量
-
将源记录的偏移量元数据序列化到输出消息头中的 cdc.offset。 (布尔值,默认:
false)
metadata.store.dynamo-db
- create-delay
-
创建表重试之间的延迟。(整数,默认:
1) - create-retries
-
创建表请求的重试次数。(整数,默认:
25) - read-capacity
-
表上的读取容量。(长整型,默认:
1) - 表格
-
用于元数据的表名。(字符串,默认:
<none>) - time-to-live
-
表项的TTL。 (整数,默认:
<none>) - write-capacity
-
表上的写入容量。(长整型,默认:
1)
metadata.store
- 类型
-
指示要配置的元数据存储类型(默认为“memory”)。若要使用持久化存储,您必须包含相应的Spring Integration依赖。(StoreType, 默认值:
<none>, 可能的值:mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)
metadata.store.zookeeper
- connect-string
-
Zookeeper连接字符串格式为HOST:PORT。(字符串,默认:
127.0.0.1:2181) - 编码
-
存储数据到Zookeeper时使用的编码。(字符集,默认:
UTF-8) - retry-interval
-
Zookeeper操作的重试间隔,单位为毫秒。(Integer,默认:
1000) - 根目录
-
根节点 - 存储条目是此节点的子节点。(字符串,默认:
/SpringIntegration-MetadataStore)
Debezium 属性 快捷映射
下表列出了所有可用的快捷键及其所代表的 Debezium 属性。
| 快捷方式 | 原始 | 描述 |
|---|---|---|
cdc.connector |
cdc.config.connector.class |
|
cdc.name |
cdc.config.name |
|
cdc.offset.flush-interval |
cdc.config.offset.flush.interval.ms |
|
cdc.offset.commit-timeout |
cdc.config.offset.flush.timeout.ms |
|
cdc.offset.policy |
cdc.config.offset.commit.policy |
|
cdc.offset.storage |
cdc.config.offset.storage |
|
cdc.flattening.drop-tombstones |
cdc.config.drop.tombstones |
|
cdc.flattening.delete-handling-mode |
cdc.config.delete.handling.mode |
|
5.1.2. 数据库支持
该 CDC Source 使用 Debezium 实用工具,目前支持五种数据存储的 CDC:MySQL、PostgreSQL、MongoDB、Oracle 和 SQL Server 数据库。
5.1.3. 示例和测试
集成测试 [CdcSourceIntegrationTest](), [CdcDeleteHandlingIntegrationTest]() 和 [CdcFlatteningIntegrationTest]() 使用在本地计算机上运行的数据库固定数据。
我们使用预先构建好的 debezium docker 数据库镜像。
Maven 构建借助 docker-maven-plugin 创建测试数据库固定数据。
要在您的 IDE 中运行和调试测试,需要从命令行部署所需的数据库映像。下面的说明解释了如何从 Docker 映像运行预配置的测试数据库。
MySQL
在 docker 中启动 debezium/example-mysql:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0
|
(可选) 使用
|
使用以下属性连接 CDC 源到 MySQL 数据库:
cdc.connector=mysql (1)
cdc.name=my-sql-connector (2)
cdc.config.database.server.id=85744 (2)
cdc.config.database.server.name=my-app-connector (2)
cdc.config.database.user=debezium (3)
cdc.config.database.password=dbz (3)
cdc.config.database.hostname=localhost (3)
cdc.config.database.port=3306 (3)
cdc.schema=true (4)
cdc.flattening.enabled=true (5)
| 1 | 配置 CDC 源以使用MySqlConnector。(相当于设置cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector)。 |
| 2 | 用于识别和分派传入事件的元数据。 |
| 3 | 连接到运行在localhost:3306上的MySQL服务器,作为debezium用户。 |
| 4 | 在SourceRecord事件中包含更改事件值模式。 |
| 5 | 启用CDC事件扁平化。 |
您也可以使用此mysql配置运行CdcSourceIntegrationTests#CdcMysqlTests。
PostgreSQL
从debezium/example-postgres:1.0 Docker镜像启动一个预配置的postgres服务器:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0
您可以像这样连接到此服务器:
psql -U postgres -h localhost -p 5432
使用以下属性连接 CDC 源与 PostgreSQL:
cdc.connector=postgres (1)
cdc.offset.storage=memory (2)
cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)
cdc.config.database.user=postgres (4)
cdc.config.database.password=postgres (4)
cdc.config.database..dbname=postgres (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=5432 (4)
cdc.schema=true (5)
cdc.flattening.enabled=true (6)
| 1 | 配置CDC Source使用PostgresConnector。相当于设置cdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector。 |
| 2 | 配置Debezium引擎使用memory(例如`cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore`)作为后端偏移存储。 |
| 3 | 用于识别和分派传入事件的元数据。 |
| 4 | 连接到运行在localhost:5432上的PostgreSQL服务器,作为postgres用户。 |
| 5 | 在SourceRecord事件中包含更改事件值模式。 |
| 6 | 启用CDC事件扁平化。 |
您也可以使用此mysql配置运行CdcSourceIntegrationTests#CdcPostgresTests。
MongoDB
从debezium/example-mongodb:0.10Docker镜像启动一个预配置的mongodb:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:0.10
初始化库存集合
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
在 mongodb 终端输出中,搜索类似于 host: "3f95a8a6516e:27017" 的日志条目:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
在你的/etc/hosts中添加127.0.0.1 3f95a8a6516e条目
使用以下属性连接 CDC 源到 MongoDB:<br>
cdc.connector=mongodb (1)
cdc.offset.storage=memory (2)
cdc.config.mongodb.hosts=rs0/localhost:27017 (3)
cdc.config.mongodb.name=dbserver1 (3)
cdc.config.mongodb.user=debezium (3)
cdc.config.mongodb.password=dbz (3)
cdc.config.database.whitelist=inventory (3)
cdc.config.tasks.max=1 (4)
cdc.schema=true (5)
cdc.flattening.enabled=true (6)
| 1 | 配置CDC Source以使用MongoDB连接器。这会映射到cdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector。 |
| 2 | 配置Debezium引擎使用memory(例如`cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore`)作为后端偏移存储。 |
| 3 | 连接到运行在localhost:27017上的MongoDB,作为debezium用户。 |
| 4 | debezium.io/docs/connectors/mongodb/#tasks |
| 5 | 在SourceRecord事件中包含更改事件值模式。 |
| 6 | 启用CDC事件扁平化。 |
您也可以使用此mysql配置运行CdcSourceIntegrationTests#CdcPostgresTests。
SQL Server
从 debezium/example-postgres:1.0 Docker 镜像启动一个 sqlserver:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
使用 debezium 的 sqlserver 教程中的示例数据填充表单:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
使用以下属性将 CDC 源连接到 SQL Server:
cdc.connector=sqlserver (1)
cdc.offset.storage=memory (2)
cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)
cdc.config.database.user=sa (4)
cdc.config.database.password=Password! (4)
cdc.config.database..dbname=testDB (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=1433 (4)
| 1 | 配置 CDC Source 使用 SqlServerConnector。相当于设置 cdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector。 |
| 2 | 配置Debezium引擎使用memory(例如`cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore`)作为后端偏移存储。 |
| 3 | 用于识别和分派传入事件的元数据。 |
| 4 | 连接到在localhost:1433上运行的SQL Server,作为sa用户。 |
您也可以使用此mysql配置运行CdcSourceIntegrationTests#CdcSqlServerTests。
Oracle
启动Oracle并从localhost访问,按照Debezium Vagrant设置中描述的配置、用户和授权进行设置
使用 Debezium 的 Oracle 教程中的示例数据填充表单:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. 文件源
此应用程序轮询一个目录,并将新文件或其内容发送到输出通道。<br/>默认情况下,文件源会提供文件的内容作为字节数组。<br/>但是,可以使用 --file.supplier.mode 选项进行自定义:
-
ref 提供一个 java.io.File 引用
-
按行分割文件,并为每一行发出一条新消息
-
内容 默认。提供文件的内容作为字节数组
在使用 --file.supplier.mode=lines 时,还可以提供附加选项 --file.supplier.withMarkers=true。如果将其设置为 true,则底层的 FileSplitter 在实际数据之前和之后会发出额外的文件开始和结束标记消息。这两个额外标记消息的有效载荷类型为 FileSplitter.FileMarker。如果未明确设置,则 withMarkers 选项默认为 false。
5.2.1. 选项
文件源具有以下选项:file
按前缀分组的属性:
file.consumer
- markers-json
-
当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:
true) - 模式
-
用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:
<none>,可能的值:ref,lines,contents) - with-markers
-
设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:
<none>)
5.3. FTP源
此源应用程序支持使用FTP协议传输文件。
文件从remote目录传输到部署应用的local目录中。
默认情况下,源发出的消息以字节数组形式提供。但是,可以使用--mode选项进行自定义:
-
ref 提供一个
java.io.File引用 -
lines 将按行拆分文件,并为每一行发出新消息
-
contents 默认值。提供文件的内容作为字节数组
使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始 和 文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarker。withMarkers 的默认值是 false,除非明确设置。
另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
5.3.2. 输出
5.3.3. 选项
该ftp源具有以下选项:
按前缀分组的属性:
file.consumer
- markers-json
-
当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:
true) - 模式
-
用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:
<none>,可能的值:ref,lines,contents) - with-markers
-
设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:
<none>)
ftp.factory
- cache-sessions
-
缓存会话。 (布尔值,默认:
<none>) - client-mode
-
FTP会话使用的客户端模式。(ClientMode,默认:
<none>,可选值:ACTIVE、PASSIVE) - 主机
-
服务器的主机名。(字符串,默认:
localhost) - 密码
-
连接到服务器时使用的密码。(字符串,默认:
<none>) - 端口
-
服务器的端口。(整数,默认:
21) - 用户名
-
连接服务器时要使用的用户名。(字符串,默认值:
<none>)
ftp.supplier
- auto-create-local-dir
-
如果不存在,则设置为 true 以创建本地目录。(Boolean,默认:
true) - delay-when-empty
-
未检测到新文件时的延迟持续时间。(持续时间,默认:
1s) - delete-remote-files
-
设置为true表示在成功传输后删除远程文件。(布尔类型,默认:
false) - filename-pattern
-
一个过滤器模式,用于匹配要传输的文件名。(字符串,默认:
<none>) - filename-regex
-
用于匹配要传输的文件名的过滤正则表达式模式。(Pattern,默认值:
<none>) - local-dir
-
用于文件传输的本地目录。(文件,默认:
<none>) - preserve-timestamp
-
设置为 true 可保留原始时间戳。(布尔类型,默认值:
true) - remote-dir
-
远程 FTP 目录。(字符串,默认:
/) - remote-file-separator
-
远程文件分隔符。(字符串,默认:
/) - tmp-file-suffix
-
传输进行期间使用的后缀。(字符串,默认:
.tmp)
metadata.store.dynamo-db
- create-delay
-
创建表重试之间的延迟。(整数,默认:
1) - create-retries
-
创建表请求的重试次数。(整数,默认:
25) - read-capacity
-
表上的读取容量。(长整型,默认:
1) - 表格
-
用于元数据的表名。(字符串,默认:
<none>) - time-to-live
-
表项的TTL。 (整数,默认:
<none>) - write-capacity
-
表上的写入容量。(长整型,默认:
1)
5.4. Geode 源
Geode 源将从 Apache Geode EntryEvents 或 CqEvents 中提取对象并生成流。
5.4.1. 选项
Geode 源具有以下选项:geode
按前缀分组的属性:
geode.pool
- connect-type
-
指定连接类型:
server或locator。(ConnectType,默认值:<none>,可能的取值:locator、server) - host-addresses
-
指定一个或多个Gemfire定位器或服务器地址,格式为[主机]:[端口]。(InetSocketAddress[],默认:
<none>) - subscription-enabled
-
设置为 true 可以启用客户端池的订阅。这是同步更新到客户端缓存所必需的。(Boolean,默认:
false)
geode.security.ssl
- 密码
-
为安全套接字连接配置使用的 SSL 密码,作为有效密码名称的数组。(字符串,默认:
any) - keystore-type
-
标识用于SSL通信的Keystore类型(例如:JKS,PKCS11等)。 (String,默认值:
JKS) - keystore-uri
-
用于连接到 Geode 集群的预创建 Keystore URI。 (Resource, 默认值:
<none>) - ssl-keystore-password
-
访问密钥库 truststore 的密码。 (String,默认值:
<none>) - ssl-truststore-password
-
访问信任存储库的密码。 (String, 默认值:
<none>) - truststore-type
-
标识用于SSL通信的信任存储类型(例如 JKS,PKCS11 等)。 (String,默认值:
JKS) - truststore-uri
-
用于连接到 Geode 集群的预创建信任库 URI 的位置。 (资源, 默认:
<none>) - user-home-directory
-
本地缓存从truststoreUri和keystoreUri位置下载的truststore和keystore文件的目录。 (String, 默认值:
user.home)
5.5. HTTP 源
一个监听HTTP请求并将其主体作为消息负载发出的应用程序。
如果Content-Type匹配text/*或application/json,负载将是一个字符串,
否则负载将是一个字节数组。
负载:
If content type matches text/* 或 application/json
-
String
如果内容类型不匹配 text/* 或 application/json
-
byte array
5.5.2. 选项
The http source 支持以下配置属性:
按前缀分组的属性:
http.cors
- allow-credentials
-
是否应包含与被注解请求的域关联的任何cookie。 (布尔值,默认:
<none>) - allowed-headers
-
实际请求中可以使用的请求头列表。 (String[], 默认:
<none>) - allowed-origins
-
允许的 origins 列表,例如 https://domain1.com。 (String[], 默认值:
<none>)
5.6. JDBC 源
此源从关系型数据库管理系统(RDBMS)中轮询数据。
此源完全基于 DataSourceAutoConfiguration,有关更多信息,请参阅 Spring Boot JDBC 支持。
5.6.2. 选项
按前缀分组的属性:
jdbc.supplier
- max-rows
-
Max numbers of rows to process for query。 (Integer, 默认值:
0) - 查询
-
用于选择数据的查询。 (String,默认值:
<none>) - 分割
-
是否将SQL查询结果拆分为单独的消息。 (布尔值, 默认:
true) - 更新
-
用于将轮询的消息标记为“已查看”的SQL更新语句。(字符串,默认:
<none>)
spring.cloud.stream.poller
- 定时任务
-
Cron表达式值用于Cron触发器。(字符串,默认:
<none>) - fixed-delay
-
默认轮询程序的固定延迟时间。(Long,默认值:
1000) - initial-delay
-
周期性触发器的初始延迟。(整数,默认值:
0) - max-messages-per-poll
-
默认轮询器每次轮询的最大消息数。(Long,默认:
1) - time-unit
-
应用于延迟值的时间单位。(TimeUnit,默认:
<none>,有效值:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)
spring.datasource
- 数据
-
数据(DML)脚本资源引用。(List<String>,默认:
<none>) - driver-class-name
-
JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:
<none>) - initialization-mode
-
用于确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化的模式。(DataSourceInitializationMode,默认值:
embedded,可选值:ALWAYS、EMBEDDED、NEVER) - 密码
-
数据库登录密码。(字符串,默认:
<none>) - 架构
-
Schema (DDL) script resource references. (List<String>, default:
<none>) - url
-
数据库的 JDBC URL。(字符串,默认:
<none>) - 用户名
-
数据库登录用户名。(字符串,默认:
<none>)
也请参见 Spring Boot 文档
对于额外的 DataSource 属性和 TriggerProperties 以及 MaxMessagesProperties 的轮询选项。
5.7. JMS 源
JMS 源可接收来自 JMS 的消息。
5.7.1. 选项
JMS 的源有以下选项:
按前缀分组的属性:
jms.supplier
- client-id
-
用于持久订阅的客户端 ID。(字符串,默认:
<none>) - 目的地
-
要从中接收消息的目标(队列或主题)。(字符串,默认:
<none>) - message-selector
-
消息的选择器。(字符串,默认:
<none>) - session-transacted
-
启用事务并选择 DefaultMessageListenerContainer,不启用则选择 SimpleMessageListenerContainer。 (布尔类型,默认:
true) - subscription-durable
-
持久订阅为真。 (布尔类型,默认:
<none>) - subscription-name
-
持久或共享订阅的名称。(字符串,默认:
<none>) - subscription-shared
-
共享订阅为真。(布尔值,默认:
<none>)
spring.jms
- jndi-name
-
连接工厂JNDI名称。设置后,优先于其他连接工厂自动配置。(字符串,默认:
<none>) - pub-sub-domain
-
默认的目标类型是否为主题。(布尔值,默认:
false)
spring.jms.listener
- acknowledge-mode
-
容器的确认模式。默认情况下,监听器是事务性的,并使用自动确认。(AcknowledgeMode,默认值:
<none>,可选值:AUTO、CLIENT、DUPS_OK) - auto-startup
-
启动时自动启动容器。(布尔类型,默认值:
true) - 并发
-
最少并发消费者数量。(整数,默认:
<none>) - max-concurrency
-
最大并发消费者数。(整型,默认:
<none>) - receive-timeout
-
接收调用时使用的超时时间。使用-1表示无等待接收,0表示不设置超时。如果不在事务管理器内运行,则仅可能设置为0,并且通常不建议这样做,因为它会阻止干净关闭。(持续时间,默认:
1s)
5.9. 邮件发送源
一个监听邮件并将其消息正文作为消息负载发出的应用程序源代码。
5.9.1. 选项
邮件 源 有以下选项:
- mail.supplier.charset
-
字节数组邮件到字符串转换的字符集。 (字符串,默认:
UTF-8) - mail.supplier.delete
-
设置为 true 表示下载邮件后将其删除。(布尔类型,默认:
false) - mail.supplier.expression
-
配置一个 SpEL 表达式来选择消息。(字符串,默认:
true) - mail.supplier.idle-imap
-
设置为 true 以使用 IdleImap 配置。(布尔类型,默认:
false) - mail.supplier.java-mail-properties
-
JavaMail 属性作为由换行符分隔的 name-value 对字符串,例如 'foo=bar baz=car'。 (Properties, 默认:
<none>) - mail.supplier.mark-as-read
-
设置为 true 可标记邮件为已读。(布尔类型,默认:
false) - mail.supplier.url
-
邮件服务器连接的URL,例如 'imaps://username:[[email protected]:993/Inbox'。 (URLName,默认:
<none>) - mail.supplier.user-flag
-
当服务器不支持 ecent时标记消息的标志。(字符串,默认:
<none>)
5.10. MongoDB 数据源
此源从MongoDB获取数据。
此源完全基于MongoDataAutoConfiguration,请参阅
Spring Boot MongoDB支持
以获取更多信息。
5.10.1. 选项
The mongodb source 有以下选项:
按前缀分组的属性:
mongodb.supplier
- 集合
-
要查询的MongoDB集合。(字符串,默认:
<none>) - 查询
-
MongoDB 查询。 (字符串,默认:
{ }) - query-expression
-
MongoDB 查询 DSL 样式的 SpEL 表达式。(表达式,默认:
<none>) - 分割
-
是否将查询结果拆分为单个消息。(布尔类型,默认:
true) - update-expression
-
MongoDB 更新 DSL 样式的 SpEL 表达式。(表达式,默认:
<none>)
spring.data.mongodb
- authentication-database
-
认证数据库名称。(字符串,默认:
<none>) - auto-index-creation
-
是否启用自动索引创建。(布尔类型,默认:
<none>) - 数据库
-
数据库名称。 (字符串,默认:
<none>) - field-naming-strategy
-
要使用的 FieldNamingStrategy 的完全限定名。(Class<?>, 默认:
<none>) - grid-fs-database
-
<文档缺失> (字符串,默认:
<none>) - 主机
-
Mongo服务器主机。不能与URI一起设置。(字符串,默认:
<none>) - 密码
-
登录 MongoDB 服务器的密码。不能与 URI 一起设置。(字符数组,默认:
<none>) - 端口
-
Mongo服务器端口。不能与URI一起设置。(Integer,默认:
<none>) - replica-set-name
-
群集所需的副本集名称。不能与URI一起设置。(字符串,默认:
<none>) - uri
-
Mongo 数据库连接字符串。不能与主机、端口、凭据和副本集名称一起设置。(字符串,默认:
mongodb://localhost/test) - 用户名
-
登录 MongoDB 服务器的用户。不能与 URI 同时设置。(字符串,默认:
<none>) - uuid-representation
-
转换 UUID 为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认:
java-legacy,可能的值:UNSPECIFIED、STANDARD、C_SHARP_LEGACY、JAVA_LEGACY、PYTHON_LEGACY)
也请参见 Spring Boot 文档 以了解额外的 MongoProperties 属性。
查看并 TriggerProperties 了解轮询选项。
5.11. MQTT 源
源代码可用于接收来自 MQTT 的消息。
5.11.2. 选项
MQTT 源具有以下选项:
按前缀分组的属性:
mqtt
- clean-session
-
客户端和服务器是否应记住重启和重新连接之间的状态。(布尔值,默认:
true) - connection-timeout
-
连接超时时间(秒)。 (Integer, 默认值:
30) - keep-alive-interval
-
ping间隔,单位为秒。(整数,默认值:
60) - 密码
-
连接到代理时要使用的密码。(字符串,默认:
guest) - 持久化
-
'memory' 或 'file'。 (字符串,默认:
memory) - persistence-directory
-
持久化目录。 (字符串,默认:
/tmp/paho) - url
-
MQTT代理的位置(以逗号分隔的列表)。(字符串数组,默认:
[tcp://localhost:1883]) - 用户名
-
连接到代理时使用的用户名。(字符串,默认:
guest)
5.12. RabbitMQ 源
The "rabbit" 源启用接收来自RabbitMQ的消息。
The queue(s) 必须在流部署之前存在;它们不会自动创建。 您可以轻松地使用 RabbitMQ 网页界面创建一个队列。
5.12.3. 选项
Rabbit 源代码有以下选项:
按前缀分组的属性:
rabbit.supplier
- enable-retry
-
启用重试的布尔值。(Boolean,默认值:
false) - initial-retry-interval
-
启用重试时的初始重试间隔。(整数,默认值:
1000) - mapped-request-headers
-
要映射的标头。 (String[], 默认值:
[STANDARD_REQUEST_HEADERS]) - max-attempts
-
重试启用时的最大交付尝试次数。(整数,默认:
3) - max-retry-interval
-
启用重试时的最大重试间隔。(整数,默认:
30000) - own-connection
-
为 true 时,使用基于引导属性的单独连接。(布尔类型,默认值:
false) - 队列
-
源将监听消息的队列。(字符串数组,默认:
<none>) - 重新排队
-
拒绝的消息是否应重新排队。(布尔值,默认:
true) - retry-multiplier
-
启用重试时的重试退避倍数。(Double,默认:
2) - 事务性
-
渠道是否已交易。(布尔型,默认:
false)
spring.rabbitmq
- address-shuffle-mode
-
用于打乱配置地址的模式。(AddressShuffleMode,默认值:
none,可选值:NONE、RANDOM、INORDER) - 地址
-
客户端应连接的地址列表,用逗号分隔。设置后,主机和端口将被忽略。(字符串,默认:
<none>) - channel-rpc-timeout
-
通道中RPC调用的继续超时时间。将其设置为零以无限期等待。(持续时间,默认:
10m) - connection-timeout
-
连接超时时间。将其设置为零以无限期等待。(持续时间,默认:
<none>) - 主机
-
RabbitMQ主机。如果设置了地址,则忽略此设置。(字符串,默认:
localhost) - 密码
-
登录以向代理进行身份验证。(字符串,默认:
guest) - 端口
-
RabbitMQ 端口。如果设置了地址,则忽略此设置。默认为 5672,如果启用了 SSL,则默认为 5671。(Integer,默认:
<none>) - publisher-confirm-type
-
发布者类型确认要使用的。(ConfirmType,默认值:
<none>,可能的取值:SIMPLE,CORRELATED,NONE) - publisher-returns
-
是否启用发布商返还。(布尔值,默认:
false) - requested-channel-max
-
客户端请求的每个连接的通道数。使用0表示无限制。(整数,默认:
2047) - requested-heartbeat
-
请求的心跳超时时间;零表示无。如果未指定持续时间后缀,则使用秒。(持续时间,默认:
<none>) - 用户名
-
登录用户以对经纪人进行身份验证。(字符串,默认:
guest) - virtual-host
-
连接到代理时使用的虚拟主机。(字符串,默认:
<none>)
也请参见 Spring Boot 文档 对于 broker 连接和监听器属性的额外属性。
一个关于重试的注意事项
在默认的 ackMode (AUTO)和 requeue (true)选项下,失败的消息投递将无限次重试。由于Rabbit源本身处理的内容不多,因此源本身出现故障的风险较小,除非下游的Binder因某种原因未连接。将 重试 设置为 false 将导致消息在首次尝试时被拒绝(如果代理配置了死信交换/队列,则可能发送到该位置)。enableRetry 选项允许配置重试参数,以便在消息传递失败时进行重试,并在重试次数用尽后最终丢弃(或死信)。在重试间隔期间,交付线程被挂起。重试选项包括enableRetry、maxAttempts、initialRetryInterval、retryMultiplier和maxRetryInterval。使用 MessageConversionException 导致的消息传递失败永远不会重试;假设如果消息在第一次尝试时无法转换,后续尝试也会失败。此类消息将被丢弃(或转入死信队列)。
|
5.13. 亚马逊S3源
此源应用程序支持使用 Amazon S3 协议传输文件。 文件从 remote 目录(S3 存储桶)传输到部署应用程序的 local 目录。
源发出的消息默认以字节数组提供。但是,可以使用--mode选项进行自定义:
-
ref 提供一个
java.io.File引用 -
lines 将按行拆分文件,并为每一行发出新消息
-
contents 默认值。提供文件的内容作为字节数组
使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始 和 文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarker。withMarkers 的默认值是 false,除非明确设置。
另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
模式 = 行
5.13.3. 选项
该 s3 源具有以下选项:
按前缀分组的属性:
file.consumer
- markers-json
-
当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:
true) - 模式
-
用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:
<none>,可能的值:ref,lines,contents) - with-markers
-
设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:
<none>)
metadata.store.dynamo-db
- create-delay
-
创建表重试之间的延迟。(整数,默认:
1) - create-retries
-
创建表请求的重试次数。(整数,默认:
25) - read-capacity
-
表上的读取容量。(长整型,默认:
1) - 表格
-
用于元数据的表名。(字符串,默认:
<none>) - time-to-live
-
表项的TTL。 (整数,默认:
<none>) - write-capacity
-
表上的写入容量。(长整型,默认:
1)
metadata.store
- 类型
-
指示要配置的元数据存储类型(默认为“memory”)。若要使用持久化存储,您必须包含相应的Spring Integration依赖。(StoreType, 默认值:
<none>, 可能的值:mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)
metadata.store.zookeeper
- connect-string
-
Zookeeper连接字符串格式为HOST:PORT。(字符串,默认:
127.0.0.1:2181) - 编码
-
存储数据到Zookeeper时使用的编码。(字符集,默认:
UTF-8) - retry-interval
-
Zookeeper操作的重试间隔,单位为毫秒。(Integer,默认:
1000) - 根目录
-
根节点 - 存储条目是此节点的子节点。(字符串,默认:
/SpringIntegration-MetadataStore)
s3.common
- endpoint-url
-
连接到与 S3 兼容的存储的可选端点 URL。(字符串,默认:
<none>) - path-style-access
-
使用路径样式访问。(Boolean,默认值:
false)
s3.supplier
- auto-create-local-dir
-
是否创建本地目录。(布尔类型,默认:
true) - delete-remote-files
-
处理完成后是否删除远程文件。(布尔值,默认:
false) - filename-pattern
-
用于过滤远程文件的模式。(字符串,默认:
<none>) - filename-regex
-
用于筛选远程文件的正则表达式。(模式,默认:
<none>) - list-only
-
设置为 true 可以返回 s3 对象元数据而不需将文件复制到本地目录。(Boolean,默认:
false) - local-dir
-
本地目录,用于存储文件。(File,默认:
<none>) - preserve-timestamp
-
是否将远程文件的时间戳传递给本地文件。(布尔类型,默认:
true) - remote-dir
-
AWS S3存储桶资源。(字符串,默认:
bucket) - remote-file-separator
-
远程文件分隔符。 (字符串,默认:
/) - tmp-file-suffix
-
临时文件后缀。(字符串,默认:
.tmp)
5.13.4. Amazon AWS 常用选项
Amazon S3 源(与其他所有 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,并且其自动配置类由 Spring Boot 自动使用。有关所需和有用的自动配置属性,请参阅他们的文档。
其中一些是关于AWS凭据的:
-
cloud.aws.credentials.accessKey
-
cloud.aws.credentials.secretKey
-
cloud.aws.credentials.instanceProfile
-
cloud.aws.credentials.profileName
-
cloud.aws.credentials.profilePath
其他用于 AWS Region 定义:
-
cloud.aws.region.auto
-
cloud.aws.region.static
对于 AWS Stack:
-
cloud.aws.stack.auto
-
cloud.aws.stack.name
5.14. SFTP 源
此源应用程序支持使用SFTP协议传输文件。文件从remote目录传输到部署应用的local目录。默认情况下,源发出的消息为字节数组。但是,可以使用--mode选项进行自定义:
-
ref 提供一个
java.io.File引用 -
lines 将按行拆分文件,并为每一行发出新消息
-
contents 默认值。提供文件的内容作为字节数组
使用 --mode=lines 时,还可以提供附加选项 --withMarkers=true。如果设置为 true,底层的 FileSplitter 将在实际数据之前和之后发出额外的 文件开始 和 文件结束 标记消息。这两个附加标记消息的有效载荷类型为 FileSplitter.FileMarker。withMarkers 的默认值是 false,除非明确设置。
有关高级配置选项,请参阅sftp-supplier。
另请参阅MetadataStore选项,了解用于防止重启时重复消息的共享持久存储配置。
5.14.2. 输出
模式 = 内容
模式 = 行
5.14.3. 选项
该ftp源具有以下选项:
按前缀分组的属性:
file.consumer
- markers-json
-
当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:
true) - 模式
-
用于文件读取源的文件读取模式。值为 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 内容作为字节。(FileReadingMode,默认:
<none>,可能的值:ref,lines,contents) - with-markers
-
设置为 true 可在数据之前/之后发出文件开始/结束标记消息。仅当 FileReadingMode 设置为 'lines' 时有效。(Boolean,默认值:
<none>)
metadata.store.dynamo-db
- create-delay
-
创建表重试之间的延迟。(整数,默认:
1) - create-retries
-
创建表请求的重试次数。(整数,默认:
25) - read-capacity
-
表上的读取容量。(长整型,默认:
1) - 表格
-
用于元数据的表名。(字符串,默认:
<none>) - time-to-live
-
表项的TTL。 (整数,默认:
<none>) - write-capacity
-
表上的写入容量。(长整型,默认:
1)
metadata.store
- 类型
-
指示要配置的元数据存储类型(默认为“memory”)。若要使用持久化存储,您必须包含相应的Spring Integration依赖。(StoreType, 默认值:
<none>, 可能的值:mongodb,gemfire,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)
metadata.store.zookeeper
- connect-string
-
Zookeeper连接字符串格式为HOST:PORT。(字符串,默认:
127.0.0.1:2181) - 编码
-
存储数据到Zookeeper时使用的编码。(字符集,默认:
UTF-8) - retry-interval
-
Zookeeper操作的重试间隔,单位为毫秒。(Integer,默认:
1000) - 根目录
-
根节点 - 存储条目是此节点的子节点。(字符串,默认:
/SpringIntegration-MetadataStore)
sftp.supplier
- auto-create-local-dir
-
如果不存在,则设置为 true 以创建本地目录。(Boolean,默认:
true) - delay-when-empty
-
未检测到新文件时的延迟持续时间。(持续时间,默认:
1s) - delete-remote-files
-
设置为true表示在成功传输后删除远程文件。(布尔类型,默认:
false) - 目录
-
工厂“name.directory”对的列表。(String[],默认:
<none>) - 工厂
-
一个工厂名称到工厂的映射。(Map<String, Factory>, 默认值:
<none>) - 公平
-
多个服务器/目录公平轮询。默认为false,因此如果源有多个条目,则在访问其他源之前会先接收这些条目。(布尔型,默认:
false) - filename-pattern
-
一个过滤器模式,用于匹配要传输的文件名。(字符串,默认:
<none>) - filename-regex
-
用于匹配要传输的文件名的过滤正则表达式模式。(Pattern,默认值:
<none>) - list-only
-
设置为 true 可以在不包含整个有效负载的情况下返回文件元数据。(布尔值,默认:
false) - local-dir
-
用于文件传输的本地目录。(文件,默认:
<none>) - max-fetch
-
每次轮询要获取的远程文件的最大数量;默认为无限制。在列出文件或构建任务启动请求时无效。(整数,默认:
<none>) - preserve-timestamp
-
设置为 true 可保留原始时间戳。(布尔类型,默认值:
true) - remote-dir
-
远程 FTP 目录。(字符串,默认:
/) - remote-file-separator
-
远程文件分隔符。(字符串,默认:
/) - rename-remote-files-to
-
一个 SpEL 表达式,解析为新名称,在远程文件成功传输后必须重命名。(表达式,默认:
<none>) - 流
-
设置为 true 可以流式传输文件,而不是复制到本地目录。(Boolean,默认:
false) - tmp-file-suffix
-
传输进行期间使用的后缀。(字符串,默认:
.tmp)
sftp.supplier.factory
- allow-unknown-keys
-
允许未知或已更改的密钥。 (Boolean,默认:
false) - 主机
-
服务器的主机名。(字符串,默认:
localhost) - known-hosts-expression
-
一个解析为已知主机文件位置的 SpEL 表达式。(表达式,默认:
<none>) - pass-phrase
-
用户私钥的密码。(字符串,默认:
<empty string>) - 密码
-
连接到服务器时使用的密码。(字符串,默认:
<none>) - 端口
-
服务器的端口。(整数,默认:
22) - private-key
-
用户私钥的位置。 (资源,默认:
<none>) - 用户名
-
连接服务器时要使用的用户名。(字符串,默认值:
<none>)
5.15. 系统日志
syslog源通过UDP、TCP或两者同时接收SYSLOG数据包。支持RFC3164(BSD)和RFC5424格式。
5.15.1. 选项
- syslog.supplier.buffer-size
-
解码消息时使用的缓冲区大小;较大的消息将被拒绝。(Integer,默认:
2048) - syslog.supplier.nio
-
是否使用NIO(在支持大量连接时)。(布尔值,默认:
false) - syslog.supplier.port
-
要监听的端口。(整数,默认:
1514) - syslog.supplier.protocol
-
用于 SYSLOG 的协议(tcp 或 udp)。(协议,缺省:
<none>,有效值:tcp、udp、both) - syslog.supplier.reverse-lookup
-
是否对传入的套接字执行反向查找。(布尔值,默认:
false) - syslog.supplier.rfc
-
"5424" 或 "3164" - 根据 RFC 的 syslog 格式;3164 又称作 "BSD" 格式。 (字符串,默认值:
3164) - syslog.supplier.socket-timeout
-
套接字超时时间。(整型,默认:
0)
5.16. TCP
源代码tcp用作服务器,允许远程方连接到它并通过原始tcp套接字提交数据。
TCP是一种流式协议,需要某种机制来在传输线上对消息进行定界。提供了多种解码器,默认的是'CRLF',与Telnet兼容。
由TCP源应用程序生成的消息的byte[]有效负载。
5.17. 时间源
时间源将每隔一段时间简单地发出包含当前时间的字符串。
5.17.1. 可选项
时间源具有以下选项:time
- spring.cloud.stream.poller.cron
-
Cron表达式值用于Cron触发器。(字符串,默认:
<none>) - spring.cloud.stream.poller.fixed-delay
-
默认轮询程序的固定延迟时间。(Long,默认值:
1000) - spring.cloud.stream.poller.initial-delay
-
周期性触发器的初始延迟。(整数,默认值:
0) - spring.cloud.stream.poller.max-messages-per-poll
-
默认轮询器每次轮询的最大消息数。(Long,默认:
1) - spring.cloud.stream.poller.time-unit
-
应用于延迟值的时间单位。(TimeUnit,默认:
<none>,有效值:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS) - time.date-format
-
日期值的格式。(字符串,默认:
MM/dd/yy HH:mm:ss)
5.18. Twitter 消息源
重复获取最近30天内的直接消息(包括已发送和接收的消息),并按时间倒序排列。
缓解后的消息被缓存(在MetadataStore缓存中)以防止重复。
默认使用内存中的SimpleMetadataStore。
该twitter.message.source.count控制返回消息的数量。
属性spring.cloud.stream.poller控制 the message poll interval。
必须与使用的API速率限制一致
5.18.1. 选项
按前缀分组的属性:
spring.cloud.stream.poller
- 定时任务
-
Cron表达式值用于Cron触发器。(字符串,默认:
<none>) - fixed-delay
-
默认轮询程序的固定延迟时间。(Long,默认值:
1000) - initial-delay
-
周期性触发器的初始延迟。(整数,默认值:
0) - max-messages-per-poll
-
默认轮询器每次轮询的最大消息数。(Long,默认:
1) - time-unit
-
应用于延迟值的时间单位。(TimeUnit,默认:
<none>,有效值:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)
twitter.connection
- access-token
-
您的 Twitter Tokens。 (字符串,默认值:
<none>) - access-token-secret
-
您的 Twitter Tokens密钥。(字符串,默认:
<none>) - consumer-key
-
您的 Twitter 密钥。(字符串,默认:
<none>) - consumer-secret
-
您的 Twitter 密钥。 (字符串,默认值:
<none>) - debug-enabled
-
启用 Twitter4J 调试模式。(布尔值,默认:
false) - raw-json
-
启用缓存由 Twitter API 返回的原始(未经处理)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 JSON 表示形式。当设置为 True 时,结果将使用原始的 Twitter API JSON 表示形式。(布尔值,默认:
true)
5.19. Twitter搜索源
Twitter 的 标准搜索 API(search/tweets)允许对最近或热门推文的索引进行简单查询。这个 Source 提供了针对过去 7 天内发布的新近推文抽样的连续搜索。是 'public' API 集合的一部分。
返回与指定查询匹配的相关推文集合。
使用spring.cloud.stream.poller属性来控制连续搜索请求之间的间隔。速率限制-每30分钟窗口内180个请求(例如,约6次/分钟,约1次/10秒)
该twitter.search查询属性允许通过关键字进行查询,并根据时间和地理位置过滤结果。
代码twitter.search.count和twitter.search.page控制搜索API的结果分页。
注意:Twitter 的搜索服务以及由此扩展的搜索 API 并不打算作为推文的全面来源。并非所有的推文都会被索引或通过搜索界面提供。
5.19.1. 选项
按前缀分组的属性:
spring.cloud.stream.poller
- 定时任务
-
Cron表达式值用于Cron触发器。(字符串,默认:
<none>) - fixed-delay
-
默认轮询程序的固定延迟时间。(Long,默认值:
1000) - initial-delay
-
周期性触发器的初始延迟。(整数,默认值:
0) - max-messages-per-poll
-
默认轮询器每次轮询的最大消息数。(Long,默认:
1) - time-unit
-
应用于延迟值的时间单位。(TimeUnit,默认:
<none>,有效值:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)
twitter.connection
- access-token
-
您的 Twitter Tokens。 (字符串,默认值:
<none>) - access-token-secret
-
您的 Twitter Tokens密钥。(字符串,默认:
<none>) - consumer-key
-
您的 Twitter 密钥。(字符串,默认:
<none>) - consumer-secret
-
您的 Twitter 密钥。 (字符串,默认值:
<none>) - debug-enabled
-
启用 Twitter4J 调试模式。(布尔值,默认:
false) - raw-json
-
启用缓存由 Twitter API 返回的原始(未经处理)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 JSON 表示形式。当设置为 True 时,结果将使用原始的 Twitter API JSON 表示形式。(布尔值,默认:
true)
twitter.search
- 计数
-
每页返回的推文数量(例如,每次单一请求),最多为100个。(整数,默认:
100) - 语言
-
将搜索到的推文限制为指定语言,由 http://en.wikipedia.org/wiki/ISO_639-1 定义。(字符串,默认:
<none>) - 页面
-
从最近的推文开始搜索之前,要向后搜索的页面数量(例如请求),以在最近年份的推文中再次启动搜索。总共向后搜索的推文数为(页码 * 数量)(整数,默认:
3) - 查询
-
通过搜索查询字符串搜索推文。(字符串,默认:
<none>) - restart-from-most-recent-on-empty-response
-
在空响应时,从最近的推文重新开始搜索。仅在第一次重启后应用(例如,当 since_id != UNBOUNDED 时)(布尔值,默认:
false) - result-type
-
指定您偏好的搜索结果类型。当前默认值为“混合”。有效值包括:混合:在响应中同时包含热门和实时结果。最近:仅在响应中返回最新的结果。热门:仅在响应中返回最热门的结果。(ResultType,默认:
<none>,可能值:popular,mixed,recent) - Since
-
如果指定,返回自给定日期以来的推文。日期应按 YYYY-MM-DD 格式设置。(字符串,默认:
<none>)
5.20. Twitter 流源
-
The
Filter API返回与一个或多个过滤器谓词匹配的公共状态。 使用多个参数可以使用单个连接到流式API。 提示:track、follow和locations字段用或运算符组合! 包含track=foo和follow=1234的查询返回匹配test或由用户1234创建的推文。 -
该
Sample API返回所有公共状态的小随机样本。
默认访问级别的推文返回的结果相同,因此如果两个不同的客户端连接到此端点,则它们会看到相同的推文。
默认访问级别允许最多400个跟踪关键字、5,000个关注用户ID以及25个0.1-360度的位置框。
5.20.1. 可选项
按前缀分组的属性:
twitter.connection
- access-token
-
您的 Twitter Tokens。 (字符串,默认值:
<none>) - access-token-secret
-
您的 Twitter Tokens密钥。(字符串,默认:
<none>) - consumer-key
-
您的 Twitter 密钥。(字符串,默认:
<none>) - consumer-secret
-
您的 Twitter 密钥。 (字符串,默认值:
<none>) - debug-enabled
-
启用 Twitter4J 调试模式。(布尔值,默认:
false) - raw-json
-
启用缓存由 Twitter API 返回的原始(未经处理)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 JSON 表示形式。当设置为 True 时,结果将使用原始的 Twitter API JSON 表示形式。(布尔值,默认:
true)
twitter.stream.filter
- 计数
-
指示在转换到实时流之前要流式传输的先前状态的数量。(整数,默认:
0) - filter-level
-
过滤器级别限制流中显示的推文,仅包括具有最小filterLevel属性值的推文。可以是none、low或medium之一。(FilterLevel,默认:
<none>) - 关注
-
指定要接收公共推文的用户,按ID。(List<Long>, 默认:
<none>) - 语言
-
指定流的推文语言。(List<String>, 默认:
<none>) - 位置
-
要跟踪的位置。内部表示为二维数组。边界框无效:52.38,4.90,51.51,-0.12。第一对必须是框的西南角(List<BoundingBox>,默认:
<none>) - 跟踪
-
指定要跟踪的关键词。(List<String>, 默认:
<none>)
5.22. ZeroMQ源
“zeromq”源启用从ZeroMQ接收消息。
5.22.3. 选项
Java开发Spring框架的英文网站有以下选项:zeromq
- zeromq.supplier.bind-port
-
创建 ZeroMQ 套接字时绑定的端口;0 表示选择一个随机端口。(整数,默认:
0) - zeromq.supplier.connect-url
-
连接到 ZeroMQ 套接字的 URL。(字符串,默认:
<none>) - zeromq.supplier.consume-delay
-
当 ZeroMQ Socket 没有接收到数据时,消费的延迟。(持续时间,默认:
1s) - zeromq.supplier.socket-type
-
连接应建立的套接字类型。(SocketType,默认:
<none>,可能值:PAIR,PUB,SUB,REQ,REP,DEALER,ROUTER,PULL,PUSH,XPUB,XSUB,STREAM) - zeromq.supplier.topics
-
要订阅的主题。(字符串数组,默认:
[])
也请参见 Spring Boot 文档 对于 broker 连接和监听器属性的额外属性。
6. 处理器
6.1. 聚合处理器
聚合处理器使应用程序能够将传入的消息分组并释放到输出目标中。
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。
6.1.2. 选项
按前缀分组的属性:
聚合器
- 聚合
-
用于聚合策略的SpEL表达式。默认为负载集合。(表达式,默认:
<none>) - 相关性
-
用于关联键的 SpEL 表达式。默认为 correlationId 标头。(表达式,默认:
<none>) - group-timeout
-
SpEL 表达式用于设置超时未完成组的过期时间。(表达式,默认:
<none>) - message-store-entity
-
持久化消息存储实体:关系型数据库中的表前缀,MongoDb 中的集合名称等。(字符串,默认:
<none>) - message-store-type
-
消息存储类型。 (字符串,默认:
<none>) - 发布
-
发布策略的 SpEL 表达式。默认情况下基于 sequenceSize 请求头。(表达式,默认:
<none>)
spring.data.mongodb
- authentication-database
-
认证数据库名称。(字符串,默认:
<none>) - auto-index-creation
-
是否启用自动索引创建。(布尔类型,默认:
<none>) - 数据库
-
数据库名称。 (字符串,默认:
<none>) - field-naming-strategy
-
要使用的 FieldNamingStrategy 的完全限定名。(Class<?>, 默认:
<none>) - grid-fs-database
-
<文档缺失> (字符串,默认:
<none>) - 主机
-
Mongo服务器主机。不能与URI一起设置。(字符串,默认:
<none>) - 密码
-
登录 MongoDB 服务器的密码。不能与 URI 一起设置。(字符数组,默认:
<none>) - 端口
-
Mongo服务器端口。不能与URI一起设置。(Integer,默认:
<none>) - replica-set-name
-
群集所需的副本集名称。不能与URI一起设置。(字符串,默认:
<none>) - uri
-
Mongo 数据库连接字符串。不能与主机、端口、凭据和副本集名称一起设置。(字符串,默认:
mongodb://localhost/test) - 用户名
-
登录 MongoDB 服务器的用户。不能与 URI 同时设置。(字符串,默认:
<none>) - uuid-representation
-
转换 UUID 为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认:
java-legacy,可能的值:UNSPECIFIED、STANDARD、C_SHARP_LEGACY、JAVA_LEGACY、PYTHON_LEGACY)
spring.datasource
- continue-on-error
-
初始化数据库时如果发生错误是否停止。(布尔类型,默认:
false) - 数据
-
数据(DML)脚本资源引用。(List<String>,默认:
<none>) - data-password
-
执行DML脚本时的数据库密码(如果不同)。(字符串,默认:
<none>) - data-username
-
要执行DML脚本的数据库用户名(如果不同)。(字符串,默认:
<none>) - driver-class-name
-
JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:
<none>) - embedded-database-connection
-
嵌入式数据库的连接详情。如果类路径中存在可用的最合适的嵌入式数据库,则默认使用该数据库。(EmbeddedDatabaseConnection,默认值:
<none>,可能的值:NONE,H2,DERBY,HSQL,HSQLDB) - generate-unique-name
-
是否生成随机数据源名称。(布尔型,默认:
true) - initialization-mode
-
用于确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化的模式。(DataSourceInitializationMode,默认值:
embedded,可选值:ALWAYS、EMBEDDED、NEVER) - jndi-name
-
JNDI位置的数据源。设置后,类、URL、用户名和密码将被忽略。(字符串,默认:
<none>) - 姓名
-
如果 "generate-unique-name" 为 false,则要使用的数据源名称。当使用嵌入式数据库时,默认值为 "testdb",否则为 null。(String,默认:
<none>) - 密码
-
数据库登录密码。(字符串,默认:
<none>) - 平台
-
DDL 或 DML 脚本中使用的平台(例如 schema-${platform}.sql 或 data-${platform}.sql)。(字符串,默认:
all) - 架构
-
Schema (DDL) script resource references. (List<String>, default:
<none>) - schema-password
-
执行 DDL 脚本的数据库密码(如果不同)。(字符串,默认:
<none>) - schema-username
-
要执行DDL脚本的数据库的用户名(如果不同的话)。(字符串,默认:
<none>) - 分隔符
-
SQL初始化脚本中的语句分隔符。(字符串,默认:
;) - sql-script-encoding
-
SQL 脚本编码。 (字符集,默认:
<none>) - 类型
-
要使用的连接池实现的完全限定名。默认情况下,它会自动从类路径中检测。(Class<DataSource>,默认:
<none>) - url
-
数据库的 JDBC URL。(字符串,默认:
<none>) - 用户名
-
数据库登录用户名。(字符串,默认:
<none>)
spring.mongodb.embedded
- 功能
-
要启用的功能的逗号分隔列表。默认情况下使用配置版本的默认值。(Set<Feature>, 默认:
[sync_delay]) - 版本
-
要使用的 Mongo 版本。(字符串,默认:
3.5.5)
spring.redis
- client-name
-
连接上要设置的客户端名称,通过 CLIENT SETNAME 设置。 (字符串,默认:
<none>) - client-type
-
要使用的客户端类型。默认情况下,根据类路径自动检测。(ClientType,默认值:
<none>,可能的取值:LETTUCE,JEDIS) - connect-timeout
-
连接超时。(持续时间,默认:
<none>) - 数据库
-
连接工厂使用的数据库索引。(整数,默认:
0) - 主机
-
Redis服务器主机。(字符串,默认:
localhost) - 密码
-
Redis服务器的登录密码。(字符串,默认:
<none>) - 端口
-
Redis服务器端口。(整数,默认:
6379) - SSL
-
是否启用SSL支持。(布尔类型,默认:
false) - timeout
-
读取超时时间。(持续时间,默认:
<none>) - url
-
连接URL。覆盖主机、端口和密码。用户被忽略。例如:redis://user:[email protected]:6379 (字符串,默认:
<none>) - 用户名
-
登录 Redis 服务器的用户名。(字符串,默认值:
<none>)
6.3 过滤器处理器
过滤器处理器允许应用程序检查传入的有效载荷,然后对其应用谓词以决定是否需要继续记录。 例如,如果传入的有效载荷类型为 String 并且您想筛选掉少于五个字符的内容,则可以按如下方式运行过滤器处理器。
java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。
6.5. Header Enricher Processor
使用 header-enricher 应用添加消息头。
标题以换行符分隔的键值对形式提供,其中键是标题名称,值为SpEL表达式。
例如 --headers='foo=payload.someProperty \n bar=payload.otherProperty'。
6.6. Http 请求处理器
一个处理器应用程序,该程序向HTTP资源发出请求,并将响应正文作为消息有效负载进行发布。
6.6.1. 输入
headers
任何必需的 HTTP 头部必须通过 headers 或 headers-expression 属性显式设置。请参见下面的示例。
头部值也可用于构造:
-
当在
body-expression属性中引用时,请求正文。 -
当在
http-method-expression属性中引用时,HTTP方法。 -
在
url-expression属性中引用时的URL。
负载
默认情况下,负载用作 POST 请求的请求体,并且可以是任何 Java 类型。对于 GET 请求,它应该是一个空字符串。负载还可以用于构建:
-
当在
body-expression属性中引用时,请求正文。 -
当在
http-method-expression属性中引用时,HTTP方法。 -
在
url-expression属性中引用时的URL。
底层的WebClient支持Jackson JSON序列化,以支持任何请求和响应类型。expected-response-type属性,默认为String.class,可以设置为您应用程序类路径中的任何类。请注意,用户定义的有效负载类型需要向您的pom文件添加所需的依赖项。
6.6.2. 输出
负载
原始输出对象是ResponseEntity<?>,其任何字段(例如:body、headers)或访问器方法(statusCode)都可以作为reply-expression的一部分引用。默认情况下,传出消息的有效负载是响应体。请注意,Jackson 默认无法反序列化 ResponseEntity(由表达式 #root 引用),但可以将其渲染为HashMap。
6.6.4. 选项
- http.request.body-expression
-
从传入消息中推导请求体的 SpEL 表达式。(表达式,默认:
<none>) - http.request.expected-response-type
-
用于解释响应的类型。(Class<?>,默认:
<none>) - http.request.headers-expression
-
用于推导要使用的 HTTP 头部映射的 SpEL 表达式。 (表达式,默认:
<none>) - http.request.http-method-expression
-
从传入的消息推导请求方法的 SpEL 表达式。(表达式,默认:
<none>) - http.request.maximum-buffer-size
-
输入流缓冲区分配的最大字节数。默认值为256k。如需发布或获取大二进制内容,请适当增加。(Integer,缺省:
0) - http.request.reply-expression
-
用于计算最终结果的SpEL表达式,应用于整个http {@link org.springframework.http.ResponseEntity}。 (表达式,默认:
<none>) - http.request.timeout
-
请求超时时间(毫秒)。(Long,默认:
30000) - http.request.url-expression
-
对传入消息进行SpEL表达式计算以确定要使用的URL。(表达式,默认:
<none>)
6.7. 图像识别处理器
一个使用Inception模型对实时图像进行分类的处理器,可以将图像分为不同的类别(例如标签)。
模型的输入是二进制数组形式的图像。
输出为以下格式的JSON消息:
{
"labels" : [
{"giant panda":0.98649305}
]
}
Result 包含识别类别的名称(例如 label)以及该图像代表此类别的置信度(例如 confidence)。
如果将response-seize设置为高于1的值,则结果将包括最可能的前response-seize个标签。例如,response-size=3将返回:
{
"labels": [
{"giant panda":0.98649305},
{"badger":0.010562794},
{"ice bear":0.001130851}
]
}
负载
如果传入类型为 byte[] 且内容类型设置为 application/octet-stream,则应用程序将输入的 byte[] 图像处理并输出增强后的 byte[] 图像负载和 JSON 标头。
6.7.2. 选项
- image.recognition.cache-model
-
缓存预训练的 TensorFlow 模型。(布尔类型,默认:
true) - image.recognition.debug-output
-
<文档缺失> (布尔值,默认:
false) - image.recognition.debug-output-path
-
<文档缺失> (字符串,默认:
image-recognition-result.png) - image.recognition.model
-
预训练的TensorFlow图像识别模型。请注意,该模型必须与所选的模型类型匹配!(字符串,默认:
https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb) - image.recognition.model-type
-
支持三种不同的预训练 tensorflow 图像识别模型:inception、mobilenetv1 和 mobilenetv2。1. inception图使用“input”作为输入,“output”作为输出。2. mobilenetv2 预训练模型:https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - 标准化图像尺寸始终为正方形(例如 h=w)- 图使用“input”作为输入和“mobilenetv2/predictions/reshape_1”作为输出。3. mobilenetv1 预训练模型:https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - 图使用“input”作为输入和“mobilenetv1/predictions/reshape_1”作为输出。(modeltype,默认:
<none>,可能值:inception,mobilenetv1,mobilenetv2) - image.recognition.normalized-image-size
-
归一化图像大小。(整数,默认:
224) - image.recognition.response-size
-
已识别图片的数量。(整数,默认值:
5)
6.8. 对象检测处理器
对象检测处理器提供开箱即用的 TensorFlow 对象检测 API 支持。它允许在单个图像或图像流中实时定位和识别多个对象。对象检测处理器是基于 对象检测功能 构建的。
这里是一些合理的配置默认值:
-
object.detection.model:storage.googleapis.com/scdf-tensorflow-models/object-detection/faster_rcnn_resnet101_coco_2018_01_28_frozen_inference_graph.pb -
object.detection.labels:storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt -
object.detection.with-masks:false
下图显示了一个 Spring Cloud 数据流,用于实时预测输入图像流中的对象类型。
处理器的输入是一个图像字节数组,输出是增强后的图像和一个标题,称为detected_objects,它提供了检测到的对象的文字描述:
{
"labels" : [
{"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
{"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
{"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
{"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
]
}
0 头格式是:
-
对象名称:置信度 - 检测到的对象的人类可读名称(例如,标签)及其置信度,置信度为[0-1]之间的浮点数
-
x1、y1、x2、y2 - 响应还提供了检测到的对象的边界框,表示为
(x1, y1, x2, y2)。坐标相对于图像大小进行相对定位。 -
cid-分类标识符,如所提供的标签配置文件中定义。
负载
传入的类型是 byte[],内容类型是 application/octet-stream。处理器处理输入的 byte[] 图像,并输出一个增强的 byte[] 图像负载和一个 JSON 头部(detected_objects)。
6.8.2. 选项
- object.detection.cache-model
-
<文档缺失> (布尔值,默认:
true) - object.detection.confidence
-
<文档缺失> (浮点数,默认:
0.4) - object.detection.debug-output
-
<文档缺失> (布尔值,默认:
false) - object.detection.debug-output-path
-
<文档缺失> (字符串,默认:
object-detection-result.png) - object.detection.labels
-
标签 URI。 (字符串,默认:
https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt) - object.detection.model
-
预训练的 TensorFlow 对象检测模型。(字符串,默认:
https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb) - object.detection.response-size
-
<documentation missing> (Integer,默认:
<none>) - object.detection.with-masks
-
<文档缺失> (布尔值,默认:
false)
6.9. 语义分割处理器
基于最先进的DeepLab TensorFlow模型的图像语义分割。
图像分割是将图像中的每个像素与一个类别标签(如花朵、人、道路、天空、海洋或汽车)关联起来的过程。Semantic Segmentation不同于Instance Segmentation,后者生成实例感知的区域掩码,而Semantic Segmentation则生成类别感知的掩码。要实现Instance Segmentation,请参阅目标检测服务。
该 Semantic Segmentation Processor 使用 语义分割功能 库和 TensorFlow 服务。
负载
传入的类型是 byte[],内容类型是 application/octet-stream。处理器处理输入的 byte[] 图像,并输出增强的 byte[] 图像负载和 json 头。
处理器的输入是图像字节数组,输出是增强后的图像字节数组,并且包含一个JSON标头semantic_segmentation,格式如下:
[
[ 0, 0, 0 ],
[ 127, 127, 127 ],
[ 255, 255, 255 ]
...
]
输出头的json格式表示从输入图像计算出的颜色像素图。
6.9.2. 选项
- semantic.segmentation.color-map-uri
-
每个预训练模型都基于某些对象颜色映射。预定义的选项有:- classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json (字符串,默认:
classpath:/colormap/citymap_colormap.json) - semantic.segmentation.debug-output
-
将输出图像保存在本地 debugOutputPath 路径下。(布尔类型,默认:
false) - semantic.segmentation.debug-output-path
-
<文档缺失> (字符串,默认:
semantic-segmentation-result.png) - semantic.segmentation.mask-transparency
-
计算分割掩码图像的alpha颜色。(浮点数,默认:
0.45) - semantic.segmentation.model
-
预训练的TensorFlow语义分割模型。(字符串,默认:
https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb) - semantic.segmentation.output-type
-
指定输出图像类型。您可以返回带有计算掩码叠加的输入图像,或者仅返回掩码。(OutputType,默认值:
<none>,可选值:blended,mask)
6.10. 脚本处理器
使用脚本转换消息的处理器。脚本正文作为属性值直接提供。可以指定脚本语言(groovy/javascript/ruby/python)。
6.10.1. 选项
脚本处理器有以下选项:script-processor
- script-processor.language
-
脚本属性中文本的语言。支持的类型为:groovy、javascript、ruby、python。(字符串,默认值:
<none>) - script-processor.script
-
脚本文本。(字符串,默认:
<none>) - script-processor.variables
-
变量绑定作为以换行符分隔的名称-值对字符串,例如 'foo=bar baz=car'。(属性,默认:
<none>) - script-processor.variables-location
-
包含自定义脚本变量绑定的属性文件的位置。(资源,缺省:
<none>)
6.11. 分割处理器
拆分器应用程序建立在Spring Integration中的同名概念之上,允许将单个消息拆分为多个不同的消息。
处理器使用一个函数作为输入Message<?>,然后根据各种属性产生输出List<Message<?>(见下文)。
您可以使用SpEL表达式或定界符来指定如何拆分传入的消息。
负载
-
传入的有效负载 -
Message<?>
如果传入的类型是 byte[] 并且内容类型设置为 text/plain 或 application/json,则应用程序会将 byte[] 转换为 String。
-
传出负载 -
List<Message<?>
6.11.2. 选项
- splitter.apply-sequence
-
在标头中添加关联/序列信息,以方便稍后的聚合。(布尔值,默认:
true) - splitter.charset
-
在将基于文本的文件中的字节转换为字符串时要使用的字符集。(String,默认:
<none>) - splitter.delimiters
-
当表达式为 null 时,用于对
{@link String}负载进行标记化的分隔符。 (字符串,默认值:<none>) - splitter.expression
-
用于拆分有效负载的 SpEL 表达式。(字符串,默认值:
<none>) - splitter.file-markers
-
设置为 true 或 false,以使用一个 {@code FileSplitter}(按行拆分基于文本的文件)来包含(或不包含)文件开头/结尾标记。(布尔型,默认:
<none>) - splitter.markers-json
-
当 'fileMarkers == true' 时,指定它们是否应作为 FileSplitter.FileMarker 对象或 JSON 生成。(布尔值,默认:
true)
6.12. 转换处理器
转换器处理器允许您基于SpEL表达式来转换消息负载结构。
以下是运行此应用程序的示例。
java -jar filter-processor-kafka-<version>.jar --spel.function.expression=toUpperCase()
如果你想将其运行在 RabbitMQ 上,请将 Kafka 更改为 Rabbit。
6.13. Twitter趋势和趋势位置处理器
能够返回热门话题或热门话题位置的处理器。twitter.trend.trend-query-type属性允许选择查询类型。
6.13.2. 获取趋势位置
在此模式下,将twitter.trend.trend-query-type设置为trendLocation。
通过位置检索热门话题的完整列表或附近的位置列表。
如果未提供latitude、longitude参数,则处理器执行可用趋势API,并返回Twitter具有话题趋势信息的位置。
如果提供了latitude、longitude参数,处理器会执行趋势最近API并返回Twitter具有趋势话题信息的位置,这些位置最接近指定的位置。
响应是一个 locations 数组,用于编码位置的 WOEID 和其他人类可读信息,例如规范名称和该位置所属国家。
7. 消息通道
7.1. Cassandra Sink(Cassandra 接收器)
此接收应用程序将接收到的每个消息的内容写入Cassandra。
它期望一个JSON字符串作为负载,并使用其属性来映射到表列。
7.1.2. 选项
该cassandra接收器具有以下选项:
- spring.data.cassandra.compression
-
通过Cassandra二进制协议支持的压缩。(压缩,默认:
none,可能值:LZ4,SNAPPY,NONE) - spring.data.cassandra.config
-
要使用的配置文件的位置。(资源,默认:
<none>) - spring.data.cassandra.contact-points
-
集群节点地址,格式为“host:port”,或者仅使用配置端口的简单“host”。(List<String>, 默认:
[127.0.0.1:9042]) - spring.data.cassandra.keyspace-name
-
要使用的 Keyspace 名称。(字符串,默认:
<none>) - spring.data.cassandra.local-datacenter
-
被视为“本地”的数据中心。联系点应来自此数据中心。(字符串,默认:
<none>) - spring.data.cassandra.password
-
服务器的登录密码。(字符串,默认:
<none>) - spring.data.cassandra.port
-
如果联系点未指定,则使用的端口。(整数,默认:
9042) - spring.data.cassandra.schema-action
-
启动时要采取的架构操作。(字符串,默认:
none) - spring.data.cassandra.session-name
-
命名 Cassandra 会话。(字符串,默认值:
<none>) - spring.data.cassandra.ssl
-
启用 SSL 支持。(布尔类型,默认:
false) - spring.data.cassandra.username
-
服务器登录用户。(字符串,默认:
<none>)
7.2. 分析接收器
基于分析消费者构建的接收应用程序,该程序从输入消息中计算分析,并将分析结果作为指标发布到各种监控系统。它利用micrometer库在最流行的监控系统上提供统一的编程体验,并公开Spring表达式语言(SpEL)属性,用于定义如何从输入数据中计算度量名称、值和标签。
分析接收器可以生成两种指标类型:
一个 计量器(例如计数器或仪表)由其唯一的 name 和 dimensions 确定(术语维度和标签可以互换使用)。维度允许对特定命名指标进行切片,以便深入分析和推理数据。
由于指标由其name和dimensions唯一标识,因此您可以为每个指标分配多个标签(例如键/值对),但之后不能随意更改这些标签!如果具有相同名称的指标具有不同的标签集,则监控系统(如Prometheus)会发出警告。 |
使用analytics.name或analytics.name-expression属性设置输出分析指标的名称。如果没有设置,指标名称默认为应用程序的名称。
使用analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>属性可以向您的指标添加一个或多个标签。TAG_NAME在属性定义中将作为标签名称出现在指标中。TAG_VALUE是SpEL表达式,该表达式从传入的消息动态计算出标签值。
表达式使用SpEL和headers关键字来访问消息的头和有效载荷值。
您可以使用字面量(例如 'fixed value')来设置具有固定值的标签。 |
所有流应用程序开箱即用地支持三种最流行的监控系统,Wavefront、Prometheus 和 InfluxDB,您可以通过声明方式启用每种系统。通过只需向 Analytics Sink 应用程序添加它们的 Micrometer Meter-Registry 依赖项即可添加对其他监控系统的支持。
请访问 Spring Cloud Data Flow 流监控 获取配置监控系统的详细说明。以下快速片段可帮助您开始。
-
要启用 Prometheus 计量注册表,请设置以下属性。
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
要启用Wavefront计数器注册表,请设置以下属性。
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
要启用 InfluxDB 计量注册表,请设置以下属性。
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
如果启用了数据流服务器监控,则Analytics Sink将重用提供的度量配置。 |
下图说明了Analytics Sink如何帮助收集股票交易所和实时数据管道的业务内部信息。
7.2.2. 选项
按前缀分组的属性:
分析
- amount-expression
-
用于计算输出指标值(例如金额)的 SpEL 表达式。默认值为 1.0 (表达式,默认:
<none>) - meter-type
-
用于向后端报告指标的 Micrometer 计量类型。(MeterType,默认值:
<none>,可能的取值:counter、gauge) - 姓名
-
输出指标的名称。'name' 和 'nameExpression' 相互排斥,只能设置其中一个。(字符串,默认:
<none>) - name-expression
-
用于从输入消息计算输出指标名称的SpEL表达式。'name' 和 'nameExpression' 是互斥的,只能设置其中一个。(表达式,默认:
<none>)
analytics.tag
- 表达式
-
从 SpEL 表达式计算标签。单个 SpEL 表达式可以生成值数组,这意味着不同的名称/值标签。每个名称/值标签都会产生单独的计量器增量。标签表达式格式是:analytics.tag.expression.[tag-name]=[SpEL 表达式] (Map<String, Expression>, 默认:
<none>) - 固定
-
已弃用:请使用 analytics.tag.expression 并带有字面量 SpEL 表达式。自定义固定标签。这些标签具有常数值,创建一次后会随每次发布的指标一起发送。定义固定标签的约定是:<code>analytics.tag.fixed.[tag-name]=[tag-value]</code>(Map<String, String>, 默认值:
<none>)
7.3. Elasticsearch 写入器
将文档索引到Elasticsearch中的接收器。<br>
此Elasticsearch接收器仅支持索引JSON文档。
它从输入目标获取数据,然后将其索引到Elasticsearch中。
输入数据可以是普通的json字符串,或者表示JSON的java.util.Map。
它还接受作为Elasticsearch提供的XContentBuilder的数据。
然而,在中间件不将记录保存为XContentBuilder的情况下,这种情况很少见。
这主要是为了直接调用消费者而提供的。
7.3.1. 选项
Spring 框架的 Elasticsearch 接收器具有以下选项:
按前缀分组的属性:
elasticsearch.consumer
- 异步
-
指示索引操作是异步还是同步。默认情况下,索引操作是同步进行的。(布尔值,默认:
false) - batch-size
-
每次请求要索引的项目数量。默认值为1。如果设置大于1,则会使用批量索引API。(整数,默认:
1) - group-timeout
-
批量索引激活时,在超时毫秒后将消息组刷新。默认值为-1,表示不会自动刷新空闲的消息组。(Long,默认:
-1) - id
-
要索引的文档的id。如果设置了,则每条消息的INDEX_ID报头值将覆盖此属性。(表达式,默认:
<none>) - 索引
-
索引名称。如果设置,则每个消息的 INDEX_NAME 头值将覆盖此属性。(字符串,默认:
<none>) - 路由
-
指示要路由到的分片。如果没有提供,Elasticsearch 将默认使用文档 ID 的哈希。(字符串,默认:
<none>) - timeout-seconds
-
分片可用的超时时间。如果未设置,则由Elasticsearch客户端默认设为1分钟。(Long,默认:
0)
7.3.2. 运行此接收器的示例
-
从文件夹
elasticsearch-sink:./mvnw clean package -
cd apps
-
切换到正确的绑定器生成的应用程序(Kafka 或 RabbitMQ)
-
./mvnw clean package -
请确保您已运行Elasticsearch。例如,您可以使用以下命令将其作为docker容器运行。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2 -
如果中间件(Kafka 或 RabbitMQ)尚未运行,请先启动它。
-
java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing -
将一些JSON数据发送到中间件目的地。例如:
"foo":"bar"} -
验证数据是否已索引:
curl localhost:9200/testing/_search
7.4. 文件接收器
文件接收应用程序将接收到的每个消息写入到一个文件中。
7.4.2. 选项
代码file-sink有以下选项:
- file.consumer.binary
-
一个标志,用于指示是否应抑制在写入后添加换行符。(布尔值,默认:
false) - file.consumer.charset
-
写入文本内容时使用的字符集。(字符串,默认:
UTF-8) - file.consumer.directory
-
目标文件的父目录。(File,缺省:
<none>) - file.consumer.directory-expression
-
要评估的目标文件的父目录表达式。(字符串,默认:
<none>) - file.consumer.mode
-
如果目标文件已存在,则要使用的 FileExistsMode。(FileExistsMode,默认值:
<none>,可选值:APPEND、APPEND_NO_FLUSH、FAIL、IGNORE、REPLACE、REPLACE_IF_MODIFIED) - file.consumer.name
-
目标文件的名称。(字符串,默认:
file-consumer) - file.consumer.name-expression
-
要计算的目标文件名的表达式。(字符串,默认值:
<none>) - file.consumer.suffix
-
文件名后缀。 (字符串,默认:
<empty string>)
7.5. FTP 接收器
FTP接收器是从传入的消息推送到FTP服务器的简单选项。
它使用ftp-outbound-adapter,因此传入的消息可以是java.io.File对象、String(文件内容)或bytes数组(文件内容也是这样)。
要使用此接收器,您需要一个用户名和密码来登录。
默认情况下,如果未指定,则 Spring Integration 将使用 o.s.i.file.DefaultFileNameGenerator。如果DefaultFileNameGenerator基于file_name标头(如果存在)的值来确定文件名,在MessageHeaders中,或者如果Message的有效载荷已经是java.io.File,则它会使用该文件的原始名称。 |
7.5.4. 可选项
FTP 接收器 具有以下选项:
按前缀分组的属性:
ftp.consumer
- auto-create-dir
-
是否创建远程目录。(布尔值,默认:
true) - filename-expression
-
用于生成远程文件名的 SpEL 表达式。(字符串,默认:
<none>) - 模式
-
远程文件已存在时要采取的操作。(FileExistsMode,默认:
<none>,可选值:APPEND、APPEND_NO_FLUSH、FAIL、IGNORE、REPLACE、REPLACE_IF_MODIFIED) - remote-dir
-
远程 FTP 目录。(字符串,默认:
/) - remote-file-separator
-
远程文件分隔符。(字符串,默认:
/) - temporary-remote-dir
-
如果 '#isUseTemporaryFilename()' 为 true,则文件将被写入的临时目录。(字符串,默认:
/) - tmp-file-suffix
-
传输进行期间使用的后缀。(字符串,默认:
.tmp) - use-temporary-filename
-
是否写入临时文件并重命名。(布尔类型,默认:
true)
7.6. Geode接收器
Geode 接收器将消息内容写入 Geode 区域。
7.6.1. 可选项
该 geode 接收器有以下选项:
按前缀分组的属性:
geode.consumer
- JSON
-
指示 Geode 区域是否将 JSON 对象存储为 PdxInstance。(布尔值,默认:
false) - key-expression
-
作为缓存键使用的 SpEL 表达式。(字符串,默认:
<none>)
geode.pool
- connect-type
-
指定连接类型:
server或locator。(ConnectType,默认值:<none>,可能的取值:locator、server) - host-addresses
-
指定一个或多个Gemfire定位器或服务器地址,格式为[主机]:[端口]。(InetSocketAddress[],默认:
<none>) - subscription-enabled
-
设置为 true 可以启用客户端池的订阅。这是同步更新到客户端缓存所必需的。(Boolean,默认:
false)
geode.security.ssl
- 密码
-
为安全套接字连接配置使用的 SSL 密码,作为有效密码名称的数组。(字符串,默认:
any) - keystore-type
-
标识用于SSL通信的Keystore类型(例如:JKS,PKCS11等)。 (String,默认值:
JKS) - keystore-uri
-
用于连接到 Geode 集群的预创建 Keystore URI。 (Resource, 默认值:
<none>) - ssl-keystore-password
-
访问密钥库 truststore 的密码。 (String,默认值:
<none>) - ssl-truststore-password
-
访问信任存储库的密码。 (String, 默认值:
<none>) - truststore-type
-
标识用于SSL通信的信任存储类型(例如 JKS,PKCS11 等)。 (String,默认值:
JKS) - truststore-uri
-
用于连接到 Geode 集群的预创建信任库 URI 的位置。 (资源, 默认:
<none>) - user-home-directory
-
本地缓存从truststoreUri和keystoreUri位置下载的truststore和keystore文件的目录。 (String, 默认值:
user.home)
7.7. JDBC 处理程序
JDBC接收器允许您将传入的有效负载持久保存到关系型数据库管理系统(RDBMS)数据库中。
jdbc.consumer.columns 属性表示 COLUMN_NAME[:EXPRESSION_FOR_VALUE] 的键值对,其中 EXPRESSION_FOR_VALUE(连同冒号)是可选的。 在这种情况下,该值通过生成的表达式(如 payload.COLUMN_NAME)进行评估,因此我们可以通过这种方式实现从对象属性到表列的直接映射。 例如,我们有一个如下所示的 JSON 负载:
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
因此,我们可以使用以下配置将数据插入到具有 name、city 和 street 结构的表中:
--jdbc.consumer.columns=name,city:address.city,street:address.street
此接收器支持批量插入,具体取决于底层的 JDBC 驱动程序。 通过 batch-size 和 idle-timeout 属性配置批量插入: 当接收到的消息达到 batch-size 条时进行聚合,并作为一批次插入。 如果在没有新消息的情况下经过了 idle-timeout 毫秒,则即使批次大小小于 batch-size,也会插入已聚合的数据,以此来限制最大延迟。
该模块还使用了 Spring Boot 的 DataSource 支持 来配置数据库连接,因此像 spring.datasource.url 等属性也适用。 |
7.7.1. 示例
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names --jdbc.consumer.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.7.2. 选项
jdbc接收器有以下选项:jdbc
按前缀分组的属性:
jdbc.consumer
- batch-size
-
当消息数量达到阈值时,数据将被刷新到数据库表中。(整数,默认:
1) - 列
-
逗号分隔的基于冒号的列名和 SpEL 表达式对,用于插入/更新值。在初始化时使用这些名称来发布 DDL。(字符串,默认:
payload:payload.toString()) - idle-timeout
-
数据自动刷新到数据库表时的空闲超时时间(毫秒)。(Long,默认:
-1) - 初始化
-
'true'、'false' 或表格的自定义初始化脚本的位置。(字符串,默认:
false) - table-name
-
要写入的表名。(字符串,默认:
messages)
spring.datasource
- 数据
-
数据(DML)脚本资源引用。(List<String>,默认:
<none>) - driver-class-name
-
JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:
<none>) - initialization-mode
-
用于确定是否应使用可用的 DDL 和 DML 脚本执行数据源初始化的模式。(DataSourceInitializationMode,默认值:
embedded,可选值:ALWAYS、EMBEDDED、NEVER) - 密码
-
数据库登录密码。(字符串,默认:
<none>) - 架构
-
Schema (DDL) script resource references. (List<String>, default:
<none>) - url
-
数据库的 JDBC URL。(字符串,默认:
<none>) - 用户名
-
数据库登录用户名。(字符串,默认:
<none>)
7.8. 日志接收器
该 log 接收器使用应用程序记录器输出数据以供检查。
请理解,log 接收器使用无类型的处理器,这会影响实际的日志记录方式。这意味着如果内容类型为文本,则原始负载字节将被转换为字符串;否则将记录原始字节。用户指南中包含更多信息。
7.9. MongoDB Sink
此接收应用程序将传入数据存入MongoDB。
该应用程序完全基于MongoDataAutoConfiguration,因此请参阅Spring Boot MongoDB支持获取更多信息。
7.9.2. 选项
MongoDB 接收器具有以下选项:
按前缀分组的属性:
mongodb.consumer
- 集合
-
存储数据的 MongoDB 集合。(字符串,默认:
<none>) - collection-expression
-
要评估 MongoDB 集合的 SpEL 表达式。(表达式,默认:
<none>)
spring.data.mongodb
- authentication-database
-
认证数据库名称。(字符串,默认:
<none>) - auto-index-creation
-
是否启用自动索引创建。(布尔类型,默认:
<none>) - 数据库
-
数据库名称。 (字符串,默认:
<none>) - field-naming-strategy
-
要使用的 FieldNamingStrategy 的完全限定名。(Class<?>, 默认:
<none>) - grid-fs-database
-
<文档缺失> (字符串,默认:
<none>) - 主机
-
Mongo服务器主机。不能与URI一起设置。(字符串,默认:
<none>) - 密码
-
登录 MongoDB 服务器的密码。不能与 URI 一起设置。(字符数组,默认:
<none>) - 端口
-
Mongo服务器端口。不能与URI一起设置。(Integer,默认:
<none>) - replica-set-name
-
群集所需的副本集名称。不能与URI一起设置。(字符串,默认:
<none>) - uri
-
Mongo 数据库连接字符串。不能与主机、端口、凭据和副本集名称一起设置。(字符串,默认:
mongodb://localhost/test) - 用户名
-
登录 MongoDB 服务器的用户。不能与 URI 同时设置。(字符串,默认:
<none>) - uuid-representation
-
转换 UUID 为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认:
java-legacy,可能的值:UNSPECIFIED、STANDARD、C_SHARP_LEGACY、JAVA_LEGACY、PYTHON_LEGACY)
7.10. MQTT接收器
此模块用于向MQTT发送消息。
7.10.2. 选项
MQTT接收器具有以下选项:mqtt
按前缀分组的属性:
mqtt
- clean-session
-
客户端和服务器是否应记住重启和重新连接之间的状态。(布尔值,默认:
true) - connection-timeout
-
连接超时时间(秒)。 (Integer, 默认值:
30) - keep-alive-interval
-
ping间隔,单位为秒。(整数,默认值:
60) - 密码
-
连接到代理时要使用的密码。(字符串,默认:
guest) - 持久化
-
'memory' 或 'file'。 (字符串,默认:
memory) - persistence-directory
-
持久化目录。 (字符串,默认:
/tmp/paho) - url
-
MQTT代理的位置(以逗号分隔的列表)。(字符串数组,默认:
[tcp://localhost:1883]) - 用户名
-
连接到代理时使用的用户名。(字符串,默认:
guest)
7.11. Pgcopy 填充器
一个模块,使用PostgreSQL COPY命令将传入的有效负载写入RDBMS。
7.11.3. 选项
jdbc接收器有以下选项:jdbc
- spring.datasource.driver-class-name
-
JDBC 驱动程序的完全限定名称。默认情况下根据 URL 自动检测。(字符串,默认:
<none>) - spring.datasource.password
-
数据库登录密码。(字符串,默认:
<none>) - spring.datasource.url
-
数据库的 JDBC URL。(字符串,默认:
<none>) - spring.datasource.username
-
数据库登录用户名。(字符串,默认:
<none>)
该模块还使用了 Spring Boot 的 DataSource 支持 来配置数据库连接,因此像 spring.datasource.url 等属性也适用。 |
7.12. RabbitMQ Sink
此模块向RabbitMQ发送消息。
7.12.1. 选项
该 rabbit 接收器有以下选项:
(请参阅Spring Boot文档中的RabbitMQ连接属性)
按前缀分组的属性:
兔子
- converter-bean-name
-
自定义消息转换器的bean名称;如果省略,则使用SimpleMessageConverter。如果为'jsonConverter',将为您创建一个Jackson2JsonMessageConverter bean。(字符串,默认:
<none>) - 交换
-
交换名称 - 如果提供了 exchangeNameExpression,则会被覆盖。(字符串,默认:
<empty string>) - exchange-expression
-
一个计算结果为交换名称的 SpEL 表达式。(表达式,默认:
<none>) - headers-mapped-last
-
在映射传出消息的标头时,确定是在转换消息之前还是之后进行标头映射。(布尔型,默认:
true) - mapped-request-headers
-
要映射的标头。 (String[], 默认值:
[*]) - own-connection
-
为 true 时,使用基于引导属性的单独连接。(布尔类型,默认值:
false) - persistent-delivery-mode
-
当没有 'amqp_deliveryMode' 标头时的默认传递模式,PERSISTENT 表示为 true。(Boolean,默认:
false) - routing-key
-
路由键 - 如果提供了 routingKeyExpression,则会被覆盖。(字符串,默认:
<none>) - routing-key-expression
-
一个计算路由键的SpEL表达式。(表达式,默认:
<none>)
spring.rabbitmq
- address-shuffle-mode
-
用于打乱配置地址的模式。(AddressShuffleMode,默认值:
none,可选值:NONE、RANDOM、INORDER) - 地址
-
客户端应连接的地址列表,用逗号分隔。设置后,主机和端口将被忽略。(字符串,默认:
<none>) - channel-rpc-timeout
-
通道中RPC调用的继续超时时间。将其设置为零以无限期等待。(持续时间,默认:
10m) - connection-timeout
-
连接超时时间。将其设置为零以无限期等待。(持续时间,默认:
<none>) - 主机
-
RabbitMQ主机。如果设置了地址,则忽略此设置。(字符串,默认:
localhost) - 密码
-
登录以向代理进行身份验证。(字符串,默认:
guest) - 端口
-
RabbitMQ 端口。如果设置了地址,则忽略此设置。默认为 5672,如果启用了 SSL,则默认为 5671。(Integer,默认:
<none>) - publisher-confirm-type
-
发布者类型确认要使用的。(ConfirmType,默认值:
<none>,可能的取值:SIMPLE,CORRELATED,NONE) - publisher-returns
-
是否启用发布商返还。(布尔值,默认:
false) - requested-channel-max
-
客户端请求的每个连接的通道数。使用0表示无限制。(整数,默认:
2047) - requested-heartbeat
-
请求的心跳超时时间;零表示无。如果未指定持续时间后缀,则使用秒。(持续时间,默认:
<none>) - 用户名
-
登录用户以对经纪人进行身份验证。(字符串,默认:
guest) - virtual-host
-
连接到代理时使用的虚拟主机。(字符串,默认:
<none>)
7.13. Redis存储
向Redis发送消息。
7.13.1. 选项
Redis接收器具有以下选项:redis
按前缀分组的属性:
redis.consumer
- 键
-
存储密钥时使用的字面量键名。(字符串,默认:
<none>) - key-expression
-
A SpEL 表达式,用于存储到键中。(字符串,默认:
<none>) - 队列
-
存储到队列时要使用的队列名称字面量。(字符串,默认:
<none>) - queue-expression
-
用于队列的 SpEL 表达式。(字符串,默认值:
<none>) - 主题
-
发布到主题时使用的文字主题名称。(字符串,默认:
<none>) - topic-expression
-
用于主题的 SpEL 表达式。(字符串,默认:
<none>)
spring.redis
- client-name
-
连接上要设置的客户端名称,通过 CLIENT SETNAME 设置。 (字符串,默认:
<none>) - client-type
-
要使用的客户端类型。默认情况下,根据类路径自动检测。(ClientType,默认值:
<none>,可能的取值:LETTUCE,JEDIS) - connect-timeout
-
连接超时。(持续时间,默认:
<none>) - 数据库
-
连接工厂使用的数据库索引。(整数,默认:
0) - 主机
-
Redis服务器主机。(字符串,默认:
localhost) - 密码
-
Redis服务器的登录密码。(字符串,默认:
<none>) - 端口
-
Redis服务器端口。(整数,默认:
6379) - SSL
-
是否启用SSL支持。(布尔类型,默认:
false) - timeout
-
读取超时时间。(持续时间,默认:
<none>) - url
-
连接 URL。覆盖主机、端口和密码。用户被忽略。示例:redis://user:[email protected]:6379 (字符串,默认:
<none>) - 用户名
-
登录 Redis 服务器的用户名。(字符串,默认值:
<none>)
spring.redis.jedis.pool
- max-active
-
连接池在某一时间可以分配的最大连接数。使用负值表示无限制。(Integer,默认:
8) - max-idle
-
连接池中"空闲"连接的最大数量。使用负值表示空闲连接数没有上限。(Integer,默认:
8) - max-wait
-
连接池耗尽时,分配连接应阻塞的最大时间(以抛出异常)。使用负值表示无限期阻塞。(持续时间,默认:
-1ms) - min-idle
-
池中要维持的最少空闲连接数的目标。只有当此设置和驱逐运行之间的间隔都为正时,才会产生效果。(整数,默认:
0) - time-between-eviction-runs
-
空闲对象驱逐线程运行的时间间隔。如果为正数,则启动空闲对象驱逐线程,否则不执行任何空闲对象清除操作。(持续时间,默认:
<none>)
spring.redis.lettuce.pool
- max-active
-
连接池在某一时间可以分配的最大连接数。使用负值表示无限制。(Integer,默认:
8) - max-idle
-
连接池中"空闲"连接的最大数量。使用负值表示空闲连接数没有上限。(Integer,默认:
8) - max-wait
-
连接池耗尽时,分配连接应阻塞的最大时间(以抛出异常)。使用负值表示无限期阻塞。(持续时间,默认:
-1ms) - min-idle
-
池中要维持的最少空闲连接数的目标。只有当此设置和驱逐运行之间的间隔都为正时,才会产生效果。(整数,默认:
0) - time-between-eviction-runs
-
空闲对象驱逐线程运行的时间间隔。如果为正数,则启动空闲对象驱逐线程,否则不执行任何空闲对象清除操作。(持续时间,默认:
<none>)
7.14. 路由器接收器
此应用程序将消息路由到命名通道。
7.14.1. 选项
路由器接收器具有以下选项:router
- router.default-output-channel
-
无法路由的消息发送到何处。(字符串,默认:
nullChannel) - router.destination-mappings
-
目的地映射作为以换行符分隔的名称-值对字符串,例如 'foo=bar\ baz=car'。 (属性,默认:
<none>) - router.expression
-
应用于消息以确定路由到的通道的表达式。请注意,对于文本、JSON 或 XML 等内容类型,有效负载的字节格式为 byte[] 而不是 String!请查阅文档了解如何处理字节数组有效负载内容。(表达式,默认:
<none>) - router.refresh-delay
-
检查脚本更改的时间间隔(毫秒);如果为负数,则不刷新。(Integer,默认值:
60000) - router.resolution-required
-
是否需要通道解析。(布尔值,默认:
false) - router.script
-
返回通道或通道映射解析键的 Groovy 脚本的位置。(资源,默认:
<none>) - router.variables
-
变量绑定作为以换行符分隔的名称-值对字符串,例如 'foo=bar baz=car'。(属性,默认:
<none>) - router.variables-location
-
包含自定义脚本变量绑定的属性文件的位置。(资源,缺省:
<none>)
由于这是一个动态路由器,目的地是按需创建的;因此,默认情况下,只有当Binder无法绑定到目标时,才会使用defaultOutputChannel和resolutionRequired。 |
您可以使用spring.cloud.stream.dynamicDestinations属性限制动态绑定的创建。默认情况下,所有解析的目标都将被动态绑定;如果此属性具有逗号分隔的目标名称列表,则只有这些目标会被绑定。解析到不在该列表中的目标的消息将路由到defaultOutputChannel,它也必须出现在列表中。
destinationMappings 用于将评估结果映射到实际的目标名称。
7.14.2. 基于 SpEL 的路由
该表达式针对消息进行求值,并返回一个频道名称,或者是一个频道名称映射表的键。
有关更多详细信息,请参阅Spring集成参考手册中的“路由器和Spring表达式语言(SpEL)”小节。配置通用路由器部分。
从 Spring Cloud Stream 2.0 开始,json、text 和 xml 内容类型的消息线格式为 byte[] 而不是 String!
这是与 SCSt 1.x 的变更,后者将这些类型视为字符串!
根据内容类型的不同,处理 byte[] 负载有不同的技术。对于普通的 text
内容类型,可以使用 new String(payload) SpEL 表达式将八位字节负载转换成字符串。
对于 json
类型,jsonPath() SpEL 工具
已经支持字符串和字节数组内容的互换。
同样的规则适用于 xml 内容类型和
#xpath() SpEL 工具。 |
例如对于 text 内容类型,应使用:
new String(payload).contains('a')
并且对于 json 内容类型,SpEL 表达式如下:
#jsonPath(payload, '$.person.name')
7.14.3. 基于Groovy的路由
除了使用 SpEL 表达式,还可以使用 Groovy 脚本。让我们在文件系统中创建一个 Groovy 脚本,路径为“file:/my/path/router.groovy”,或者“classpath:/my/path/router.groovy”:
println("Groovy processing payload '" + payload + "'");
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
如果您想向脚本传递变量值,可以使用variables选项静态绑定值,或者选择性地通过propertiesLocation选项传递包含绑定属性文件的路径。文件中的所有属性都将作为变量提供给脚本。您可以同时指定variables和propertiesLocation,在这种情况下,作为variables提供的任何重复值将覆盖在propertiesLocation中提供的值。
请注意,payload和headers是隐式绑定的,以便您能够访问消息中包含的数据。
有关更多信息,请参阅 Spring Integration 参考手册Groovy 支持
7.16. Amazon S3 接收器
此接收应用程序支持将对象传输到Amazon S3存储桶。
文件负载(以及递归目录)被传输到remote目录(S3存储桶),然后传输到应用程序部署所在的local目录。
此接收器接受的消息必须包含以下之一:payload
-
File,包括用于递归上传的目录; -
InputStream; -
byte[]
7.16.1. 选项
该s3接收器具有以下选项:
按前缀分组的属性:
s3.common
- endpoint-url
-
连接到与 S3 兼容的存储的可选端点 URL。(字符串,默认:
<none>) - path-style-access
-
使用路径样式访问。(Boolean,默认值:
false)
s3.consumer
- 访问控制列表
-
S3 对象访问控制列表。(预定义访问控制列表,缺省值:
<none>,可取值:private,public-read,public-read-write,authenticated-read,log-delivery-write,bucket-owner-read,bucket-owner-full-control,aws-exec-read) - acl-expression
-
要评估的 S3 对象访问控制列表表达式。(表达式,默认:
<none>) - 桶
-
目标文件存储的 AWS 存储桶。(字符串,默认:
<none>) - bucket-expression
-
要计算的AWS存储桶名称表达式。(表达式,默认:
<none>) - key-expression
-
要评估的 S3 对象键表达式。(表达式,默认:
<none>)
基于AmazonS3SinkConfiguration生成的目标应用程序可以使用S3MessageHandler.UploadMetadataProvider和/或S3ProgressListener进行增强,这两个组件被注入到S3MessageHandler Bean中。有关详细信息,请参阅Spring集成AWS支持。
7.16.2. 亚马逊AWS通用选项
Amazon S3 连接器(如同所有其他 Amazon AWS 应用程序)基于 Spring Cloud AWS 项目作为基础,其自动配置类由 Spring Boot 自动使用。请查阅相关文档了解所需和有用的自动配置属性。
其中一些是关于AWS凭据的:
-
cloud.aws.credentials.accessKey
-
cloud.aws.credentials.secretKey
-
cloud.aws.credentials.instanceProfile
-
cloud.aws.credentials.profileName
-
cloud.aws.credentials.profilePath
其他用于 AWS Region 定义:
-
cloud.aws.region.auto
-
cloud.aws.region.static
对于 AWS Stack:
-
cloud.aws.stack.auto
-
cloud.aws.stack.name
7.17. SFTP接收器
SFTP接收器是从传入消息向SFTP服务器推送文件的简单选项。
它使用sftp-outbound-adapter,因此传入的消息可以是java.io.File对象、String(文件内容)或bytes数组(文件内容也是这样)。
要使用此接收器,您需要一个用户名和密码来登录。
默认情况下,如果未指定,则 Spring Integration 将使用 o.s.i.file.DefaultFileNameGenerator。如果DefaultFileNameGenerator基于file_name标头(如果存在)的值来确定文件名,在MessageHeaders中,或者如果Message的有效载荷已经是java.io.File,则它会使用该文件的原始名称。 |
配置 sftp.factory.known-hosts-expression 选项时,评估的根对象是应用程序上下文,例如sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'。
7.17.3. 选项
该 sftp 接收器具有以下选项:
按前缀分组的属性:
sftp.consumer
- auto-create-dir
-
是否创建远程目录。(布尔值,默认:
true) - filename-expression
-
用于生成远程文件名的 SpEL 表达式。(字符串,默认:
<none>) - 模式
-
远程文件已存在时要采取的操作。(FileExistsMode,默认:
<none>,可选值:APPEND、APPEND_NO_FLUSH、FAIL、IGNORE、REPLACE、REPLACE_IF_MODIFIED) - remote-dir
-
远程 FTP 目录。(字符串,默认:
/) - remote-file-separator
-
远程文件分隔符。(字符串,默认:
/) - temporary-remote-dir
-
如果
isUseTemporaryFilename()为 true,则文件将被写入的临时目录。(字符串,默认:/) - tmp-file-suffix
-
传输进行期间使用的后缀。(字符串,默认:
.tmp) - use-temporary-filename
-
是否写入临时文件并重命名。(布尔类型,默认:
true)
sftp.consumer.factory
- allow-unknown-keys
-
允许未知或已更改的密钥。 (Boolean,默认:
false) - cache-sessions
-
缓存会话。 (布尔值,默认:
<none>) - 主机
-
服务器的主机名。(字符串,默认:
localhost) - known-hosts-expression
-
一个解析为已知主机文件位置的 SpEL 表达式。(表达式,默认:
<none>) - pass-phrase
-
用户私钥的密码。(字符串,默认:
<empty string>) - 密码
-
连接到服务器时使用的密码。(字符串,默认:
<none>) - 端口
-
服务器的端口。(整数,默认:
22) - private-key
-
用户私钥的位置。 (资源,默认:
<none>) - 用户名
-
连接服务器时要使用的用户名。(字符串,默认值:
<none>)
7.18. TCP接收器
此模块使用编码器通过TCP写入消息。
TCP是一个流式协议,需要一些机制来在传输线上封装消息。提供了多种编码器,默认的是'CRLF'。
7.20. Twitter 消息接收器
从认证用户向指定用户发送直接消息。需要JSON POST请求体,并将Content-Type头部设置为application/json。
| 当您收到用户的消息时,可以在24小时内回复最多5条消息。每收到一条消息都会重置24小时的时间窗口和5条消息的配额。在24小时内发送第6条消息或在24小时时间窗口外发送消息会被计入速率限制。此行为仅适用于使用POST direct_messages/events/new端点时。 |
Spring Expression Language (SpEL) 表达式用于从输入消息计算请求参数。
7.20.1. 选项
使用单引号 (') 来包裹 SpEL 表达式属性的字面值。 例如,要设置固定的消息文本,请使用 text='Fixed Text'。 对于固定的目标 userId,请使用 userId='666'。 |
- twitter.message.update.media-id
-
与消息关联的媒体ID。直接消息只能引用单个媒体ID。(表达式,默认:
<none>) - twitter.message.update.screen-name
-
要发送直聊的用户的屏幕名称。(表达式,默认:
<none>) - twitter.message.update.text
-
直接消息文本。如有必要,请进行URL编码。最大长度为10,000个字符。(表达式,默认:
payload) - twitter.message.update.user-id
-
要发送直聊的消息的用户 ID。 (表达式,默认:
<none>)
7.21. 推特更新接收器
更新认证用户的当前文本(例如发布推文)。
| 每次更新尝试时,都会将更新文本与认证用户的近期推文进行比较。任何会导致重复的尝试都将被阻止,并导致返回403错误。用户不能连续两次提交相同的文本。 |
虽然API本身没有进行速率限制,但用户在一次时间内可以创建的推文数量是有限制的。标准API的更新限制为每3小时窗口内最多300条。如果该用户的更新次数达到当前允许的上限,此方法将返回HTTP 403错误。
您可以在此处找到更新API的详细信息:developer.twitter.com/zh-CN/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.21.1. 选项
按前缀分组的属性:
twitter.update
- attachment-url
-
(SpEL 表达式) 要使 URL 不计入扩展推文的文本正文中,请提供作为推文附件的 URL。该 URL 必须是推文永久链接或直接消息深度链接。任意非 Twitter 的 URL 必须保留在文本中。传递给 attachment_url 参数且不符合任何推文永久链接或直接消息深度链接的 URL 将在创建推文时失败并引发异常。(表达式,默认:
<none>) - display-coordinates
-
(SpEL表达式) 是否在发送推文的确切坐标上放置一个标记。 (表达式,默认:
<none>) - in-reply-to-status-id
-
(SpEL 表达式) 已存在的文本ID,更新将是对此文本的回复。注意:除非在此参数引用的推文作者在文本中被提及,否则将忽略此参数。因此,您必须包含@username,其中username是所引用推文的作者,在更新中。当设置inReplyToStatusId时,auto_populate_reply_metadata也会自动设置。这确保从原始推文中查找前导@mentions,并将其添加到新推文中。这将在扩展推文的元数据中追加@mentions,直到达到@mentions的限制。如果原始推文已被删除,则回复将会失败。(表达式,默认:
<none>) - media-ids
-
(SpEL 表达式) 与推文关联的媒体 ID 列表,用逗号分隔。您可以在一条推文中包含最多 4 张照片或 1 个动画 GIF 或 1 个视频。有关上传媒体的更多详细信息,请参阅上传媒体。(表达式,默认:
<none>) - place-id
-
(SpEL表达式)世界上的一个地方。(表达式,默认:
<none>) - text
-
(SpEL 表达式) 文本更新的文本。如有需要,请进行 URL 编码。t.co 链接包裹将影响字符计数。默认为消息的有效负载(表达式,默认:
payload)
7.22. 波前接收器
Wavefront 接收器消耗 Message<?>,将其转换为波形数据格式中的指标,然后直接发送到 Wavefront 或者 Wavefront 代理。Wavefront 数据格式
支持常见的ETL用例,其中现有的(历史)指标数据必须经过清洗、转换并存储在Wavefront中以供进一步分析。
7.22.1. 选项
波形图接收器具有以下选项:Wavefront
- wavefront.api-token
-
Wavefront API 访问Tokens。(字符串,默认:
<none>) - wavefront.metric-expression
-
一个SpEL表达式,用于计算指标值。(表达式,默认:
<none>) - wavefront.metric-name
-
指标的名称,默认为应用程序名称。(字符串,默认值:
<none>) - wavefront.proxy-uri
-
波形代理的URL。(字符串,默认:
<none>) - wavefront.source
-
发出指标的唯一应用程序、主机、容器或实例。(字符串,默认:
<none>) - wavefront.tag-expression
-
与指标点标记关联的自定义元数据集合不能为空。键的有效字符为字母数字、连字符('-')、下划线('_')、点('.')。对于值,任何字符都允许,包括空格。要包含双引号,请用反斜杠转义,反斜杠不能是标记值中的最后一个字符。点标记键和值组合的最大长度为254个字符(包括分隔键和值的 '=' 号,共255个字符)。如果值较长,则拒绝该点并记录日志(Map<String, Expression>, 默认:
<none>) - wavefront.timestamp-expression
-
一个 SpEL 表达式,用于计算指标的时间戳(可选)。(表达式,默认:
<none>) - wavefront.uri
-
Wavefront 环境的 URL。(字符串,默认:
<none>)
7.23. WebSocket接收器
一个简单的 Websocket Sink 实现。
7.23.1. 选项
支持以下选项:<br/>
- websocket.consumer.log-level
-
netty通道的日志级别。默认为<tt>WARN</tt>(字符串,默认:
<none>) - websocket.consumer.path
-
WebSocketSink 消费者需要连接的路径。默认值为 <tt>/websocket</tt> (字符串,默认:
/websocket) - websocket.consumer.port
-
Netty服务器监听的端口。默认为9292(整数,默认:
9292) - websocket.consumer.ssl
-
是否创建一个 {@link io.netty.handler.ssl.SslContext}。 (Boolean,缺省值:
false) - websocket.consumer.threads
-
Netty {@link io.netty.channel.EventLoopGroup} 的线程数。默认是 <tt>1</tt> (Integer,默认:
1)
7.23.2. 示例
为了验证 websocket-sink 是否从其他 spring-cloud-stream 应用接收消息,您可以使用以下简单的端到端设置。
步骤 3:部署<br>websocket-sink
最后,以trace模式启动一个websocket接收器,以便在日志中看到由time-source产生的消息:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
你应该开始在启动WebsocketSink的控制台中看到日志消息,如下所示:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.23.3. 执行器
您可以使用Endpoint来访问最后发送和接收的n条消息。您必须通过提供--endpoints.websocketconsumertrace.enabled=true来启用它。默认情况下,它会通过host:port/websocketconsumertrace显示最后100条消息。以下是样本输出:
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
还有一个简单的HTML页面,您可以在文本区域中看到转发的消息。您可以直接通过浏览器中的host:port访问它。
7.24. ZeroMQ接收器
"zeromq"接收器启用向ZeroMQ套接字发送消息。
7.24.3. 选项
该 zeromq 接收器有以下选项:
- zeromq.consumer.connect-url
-
连接到 ZeroMQ 套接字的连接 URL。(字符串,默认值:
<none>) - zeromq.consumer.socket-type
-
连接应建立的套接字类型。(SocketType,默认:
<none>,可能值:PAIR,PUB,SUB,REQ,REP,DEALER,ROUTER,PULL,PUSH,XPUB,XSUB,STREAM) - zeromq.consumer.topic
-
发送消息给订阅者之前,用于评估主题的表达式。 (表达式,默认:
<none>)