应用
5. 来源
5.1. Debezium 源
基于 Debezium 引擎的变更数据捕获 (CDC) 源。
这Debezium Source
允许捕获数据库更改事件并通过不同的消息绑定器流式传输这些事件,例如Apache Kafka
,RabbitMQ
以及所有 Spring Cloud Stream 支持代理。
此源可以与任何 Spring Cloud Stream 消息绑定器一起使用。 它不受 Kafka Connect 框架的限制或依赖。尽管这种方法很灵活,但它有一定的局限性。 |
支持所有 Debezium 配置属性。
只需在任何 Debezium 属性之前使用debezium.properties.
前缀。
例如,要将 Debezium 的connector.class
属性使用debezium.properties.connector.class
source 属性。
5.1.1. 数据库支持
这Debezium Source
目前支持多个数据存储的 CDC:MySQL、PostgreSQL、MongoDB、Oracle、SQL Server、Db2、Vitess 和 Spanner 数据库。
5.1.2. 选项
事件扁平化配置
Debezium 提供了一种全面的消息格式,可以准确地详细说明有关系统中发生的更改的信息。
但是,有时这种格式可能不适合下游使用者,这可能需要对消息进行格式化,以便字段名称和值以简化的flattened
结构。
要简化 Debezium 连接器生成的事件记录的格式,您可以使用 Debezium 事件扁平化消息转换。 使用讨人喜欢的配置,您可以配置简单的消息格式,如下所示:
--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
Debezium 偏移存储
当 Debezium 源代码运行时,它会从源代码读取信息并定期记录offsets
定义了它处理了多少信息。
如果源重新启动,它将使用最后记录的偏移量来了解它应该在源信息中恢复读取的位置。
开箱即用,提供了以下偏移存储配置选项:
-
内存中
Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
-
本地文件系统
Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat (1) --debezium.properties.offset.flush.interval.ms=60000 (2)
1 要存储偏移的文件的路径。在以下情况下需要 offset.storage`
设置为FileOffsetBackingStore
.2 尝试提交偏移量的时间间隔。默认值为 1 分钟。 -
Kafka 主题
Uses a Kafka topic to store offset data.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore --debezium.properties.offset.storage.topic=my-kafka-offset-topic (1) --debezium.properties.offset.storage.partitions=2 (2) --debezium.properties.offset.storage.replication.factor=1 (3) --debezium.properties.offset.flush.interval.ms=60000 (4)
1 要存储偏移量的 Kafka 主题的名称。在以下情况下需要 offset.storage
设置为KafkaOffsetBackingStore
.2 创建偏移存储主题时使用的分区数。 3 创建偏移存储主题时使用的复制因子。 4 尝试提交偏移量的时间间隔。默认值为 1 分钟。
可以实现org.apache.kafka.connect.storage.OffsetBackingStore
接口,以提供绑定到自定义后端键值存储的偏移存储。
连接器属性
下表列出了每个连接器的所有可用 Debezium 属性。
这些属性可以通过在前缀debezium.properties.
前缀。
5.1.3. 示例和测试
debezium 集成测试使用在本地计算机上运行的数据库夹具。利用 Testcontainers 的帮助预构建 debezium docker 数据库镜像。
要从 IDE 运行和调试测试,您需要从命令行部署所需的数据库映像。 以下说明说明介绍了如何从 Docker 镜像运行预配置的测试数据库。
MySQL
启动debezium/example-mysql
在 docker 中:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
(可选)用mysql 客户端连接到数据库并创建debezium 具有所需凭据的用户: |
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
使用以下属性将 Debezium 源连接到 MySQL DB:
debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector (1)
debezium.properties.name=my-connector (2)
debezium.properties.topic.prefix=my-topic (2)
debezium.properties.database.server.id=85744 (2)
debezium.properties.database.user=debezium (3)
debezium.properties.database.password=dbz (3)
debezium.properties.database.hostname=localhost (3)
debezium.properties.database.port=3306 (3)
debezium.properties.schema=true (4)
debezium.properties.key.converter.schemas.enable=true (4)
debezium.properties.value.converter.schemas.enable=true (4)
debezium.properties.transforms=unwrap (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (5)
debezium.properties.transforms.unwrap.add.fields=name,db (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none (5)
debezium.properties.transforms.unwrap.drop.tombstones=true (5)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (6)
1 | 将 Debezium 源配置为使用 MySqlConnector。 |
2 | 元数据用于识别和调度传入事件。 |
3 | 连接到运行在localhost:3306 如debezium 用户。 |
4 | 在ChangeEvent 消息。 |
5 | 启用更改事件展平。 |
6 | 在多个启动之间保留的源状态。 |
您还可以运行DebeziumDatabasesIntegrationTest#mysql()
使用此 MySQL 配置。
禁用 mysql GenericContainer 测试初始化代码。 |
PostgreSQL
从debezium/example-postgres:1.0
Docker 镜像:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final
您可以像这样连接到此服务器:
psql -U postgres -h localhost -p 5432
使用以下属性将 Debezium 源连接到 PostgreSQL:
debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=postgres (4)
debezium.properties.database.password=postgres (4)
debezium.properties.database..dbname=postgres (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=5432 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 | 配置Debezium Source 使用 PostgresConnector。 |
2 | 配置 Debezium 引擎以使用memory 商店。 |
3 | 元数据用于识别和调度传入事件。 |
4 | 连接到运行在localhost:5432 如postgres 用户。 |
5 | 在消息中包括“更改事件值”架构。 |
6 | 启用 Chage 事件展平。 |
您还可以运行DebeziumDatabasesIntegrationTest#postgres()
使用此 postgres 配置。
禁用 postgres GenericContainer 测试初始化代码。 |
Mongo数据库
从debezium/example-mongodb:2.3.3.Final
容器镜像:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:2.3.3.Final
初始化库存集合
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
在mongodb
终端输出,搜索像host: "3f95a8a6516e:27017"
:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
加127.0.0.1 3f95a8a6516e
进入您的/etc/hosts
使用以下属性将 Debezium 源连接到 MongoDB:
debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector (1)
debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.mongodb.hosts=rs0/localhost:27017 (3)
debezium.properties.topic.prefix=dbserver1 (3)
debezium.properties.mongodb.user=debezium (3)
debezium.properties.mongodb.password=dbz (3)
debezium.properties.database.whitelist=inventory (3)
debezium.properties.tasks.max=1 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 | 配置Debezium Source 使用 MongoDB 连接器。 |
2 | 配置 Debezium 引擎以使用memory . |
3 | 连接到运行在localhost:27017 如debezium 用户。 |
4 | debezium.io/docs/connectors/mongodb/#tasks |
5 | 在SourceRecord 事件。 |
6 | 启用 Chnage 事件展平化。 |
您还可以运行DebeziumDatabasesIntegrationTest#mongodb()
使用此 mongoDB 配置。
SQL 服务器
开始一个sqlserver
从debezium/example-postgres:1.0
Docker 镜像:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
使用示例数据表单填充 debezium SqlServer 教程:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
使用以下属性将 Debezium 源连接到 SQLServer:
debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=sa (4)
debezium.properties.database.password=Password! (4)
debezium.properties.database..dbname=testDB (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=1433 (4)
1 | 配置Debezium Source 以使用 SqlServerConnector。 |
2 | 配置 Debezium 引擎以使用memory 状态存储。 |
3 | 元数据用于识别和调度传入事件。 |
4 | 连接到运行在localhost:1433 如sa 用户。 |
您还可以运行DebeziumDatabasesIntegrationTest#sqlServer()
使用此 SqlServer 配置。
神谕
从 localhost 启动可访问的 Oracle,并使用 Debezium Vagrant 设置中描述的配置、用户和授权进行设置
使用 Debezium Oracle 教程填充示例数据:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. 文件源
此应用程序轮询目录并将新文件或其内容发送到输出通道。默认情况下,文件源将 File 的内容作为字节数组提供。但是,可以使用 --file.supplier.mode 选项对其进行自定义:
-
ref 提供 java.io.File 引用
-
行 将逐行拆分文件并为每行发出一条新消息
-
内容默认值。将文件的内容作为字节数组提供
使用时--file.supplier.mode=lines
,您还可以提供附加选项--file.supplier.withMarkers=true
.
如果设置为 true,则基础 FileSplitter 将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。
这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker
.如果未显式设置,则选项 withMarkers 默认为 false。
5.3. FTP 源
此源应用程序支持使用 FTP 协议传输文件。
文件从remote
目录到local
部署应用的目录。
默认情况下,源发出的消息作为字节数组提供。但是,这可能是
使用--mode
选择:
-
裁判提供一个
java.io.File
参考 -
线将逐行拆分文件并为每一行发出一条新消息
-
内容默认值。将文件的内容作为字节数组提供
使用时--mode=lines
,您还可以提供附加选项--withMarkers=true
.
如果设置为true
,基础FileSplitter
将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。
这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker
.选项withMarkers
默认为false
如果未显式设置。
另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。
5.3.1. 输入
N/A(从 FTP 服务器获取文件)。
5.3.2. 输出
模式 = 内容
头:
-
Content-Type: application/octet-stream
-
file_originalFile: <java.io.File>
-
file_name: <file name>
有效载荷:
一个byte[]
填充文件内容。
模式 = 线
头:
-
Content-Type: text/plain
-
file_orginalFile: <java.io.File>
-
file_name: <file name>
-
correlationId: <UUID>
(每行相同) -
sequenceNumber: <n>
-
sequenceSize: 0
(在读取文件之前不知道行数)
有效载荷:
一个String
对于每行。
第一行前面有一条消息,其中有START
标记有效负载。最后一行后面是一条消息,其中有END
标记有效载荷。
标记的存在和格式由with-markers
和markers-json
性能。
模式 = ref
头:
没有。
有效载荷:
一个java.io.File
对象。
5.3.4. 示例
java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
--ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo
5.4. Http 源代码
侦听 HTTP 请求并将正文作为消息有效负载发出的源应用程序。
如果 Content-Type 匹配text/*
或application/json
,有效负载将是一个字符串,
否则,有效负载将是一个字节数组。
5.4.1. 有效载荷:
如果内容类型匹配text/*
或application/json
-
String
如果内容类型不匹配text/*
或application/json
-
byte array
5.5. JDBC 源代码
此源轮询来自 RDBMS 的数据。
此来源完全基于DataSourceAutoConfiguration
,因此请参阅 Spring Boot JDBC 支持 了解更多信息。
5.5.1. 有效载荷
-
Map<String, Object>
什么时候jdbc.split == true
(默认)和List<Map<String, Object>>
否则
5.5.2. 选项
jdbc 源具有以下选项:
另请参阅 Spring Boot 文档以获取补充DataSource
properties 和TriggerProperties
和MaxMessagesProperties
用于轮询选项。
5.7. Apache Kafka 源代码
此模块使用来自 Apache Kafka 的消息。
5.7.1. 选项
kafka 源代码具有以下选项:
(有关 Spring for Apache Kafka 配置属性,请参阅 Spring Boot 文档)
5.8. 负载发生器源
发送生成的数据并将其分派到流的源。
5.8.1. 选项
负载发生器源具有以下选项:
- load-generator.generate-timestamp
-
是否生成时间戳。(布尔值,默认:
false
) - 加载生成器.消息计数
-
消息计数。(整数,默认:
1000
) - load-generator.message-size
-
消息大小。(整数,默认:
1000
) - load-generator.producers
-
生产者数量。(整数,默认:
1
)
5.10. MongoDB 源代码
此源轮询来自 MongoDB 的数据。
此来源完全基于MongoDataAutoConfiguration
,因此请参阅 Spring Boot MongoDB 支持 了解更多信息。
5.10.1. 选项
mongodb 源代码具有以下选项:
另请参阅 Spring Boot 文档以获取更多内容MongoProperties
性能。
See 和TriggerProperties
用于轮询选项。
5.12. RabbitMQ 源代码
“rabbit”源允许从 RabbitMQ 接收消息。
在部署流之前,队列必须存在;它们不会自动创建。 您可以使用 RabbitMQ Web UI 轻松创建队列。
5.12.1. 输入
不适用
5.12.2. 输出
有效载荷
-
byte[]
5.12.3. 选项
兔子源有以下选项:
另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的附加属性。
关于重试的说明
使用默认的 ackMode (AUTO) 和重新排队 (true) 选项,将重试失败的消息传递
无限期。
由于兔源没有太多的加工,因此源本身失败的风险很小,除非
下游Binder 由于某种原因未连接。
将 requeue 设置为 false 将导致邮件在第一次尝试时被拒绝(并可能发送到死信
Exchange/Queue(如果代理已如此配置)。
enableRetry 选项允许配置重试参数,以便可以重试失败的消息传递,并且
当重试用尽时,最终被丢弃(或死信)。
在重试间隔期间,传递线程将挂起。
重试选项包括 enableRetry、maxAttempts、initialRetryInterval、retryMultipler 和 maxRetryInterval。
永远不会重试因 MessageConversionException 而失败的消息传递;假设如果消息
第一次尝试时无法转换,后续尝试也将失败。
此类消息将被丢弃(或死信)。 |
5.12.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:
$ ./mvnw clean package
5.12.5. 示例
java -jar rabbit-source.jar --rabbit.queues=
5.13. Amazon S3 源代码
此源应用程序支持使用 Amazon S3 协议传输文件。
文件从remote
目录(S3 存储桶)到local
部署应用程序的目录。
默认情况下,源发出的消息作为字节数组提供。但是,这可能是
使用--mode
选择:
-
裁判提供一个
java.io.File
参考 -
线将逐行拆分文件并为每一行发出一条新消息
-
内容默认值。将文件的内容作为字节数组提供
使用时--mode=lines
,您还可以提供附加选项--withMarkers=true
.
如果设置为true
,基础FileSplitter
将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。
这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker
.选项withMarkers
默认为false
如果未显式设置。
另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。
5.13.1. 模式 = 行
头:
-
Content-Type: text/plain
-
file_orginalFile: <java.io.File>
-
file_name: <file name>
-
correlationId: <UUID>
(每行相同) -
sequenceNumber: <n>
-
sequenceSize: 0
(在读取文件之前不知道行数)
有效载荷:
一个String
对于每行。
第一行前面有一条消息,其中有START
标记有效负载。最后一行后面是一条消息,其中有END
标记有效载荷。
标记的存在和格式由with-markers
和markers-json
性能。
5.13.2. 模式 = ref
头:
没有。
有效载荷:
一个java.io.File
对象。
5.13.4. Amazon AWS 通用选项
Amazon S3 源(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,其自动配置类由 Spring Boot 自动使用。 请参阅他们的文档,了解所需和有用的自动配置属性。
5.13.5. 示例
java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines
5.14. SFTP 源
此源应用程序支持使用 SFTP 协议传输文件。
文件从remote
目录到local
部署应用的目录。
默认情况下,源发出的消息作为字节数组提供。但是,这可能是
使用--mode
选择:
-
裁判提供一个
java.io.File
参考 -
线将逐行拆分文件并为每一行发出一条新消息
-
内容默认值。将文件的内容作为字节数组提供
使用时--mode=lines
,您还可以提供附加选项--withMarkers=true
.
如果设置为true
,基础FileSplitter
将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。
这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker
.选项withMarkers
默认为false
如果未显式设置。
看sftp-supplier
以获取高级配置选项。
另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。
5.14.1. 输入
N/A(从 SFTP 服务器获取文件)。
5.14.2. 输出
模式 = 内容
头:
-
Content-Type: application/octet-stream
-
file_name: <file name>
-
file_remoteFileInfo <file metadata>
-
file_remoteHostPort: <host:port>
-
file_remoteDirectory: <relative-path>
-
file_remoteFile: <file-name>
-
sftp_selectedServer: <server-key>
(如果是多源)
有效载荷:
一个byte[]
填充文件内容。
模式 = 线
头:
-
Content-Type: text/plain
-
file_name: <file name>
-
correlationId: <UUID>
(每行相同) -
sequenceNumber: <n>
-
sequenceSize: 0
(在读取文件之前不知道行数) -
file_marker : <file marker>
(如果启用了 with-markers)
有效载荷:
一个String
对于每行。
第一行前面有一条消息,其中有START
标记有效负载。最后一行后面是一条消息,其中有END
标记有效载荷。
标记的存在和格式由with-markers
和markers-json
性能。
模式 = ref
头:
-
file_remoteHostPort: <host:port>
-
file_remoteDirectory: <relative-path>
-
file_remoteFile: <file-name>
-
file_originalFile: <absolute-path-of-local-file>
-
file_name <local-file-name>
-
file_relativePath
-
file_remoteFile: <remote-file-name>
-
sftp_selectedServer: <server-key>
(如果是多源)
有效载荷:
一个java.io.File
对象。
5.14.4. 示例
java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
--sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo
5.16. TCP的
这tcp
源充当服务器,允许远程方连接到它并通过原始 TCP 套接字提交数据。
TCP 是一种流式处理协议,需要某种机制来构建在线路上的消息。许多解码器是 可用,默认值为与 Telnet 兼容的“CRLF”。
TCP 源应用程序生成的消息具有byte[]
有效载荷。
5.16.1. 选项
5.16.2. 可用的解码器
- CRLF(默认)
-
以回车符 (0x0d) 结尾的文本,后跟换行符 (0x0a)
- 如果
-
以换行符结尾的文本 (0x0a)
- 零
-
以空字节 (0x00) 结尾的文本
- STXETX
-
前面是 STX (0x02) 并以 ETX (0x03) 结尾的文本
- 生
-
no structure - 客户端通过关闭套接字来指示完整的消息
- L1
-
前面有一个 1 字节(无符号)长度字段的数据(最多支持 255 字节)
- L2
-
数据前面有一个两个字节(无符号)长度字段(最多 216-1 字节)
- L4
-
数据前面有一个四字节(有符号)长度字段(最多 231-1 字节)
5.17. 时间源
时间源将只是每隔一段时间发出一个包含当前时间的字符串。
5.17.1. 选项
时间源具有以下选项:
spring.integration.轮询器
- 克罗恩
-
用于轮询的 Cron 表达式。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认:
<none>
) - 固定延迟
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认:
<none>
) - 固定利率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认:
<none>
) - 初始延迟
-
轮询初始延迟。申请 'fixedDelay' 和 'fixedRate';“cron”被忽略。(持续时间,默认:
<none>
) - 每次轮询的最大消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认:
<none>
) - 接收超时
-
轮询消息的等待时间。(持续时间,默认:
1s
)
5.18. Twitter 消息源
重复检索过去 30 天内的直接消息(已发送和已接收),按时间倒序排序。
释放的消息被缓存(在MetadataStore
cache)以防止重复。
默认情况下,内存中的SimpleMetadataStore
被使用。
这twitter.message.source.count
控制返回的消息数。
这spring.cloud.stream.poller
属性控制消息轮询间隔。必须与使用的 API 保持一致速率限制
5.18.1. 选项
5.19. Twitter 搜索源
Twitter 的标准搜索 API(搜索/推文)允许对最近或流行的推文的索引进行简单查询。 这Source
提供针对过去 7 天内发布的最近推文样本的连续搜索。“公共”API 集的一部分。
返回与指定查询匹配的相关推文的集合。
使用spring.cloud.stream.poller
属性来控制连续搜索请求之间的间隔。速率限制 - 每 30 分钟窗口 180 个请求(例如 ~6 r/m,~ 1 req / 10 秒)
这twitter.search
查询属性允许按关键字查询并按时间和地理位置过滤结果。
这twitter.search.count
和twitter.search.page
根据搜索 API 控制结果分页。
注意:Twitter 的搜索服务以及搜索 API 并不意味着成为推文的详尽来源。并非所有推文都会被索引或通过搜索界面提供。
5.19.1. 选项
5.20. Twitter 流源
-
这
Filter API
返回与一个或多个过滤器谓词匹配的公共状态。 多个参数允许使用与流式处理API的单个连接。 提示:这track
,follow
和locations
字段与 OR 运算符组合! 查询track=foo
和follow=1234
返回匹配的推文test
或由用户创建1234
. -
这
Sample API
返回所有公共状态的小型随机样本。 默认访问级别返回的推文是相同的,因此,如果两个不同的客户端连接到此端点,它们将看到相同的推文。
默认访问级别允许多达 400 个跟踪关键字、5,000 个关注用户 ID 和 25 个 0.1-360 度位置框。
5.20.1. 选项
5.21. Websocket 源代码
这Websocket
通过 Web 套接字生成消息的源。
5.21.1. 选项
5.21.2. 示例
要验证 websocket-source 是否从 Websocket 客户端接收消息,您可以使用以下简单的端到端设置。
第 1 步:启动 kafka
第 2 步:部署websocket-source
在特定端口上,比如 8080
第 3 步:在端口 8080 路径“/websocket”上连接一个 websocket 客户端,并发送一些消息。
您可以启动 kafka 控制台使用者并在那里查看消息。
5.22. XMPP 源
“xmpp”源允许从 XMPP 服务器接收消息。
5.22.1. 输入
不适用
5.22.2. 输出
有效载荷
-
byte[]
5.22.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:
$ ./mvnw clean package
5.22.5. 示例
java -jar xmpp-source.jar --xmpp.factory.host=localhost --xmpp.factory.port=5222 --xmpp.factory.user=jane --xmpp.factory.password=secret --xmpp.factory.service-name=localhost
5.23. ZeroMQ 源代码
“zeromq”源允许从 ZeroMQ 接收消息。
5.23.1. 输入
不适用
5.23.2. 输出
有效载荷
-
byte[]
5.23.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:
$ ./mvnw clean package
5.23.5. 示例
java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=
6. 处理器
6.1. 聚合处理器
聚合器处理器使应用程序能够将传入消息聚合到组中,并将它们释放到输出目标中。
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。
6.1.1. 有效载荷
如果输入有效负载是byte[]
并且 content-type 标头是 JSON,则JsonBytesToMap
函数尝试将此有效负载反序列化为Map
以便在聚合器函数的输出上更好地表示数据。
此外,这样的Map
数据表示使从下面提到的 SpEL 表达式中轻松访问有效负载内容变得容易。
否则(包括反序列化错误),输入有效负载将保持原样 - 它是目标应用程序配置,用于将其转换为所需的形式。
6.1.2. 选项
6.3. 滤波处理器
筛选处理器使应用程序能够检查传入的有效负载,然后对其应用谓词,以决定是否需要继续记录。
例如,如果传入有效负载的类型为String
如果您想过滤掉任何少于五个字符的内容,您可以运行过滤处理器,如下所示。
java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。
6.3.1. 有效载荷
您可以将任何类型作为有效负载传递,然后对其应用 SpEL 表达式进行过滤。
如果传入类型为byte[]
并将内容类型设置为text/plain
或application/json
,则应用程序将byte[]
到String
.
6.3.2. 选项
6.4. Groovy 处理器
对消息应用 Groovy 脚本的处理器。
6.4.1. 选项
groovy-processor 处理器具有以下选项:
- groovy-processor.script
-
引用用于处理消息的脚本。(资源,默认值:
<none>
) - groovy-processor.变量
-
变量绑定为换行符,以名称-值对为分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<none>
) - groovy-processor.变量位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<none>
)
6.5. 标头富集处理器
使用标头扩充器应用添加邮件标头。
标头以换行符分隔键值对的形式提供,其中键是标头名称,值是 SpEL 表达式。 例如--headers='foo=payload.someProperty \n bar=payload.otherProperty'
.
6.6. Http 请求处理器
向 HTTP 资源发出请求并将响应正文作为消息有效负载发出的处理器应用程序。
6.6.1. 输入
头
任何必需的 HTTP 标头都必须通过headers
或headers-expression
财产。 请参阅下面的示例。标头值也可用于构造:
-
请求正文在
body-expression
财产。 -
HTTP 方法在
http-method-expression
财产。 -
在
url-expression
财产。
有效载荷
默认情况下,有效负载用作 POST 请求的请求正文,并且可以是任何 Java 类型。 它应该是 GET 请求的空字符串。 有效负载还可用于构造:
-
请求正文在
body-expression
财产。 -
HTTP 方法在
http-method-expression
财产。 -
在
url-expression
财产。
底层 WebClient 支持 Jackson JSON 序列化,以支持任何请求和响应类型(如有必要)。
这expected-response-type
属性String.class
默认情况下,可以设置为应用程序类路径中的任何类。
请注意,用户定义的有效负载类型需要将所需的依赖项添加到您的 pom 文件中。
6.6.2. 输出
头
没有 HTTP 消息头映射到出站消息。
有效载荷
原始输出对象是 ResponseEntity<?>其任何字段(例如body
,headers
) 或访问器方法 (statusCode
) 可以作为reply-expression
.
默认情况下,出站消息有效负载是响应正文。
请注意,ResponseEntity(由表达式#root
) 默认情况下不能被 Jackson 反序列化,但可以呈现为HashMap
.
6.6.3. 选项
http-request 处理器具有以下选项:
processors.adoc 中未解析的指令 - 包括:/home/runner/work/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/.。/../applications/processor/image-recognition-processor/README.adoc[tags=ref-doc]
processors.adoc 中未解析的指令 - 包括:/home/runner/work/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/.。/../applications/processor/object-detection-processor/README.adoc[tags=ref-doc]
processors.adoc 中未解析的指令 - 包括:/home/runner/work/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/.。/../applications/processor/semantic-segmentation-processor/README.adoc[tags=ref-doc]
6.7. 脚本处理器
使用脚本转换消息的处理器。脚本正文直接提供 作为属性值。可以指定脚本的语言(groovy/javascript/ruby/python)。
6.7.1. 选项
脚本处理器处理器具有以下选项:
- 脚本处理器.language
-
script 属性中文本的语言。支持:groovy、javascript、ruby、python。(字符串,默认:
<none>
) - 脚本处理器.script
-
脚本文本。(字符串,默认:
<none>
) - 脚本处理器.变量
-
变量绑定为换行符,以名称-值对为分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<none>
) - 脚本处理器.变量位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<none>
)
6.8. 拆分处理器
拆分器应用程序建立在 Spring Integration 中同名的概念之上,并允许将单个消息拆分为多个不同的消息。
处理器使用一个函数,该函数接受Message<?>
作为输入,然后生成一个List<Message<?>
作为基于各种属性的输出(见下文)。
您可以使用 SpEL 表达式或分隔符来指定要如何拆分传入消息。
6.8.1. 有效载荷
-
传入有效载荷 -
Message<?
>
如果传入类型为byte[]
并将内容类型设置为text/plain
或application/json
,则应用程序将byte[]
到String
.
-
传出有效载荷 -
List<Message<?>
6.8.2. 选项
6.9. 变换处理器
Transformer 处理器允许您根据 SpEL 表达式转换消息有效负载结构。
下面是如何运行此应用程序的示例。
java -jar transform-processor-kafka-<version>.jar \
--spel.function.expression=payload.toUpperCase()
如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。
6.9.1. 有效载荷
传入消息可以包含任何类型的有效负载。
6.9.2. 选项
6.10. Twitter 趋势和趋势位置处理器
可以返回趋势主题或趋势主题的位置的处理器。
这twitter.trend.trend-query-type
属性允许来选择查询类型。
6.10.1. 检索某个位置的趋势主题(可选)
对于此模式集twitter.trend.trend-query-type
自trend
.
6.10.2. 检索趋势位置
对于此模式集twitter.trend.trend-query-type
自trendLocation
.
按位置检索热门主题的完整或附近位置列表。
如果latitude
,longitude
参数不提供处理器执行可用趋势 API 并返回 Twitter 具有趋势主题信息的位置。
如果latitude
,longitude
参数,则处理器执行 Trends Closest API 并返回 Twitter 具有最接近指定位置的趋势主题信息的位置。
Response 是locations
对该位置的 WOEID 和其他一些人类可读信息(例如该位置所属的规范名称和国家/地区)进行编码。
6.10.3. 选项
7. 水槽
7.1. 卡桑德拉水槽
此接收器应用程序将收到的每条消息的内容写入 Cassandra。
它需要 JSON 字符串的有效负载,并使用其属性映射到表列。
7.1.1. 有效载荷
表示要持久化的实体(或实体列表)的 JSON 字符串或字节数组。
7.2. 分析接收器
Sink 应用程序,构建在 Analytics 使用者之上,它根据输入消息计算分析,并将分析作为指标发布到各种监控系统。它利用千分尺库在最流行的监控系统中提供统一的编程体验,并公开了 Spring 表达式语言 (SpEL) 属性,用于定义如何从输入数据计算指标名称、值和标签。
分析接收器可以生成两种指标类型:
仪表(例如计数器或仪表)由其唯一标识name
和dimensions
(术语维度和标记可以互换使用)。维度允许对特定的命名量度进行切片,以向下钻取和推理数据。
作为指标,由其name 和dimensions ,您可以为每个指标分配多个标签(e.g. key/值对),但之后不能随机更改这些标签!如果同名指标具有不同的标签集,Prometheus 等监控系统会抱怨。 |
使用analytics.name
或analytics.name-expression
属性设置输出分析指标的名称。如果未设置,则指标名称默认为应用程序名称。
使用analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>
,属性,用于向指标添加一个或多个标记。 这TAG_NAME
将在指标中显示为标记名称。TAG_VALUE是SpEL
从传入消息动态计算标记值的表达式。
这SpEL
表达式使用headers
和payload
关键字来访问邮件的标头和有效负载值。
您可以使用文字(例如'fixed value' ) 设置具有固定值的标签。 |
所有 Stream 应用程序都支持,盒子的 ouf,三个最流行的监控系统,Wavefront
,Prometheus
和InfluxDB
并且您可以以声明方式启用它们中的每一个。您只需将 micrometer meter-registry 依赖项添加到Analytics Sink
应用。
请访问 Spring Cloud Data Flow Stream Monitoring 以获取配置监控系统的详细说明。以下快速片段可以帮助您入门。
-
要启用 Prometheus 仪表注册表,请设置以下属性。
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
要启用 Wavefront 仪表注册表,请设置以下属性。
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
要启用 InfluxDB 仪表注册表,请设置以下属性。
management.metrics.export.influx.enabled=true management.metrics.export.influx.uri={influxdb-server-url}
如果启用了数据流服务器监控,则Analytics Sink 将重用提供的指标配置。 |
下图说明了如何Analytics Sink
帮助收集企业内部信息,用于证券交易所,实时管道。

7.2.1. 有效载荷
传入消息可以包含任何类型的有效负载。
7.2.2. 选项
7.3. Elasticsearch Sink
将文档索引到 Elasticsearch 中的 Sink。
此 Elasticsearch 接收器仅支持索引 JSON 文档。
它使用来自输入目标的数据,然后将其索引到 Elasticsearch。
输入数据可以是纯 json 字符串,也可以是java.util.Map
表示 JSON。
它还接受 Elasticsearch 提供的数据XContentBuilder
.
但是,这是一种罕见的情况,因为中间件不太可能将记录保留为XContentBuilder
.
这主要用于直接调用消费者。
7.3.2. 运行此接收器的示例
-
从文件夹
elasticsearch-sink
:./mvnw clean package
-
CD 应用程序
-
cd 到正确的 Binder 生成的应用程序(Kafka 或 RabbitMQ)
-
./mvnw clean package
-
确保您正在运行 Elasticsearch。例如,您可以使用以下命令将其作为 docker 容器运行。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
-
如果中间件尚未运行,请启动中间件(Kafka 或 RabbitMQ)。
-
java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing
-
将一些 JSON 数据发送到中间件目标。例如:
{"foo":"bar"}
-
验证数据是否已编制索引:
curl localhost:9200/testing/_search
7.5. FTP 接收器
FTP 接收器是将文件从传入邮件推送到 FTP 服务器的简单选项。
它使用ftp-outbound-adapter
,因此传入消息可以是java.io.File
对象,一个String
(文件内容)
或数组bytes
(文件内容也是如此)。
要使用此接收器,您需要一个用户名和一个密码才能登录。
默认情况下,Spring Integration 将使用o.s.i.file.DefaultFileNameGenerator 如果未指定任何内容。DefaultFileNameGenerator 将确定文件名
基于file_name 标头(如果存在)在MessageHeaders ,或者如果Message 已经是一个java.io.File ,那么它会
使用该文件的原始名称。 |
7.5.1. 标头
-
file_name
(见上文注释)
7.5.2. 有效载荷
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.5.3. 输出
N/A(写入 FTP 服务器)。
7.6. JDBC 接收器
JDBC 接收器允许您将传入的有效负载持久化到 RDBMS 数据库中。
这jdbc.consumer.columns
属性表示COLUMN_NAME[:EXPRESSION_FOR_VALUE]
哪里EXPRESSION_FOR_VALUE
(连同冒号)是可选的。
在这种情况下,值是通过生成的表达式计算的,例如payload.COLUMN_NAME
,因此这样我们就可以从对象属性直接映射到表列。
例如,我们有一个 JSON 有效负载,如下所示:
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
因此,我们可以将其插入到表格中name
,city
和street
结构使用配置:
--jdbc.consumer.columns=name,city:address.city,street:address.street
此接收器支持批量插入,只要基础 JDBC 驱动程序支持。
批次插入通过batch-size
和idle-timeout
性能:
传入消息将聚合到batch-size
消息存在,然后作为批处理插入。
如果idle-timeout
毫秒过去没有新消息,则插入聚合的批次,即使它小于batch-size
,限制最大延迟。
该模块还使用 Spring Boot 的 DataSource 支持来配置数据库连接,因此像spring.datasource.url 等适用。 |
7.6.1. 示例
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
--jdbc.consumer.columns=name \
--spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.6.2. 有效载荷
7.8. 原木汇
这log
Sink 使用应用程序记录器输出数据以供检查。
敬请谅解log
接收器使用无类型处理程序,这会影响实际日志记录的执行方式。
这意味着,如果内容类型是文本的,则原始有效负载字节将转换为 String,否则将记录原始字节。
请参阅用户指南中的更多信息。
7.9. MongoDB 接收器
此接收器应用程序将传入数据摄取到 MongoDB 中。
此应用程序完全基于MongoDataAutoConfiguration
,因此请参阅 Spring Boot MongoDB 支持 了解更多信息。
7.9.1. 输入
有效载荷
-
任何 POJO
-
String
-
byte[]
7.11. Pgcopy 接收器
使用 PostgreSQL COPY 命令将其传入有效负载写入 RDBMS 的模块。
7.11.1. 输入
头
有效载荷
-
任何
列表达式将根据消息进行计算,并且表达式通常仅与一种类型(例如 Map 或 bean 等)兼容。
7.11.2. 输出
不适用
7.11.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:
$ ./mvnw clean package
要运行集成测试,请在本地主机上启动 PostgreSQL 数据库:
docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest
7.11.5. 示例
java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.14. 路由器接收器
此应用程序将消息路由到命名通道。
7.14.1. 选项
路由器接收器具有以下选项:
- router.default-输出绑定
-
在何处发送不可路由的消息。(字符串,默认:
<none>
) - router.destination-mappings
-
目标映射作为以名称-值对为分隔的新行字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<none>
) - router.expression
-
要应用于消息以确定要路由到的通道的表达式。请注意,text、json 或 xml 等内容类型的有效负载线格式是 byte[] 而不是 String!。有关如何处理字节数组有效负载内容,请参阅文档。(表达式,默认值:
<none>
) - router.refresh-delay
-
检查脚本更改的频率(以毫秒为单位)(如果存在);< 0 表示不刷新。(整数,默认:
60000
) - 需要 router.resolution-required
-
是否需要通道分辨率。(布尔值,默认:
false
) - 路由器.脚本
-
返回通道或通道映射分辨率键的 groovy 脚本的位置。(资源,默认值:
<none>
) - router.变量
-
变量绑定为换行符,以名称-值对为分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<none>
) - router.变量位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<none>
)
此路由器接收器基于StreamBridge API 来自 Spring Cloud Stream,因此可以根据需要创建目标。
在这种情况下,一个defaultOutputBinding 只有当 key 未包含在destinationMappings .
这resolutionRequired = true 忽略defaultOutputBinding 如果没有映射并且尚未声明相应的绑定,则会抛出异常。 |
您可以使用spring.cloud.stream.dynamicDestinations
财产。 默认情况下,所有已解析的目标都将动态绑定;如果此属性具有以逗号分隔的目标名称列表,则仅绑定这些目标名称。解析到不在此列表中的目标的消息将被路由到defaultOutputBinding
,也必须出现在列表中。
这destinationMappings
用于将评估结果映射到实际的目标名称。
7.14.2. 基于 SpEL 的路由
该表达式根据消息进行计算,并返回通道名称或通道名称映射的键。
有关更多信息,请参阅 Spring Integration 参考手册配置通用路由器部分中的“路由器和 Spring 表达式语言 (SpEL)”小节。
7.14.3. 基于 Groovy 的路由
除了 SpEL 表达式,还可以使用 Groovy 脚本。让我们在文件系统中的“file:/my/path/router.groovy”或“classpath:/my/path/router.groovy”创建一个 Groovy 脚本:
println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
如果要将变量值传递给脚本,可以使用 variables 选项静态绑定值,或者选择使用 propertiesLocation 选项将路径传递到包含绑定的属性文件。 文件中的所有属性都将作为变量提供给脚本。您可以同时指定变量和 propertiesLocation,在这种情况下,作为变量提供的任何重复值都会覆盖 propertiesLocation 中提供的值。 请注意,有效负载和标头是隐式绑定的,以允许您访问消息中包含的数据。
有关更多信息,请参阅 Spring Integration 参考手册 Groovy 支持。
7.16. 亚马逊 S3 接收器
此接收器应用程序支持将对象传输到 Amazon S3 存储桶。
文件有效负载(和目录递归)将传输到remote
目录(S3 存储桶)到local
部署应用程序的目录。
此接收器接受的消息必须包含payload
如:
-
File
,包括递归上传的目录; -
InputStream
; -
byte[]
7.16.1. 选项
s3 接收器具有以下选项:
目标生成的应用程序基于AmazonS3SinkConfiguration
可以通过S3MessageHandler.UploadMetadataProvider
和/或S3ProgressListener
,这些被注入S3MessageHandler
豆。
有关更多详细信息,请参阅 Spring Integration AWS 支持。
7.16.2. Amazon AWS 通用选项
Amazon S3 Sink(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,其自动配置 类由 Spring Boot 自动使用。 请参阅他们的文档,了解所需和有用的自动配置属性。
其中一些是关于 AWS 凭证的:
-
spring.cloud.aws.credentials.accessKey
-
spring.cloud.aws.credentials.secretKey
-
spring.cloud.aws.credentials.instanceProfile
-
spring.cloud.aws.credentials.profileName
-
spring.cloud.aws.credentials.profile路径
其他适用于 AWSRegion
定义:
-
cloud.aws.region.auto
-
cloud.aws.region.static
例子
java -jar s3-sink.jar --s3.bucket=/tmp/bar
7.17. SFTP 接收器
SFTP 接收器是一个简单的选项,用于将文件从传入邮件推送到 SFTP 服务器。
它使用sftp-outbound-adapter
,因此传入消息可以是java.io.File
对象,一个String
(文件内容)
或数组bytes
(文件内容也是如此)。
要使用此接收器,您需要一个用户名和一个密码才能登录。
默认情况下,Spring Integration 将使用o.s.i.file.DefaultFileNameGenerator 如果未指定任何内容。DefaultFileNameGenerator 将确定文件名
基于file_name 标头(如果存在)在MessageHeaders ,或者如果Message 已经是一个java.io.File ,那么它会
使用该文件的原始名称。 |
配置sftp.factory.known-hosts-expression
选项,则评估的根对象是应用程序上下文,例如可能是sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
.
7.17.1. 输入
头
-
file_name
(见上文注释)
有效载荷
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.17.2. 输出
N/A(写入 SFTP 服务器)。
7.18. TCP接收器
此模块使用编码器将消息写入 TCP。
TCP 是一种流式处理协议,需要某种机制来构建在线路上的消息。许多编码器是 available,默认值为“CRLF”。
7.18.2. 可用的编码器
- CRLF(默认)
-
以回车符 (0x0d) 结尾的文本,后跟换行符 (0x0a)
- 如果
-
以换行符结尾的文本 (0x0a)
- 零
-
以空字节 (0x00) 结尾的文本
- STXETX
-
前面是 STX (0x02) 并以 ETX (0x03) 结尾的文本
- 生
-
no structure - 客户端通过关闭套接字来指示完整的消息
- L1
-
前面有一个 1 字节(无符号)长度字段的数据(最多支持 255 字节)
- L2
-
数据前面有一个两个字节(无符号)长度字段(最多 216-1 字节)
- L4
-
数据前面有一个四字节(有符号)长度字段(最多 231-1 字节)
7.19. 吞吐量接收器
接收器,它将按选定的时间间隔对消息进行计数并记录观察到的吞吐量。
7.19.1. 选项
吞吐量接收器具有以下选项:
- 吞吐量.report-every-ms
-
报告的频率。(整数,默认:
1000
)
7.20. Twitter 消息接收器
从身份验证用户向指定用户发送直接消息。
需要 JSON POST 正文和Content-Type
header 设置为application/json
.
当收到来自用户的消息时,您可以在 24 小时内发送最多 5 条消息作为响应。 收到的每条消息都会重置 24 小时窗口和 5 条分配的消息。 在 24 小时窗口内发送第 6 条消息或在 24 小时窗口之外发送消息将计入速率限制。 此行为仅适用于使用 POST direct_messages/events/new 端点时。 |
SpEL 表达式用于从输入消息中计算请求参数。
7.20.1. 选项
使用单引号 () 将' SpEL expression 属性。例如,要设置固定消息文本,请使用text='Fixed Text' . 对于固定目标 userId,请使用userId='666' . |
7.21. Twitter 更新接收器
更新身份验证用户的当前文本(例如推文)。
对于每次更新尝试,都会将更新文本与身份验证用户最近的推文进行比较。任何可能导致重复的尝试都将被阻止,从而导致 403 错误。用户不能连续提交两次相同的文本。 |
虽然不受 API 的速率限制,但用户一次可以创建的推文数量受到限制。标准 API 的更新限制为 3 小时窗口内 300 个。如果用户发布的更新数量达到当前允许的限制,此方法将返回 HTTP 403 错误。
您可以在此处找到更新 API 的详细信息:developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.21.1. 选项
7.22. 波前汇
Wavefront 接收器使用 Messages<?>,将其转换为 Wavefront 数据格式的指标,并将指标直接发送到 Wavefront 或 Wavefront 代理。
支持常见的 ETL 用例,其中必须清理、转换现有(历史)指标数据并将其存储在 Wavefront 中以进行进一步分析。
7.23. Websocket 接收器
一个简单的 Websocket Sink 实现。
7.23.2. 示例
要验证 websocket-sink 是否接收来自其他 spring-cloud-stream 应用程序的消息,您可以使用 遵循简单的端到端设置。
第 1 步:启动 Rabbitmq
第 2 步:部署time-source
步骤 3:部署websocket-sink
最后在trace
模式,以便您看到由time-source
在日志中:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
您应该开始在启动 WebsocketSink 的控制台中看到日志消息,如下所示:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.23.3. 执行器
有一个Endpoint
您可以使用它来访问最后一个n
发送和接收的消息。你必须
通过提供--endpoints.websocketconsumertrace.enabled=true
.默认情况下,它通过host:port/websocketconsumertrace
.下面是一个示例输出:
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
还有一个简单的 HTML 页面,您可以在其中看到文本区域中转发的消息。您可以访问
它直接通过host:port
在您的浏览器中。
7.24. XMPP 接收器
“xmpp”接收器允许将消息发送到 XMPP 服务器。
7.24.1. 输入
-
byte[]
7.24.2. 输出
有效载荷
不适用
7.24.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:
$ ./mvnw clean package
7.25. ZeroMQ 接收器
“zeromq”接收器允许将消息发送到 ZeroMQ 套接字。
7.25.1. 输入
-
byte[]
7.25.2. 输出
有效载荷
不适用
7.25.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:
$ ./mvnw clean package
7.25.5. 示例
java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=