应用

5. 来源

5.1. Debezium 源

基于 Debezium 引擎变更数据捕获 (CDC) 源。 这Debezium Source允许捕获数据库更改事件并通过不同的消息绑定器流式传输这些事件,例如Apache Kafka,RabbitMQ以及所有 Spring Cloud Stream 支持代理。spring-doc.cadn.net.cn

此源可以与任何 Spring Cloud Stream 消息绑定器一起使用。 它不受 Kafka Connect 框架的限制或依赖。尽管这种方法很灵活,但它有一定的局限性

支持所有 Debezium 配置属性。 只需在任何 Debezium 属性之前使用debezium.properties.前缀。 例如,要将 Debezium 的connector.class属性使用debezium.properties.connector.classsource 属性。spring-doc.cadn.net.cn

5.1.1. 数据库支持

Debezium Source目前支持多个数据存储的 CDC:MySQLPostgreSQLMongoDBOracleSQL ServerDb2、Vitess Spanner 数据库。spring-doc.cadn.net.cn

5.1.2. 选项

事件扁平化配置

Debezium 提供了一种全面的消息格式,可以准确地详细说明有关系统中发生的更改的信息。 但是,有时这种格式可能不适合下游使用者,这可能需要对消息进行格式化,以便字段名称和值以简化的flattened结构。spring-doc.cadn.net.cn

要简化 Debezium 连接器生成的事件记录的格式,您可以使用 Debezium 事件扁平化消息转换。 使用讨人喜欢的配置,您可以配置简单的消息格式,如下所示:spring-doc.cadn.net.cn

--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
Debezium 偏移存储

当 Debezium 源代码运行时,它会从源代码读取信息并定期记录offsets定义了它处理了多少信息。 如果源重新启动,它将使用最后记录的偏移量来了解它应该在源信息中恢复读取的位置。 开箱即用,提供了以下偏移存储配置选项:spring-doc.cadn.net.cn

  • 内存中spring-doc.cadn.net.cn

    Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
  • 本地文件系统spring-doc.cadn.net.cn

    Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
    --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat (1)
    --debezium.properties.offset.flush.interval.ms=60000 (2)
    1 要存储偏移的文件的路径。在以下情况下需要offset.storage`设置为FileOffsetBackingStore.
    2 尝试提交偏移量的时间间隔。默认值为 1 分钟。
  • Kafka 主题spring-doc.cadn.net.cn

    Uses a Kafka topic to store offset data.
    --debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore
    --debezium.properties.offset.storage.topic=my-kafka-offset-topic (1)
    --debezium.properties.offset.storage.partitions=2 (2)
    --debezium.properties.offset.storage.replication.factor=1 (3)
    --debezium.properties.offset.flush.interval.ms=60000 (4)
    1 要存储偏移量的 Kafka 主题的名称。在以下情况下需要offset.storage设置为KafkaOffsetBackingStore.
    2 创建偏移存储主题时使用的分区数。
    3 创建偏移存储主题时使用的复制因子。
    4 尝试提交偏移量的时间间隔。默认值为 1 分钟。

可以实现org.apache.kafka.connect.storage.OffsetBackingStore接口,以提供绑定到自定义后端键值存储的偏移存储。spring-doc.cadn.net.cn

5.1.3. 示例和测试

debezium 集成测试使用在本地计算机上运行的数据库夹具。利用 Testcontainers 的帮助预构建 debezium docker 数据库镜像。spring-doc.cadn.net.cn

要从 IDE 运行和调试测试,您需要从命令行部署所需的数据库映像。 以下说明说明介绍了如何从 Docker 镜像运行预配置的测试数据库。spring-doc.cadn.net.cn

MySQL

启动debezium/example-mysql在 docker 中:spring-doc.cadn.net.cn

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
(可选)用mysql客户端连接到数据库并创建debezium具有所需凭据的用户:
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

使用以下属性将 Debezium 源连接到 MySQL DB:spring-doc.cadn.net.cn

debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector (1)

debezium.properties.name=my-connector (2)
debezium.properties.topic.prefix=my-topic (2)
debezium.properties.database.server.id=85744 (2)


debezium.properties.database.user=debezium (3)
debezium.properties.database.password=dbz (3)
debezium.properties.database.hostname=localhost (3)
debezium.properties.database.port=3306 (3)

debezium.properties.schema=true (4)
debezium.properties.key.converter.schemas.enable=true (4)
debezium.properties.value.converter.schemas.enable=true (4)

debezium.properties.transforms=unwrap (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (5)
debezium.properties.transforms.unwrap.add.fields=name,db (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none (5)
debezium.properties.transforms.unwrap.drop.tombstones=true (5)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (6)
1 将 Debezium 源配置为使用 MySqlConnector
2 元数据用于识别和调度传入事件。
3 连接到运行在localhost:3306debezium用户。
4 ChangeEvent消息。
5 启用更改事件展平
6 在多个启动之间保留的源状态。

您还可以运行DebeziumDatabasesIntegrationTest#mysql()使用此 MySQL 配置。spring-doc.cadn.net.cn

禁用 mysql GenericContainer 测试初始化代码。
PostgreSQL

debezium/example-postgres:1.0Docker 镜像:spring-doc.cadn.net.cn

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final

您可以像这样连接到此服务器:spring-doc.cadn.net.cn

psql -U postgres -h localhost -p 5432

使用以下属性将 Debezium 源连接到 PostgreSQL:spring-doc.cadn.net.cn

debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector (1)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)

debezium.properties.database.user=postgres  (4)
debezium.properties.database.password=postgres (4)
debezium.properties.database..dbname=postgres (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=5432 (4)

debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)

debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 配置Debezium Source使用 PostgresConnector
2 配置 Debezium 引擎以使用memory商店。
3 元数据用于识别和调度传入事件。
4 连接到运行在localhost:5432postgres用户。
5 在消息中包括“更改事件值”架构。
6 启用 Chage 事件展平

您还可以运行DebeziumDatabasesIntegrationTest#postgres()使用此 postgres 配置。spring-doc.cadn.net.cn

禁用 postgres GenericContainer 测试初始化代码。
Mongo数据库

debezium/example-mongodb:2.3.3.Final容器镜像:spring-doc.cadn.net.cn

docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz  debezium/example-mongodb:2.3.3.Final

初始化库存集合spring-doc.cadn.net.cn

docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'

mongodb终端输出,搜索像host: "3f95a8a6516e:27017":spring-doc.cadn.net.cn

2019-01-10T13:46:10.004+0000 I COMMAND  [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms

127.0.0.1 3f95a8a6516e进入您的/etc/hostsspring-doc.cadn.net.cn

使用以下属性将 Debezium 源连接到 MongoDB:spring-doc.cadn.net.cn

debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector (1)

debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.mongodb.hosts=rs0/localhost:27017 (3)
debezium.properties.topic.prefix=dbserver1 (3)
debezium.properties.mongodb.user=debezium (3)
debezium.properties.mongodb.password=dbz (3)
debezium.properties.database.whitelist=inventory (3)

debezium.properties.tasks.max=1 (4)

debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)

debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 配置Debezium Source使用 MongoDB 连接器
2 配置 Debezium 引擎以使用memory.
3 连接到运行在localhost:27017debezium用户。
4 debezium.io/docs/connectors/mongodb/#tasks
5 SourceRecord事件。
6 启用 Chnage 事件展平化

您还可以运行DebeziumDatabasesIntegrationTest#mongodb()使用此 mongoDB 配置。spring-doc.cadn.net.cn

SQL 服务器

开始一个sqlserverdebezium/example-postgres:1.0Docker 镜像:spring-doc.cadn.net.cn

docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2

使用示例数据表单填充 debezium SqlServer 教程:spring-doc.cadn.net.cn

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

使用以下属性将 Debezium 源连接到 SQLServer:spring-doc.cadn.net.cn

debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector (1)

debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)

debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)

debezium.properties.database.user=sa  (4)
debezium.properties.database.password=Password! (4)
debezium.properties.database..dbname=testDB (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=1433 (4)
1 配置Debezium Source以使用 SqlServerConnector
2 配置 Debezium 引擎以使用memory状态存储。
3 元数据用于识别和调度传入事件。
4 连接到运行在localhost:1433sa用户。

您还可以运行DebeziumDatabasesIntegrationTest#sqlServer()使用此 SqlServer 配置。spring-doc.cadn.net.cn

神谕

从 localhost 启动可访问的 Oracle,并使用 Debezium Vagrant 设置中描述的配置、用户和授权进行设置spring-doc.cadn.net.cn

使用 Debezium Oracle 教程填充示例数据:spring-doc.cadn.net.cn

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

5.2. 文件源

此应用程序轮询目录并将新文件或其内容发送到输出通道。默认情况下,文件源将 File 的内容作为字节数组提供。但是,可以使用 --file.supplier.mode 选项对其进行自定义:spring-doc.cadn.net.cn

使用时--file.supplier.mode=lines,您还可以提供附加选项--file.supplier.withMarkers=true. 如果设置为 true,则基础 FileSplitter 将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。 这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker.如果未显式设置,则选项 withMarkers 默认为 false。spring-doc.cadn.net.cn

5.3. FTP 源

此源应用程序支持使用 FTP 协议传输文件。 文件从remote目录到local部署应用的目录。 默认情况下,源发出的消息作为字节数组提供。但是,这可能是 使用--mode选择:spring-doc.cadn.net.cn

使用时--mode=lines,您还可以提供附加选项--withMarkers=true. 如果设置为true,基础FileSplitter将在实际数据之前和之后发出额外的文件开始文件结束标记消息。 这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker.选项withMarkers默认为false如果未显式设置。spring-doc.cadn.net.cn

另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。spring-doc.cadn.net.cn

5.3.1. 输入

N/A(从 FTP 服务器获取文件)。spring-doc.cadn.net.cn

5.3.2. 输出

模式 = 内容
头:
有效载荷:

一个byte[]填充文件内容。spring-doc.cadn.net.cn

模式 = 线
头:
有效载荷:

一个String对于每行。spring-doc.cadn.net.cn

第一行前面有一条消息,其中有START标记有效负载。最后一行后面是一条消息,其中有END标记有效载荷。spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json性能。spring-doc.cadn.net.cn

模式 = ref
头:
有效载荷:

一个java.io.File对象。spring-doc.cadn.net.cn

5.3.4. 示例

java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
         --ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo

5.4. Http 源代码

侦听 HTTP 请求并将正文作为消息有效负载发出的源应用程序。 如果 Content-Type 匹配text/*application/json,有效负载将是一个字符串, 否则,有效负载将是一个字节数组。spring-doc.cadn.net.cn

5.4.1. 有效载荷:

如果内容类型匹配text/*application/jsonspring-doc.cadn.net.cn

如果内容类型不匹配text/*application/jsonspring-doc.cadn.net.cn

5.5. JDBC 源代码

此源轮询来自 RDBMS 的数据。 此来源完全基于DataSourceAutoConfiguration,因此请参阅 Spring Boot JDBC 支持 了解更多信息。spring-doc.cadn.net.cn

5.5.1. 有效载荷

  • Map<String, Object>什么时候jdbc.split == true(默认)和List<Map<String, Object>>否则spring-doc.cadn.net.cn

5.5.2. 选项

jdbc 源具有以下选项:spring-doc.cadn.net.cn

另请参阅 Spring Boot 文档以获取补充DataSourceproperties 和TriggerPropertiesMaxMessagesProperties用于轮询选项。spring-doc.cadn.net.cn

5.6. JMS 源代码

JMS 源允许从 JMS 接收消息。spring-doc.cadn.net.cn

5.7. Apache Kafka 源代码

此模块使用来自 Apache Kafka 的消息。spring-doc.cadn.net.cn

5.7.1. 选项

kafka 源代码具有以下选项:spring-doc.cadn.net.cn

(有关 Spring for Apache Kafka 配置属性,请参阅 Spring Boot 文档)spring-doc.cadn.net.cn

5.8. 负载发生器源

发送生成的数据并将其分派到流的源。spring-doc.cadn.net.cn

5.8.1. 选项

负载发生器源具有以下选项:spring-doc.cadn.net.cn

load-generator.generate-timestamp

是否生成时间戳。(布尔值,默认:false)spring-doc.cadn.net.cn

加载生成器.消息计数

消息计数。(整数,默认:1000)spring-doc.cadn.net.cn

load-generator.message-size

消息大小。(整数,默认:1000)spring-doc.cadn.net.cn

load-generator.producers

生产者数量。(整数,默认:1)spring-doc.cadn.net.cn

5.9. 邮件源

侦听电子邮件并将邮件正文作为邮件有效负载发出的源应用程序。spring-doc.cadn.net.cn

5.10. MongoDB 源代码

此源轮询来自 MongoDB 的数据。 此来源完全基于MongoDataAutoConfiguration,因此请参阅 Spring Boot MongoDB 支持 了解更多信息。spring-doc.cadn.net.cn

5.10.1. 选项

mongodb 源代码具有以下选项:spring-doc.cadn.net.cn

另请参阅 Spring Boot 文档以获取更多内容MongoProperties性能。 See 和TriggerProperties用于轮询选项。spring-doc.cadn.net.cn

5.11. MQTT 源代码

启用从 MQTT 接收消息的源。spring-doc.cadn.net.cn

5.11.1. 有效载荷:

5.12. RabbitMQ 源代码

“rabbit”源允许从 RabbitMQ 接收消息。spring-doc.cadn.net.cn

在部署流之前,队列必须存在;它们不会自动创建。 您可以使用 RabbitMQ Web UI 轻松创建队列。spring-doc.cadn.net.cn

5.12.1. 输入

5.12.2. 输出

有效载荷

5.12.3. 选项

子源有以下选项:spring-doc.cadn.net.cn

另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的附加属性。spring-doc.cadn.net.cn

关于重试的说明
使用默认的 ackModeAUTO) 和重新排队true) 选项,将重试失败的消息传递 无限期。 由于兔源没有太多的加工,因此源本身失败的风险很小,除非 下游Binder由于某种原因未连接。 将 requeue 设置为 false 将导致邮件在第一次尝试时被拒绝(并可能发送到死信 Exchange/Queue(如果代理已如此配置)。 enableRetry 选项允许配置重试参数,以便可以重试失败的消息传递,并且 当重试用尽时,最终被丢弃(或死信)。 在重试间隔期间,传递线程将挂起。 重试选项包括 enableRetrymaxAttemptsinitialRetryIntervalretryMultiplermaxRetryInterval。 永远不会重试因 MessageConversionException 而失败的消息传递;假设如果消息 第一次尝试时无法转换,后续尝试也将失败。 此类消息将被丢弃(或死信)。

5.12.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

5.12.5. 示例

java -jar rabbit-source.jar --rabbit.queues=

5.13. Amazon S3 源代码

此源应用程序支持使用 Amazon S3 协议传输文件。 文件从remote目录(S3 存储桶)到local部署应用程序的目录。spring-doc.cadn.net.cn

默认情况下,源发出的消息作为字节数组提供。但是,这可能是 使用--mode选择:spring-doc.cadn.net.cn

使用时--mode=lines,您还可以提供附加选项--withMarkers=true. 如果设置为true,基础FileSplitter将在实际数据之前和之后发出额外的文件开始文件结束标记消息。 这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker.选项withMarkers默认为false如果未显式设置。spring-doc.cadn.net.cn

另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。spring-doc.cadn.net.cn

5.13.1. 模式 = 行

头:
有效载荷:

一个String对于每行。spring-doc.cadn.net.cn

第一行前面有一条消息,其中有START标记有效负载。最后一行后面是一条消息,其中有END标记有效载荷。spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json性能。spring-doc.cadn.net.cn

5.13.2. 模式 = ref

头:
有效载荷:

一个java.io.File对象。spring-doc.cadn.net.cn

5.13.4. Amazon AWS 通用选项

Amazon S3 源(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,其自动配置类由 Spring Boot 自动使用。 请参阅他们的文档,了解所需和有用的自动配置属性。spring-doc.cadn.net.cn

5.13.5. 示例

java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines

5.14. SFTP 源

此源应用程序支持使用 SFTP 协议传输文件。 文件从remote目录到local部署应用的目录。 默认情况下,源发出的消息作为字节数组提供。但是,这可能是 使用--mode选择:spring-doc.cadn.net.cn

使用时--mode=lines,您还可以提供附加选项--withMarkers=true. 如果设置为true,基础FileSplitter将在实际数据之前和之后发出额外的文件开始文件结束标记消息。 这 2 个附加标记消息的有效负载类型为FileSplitter.FileMarker.选项withMarkers默认为false如果未显式设置。spring-doc.cadn.net.cn

sftp-supplier以获取高级配置选项。spring-doc.cadn.net.cn

另请参阅 MetadataStore 选项,了解可能的共享持久存储配置,用于防止重启时出现重复消息。spring-doc.cadn.net.cn

5.14.1. 输入

N/A(从 SFTP 服务器获取文件)。spring-doc.cadn.net.cn

5.14.2. 输出

模式 = 内容
头:
有效载荷:

一个byte[]填充文件内容。spring-doc.cadn.net.cn

模式 = 线
头:
有效载荷:

一个String对于每行。spring-doc.cadn.net.cn

第一行前面有一条消息,其中有START标记有效负载。最后一行后面是一条消息,其中有END标记有效载荷。spring-doc.cadn.net.cn

标记的存在和格式由with-markersmarkers-json性能。spring-doc.cadn.net.cn

模式 = ref
头:
有效载荷:

一个java.io.File对象。spring-doc.cadn.net.cn

5.14.4. 示例

java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
         --sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo

5.15. 系统日志

系统日志源通过 UDP、TCP 或两者接收 SYSLOG 数据包。支持RFC3164 (BSD) 和 RFC5424 格式。spring-doc.cadn.net.cn

5.16. TCP的

tcp源充当服务器,允许远程方连接到它并通过原始 TCP 套接字提交数据。spring-doc.cadn.net.cn

TCP 是一种流式处理协议,需要某种机制来构建在线路上的消息。许多解码器是 可用,默认值为与 Telnet 兼容的“CRLF”。spring-doc.cadn.net.cn

TCP 源应用程序生成的消息具有byte[]有效载荷。spring-doc.cadn.net.cn

5.16.2. 可用的解码器

文本数据
CRLF(默认)

以回车符 (0x0d) 结尾的文本,后跟换行符 (0x0a)spring-doc.cadn.net.cn

如果

以换行符结尾的文本 (0x0a)spring-doc.cadn.net.cn

以空字节 (0x00) 结尾的文本spring-doc.cadn.net.cn

STXETX

前面是 STX (0x02) 并以 ETX (0x03) 结尾的文本spring-doc.cadn.net.cn

文本和二进制数据

no structure - 客户端通过关闭套接字来指示完整的消息spring-doc.cadn.net.cn

L1

前面有一个 1 字节(无符号)长度字段的数据(最多支持 255 字节)spring-doc.cadn.net.cn

L2

数据前面有一个两个字节(无符号)长度字段(最多 216-1 字节)spring-doc.cadn.net.cn

L4

数据前面有一个四字节(有符号)长度字段(最多 231-1 字节)spring-doc.cadn.net.cn

5.17. 时间源

时间源将只是每隔一段时间发出一个包含当前时间的字符串。spring-doc.cadn.net.cn

5.17.1. 选项

时间源具有以下选项:spring-doc.cadn.net.cn

spring.integration.轮询器
克罗恩

用于轮询的 Cron 表达式。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认:<none>)spring-doc.cadn.net.cn

固定延迟

轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认:<none>)spring-doc.cadn.net.cn

固定利率

轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认:<none>)spring-doc.cadn.net.cn

初始延迟

轮询初始延迟。申请 'fixedDelay' 和 'fixedRate';“cron”被忽略。(持续时间,默认:<none>)spring-doc.cadn.net.cn

每次轮询的最大消息数

每个轮询周期要轮询的最大消息数。(整数,默认:<none>)spring-doc.cadn.net.cn

接收超时

轮询消息的等待时间。(持续时间,默认:1s)spring-doc.cadn.net.cn

5.18. Twitter 消息源

重复检索过去 30 天内的直接消息(已发送和已接收),按时间倒序排序。 释放的消息被缓存(在MetadataStorecache)以防止重复。 默认情况下,内存中的SimpleMetadataStore被使用。spring-doc.cadn.net.cn

twitter.message.source.count控制返回的消息数。spring-doc.cadn.net.cn

spring.cloud.stream.poller属性控制消息轮询间隔。必须与使用的 API 保持一致速率限制spring-doc.cadn.net.cn

5.19. Twitter 搜索源

Twitter 的标准搜索 API(搜索/推文)允许对最近或流行的推文的索引进行简单查询。 这Source提供针对过去 7 天内发布的最近推文样本的连续搜索。“公共”API 集的一部分。spring-doc.cadn.net.cn

返回与指定查询匹配的相关推文的集合。spring-doc.cadn.net.cn

使用spring.cloud.stream.poller属性来控制连续搜索请求之间的间隔。速率限制 - 每 30 分钟窗口 180 个请求(例如 ~6 r/m,~ 1 req / 10 秒)spring-doc.cadn.net.cn

twitter.search查询属性允许按关键字查询并按时间和地理位置过滤结果。spring-doc.cadn.net.cn

twitter.search.counttwitter.search.page根据搜索 API 控制结果分页。spring-doc.cadn.net.cn

注意:Twitter 的搜索服务以及搜索 API 并不意味着成为推文的详尽来源。并非所有推文都会被索引或通过搜索界面提供。spring-doc.cadn.net.cn

5.20. Twitter 流源

实时推文流筛选器示例 API 支持。spring-doc.cadn.net.cn

  • Filter API返回与一个或多个过滤器谓词匹配的公共状态。 多个参数允许使用与流式处理API的单个连接。 提示:这track,followlocations字段与 OR 运算符组合! 查询track=foofollow=1234返回匹配的推文test 由用户创建1234.spring-doc.cadn.net.cn

  • Sample API返回所有公共状态的小型随机样本。 默认访问级别返回的推文是相同的,因此,如果两个不同的客户端连接到此端点,它们将看到相同的推文。spring-doc.cadn.net.cn

默认访问级别允许多达 400 个跟踪关键字、5,000 个关注用户 ID 和 25 个 0.1-360 度位置框。spring-doc.cadn.net.cn

5.21. Websocket 源代码

Websocket通过 Web 套接字生成消息的源。spring-doc.cadn.net.cn

5.21.2. 示例

要验证 websocket-source 是否从 Websocket 客户端接收消息,您可以使用以下简单的端到端设置。spring-doc.cadn.net.cn

第 1 步:启动 kafka
第 2 步:部署websocket-source在特定端口上,比如 8080
第 3 步:在端口 8080 路径“/websocket”上连接一个 websocket 客户端,并发送一些消息。

您可以启动 kafka 控制台使用者并在那里查看消息。spring-doc.cadn.net.cn

5.22. XMPP 源

“xmpp”源允许从 XMPP 服务器接收消息。spring-doc.cadn.net.cn

5.22.1. 输入

5.22.2. 输出

有效载荷

5.22.3. 选项

xmpp 源具有以下选项:spring-doc.cadn.net.cn

另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的附加属性。spring-doc.cadn.net.cn

5.22.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

5.22.5. 示例

java -jar xmpp-source.jar --xmpp.factory.host=localhost --xmpp.factory.port=5222 --xmpp.factory.user=jane --xmpp.factory.password=secret --xmpp.factory.service-name=localhost

5.23. ZeroMQ 源代码

“zeromq”源允许从 ZeroMQ 接收消息。spring-doc.cadn.net.cn

5.23.1. 输入

5.23.2. 输出

有效载荷

5.23.3. 选项

zeromq 源具有以下选项:spring-doc.cadn.net.cn

另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的附加属性。spring-doc.cadn.net.cn

5.23.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

5.23.5. 示例

java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=

6. 处理器

6.1. 聚合处理器

聚合器处理器使应用程序能够将传入消息聚合到组中,并将它们释放到输出目标中。spring-doc.cadn.net.cn

java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbcspring-doc.cadn.net.cn

如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。spring-doc.cadn.net.cn

6.1.1. 有效载荷

如果输入有效负载是byte[]并且 content-type 标头是 JSON,则JsonBytesToMap函数尝试将此有效负载反序列化为Map以便在聚合器函数的输出上更好地表示数据。 此外,这样的Map数据表示使从下面提到的 SpEL 表达式中轻松访问有效负载内容变得容易。 否则(包括反序列化错误),输入有效负载将保持原样 - 它是目标应用程序配置,用于将其转换为所需的形式。spring-doc.cadn.net.cn

6.2. 桥接处理器

一种处理器,只需将传入有效负载传递给出站,即可桥接输入和输出。spring-doc.cadn.net.cn

6.2.1. 有效载荷

6.3. 滤波处理器

筛选处理器使应用程序能够检查传入的有效负载,然后对其应用谓词,以决定是否需要继续记录。 例如,如果传入有效负载的类型为String如果您想过滤掉任何少于五个字符的内容,您可以运行过滤处理器,如下所示。spring-doc.cadn.net.cn

java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4spring-doc.cadn.net.cn

如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。spring-doc.cadn.net.cn

6.3.1. 有效载荷

您可以将任何类型作为有效负载传递,然后对其应用 SpEL 表达式进行过滤。 如果传入类型为byte[]并将内容类型设置为text/plainapplication/json,则应用程序将byte[]String.spring-doc.cadn.net.cn

6.4. Groovy 处理器

对消息应用 Groovy 脚本的处理器。spring-doc.cadn.net.cn

6.4.1. 选项

groovy-processor 处理器具有以下选项:spring-doc.cadn.net.cn

groovy-processor.script

引用用于处理消息的脚本。(资源,默认值:<none>)spring-doc.cadn.net.cn

groovy-processor.变量

变量绑定为换行符,以名称-值对为分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<none>)spring-doc.cadn.net.cn

groovy-processor.变量位置

包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

6.5. 标头富集处理器

使用标头扩充器应用添加邮件标头。spring-doc.cadn.net.cn

标头以换行符分隔键值对的形式提供,其中键是标头名称,值是 SpEL 表达式。 例如--headers='foo=payload.someProperty \n bar=payload.otherProperty'.spring-doc.cadn.net.cn

6.6. Http 请求处理器

向 HTTP 资源发出请求并将响应正文作为消息有效负载发出的处理器应用程序。spring-doc.cadn.net.cn

6.6.1. 输入

任何必需的 HTTP 标头都必须通过headersheaders-expression财产。 请参阅下面的示例。标头值也可用于构造:spring-doc.cadn.net.cn

有效载荷

默认情况下,有效负载用作 POST 请求的请求正文,并且可以是任何 Java 类型。 它应该是 GET 请求的空字符串。 有效负载还可用于构造:spring-doc.cadn.net.cn

底层 WebClient 支持 Jackson JSON 序列化,以支持任何请求和响应类型(如有必要)。 这expected-response-type属性String.class默认情况下,可以设置为应用程序类路径中的任何类。 请注意,用户定义的有效负载类型需要将所需的依赖项添加到您的 pom 文件中。spring-doc.cadn.net.cn

6.6.2. 输出

没有 HTTP 消息头映射到出站消息。spring-doc.cadn.net.cn

有效载荷

原始输出对象是 ResponseEntity<?>其任何字段(例如body,headers) 或访问器方法 (statusCode) 可以作为reply-expression. 默认情况下,出站消息有效负载是响应正文。 请注意,ResponseEntity(由表达式#root) 默认情况下不能被 Jackson 反序列化,但可以呈现为HashMap.spring-doc.cadn.net.cn

6.6.3. 选项

http-request 处理器具有以下选项:spring-doc.cadn.net.cn

processors.adoc 中未解析的指令 - 包括:/home/runner/work/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/.。/../applications/processor/image-recognition-processor/README.adoc[tags=ref-doc]spring-doc.cadn.net.cn

processors.adoc 中未解析的指令 - 包括:/home/runner/work/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/.。/../applications/processor/object-detection-processor/README.adoc[tags=ref-doc]spring-doc.cadn.net.cn

processors.adoc 中未解析的指令 - 包括:/home/runner/work/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/.。/../applications/processor/semantic-segmentation-processor/README.adoc[tags=ref-doc]spring-doc.cadn.net.cn

6.7. 脚本处理器

使用脚本转换消息的处理器。脚本正文直接提供 作为属性值。可以指定脚本的语言(groovy/javascript/ruby/python)。spring-doc.cadn.net.cn

6.7.1. 选项

脚本处理器处理器具有以下选项:spring-doc.cadn.net.cn

脚本处理器.language

script 属性中文本的语言。支持:groovy、javascript、ruby、python。(字符串,默认:<none>)spring-doc.cadn.net.cn

脚本处理器.script

脚本文本。(字符串,默认:<none>)spring-doc.cadn.net.cn

脚本处理器.变量

变量绑定为换行符,以名称-值对为分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<none>)spring-doc.cadn.net.cn

脚本处理器.变量位置

包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

6.8. 拆分处理器

拆分器应用程序建立在 Spring Integration 中同名的概念之上,并允许将单个消息拆分为多个不同的消息。 处理器使用一个函数,该函数接受Message<?>作为输入,然后生成一个List<Message<?>作为基于各种属性的输出(见下文)。 您可以使用 SpEL 表达式或分隔符来指定要如何拆分传入消息。spring-doc.cadn.net.cn

6.8.1. 有效载荷

如果传入类型为byte[]并将内容类型设置为text/plainapplication/json,则应用程序将byte[]String.spring-doc.cadn.net.cn

6.9. 变换处理器

Transformer 处理器允许您根据 SpEL 表达式转换消息有效负载结构。spring-doc.cadn.net.cn

下面是如何运行此应用程序的示例。spring-doc.cadn.net.cn

java -jar transform-processor-kafka-<version>.jar \
    --spel.function.expression=payload.toUpperCase()

如果要针对 RabbitMQ 运行 kafka,请将其更改为 rabbit。spring-doc.cadn.net.cn

6.9.1. 有效载荷

传入消息可以包含任何类型的有效负载。spring-doc.cadn.net.cn

6.10. Twitter 趋势和趋势位置处理器

可以返回趋势主题或趋势主题的位置的处理器。 这twitter.trend.trend-query-type属性允许来选择查询类型。spring-doc.cadn.net.cn

6.10.1. 检索某个位置的趋势主题(可选)

对于此模式集twitter.trend.trend-query-typetrend.spring-doc.cadn.net.cn

基于趋势 API 的处理器。 返回特定纬度、经度位置附近的趋势主题spring-doc.cadn.net.cn

6.10.2. 检索趋势位置

对于此模式集twitter.trend.trend-query-typetrendLocation.spring-doc.cadn.net.cn

按位置检索热门主题的完整或附近位置列表。spring-doc.cadn.net.cn

如果latitude,longitude参数不提供处理器执行可用趋势 API 并返回 Twitter 具有趋势主题信息的位置。spring-doc.cadn.net.cn

如果latitude,longitude参数,则处理器执行 Trends Closest API 并返回 Twitter 具有最接近指定位置的趋势主题信息的位置。spring-doc.cadn.net.cn

Response 是locations对该位置的 WOEID 和其他一些人类可读信息(例如该位置所属的规范名称和国家/地区)进行编码。spring-doc.cadn.net.cn

7. 水槽

7.1. 卡桑德拉水槽

此接收器应用程序将收到的每条消息的内容写入 Cassandra。spring-doc.cadn.net.cn

它需要 JSON 字符串的有效负载,并使用其属性映射到表列。spring-doc.cadn.net.cn

7.1.1. 有效载荷

表示要持久化的实体(或实体列表)的 JSON 字符串或字节数组。spring-doc.cadn.net.cn

7.2. 分析接收器

Sink 应用程序,构建在 Analytics 使用者之上,它根据输入消息计算分析,并将分析作为指标发布到各种监控系统。它利用千分尺库在最流行的监控系统中提供统一的编程体验,并公开了 Spring 表达式语言 (SpEL) 属性,用于定义如何从输入数据计算指标名称、值和标签。spring-doc.cadn.net.cn

分析接收器可以生成两种指标类型:spring-doc.cadn.net.cn

  • 计数器 - 报告单个指标,即计数,该指标按固定的正量递增。计数器可用于计算数据随时间变化的速率。spring-doc.cadn.net.cn

  • 仪表 - 报告当前值。仪表的典型示例是集合或映射的大小或处于运行状态的线程数。spring-doc.cadn.net.cn

仪表(例如计数器或仪表)由其唯一标识namedimensions(术语维度和标记可以互换使用)。维度允许对特定的命名量度进行切片,以向下钻取和推理数据。spring-doc.cadn.net.cn

作为指标,由其namedimensions,您可以为每个指标分配多个标签(e.g. key/值对),但之后不能随机更改这些标签!如果同名指标具有不同的标签集,Prometheus 等监控系统会抱怨。

使用analytics.nameanalytics.name-expression属性设置输出分析指标的名称。如果未设置,则指标名称默认为应用程序名称。spring-doc.cadn.net.cn

使用analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>,属性,用于向指标添加一个或多个标记。 这TAG_NAME将在指标中显示为标记名称。TAG_VALUE是SpEL从传入消息动态计算标记值的表达式。spring-doc.cadn.net.cn

SpEL表达式使用headerspayload关键字来访问邮件的标头和有效负载值。spring-doc.cadn.net.cn

您可以使用文字(例如'fixed value') 设置具有固定值的标签。

所有 Stream 应用程序都支持,盒子的 ouf,三个最流行的监控系统,Wavefront,PrometheusInfluxDB并且您可以以声明方式启用它们中的每一个。您只需将 micrometer meter-registry 依赖项添加到Analytics Sink应用。spring-doc.cadn.net.cn

请访问 Spring Cloud Data Flow Stream Monitoring 以获取配置监控系统的详细说明。以下快速片段可以帮助您入门。spring-doc.cadn.net.cn

management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
如果启用了数据流服务器监控,则Analytics Sink将重用提供的指标配置。

下图说明了如何Analytics Sink帮助收集企业内部信息,用于证券交易所,实时管道。spring-doc.cadn.net.cn

Analytics Architecture

7.2.1. 有效载荷

传入消息可以包含任何类型的有效负载。spring-doc.cadn.net.cn

7.3. Elasticsearch Sink

将文档索引到 Elasticsearch 中的 Sink。spring-doc.cadn.net.cn

此 Elasticsearch 接收器仅支持索引 JSON 文档。 它使用来自输入目标的数据,然后将其索引到 Elasticsearch。 输入数据可以是纯 json 字符串,也可以是java.util.Map表示 JSON。 它还接受 Elasticsearch 提供的数据XContentBuilder. 但是,这是一种罕见的情况,因为中间件不太可能将记录保留为XContentBuilder. 这主要用于直接调用消费者。spring-doc.cadn.net.cn

7.3.2. 运行此接收器的示例

  1. 从文件夹elasticsearch-sink:./mvnw clean packagespring-doc.cadn.net.cn

  2. CD 应用程序spring-doc.cadn.net.cn

  3. cd 到正确的 Binder 生成的应用程序(Kafka 或 RabbitMQ)spring-doc.cadn.net.cn

  4. ./mvnw clean packagespring-doc.cadn.net.cn

  5. 确保您正在运行 Elasticsearch。例如,您可以使用以下命令将其作为 docker 容器运行。docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2spring-doc.cadn.net.cn

  6. 如果中间件尚未运行,请启动中间件(Kafka 或 RabbitMQ)。spring-doc.cadn.net.cn

  7. java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testingspring-doc.cadn.net.cn

  8. 将一些 JSON 数据发送到中间件目标。例如:{"foo":"bar"}spring-doc.cadn.net.cn

  9. 验证数据是否已编制索引:curl localhost:9200/testing/_searchspring-doc.cadn.net.cn

7.4. 文件接收器

文件接收器应用将其收到的每条消息写入文件。spring-doc.cadn.net.cn

7.4.1. 有效载荷

7.5. FTP 接收器

FTP 接收器是将文件从传入邮件推送到 FTP 服务器的简单选项。spring-doc.cadn.net.cn

它使用ftp-outbound-adapter,因此传入消息可以是java.io.File对象,一个String(文件内容) 或数组bytes(文件内容也是如此)。spring-doc.cadn.net.cn

要使用此接收器,您需要一个用户名和一个密码才能登录。spring-doc.cadn.net.cn

默认情况下,Spring Integration 将使用o.s.i.file.DefaultFileNameGenerator如果未指定任何内容。DefaultFileNameGenerator将确定文件名 基于file_name标头(如果存在)在MessageHeaders,或者如果Message已经是一个java.io.File,那么它会 使用该文件的原始名称。

7.5.1. 标头

7.5.2. 有效载荷

7.5.3. 输出

N/A(写入 FTP 服务器)。spring-doc.cadn.net.cn

7.6. JDBC 接收器

JDBC 接收器允许您将传入的有效负载持久化到 RDBMS 数据库中。spring-doc.cadn.net.cn

jdbc.consumer.columns属性表示COLUMN_NAME[:EXPRESSION_FOR_VALUE]哪里EXPRESSION_FOR_VALUE(连同冒号)是可选的。 在这种情况下,值是通过生成的表达式计算的,例如payload.COLUMN_NAME,因此这样我们就可以从对象属性直接映射到表列。 例如,我们有一个 JSON 有效负载,如下所示:spring-doc.cadn.net.cn

{
  "name": "My Name",
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

因此,我们可以将其插入到表格中name,citystreet结构使用配置:spring-doc.cadn.net.cn

--jdbc.consumer.columns=name,city:address.city,street:address.street

此接收器支持批量插入,只要基础 JDBC 驱动程序支持。 批次插入通过batch-sizeidle-timeout性能: 传入消息将聚合到batch-size消息存在,然后作为批处理插入。 如果idle-timeout毫秒过去没有新消息,则插入聚合的批次,即使它小于batch-size,限制最大延迟。spring-doc.cadn.net.cn

该模块还使用 Spring Boot 的 DataSource 支持来配置数据库连接,因此像spring.datasource.url 等适用

7.6.1. 示例

java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
            --jdbc.consumer.columns=name \
            --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
            --spring.datasource.url='jdbc:mysql://localhost:3306/test

7.6.2. 有效载荷

7.7. Apache Kafka 接收器

此模块将消息发布到 Apache Kafka。spring-doc.cadn.net.cn

7.7.1. 选项

kafka 接收器具有以下选项:spring-doc.cadn.net.cn

(有关 Spring for Apache Kafka 配置属性,请参阅 Spring Boot 文档)spring-doc.cadn.net.cn

7.8. 原木汇

logSink 使用应用程序记录器输出数据以供检查。spring-doc.cadn.net.cn

敬请谅解log接收器使用无类型处理程序,这会影响实际日志记录的执行方式。 这意味着,如果内容类型是文本的,则原始有效负载字节将转换为 String,否则将记录原始字节。 请参阅用户指南中的更多信息。spring-doc.cadn.net.cn

7.9. MongoDB 接收器

此接收器应用程序将传入数据摄取到 MongoDB 中。 此应用程序完全基于MongoDataAutoConfiguration,因此请参阅 Spring Boot MongoDB 支持 了解更多信息。spring-doc.cadn.net.cn

7.9.1. 输入

7.10. MQTT 接收器

该模块向 MQTT 发送消息。spring-doc.cadn.net.cn

7.10.1. 有效载荷:

7.11. Pgcopy 接收器

使用 PostgreSQL COPY 命令将其传入有效负载写入 RDBMS 的模块。spring-doc.cadn.net.cn

7.11.1. 输入

有效载荷

列表达式将根据消息进行计算,并且表达式通常仅与一种类型(例如 Map 或 bean 等)兼容。spring-doc.cadn.net.cn

7.11.2. 输出

7.11.3. 选项

jdbc 接收器具有以下选项:spring-doc.cadn.net.cn

该模块还使用 Spring Boot 的 DataSource 支持来配置数据库连接,因此像spring.datasource.url 等适用

7.11.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

要运行集成测试,请在本地主机上启动 PostgreSQL 数据库:spring-doc.cadn.net.cn

    docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest

7.11.5. 示例

java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

7.12. RabbitMQ Sink

该模块向 RabbitMQ 发送消息。spring-doc.cadn.net.cn

7.12.1. 选项

子水槽有以下选项:spring-doc.cadn.net.cn

(有关 RabbitMQ 连接属性,请参阅 Spring Boot 文档)spring-doc.cadn.net.cn

7.13. Redis 接收器

向 Redis 发送消息。spring-doc.cadn.net.cn

7.14. 路由器接收器

此应用程序将消息路由到命名通道。spring-doc.cadn.net.cn

7.14.1. 选项

路由器接收器具有以下选项:spring-doc.cadn.net.cn

router.default-输出绑定

在何处发送不可路由的消息。(字符串,默认:<none>)spring-doc.cadn.net.cn

router.destination-mappings

目标映射作为以名称-值对为分隔的新行字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<none>)spring-doc.cadn.net.cn

router.expression

要应用于消息以确定要路由到的通道的表达式。请注意,text、json 或 xml 等内容类型的有效负载线格式是 byte[] 而不是 String!。有关如何处理字节数组有效负载内容,请参阅文档。(表达式,默认值:<none>)spring-doc.cadn.net.cn

router.refresh-delay

检查脚本更改的频率(以毫秒为单位)(如果存在);< 0 表示不刷新。(整数,默认:60000)spring-doc.cadn.net.cn

需要 router.resolution-required

是否需要通道分辨率。(布尔值,默认:false)spring-doc.cadn.net.cn

路由器.脚本

返回通道或通道映射分辨率键的 groovy 脚本的位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

router.变量

变量绑定为换行符,以名称-值对为分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<none>)spring-doc.cadn.net.cn

router.变量位置

包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:<none>)spring-doc.cadn.net.cn

此路由器接收器基于StreamBridgeAPI 来自 Spring Cloud Stream,因此可以根据需要创建目标。 在这种情况下,一个defaultOutputBinding只有当 key 未包含在destinationMappings. 这resolutionRequired = true忽略defaultOutputBinding如果没有映射并且尚未声明相应的绑定,则会抛出异常。

您可以使用spring.cloud.stream.dynamicDestinations财产。 默认情况下,所有已解析的目标都将动态绑定;如果此属性具有以逗号分隔的目标名称列表,则仅绑定这些目标名称。解析到不在此列表中的目标的消息将被路由到defaultOutputBinding,也必须出现在列表中。spring-doc.cadn.net.cn

destinationMappings用于将评估结果映射到实际的目标名称。spring-doc.cadn.net.cn

7.14.2. 基于 SpEL 的路由

该表达式根据消息进行计算,并返回通道名称或通道名称映射的键。spring-doc.cadn.net.cn

有关更多信息,请参阅 Spring Integration 参考手册配置通用路由器部分中的“路由器和 Spring 表达式语言 (SpEL)”小节。spring-doc.cadn.net.cn

7.14.3. 基于 Groovy 的路由

除了 SpEL 表达式,还可以使用 Groovy 脚本。让我们在文件系统中的“file:/my/path/router.groovy”或“classpath:/my/path/router.groovy”创建一个 Groovy 脚本:spring-doc.cadn.net.cn

println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

如果要将变量值传递给脚本,可以使用 variables 选项静态绑定值,或者选择使用 propertiesLocation 选项将路径传递到包含绑定的属性文件。 文件中的所有属性都将作为变量提供给脚本。您可以同时指定变量和 propertiesLocation在这种情况下,作为变量提供的任何重复值都会覆盖 propertiesLocation 中提供的值。 请注意,有效负载标头是隐式绑定的,以允许您访问消息中包含的数据。spring-doc.cadn.net.cn

有关更多信息,请参阅 Spring Integration 参考手册 Groovy 支持spring-doc.cadn.net.cn

7.15. RSocket 水槽

RSocket 接收器,使用 RSocket 协议的触发并忘记策略发送数据。spring-doc.cadn.net.cn

7.16. 亚马逊 S3 接收器

此接收器应用程序支持将对象传输到 Amazon S3 存储桶。 文件有效负载(和目录递归)将传输到remote目录(S3 存储桶)到local部署应用程序的目录。spring-doc.cadn.net.cn

此接收器接受的消息必须包含payload如:spring-doc.cadn.net.cn

7.16.1. 选项

s3 接收器具有以下选项:spring-doc.cadn.net.cn

目标生成的应用程序基于AmazonS3SinkConfiguration可以通过S3MessageHandler.UploadMetadataProvider和/或S3ProgressListener,这些被注入S3MessageHandler豆。 有关更多详细信息,请参阅 Spring Integration AWS 支持。spring-doc.cadn.net.cn

7.16.2. Amazon AWS 通用选项

Amazon S3 Sink(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,其自动配置 类由 Spring Boot 自动使用。 请参阅他们的文档,了解所需和有用的自动配置属性。spring-doc.cadn.net.cn

其中一些是关于 AWS 凭证的:spring-doc.cadn.net.cn

其他适用于 AWSRegion定义:spring-doc.cadn.net.cn

例子
java -jar s3-sink.jar --s3.bucket=/tmp/bar

7.17. SFTP 接收器

SFTP 接收器是一个简单的选项,用于将文件从传入邮件推送到 SFTP 服务器。spring-doc.cadn.net.cn

它使用sftp-outbound-adapter,因此传入消息可以是java.io.File对象,一个String(文件内容) 或数组bytes(文件内容也是如此)。spring-doc.cadn.net.cn

要使用此接收器,您需要一个用户名和一个密码才能登录。spring-doc.cadn.net.cn

默认情况下,Spring Integration 将使用o.s.i.file.DefaultFileNameGenerator如果未指定任何内容。DefaultFileNameGenerator将确定文件名 基于file_name标头(如果存在)在MessageHeaders,或者如果Message已经是一个java.io.File,那么它会 使用该文件的原始名称。

配置sftp.factory.known-hosts-expression选项,则评估的根对象是应用程序上下文,例如可能是sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.spring-doc.cadn.net.cn

7.17.1. 输入

有效载荷

7.17.2. 输出

N/A(写入 SFTP 服务器)。spring-doc.cadn.net.cn

7.18. TCP接收器

此模块使用编码器将消息写入 TCP。spring-doc.cadn.net.cn

TCP 是一种流式处理协议,需要某种机制来构建在线路上的消息。许多编码器是 available,默认值为“CRLF”。spring-doc.cadn.net.cn

7.18.2. 可用的编码器

文本数据
CRLF(默认)

以回车符 (0x0d) 结尾的文本,后跟换行符 (0x0a)spring-doc.cadn.net.cn

如果

以换行符结尾的文本 (0x0a)spring-doc.cadn.net.cn

以空字节 (0x00) 结尾的文本spring-doc.cadn.net.cn

STXETX

前面是 STX (0x02) 并以 ETX (0x03) 结尾的文本spring-doc.cadn.net.cn

文本和二进制数据

no structure - 客户端通过关闭套接字来指示完整的消息spring-doc.cadn.net.cn

L1

前面有一个 1 字节(无符号)长度字段的数据(最多支持 255 字节)spring-doc.cadn.net.cn

L2

数据前面有一个两个字节(无符号)长度字段(最多 216-1 字节)spring-doc.cadn.net.cn

L4

数据前面有一个四字节(有符号)长度字段(最多 231-1 字节)spring-doc.cadn.net.cn

7.19. 吞吐量接收器

接收器,它将按选定的时间间隔对消息进行计数并记录观察到的吞吐量。spring-doc.cadn.net.cn

7.19.1. 选项

吞吐量接收器具有以下选项:spring-doc.cadn.net.cn

吞吐量.report-every-ms

报告的频率。(整数,默认:1000)spring-doc.cadn.net.cn

7.20. Twitter 消息接收器

从身份验证用户向指定用户发送直接消息。 需要 JSON POST 正文和Content-Typeheader 设置为application/json.spring-doc.cadn.net.cn

当收到来自用户的消息时,您可以在 24 小时内发送最多 5 条消息作为响应。 收到的每条消息都会重置 24 小时窗口和 5 条分配的消息。 在 24 小时窗口内发送第 6 条消息或在 24 小时窗口之外发送消息将计入速率限制。 此行为仅适用于使用 POST direct_messages/events/new 端点时。

SpEL 表达式用于从输入消息中计算请求参数。spring-doc.cadn.net.cn

7.20.1. 选项

使用单引号 () 将'SpELexpression 属性。例如,要设置固定消息文本,请使用text='Fixed Text'. 对于固定目标 userId,请使用userId='666'.

7.21. Twitter 更新接收器

更新身份验证用户的当前文本(例如推文)。spring-doc.cadn.net.cn

对于每次更新尝试,都会将更新文本与身份验证用户最近的推文进行比较。任何可能导致重复的尝试都将被阻止,从而导致 403 错误。用户不能连续提交两次相同的文本。

虽然不受 API 的速率限制,但用户一次可以创建的推文数量受到限制。标准 API 的更新限制为 3 小时窗口内 300 个。如果用户发布的更新数量达到当前允许的限制,此方法将返回 HTTP 403 错误。spring-doc.cadn.net.cn

7.22. 波前汇

Wavefront 接收器使用 Messages<?>,将其转换为 Wavefront 数据格式的指标,并将指标直接发送到 Wavefront 或 Wavefront 代理。spring-doc.cadn.net.cn

支持常见的 ETL 用例,其中必须清理、转换现有(历史)指标数据并将其存储在 Wavefront 中以进行进一步分析。spring-doc.cadn.net.cn

7.23. Websocket 接收器

一个简单的 Websocket Sink 实现。spring-doc.cadn.net.cn

7.23.2. 示例

要验证 websocket-sink 是否接收来自其他 spring-cloud-stream 应用程序的消息,您可以使用 遵循简单的端到端设置。spring-doc.cadn.net.cn

第 1 步:启动 Rabbitmq
第 2 步:部署time-source
步骤 3:部署websocket-sink

最后在trace模式,以便您看到由time-source在日志中:spring-doc.cadn.net.cn

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE

您应该开始在启动 WebsocketSink 的控制台中看到日志消息,如下所示:spring-doc.cadn.net.cn

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

7.23.3. 执行器

有一个Endpoint您可以使用它来访问最后一个n发送和接收的消息。你必须 通过提供--endpoints.websocketconsumertrace.enabled=true.默认情况下,它通过host:port/websocketconsumertrace.下面是一个示例输出:spring-doc.cadn.net.cn

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

还有一个简单的 HTML 页面,您可以在其中看到文本区域中转发的消息。您可以访问 它直接通过host:port在您的浏览器中。spring-doc.cadn.net.cn

7.24. XMPP 接收器

“xmpp”接收器允许将消息发送到 XMPP 服务器。spring-doc.cadn.net.cn

7.24.1. 输入

7.24.2. 输出

有效载荷

7.24.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

7.25. ZeroMQ 接收器

“zeromq”接收器允许将消息发送到 ZeroMQ 套接字。spring-doc.cadn.net.cn

7.25.1. 输入

7.25.2. 输出

有效载荷

7.25.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于活页夹的项目。 然后,您可以 cd 到其中一个文件夹中并构建它:spring-doc.cadn.net.cn

$ ./mvnw clean package

7.25.5. 示例

java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=