本节将更详细地介绍如何创建流(Streams),即由Spring Cloud Stream应用程序组成的集合。内容涵盖创建和部署流等主题。spring-doc.cadn.net.cn

如果你刚刚开始使用 Spring Cloud Data Flow,建议在深入阅读本节之前,先阅读入门指南spring-doc.cadn.net.cn

17. 简介

流(Stream)是一组通过消息中间件相互通信的、长期运行的Spring Cloud Stream应用程序集合。 基于文本的DSL(领域特定语言)用于定义这些应用程序之间的配置和数据流。虽然系统提供了许多现成的应用程序以实现常见用例,但通常您需要创建一个自定义的 Spring Cloud Stream 应用程序来实现特定的业务逻辑。spring-doc.cadn.net.cn

流(Stream)的一般生命周期为:spring-doc.cadn.net.cn

  1. 注册应用程序。spring-doc.cadn.net.cn

  2. 创建一个流定义。spring-doc.cadn.net.cn

  3. 部署该流。spring-doc.cadn.net.cn

  4. 卸载或销毁该流(Stream)。spring-doc.cadn.net.cn

  5. 在流中升级或回滚应用程序。spring-doc.cadn.net.cn

为了部署流(Streams),Data Flow Server 必须配置为将部署任务委托给 Spring Cloud 生态系统中一个名为 Skipper 的新服务器。spring-doc.cadn.net.cn

此外,您可以将 Skipper 配置为将应用程序部署到一个或多个 Cloud Foundry 组织(org)和空间(space)、Kubernetes 集群中的一个或多个命名空间,或者本地机器上。 在 Data Flow 中部署流(stream)时,您可以在部署时指定要使用的平台。 Skipper 还为 Data Flow 提供了对已部署流执行更新的能力。 流中的应用程序可以通过多种方式进行更新,但最常见的示例之一是在保留现有源(source)和接收器(sink)应用程序不变的情况下,使用新的自定义业务逻辑升级处理器(processor)应用程序。spring-doc.cadn.net.cn

17.1. 流式管道 DSL

流(stream)通过使用受 Unix 启发的管道(Pipeline)语法来定义。 该语法使用竖线符号(称为“管道”)将多个命令连接起来。 在 Unix 中,命令 ls -l | grep key | lessls -l 进程的输出通过管道传递给 grep key 进程的输入。 而 grep 的输出又依次被发送到 less 进程的输入。 每个 | 符号都将左侧命令的标准输出连接到右侧命令的标准输入。 数据从左到右流经整个管道。spring-doc.cadn.net.cn

在 Data Flow 中,Unix 命令被 Spring Cloud Stream 应用程序所取代,每个管道符号表示通过消息中间件(例如 RabbitMQ 或 Apache Kafka)连接应用程序的输入和输出。spring-doc.cadn.net.cn

每个 Spring Cloud Stream 应用程序都以一个简单的名称进行注册。 注册过程指定了应用程序的获取位置(例如,在 Maven 仓库或 Docker 注册表中)。 在 Data Flow 中,我们将 Spring Cloud Stream 应用程序分为 Source(源)、Processor(处理器)或 Sink(接收器)。spring-doc.cadn.net.cn

作为一个简单的示例,考虑从 HTTP 源收集数据并写入文件接收器(File Sink)。 使用 DSL,该流的描述如下:spring-doc.cadn.net.cn

涉及某些处理的流将表示为:spring-doc.cadn.net.cn

http | filter | transform | filespring-doc.cadn.net.cn

流定义可以通过使用 Shell 的 stream create 命令来创建,如下例所示:spring-doc.cadn.net.cn

dataflow:> stream create --name httpIngest --definition "http | file"spring-doc.cadn.net.cn

流式 DSL 通过 --definition 命令选项传入。spring-doc.cadn.net.cn

流定义的部署通过 Shell 的 stream deploy 命令完成,如下所示:spring-doc.cadn.net.cn

dataflow:> stream deploy --name ticktockspring-doc.cadn.net.cn

入门指南部分向您展示了如何启动服务器,以及如何启动并使用 Spring Cloud Data Flow Shell。spring-doc.cadn.net.cn

请注意,Shell 调用 Data Flow Server 的 REST API。有关直接向服务器发送 HTTP 请求的更多信息,请参阅REST API 指南spring-doc.cadn.net.cn

在为流定义命名时,请注意,流中的每个应用程序在平台上创建时,其名称格式为 <stream name>-<app name>。因此,生成的应用程序名称总长度不能超过 58 个字符。

17.2. 流应用 DSL

您可以使用 Stream 应用 DSL 为每个 Spring Cloud Stream 应用定义自定义绑定属性。 更多信息请参阅微网站中的Stream 应用 DSL部分。spring-doc.cadn.net.cn

17.3. 应用程序属性

每个应用程序都通过属性来定制其行为。例如,http 源模块提供了一个 port 设置,允许将数据摄入端口从默认值进行更改:spring-doc.cadn.net.cn

dataflow:> stream create --definition "http --port=8090 | log" --name myhttpstream

这个 port 属性实际上与标准的 Spring Boot server.port 属性相同。 Data Flow 增加了使用简写形式 port 而非 server.port 的能力。 你也可以指定完整形式:spring-doc.cadn.net.cn

dataflow:> stream create --definition "http --server.port=8000 | log" --name myhttpstream

这一简写行为在流应用属性一节中有更详细的讨论。 如果你已经注册了应用属性元数据,那么在 Shell 中输入 -- 后可以使用 Tab 键自动补全,以获取候选属性名称列表。spring-doc.cadn.net.cn

Shell 为应用程序属性提供 Tab 自动补全功能。app info --name <appName> --type <appType> Shell 命令为所有支持的属性提供额外的文档说明。spring-doc.cadn.net.cn

支持的流 <appType> 取值包括:sourceprocessorsink

18. 流的生命周期

流的生命周期会经历以下阶段:spring-doc.cadn.net.cn

  1. 注册流定义spring-doc.cadn.net.cn

  2. 使用定义创建流spring-doc.cadn.net.cn

  3. 部署流spring-doc.cadn.net.cn

  4. 销毁或卸载流spring-doc.cadn.net.cn

  5. 在流中升级或回滚应用spring-doc.cadn.net.cn

Skipper 是一个服务器,可让你在多个云平台上发现 Spring Boot 应用程序并管理其生命周期。spring-doc.cadn.net.cn

Skipper 中的应用程序被打包成包含应用程序资源位置、应用程序属性和部署属性的包。 你可以将 Skipper 包类比为 apt-getbrew 等工具中的软件包。spring-doc.cadn.net.cn

当 Data Flow 部署一个流(Stream)时,它会生成一个包并将其上传到 Skipper,该包代表了该流中的应用程序。 后续用于升级或回滚流中应用程序的命令都会传递给 Skipper。 此外,流的定义会从该包中反向工程得到,流的状态也委托给 Skipper 管理。spring-doc.cadn.net.cn

18.1. 注册一个流应用

你可以使用 app register 命令注册一个带版本的流应用程序。你必须提供一个唯一的名称、应用程序类型以及一个可解析到应用程序构件(artifact)的 URI。 对于类型,请指定 sourceprocessorsink。版本将从 URI 中解析得出。以下是几个示例:spring-doc.cadn.net.cn

dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.1
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.2
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.3

dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │> mysource-0.0.1 <│         │    │    ║
║   │mysource-0.0.2    │         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

dataflow:>app register --name myprocessor --type processor --uri file:///Users/example/myprocessor-1.2.3.jar

dataflow:>app register --name mysink --type sink --uri https://example.com/mysink-2.0.1.jar

应用程序 URI 应符合以下任一模式格式:spring-doc.cadn.net.cn

URI 中的 <version> 部分对于带版本的流应用程序是必需的。 Skipper 利用多版本流应用程序,通过部署属性在运行时实现这些应用程序的升级或回滚。

如果你想注册使用 RabbitMQ binder 构建的 httplog 应用程序的快照版本,可以执行以下操作:spring-doc.cadn.net.cn

dataflow:>app register --name http --type source --uri maven://org.springframework.cloud.stream.app:http-source-rabbit:3.2.1
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:3.2.1

如果您希望一次性注册多个应用程序,可以将它们存储在一个属性文件中,其中键的格式为 <type>.<name>,值为对应的 URI。spring-doc.cadn.net.cn

例如,要注册使用 RabbitMQ binder 构建的 httplog 应用程序的快照版本,您可以在属性文件中包含以下内容(例如,stream-apps.properties):spring-doc.cadn.net.cn

source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:3.2.1
sink.log=maven://org.springframework.cloud.stream.app:log-sink-rabbit:3.2.1

然后,要批量导入应用程序,请使用 app import 命令,并通过 --uri 参数指定属性文件的位置,如下所示:spring-doc.cadn.net.cn

dataflow:>app import --uri file:///<YOUR_FILE_LOCATION>/stream-apps.properties

使用 --type app 注册应用程序与注册 sourceprocessorsink 是相同的。 类型为 app 的应用程序仅可在流式应用 DSL(其中使用双管道 || 而非单管道 |)中使用,并指示 Data Flow 不要配置该应用程序的 Spring Cloud Stream 绑定属性。 使用 --type app 注册的应用程序不必是 Spring Cloud Stream 应用程序;它可以是任何 Spring Boot 应用程序。 有关使用此应用程序类型的更多信息,请参阅 流式应用 DSL 介绍spring-doc.cadn.net.cn

您可以注册同一应用程序的多个版本(例如,相同的名称和类型),但只能将其中一个设置为默认版本。 默认版本用于部署流(Streams)。spring-doc.cadn.net.cn

应用程序首次注册时,会被标记为默认版本。可以使用 app default 命令更改默认的应用程序版本:spring-doc.cadn.net.cn

dataflow:>app default --id source:mysource --version 0.0.2
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │mysource-0.0.1    │         │    │    ║
║   │> mysource-0.0.2 <│         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

app list --id <type:name> 命令列出给定流应用程序的所有版本。spring-doc.cadn.net.cn

app unregister 命令有一个可选的 --version 参数,用于指定要注销的应用程序版本:spring-doc.cadn.net.cn

dataflow:>app unregister --name mysource --type source --version 0.0.1
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │> mysource-0.0.2 <│         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

如果未指定 --version,则默认版本为未注册版本。spring-doc.cadn.net.cn

流中的所有应用程序都应设置默认版本,以便该流能够被部署。 否则,在部署过程中它们将被视为未注册的应用程序。 请使用 app default 命令来设置默认版本。spring-doc.cadn.net.cn

app default --id source:mysource --version 0.0.3
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │mysource-0.0.2    │         │    │    ║
║   │> mysource-0.0.3 <│         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

stream deploy 需要设置默认的应用程序版本。 而 stream updatestream rollback 命令则可以使用所有已注册的应用程序版本(包括默认和非默认版本)。spring-doc.cadn.net.cn

以下命令创建一个使用默认 mysource 版本(0.0.3)的流:spring-doc.cadn.net.cn

dataflow:>stream create foo --definition "mysource | log"

然后我们可以将版本更新为 0.0.2:spring-doc.cadn.net.cn

dataflow:>stream update foo --properties version.mysource=0.0.2
只有预先注册的应用程序才能用于deploy(部署)、update(更新)或rollback(回滚)一个 Stream。

尝试将 mysource 更新到版本 0.0.1(未注册)失败。spring-doc.cadn.net.cn

18.1.1. 注册开箱即用的应用程序和任务

为了方便起见,我们为所有开箱即用的流应用和任务应用提供了包含应用程序 URI(适用于 Maven 和 Docker)的静态文件。 您可以指向该文件,一次性批量导入所有应用程序 URI。 或者,如前所述,您也可以单独注册这些应用,或者创建自己的自定义属性文件,仅包含所需的应用程序 URI。 不过,我们建议在自定义属性文件中维护一份“精简”的、仅包含所需应用程序 URI 的列表。spring-doc.cadn.net.cn

开箱即用的流应用程序

下表包含了基于 Spring Cloud Stream dataflow.spring.io 和 Spring Boot 3.2.x 的流应用程序的 2.7.x 链接。spring-doc.cadn.net.cn

构件类型 稳定版本 快照版本

RabbitMQ + Mavenspring-doc.cadn.net.cn

dataflow.spring.io/rabbitmq-maven-latestspring-doc.cadn.net.cn

dataflow.spring.io/rabbitmq-maven-latest-snapshotspring-doc.cadn.net.cn

RabbitMQ + Dockerspring-doc.cadn.net.cn

dataflow.spring.io/rabbitmq-docker-latestspring-doc.cadn.net.cn

dataflow.spring.io/rabbitmq-docker-latest-snapshotspring-doc.cadn.net.cn

Apache Kafka + Mavenspring-doc.cadn.net.cn

dataflow.spring.io/kafka-maven-latestspring-doc.cadn.net.cn

dataflow.spring.io/kafka-maven-latest-snapshotspring-doc.cadn.net.cn

Apache Kafka + Dockerspring-doc.cadn.net.cn

dataflow.spring.io/kafka-docker-latestspring-doc.cadn.net.cn

dataflow.spring.io/kafka-docker-latest-snapshotspring-doc.cadn.net.cn

默认情况下,开箱即用的应用程序的 Actuator 端点是受保护的。您可以通过设置以下属性来禁用安全性,从而部署流应用:app.*.spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration

在 Kubernetes 上,请参阅存活探针和就绪探针部分,了解如何为 Actuator 端点配置安全性。spring-doc.cadn.net.cn

开箱即用的任务应用程序

下表包含了基于 Spring Cloud Task dataflow.spring.io 和 Spring Boot 2.4.x 的任务应用程序的 2.7.x 链接。spring-doc.cadn.net.cn

构件类型 稳定版本 快照版本

Mavenspring-doc.cadn.net.cn

dataflow.spring.io/task-maven-latestspring-doc.cadn.net.cn

dataflow.spring.io/task-maven-latest-snapshotspring-doc.cadn.net.cn

Dockerspring-doc.cadn.net.cn

dataflow.spring.io/task-docker-latestspring-doc.cadn.net.cn

dataflow.spring.io/task-docker-latest-snapshotspring-doc.cadn.net.cn

有关现成可用的流应用程序的更多信息,请参见Spring Cloud Stream 应用程序项目页面。spring-doc.cadn.net.cn

有关现成可用的任务应用程序的更多信息,请参阅 timestamp-tasktimestamp-batch 文档。spring-doc.cadn.net.cn

例如,如果你想批量注册所有使用 Kafka binder 构建的开箱即用的流应用程序,可以使用以下命令:spring-doc.cadn.net.cn

$ dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest

或者,您可以按如下方式将所有流应用程序注册到 Rabbit binder:spring-doc.cadn.net.cn

$ dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest

你也可以传递 --local 选项(默认值为 true),以指示是否应在 Shell 进程内部解析属性文件的位置。如果应从 Data Flow Server 进程中解析该位置,请指定 --local falsespring-doc.cadn.net.cn

当你使用 app registerapp import 时,如果已存在具有所提供名称、类型和版本的应用程序,默认情况下不会覆盖它。如果你想覆盖已存在的应用程序的 urimetadata-uri 坐标,请包含 --force 选项。spring-doc.cadn.net.cn

然而,请注意,一旦下载完成,应用程序可能会根据资源位置在 Data Flow 服务器上本地缓存。如果资源位置未发生变化(即使实际的资源字节可能已不同),也不会重新下载。另一方面,在使用 maven:// 资源时,即使使用固定的位置,仍可能绕过缓存(例如使用 -SNAPSHOT 版本时)。spring-doc.cadn.net.cn

此外,如果一个流已经部署并使用了某个已注册应用的特定版本,那么(强制)重新注册另一个不同的应用将不会产生任何效果,直到该流被再次部署。spring-doc.cadn.net.cn

在某些情况下,资源在服务器端进行解析。在其他情况下,URI 会被传递给运行时容器实例,在其中进行解析。更多详细信息,请参阅各个 Data Flow Server 的具体文档。

18.1.2. 注册自定义应用程序

虽然 Data Flow 包含了源(source)、处理器(processor)和接收器(sink)应用程序,但您可以扩展这些应用程序,或者编写自定义的 Spring Cloud Stream 应用程序。 您可以按照 Microsite 上的 流开发(Stream Development) 指南来创建您自己的自定义应用程序。 创建自定义应用程序后,您可以按照 注册流应用程序(Register a Stream Application) 中所述进行注册。spring-doc.cadn.net.cn

18.2. 创建流

Spring Cloud Data Flow 服务器提供了一个完整的 RESTful API,用于管理流定义的生命周期,但最简单的使用方式是通过 Spring Cloud Data Flow Shell。入门指南部分介绍了如何启动该 Shell。spring-doc.cadn.net.cn

新流(streams)是借助流定义(stream definitions)创建的。这些定义基于一种简单的 DSL(领域特定语言)构建而成。例如,考虑一下如果我们执行以下 shell 命令会发生什么:spring-doc.cadn.net.cn

dataflow:> stream create --definition "time | log" --name ticktock

这定义了一个名为 ticktock 的流,它基于 DSL 表达式 time | log。该 DSL 使用“管道”符号(|)将一个源(source)连接到一个接收器(sink)。spring-doc.cadn.net.cn

stream info 命令显示有关流的有用信息,如下例所示(包含其输出):spring-doc.cadn.net.cn

dataflow:>stream info ticktock
╔═══════════╤═════════════════╤═══════════╤══════════╗
║Stream Name│Stream Definition│Description│  Status  ║
╠═══════════╪═════════════════╪═══════════╪══════════╣
║ticktock   │time | log       │           │undeployed║
╚═══════════╧═════════════════╧═══════════╧══════════╝

18.2.1. 流式应用属性

应用程序属性是与流中每个应用程序相关联的属性。当应用程序部署时,这些应用程序属性会根据底层部署实现,通过命令行参数或环境变量应用到应用程序中。spring-doc.cadn.net.cn

以下流在创建时可以定义应用程序属性:spring-doc.cadn.net.cn

dataflow:> stream create --definition "time | log" --name ticktock

app info --name <appName> --type <appType> shell 命令用于显示该应用程序所暴露的应用属性。 有关暴露属性的更多信息,请参阅应用程序元数据spring-doc.cadn.net.cn

以下列表展示了 time 应用程序所暴露的属性:spring-doc.cadn.net.cn

dataflow:> app info --name time --type source
Information about source application 'time':
Version: '3.2.1':
Default application version: 'true':
Resource URI: maven://org.springframework.cloud.stream.app:time-source-rabbit:3.2.1
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║spring.integration.poller.max-│Maximum number of messages to │<none>                        │java.lang.Integer             ║
║messages-per-poll             │poll per polling cycle.       │                              │                              ║
║spring.integration.poller.fixe│Polling rate period. Mutually │<none>                        │java.time.Duration            ║
║d-rate                        │exclusive with 'fixedDelay'   │                              │                              ║
║                              │and 'cron'.                   │                              │                              ║
║spring.integration.poller.fixe│Polling delay period. Mutually│<none>                        │java.time.Duration            ║
║d-delay                       │exclusive with 'cron' and     │                              │                              ║
║                              │'fixedRate'.                  │                              │                              ║
║spring.integration.poller.rece│How long to wait for messages │1s                            │java.time.Duration            ║
║ive-timeout                   │on poll.                      │                              │                              ║
║spring.integration.poller.cron│Cron expression for polling.  │<none>                        │java.lang.String              ║
║                              │Mutually exclusive with       │                              │                              ║
║                              │'fixedDelay' and 'fixedRate'. │                              │                              ║
║spring.integration.poller.init│Polling initial delay. Applied│<none>                        │java.time.Duration            ║
║ial-delay                     │for 'fixedDelay' and          │                              │                              ║
║                              │'fixedRate'; ignored for      │                              │                              ║
║                              │'cron'.                       │                              │                              ║
║time.date-format              │Format for the date value.    │MM/dd/yy HH:mm:ss             │java.lang.String              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

以下列表展示了 log 应用程序所暴露的属性:spring-doc.cadn.net.cn

dataflow:> app info --name log --type sink
Information about sink application 'log':
Version: '3.2.1':
Default application version: 'true':
Resource URI: maven://org.springframework.cloud.stream.app:log-sink-rabbit:3.2.1
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║log.name                      │The name of the logger to use.│<none>                        │java.lang.String              ║
║log.level                     │The level at which to log     │<none>                        │org.springframework.integratio║
║                              │messages.                     │                              │n.handler.LoggingHandler$Level║
║log.expression                │A SpEL expression (against the│payload                       │java.lang.String              ║
║                              │incoming message) to evaluate │                              │                              ║
║                              │as the logged message.        │                              │                              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

你可以在创建 time 时指定 logstream 应用的属性,如下所示:spring-doc.cadn.net.cn

dataflow:> stream create --definition "time --fixed-delay=5 | log --level=WARN" --name ticktock

请注意,在前面的示例中,为 fixed-delaylevel 应用程序定义的 timelog 属性是由 Shell 自动补全功能提供的“简写形式”属性名称。 这些“简写形式”的属性名称仅适用于已暴露的属性。在所有其他情况下,您应仅使用完全限定的属性名称。spring-doc.cadn.net.cn

18.2.2. 常见应用程序属性

除了通过 DSL 进行配置外,Spring Cloud Data Flow 还提供了一种机制,用于为所有由它启动的流式应用程序设置通用属性。 在启动服务器时,可以通过添加以 spring.cloud.dataflow.applicationProperties.stream 为前缀的属性来实现这一点。 这样做时,服务器会将所有这些属性(去掉前缀后)传递给它所启动的应用实例。spring-doc.cadn.net.cn

例如,可以通过使用以下选项启动 Data Flow 服务器,将所有已启动的应用程序配置为使用特定的 Kafka 代理:spring-doc.cadn.net.cn

--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=192.168.1.100:9092
--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=192.168.1.100:2181

这样做会导致 spring.cloud.stream.kafka.binder.brokersspring.cloud.stream.kafka.binder.zkNodes 属性被传递给所有启动的应用程序。spring-doc.cadn.net.cn

通过此机制配置的属性优先级低于流部署属性。 如果在流部署时指定了相同键的属性,则会覆盖这些属性(例如, app.http.spring.cloud.stream.kafka.binder.brokers 会覆盖通用属性)。

18.3. 部署一个流

本节介绍当 Spring Cloud Data Flow 服务器负责部署流(Stream)时,如何部署一个流。它涵盖了使用 Skipper 服务进行流的部署和升级。关于如何设置部署属性的说明适用于流部署的这两种方式。spring-doc.cadn.net.cn

考虑 ticktock 流定义:spring-doc.cadn.net.cn

dataflow:> stream create --definition "time | log" --name ticktock

要部署该流,请使用以下 shell 命令:spring-doc.cadn.net.cn

dataflow:> stream deploy --name ticktock

数据流服务器将 timelog 应用程序的解析与部署委托给 Skipper。spring-doc.cadn.net.cn

stream info 命令显示有关流的有用信息,包括部署属性:spring-doc.cadn.net.cn

dataflow:>stream info --name ticktock
╔═══════════╤═════════════════╤═════════╗
║Stream Name│Stream Definition│  Status ║
╠═══════════╪═════════════════╪═════════╣
║ticktock   │time | log       │deploying║
╚═══════════╧═════════════════╧═════════╝

Stream Deployment properties: {
  "log" : {
    "resource" : "maven://org.springframework.cloud.stream.app:log-sink-rabbit",
    "spring.cloud.deployer.group" : "ticktock",
    "version" : "2.0.1.RELEASE"
  },
  "time" : {
    "resource" : "maven://org.springframework.cloud.stream.app:time-source-rabbit",
    "spring.cloud.deployer.group" : "ticktock",
    "version" : "2.0.1.RELEASE"
  }
}

--platformName 命令有一个重要的可选命令参数(称为 stream deploy)。 Skipper 可以配置为部署到多个平台。 Skipper 预先配置了一个名为 default 的平台,该平台将应用程序部署到运行 Skipper 的本地机器上。 --platformName 命令行参数的默认值为 default。 如果您通常将应用部署到某一个特定平台,那么在安装 Skipper 时,可以覆盖 default 平台的配置。 否则,请将 platformName 指定为 stream platform-list 命令返回的某个值。spring-doc.cadn.net.cn

在前面的示例中,时间源每秒将当前时间作为消息发送一次,而日志接收器则通过日志框架将其输出。 您可以跟踪带有 stdout 后缀的 <instance> 日志。日志文件位于 Data Flow Server 日志输出中显示的目录内,如下列所示:spring-doc.cadn.net.cn

$ tail -f /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481708/ticktock.log/stdout_0.log
2016-06-01 09:45:11.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:11
2016-06-01 09:45:12.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:12
2016-06-01 09:45:13.251  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:13

你也可以在创建流时通过传入 --deploy 标志,一步完成流的创建和部署,如下所示:spring-doc.cadn.net.cn

dataflow:> stream create --definition "time | log" --name ticktock --deploy

然而,在实际应用场景中,通常不会在一步操作中同时创建和部署流。 原因是,当你使用 stream deploy 命令时,可以传入一些属性来定义如何将应用程序映射到平台(例如,容器应使用多大的内存、每个应用程序运行多少个实例,以及是否启用数据分区功能)。 这些属性还可以覆盖在创建流时所设置的应用程序属性。 接下来的章节将详细介绍这一功能。spring-doc.cadn.net.cn

18.3.1. 部署属性

在部署流时,您可以指定用于控制应用程序部署和配置方式的属性。有关更多信息,请参阅微网站中的部署属性部分。spring-doc.cadn.net.cn

18.4. 销毁流

你可以通过在 shell 中执行 stream destroy 命令来删除一个流,如下所示:spring-doc.cadn.net.cn

dataflow:> stream destroy --name ticktock

如果该流已部署,则在删除流定义之前会先将其取消部署。spring-doc.cadn.net.cn

18.5. 卸载流

通常,你可能希望停止一个流,但保留其名称和定义以供将来使用。在这种情况下,你可以通过名称undeploy(取消部署)该流:spring-doc.cadn.net.cn

dataflow:> stream undeploy --name ticktock
dataflow:> stream deploy --name ticktock

你可以在稍后执行 deploy 命令来重新启动它:spring-doc.cadn.net.cn

dataflow:> stream deploy --name ticktock

18.6. 验证流

有时,流定义中包含的应用程序在其注册信息中包含一个无效的 URI。 这可能是由于在应用程序注册时输入了无效的 URI,或者是因为该应用程序已从原本应从中获取的仓库中被移除。 为了验证流中包含的所有应用程序是否都能被正确解析,用户可以使用 validate 命令:spring-doc.cadn.net.cn

dataflow:>stream validate ticktock
╔═══════════╤═════════════════╗
║Stream Name│Stream Definition║
╠═══════════╪═════════════════╣
║ticktock   │time | log       ║
╚═══════════╧═════════════════╝


ticktock is a valid stream.
╔═══════════╤═════════════════╗
║ App Name  │Validation Status║
╠═══════════╪═════════════════╣
║source:time│valid            ║
║sink:log   │valid            ║
╚═══════════╧═════════════════╝

在前面的示例中,用户验证了他们的 ticktock 流。source:timesink:log 均为有效。 现在我们可以看看,如果流定义中包含一个注册应用但其 URI 无效时会发生什么情况:spring-doc.cadn.net.cn

dataflow:>stream validate bad-ticktock
╔════════════╤═════════════════╗
║Stream Name │Stream Definition║
╠════════════╪═════════════════╣
║bad-ticktock│bad-time | log   ║
╚════════════╧═════════════════╝


bad-ticktock is an invalid stream.
╔═══════════════╤═════════════════╗
║   App Name    │Validation Status║
╠═══════════════╪═════════════════╣
║source:bad-time│invalid          ║
║sink:log       │valid            ║
╚═══════════════╧═════════════════╝

在这种情况下,Spring Cloud Data Flow 指出该流无效,因为 source:bad-time 具有无效的 URI。spring-doc.cadn.net.cn

18.7. 更新流

要更新流,请使用 stream update 命令,该命令接受 --properties--propertiesFile 作为命令参数。 Skipper 引入了一个重要的新顶级前缀:version。 以下命令部署了 http | log 流(部署时注册的 log 版本为 3.2.0):spring-doc.cadn.net.cn

dataflow:> stream create --name httptest --definition "http --server.port=9000 | log"
dataflow:> stream deploy --name httptest
dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║             Name             │             DSL              │          Status            ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest                      │http --server.port=9000 | log │deploying                   ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝

Stream Deployment properties: {
  "log" : {
    "spring.cloud.deployer.indexed" : "true",
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "3.2.0"
  },
  "http" : {
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:http-source-rabbit" : "3.2.0"
  }
}

然后,以下命令将更新流以使用日志应用程序的 3.2.1 版本。 在使用特定版本的应用程序更新流之前,我们需要确保该版本的应用程序已注册:spring-doc.cadn.net.cn

dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:3.2.1
Successfully registered application 'sink:log'

然后我们可以更新应用程序:spring-doc.cadn.net.cn

dataflow:>stream update --name httptest --properties version.log=3.2.1
您只能使用已预先注册的应用程序版本来deployupdaterollback一个流。

要验证部署属性和更新后的版本,我们可以使用 stream info,如下例所示(包含其输出):spring-doc.cadn.net.cn

dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║             Name             │             DSL              │          Status            ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest                      │http --server.port=9000 | log │deploying                   ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝

Stream Deployment properties: {
  "log" : {
    "spring.cloud.deployer.indexed" : "true",
    "spring.cloud.deployer.count" : "1",
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "3.2.1"
  },
  "http" : {
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:http-source-rabbit" : "3.2.1"
  }
}

18.8. 强制更新流

在升级流(stream)时,您可以使用 --force 选项来部署当前已部署应用程序的新实例,即使应用程序或部署属性未发生任何更改也是如此。 当应用程序在启动时自行获取配置信息时(例如从 Spring Cloud Config Server 获取),就需要这种行为。 您可以使用 --app-names 选项指定要强制升级的应用程序。 如果您未指定任何应用程序名称,则所有应用程序都将被强制升级。 您可以将 --force--app-names 选项与 --properties--propertiesFile 选项一起使用。spring-doc.cadn.net.cn

18.9. 流版本

Skipper 会保留已部署流的历史记录。 在更新一个流之后,该流会产生第二个版本。 你可以使用 stream history --name <name-of-stream> 命令来查询该流的版本历史:spring-doc.cadn.net.cn

dataflow:>stream history --name httptest
╔═══════╤════════════════════════════╤════════╤════════════╤═══════════════╤════════════════╗
║Version│        Last updated        │ Status │Package Name│Package Version│  Description   ║
╠═══════╪════════════════════════════╪════════╪════════════╪═══════════════╪════════════════╣
║2      │Mon Nov 27 22:41:16 EST 2017│DEPLOYED│httptest    │1.0.0          │Upgrade complete║
║1      │Mon Nov 27 22:40:41 EST 2017│DELETED │httptest    │1.0.0          │Delete complete ║
╚═══════╧════════════════════════════╧════════╧════════════╧═══════════════╧════════════════╝

18.10. 流式清单

Skipper 会保留一份所有应用程序、其应用程序属性以及部署属性的“清单”,其中所有值都已替换完毕。 这代表了部署到平台上的最终状态。 您可以使用以下命令查看某个 Stream 任意版本的清单:spring-doc.cadn.net.cn

stream manifest --name <name-of-stream> --releaseVersion <optional-version>

如果未指定 --releaseVersion,则返回上一个版本的清单。spring-doc.cadn.net.cn

以下示例展示了清单(manifest)的使用:spring-doc.cadn.net.cn

dataflow:>stream manifest --name httptest

使用该命令会产生以下输出:spring-doc.cadn.net.cn

# Source: log.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
  name: log
spec:
  resource: maven://org.springframework.cloud.stream.app:log-sink-rabbit
  version: 3.2.0
  applicationProperties:
    spring.cloud.dataflow.stream.app.label: log
    spring.cloud.stream.bindings.input.group: httptest
    spring.cloud.dataflow.stream.name: httptest
    spring.cloud.dataflow.stream.app.type: sink
    spring.cloud.stream.bindings.input.destination: httptest.http
  deploymentProperties:
    spring.cloud.deployer.indexed: true
    spring.cloud.deployer.group: httptest
    spring.cloud.deployer.count: 1

---
# Source: http.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
  name: http
spec:
  resource: maven://org.springframework.cloud.stream.app:http-source-rabbit
  version: 3.2.0
  applicationProperties:
    spring.cloud.dataflow.stream.app.label: http
    spring.cloud.stream.bindings.output.producer.requiredGroups: httptest
    server.port: 9000
    spring.cloud.stream.bindings.output.destination: httptest.http
    spring.cloud.dataflow.stream.name: httptest
    spring.cloud.dataflow.stream.app.type: source
  deploymentProperties:
    spring.cloud.deployer.group: httptest

大部分部署和应用程序属性均由 Data Flow 设置,以使应用程序能够相互通信,并发送带有标识标签的应用程序指标。spring-doc.cadn.net.cn

18.11. 回滚流

你可以使用 stream rollback 命令将流回滚到之前的版本:spring-doc.cadn.net.cn

dataflow:>stream rollback --name httptest

可选的 --releaseVersion 命令参数用于指定流的版本。 如果未指定,回滚操作将回到上一个流版本。spring-doc.cadn.net.cn

18.12. 应用计数

应用程序数量是系统的一个动态属性,用于指定应用程序实例的数量。有关更多信息,请参阅微网站中的应用程序数量部分。spring-doc.cadn.net.cn

18.13. Skipper 的升级策略

Skipper 采用一种简单的“红/黑”升级策略。它会部署应用程序的新版本,实例数量与当前正在运行的版本相同,并检查应用程序的 /health 端点。 如果新应用程序的健康状况良好,则卸载先前的应用程序。 如果新应用程序的健康状况不佳,则卸载所有新部署的应用程序,并认为此次升级未成功。spring-doc.cadn.net.cn

升级策略并非滚动升级,因此,如果当前有五个应用程序实例正在运行,那么在理想情况下,会在旧版本被卸载之前,先启动五个新版本的应用程序实例。spring-doc.cadn.net.cn

19. 流式 DSL

本节介绍Stream DSL 简介中未涵盖的 Stream DSL 的其他功能。spring-doc.cadn.net.cn

19.1. 轻触一个流

可以在流中的各种生产者端点创建监听点(Taps)。更多信息请参见微网站中的监听流(Tapping a Stream)部分。spring-doc.cadn.net.cn

19.2. 在流中使用标签

当一个流由多个具有相同名称的应用程序组成时,必须使用标签加以区分。 有关更多信息,请参阅微网站中的应用程序标签部分。spring-doc.cadn.net.cn

19.3. 命名目的地

你可以使用命名目的地,而不必引用源应用或接收器应用。 有关更多信息,请参阅微网站中的命名目的地部分。spring-doc.cadn.net.cn

19.4. 扇入与扇出

通过使用命名目的地,您可以支持汇聚(fan-in)和分发(fan-out)的使用场景。 更多信息请参见微网站中的汇聚与分发部分。spring-doc.cadn.net.cn

20. Stream Java DSL

与其使用命令行来创建和部署流,不如使用 spring-cloud-dataflow-rest-client 模块提供的基于 Java 的 DSL。 有关更多信息,请参阅微站中的Java DSL部分。spring-doc.cadn.net.cn

21. 具有多个绑定器配置的流应用程序

在某些情况下,当需要连接到不同的消息中间件配置时,一个流(stream)可以将其应用程序绑定到多个 Spring Cloud Stream Binder。在这种情况下,您应确保应用程序已通过其 Binder 配置进行了适当的设置。例如,以下流中的处理器(processor)就是一个支持 Kafka 和 Rabbit 两种 Binder 的多 Binder 转换器:spring-doc.cadn.net.cn

http | multibindertransform --expression=payload.toUpperCase() | log
在前面的示例中,您需要编写自己的 multibindertransform 应用程序。

在此流中,每个应用程序都通过以下方式连接到消息中间件:spring-doc.cadn.net.cn

  1. HTTP 源将事件发送到 RabbitMQ(rabbit1)。spring-doc.cadn.net.cn

  2. 多绑定器转换处理器从 RabbitMQ(rabbit1)接收事件,并将处理后的事件发送到 Kafka(kafka1)。spring-doc.cadn.net.cn

  3. 日志接收器从 Kafka(kafka1)接收事件。spring-doc.cadn.net.cn

此处,rabbit1kafka1 是在 Spring Cloud Stream 应用程序属性中指定的绑定器名称。 根据此配置,应用程序的类路径中包含以下绑定器,并配有相应的配置:spring-doc.cadn.net.cn

spring-cloud-streambinder 配置属性可以在应用程序内部进行设置。 如果未设置,也可以在流部署时通过 deployment 属性传入:spring-doc.cadn.net.cn

dataflow:>stream create --definition "http | multibindertransform --expression=payload.toUpperCase() | log" --name mystream

dataflow:>stream deploy mystream --properties "app.http.spring.cloud.stream.bindings.output.binder=rabbit1,app.multibindertransform.spring.cloud.stream.bindings.input.binder=rabbit1,
app.multibindertransform.spring.cloud.stream.bindings.output.binder=kafka1,app.log.spring.cloud.stream.bindings.input.binder=kafka1"

您可以通过部署属性来指定并覆盖任何绑定器配置属性。spring-doc.cadn.net.cn

22. 函数组合

函数组合允许您将功能性逻辑动态地附加到现有的事件流应用程序中。更多详情,请参阅微站中的函数组合部分。spring-doc.cadn.net.cn

23. 函数式应用程序

随着 Spring Cloud Stream 3.x 增加了函数式支持,您只需分别实现 Java Util 中的 SourceSinkProcessor 接口,即可构建 SupplierConsumerFunction 应用程序。 有关此功能的更多内容,请参阅 SCDF 网站上的函数式应用程序指南spring-doc.cadn.net.cn

24. 示例

本章包含以下示例:spring-doc.cadn.net.cn

24.1. 简单的流处理

作为一个简单的处理步骤示例,我们可以通过以下流定义将通过 HTTP POST 发送的数据负载转换为大写:spring-doc.cadn.net.cn

http | transform --expression=payload.toUpperCase() | log

要创建此流,请在 shell 中输入以下命令:spring-doc.cadn.net.cn

dataflow:> stream create --definition "http --server.port=9000 | transform --expression=payload.toUpperCase() | log" --name mystream --deploy

以下示例使用 shell 命令来提交一些数据:spring-doc.cadn.net.cn

dataflow:> http post --target http://localhost:9000 --data "hello"

前面的示例会在日志中生成大写的HELLO,如下所示:spring-doc.cadn.net.cn

2016-06-01 09:54:37.749  INFO 80083 --- [  kafka-binder-] log.sink    : HELLO

24.2. 有状态的流处理

为演示数据分区功能,以下清单使用 Kafka 作为绑定器部署了一个流:spring-doc.cadn.net.cn

dataflow:>stream create --name words --definition "http --server.port=9900 | splitter --expression=payload.split(' ') | log"
Created new stream 'words'

dataflow:>stream deploy words --properties "app.splitter.producer.partitionKeyExpression=payload,deployer.log.count=2"
Deployed stream 'words'

dataflow:>http post --target http://localhost:9900 --data "How much wood would a woodchuck chuck if a woodchuck could chuck wood"
> POST (text/plain;Charset=UTF-8) http://localhost:9900 How much wood would a woodchuck chuck if a woodchuck could chuck wood
> 202 ACCEPTED


dataflow:>runtime apps
╔════════════════════╤═══════════╤═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║App Id / Instance Id│Unit Status│                                                               No. of Instances / Attributes                                                               ║
╠════════════════════╪═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║words.log-v1        │ deployed  │                                                                             2                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 24166                                                                                                                                        ║
║                    │           │        pid = 33097                                                                                                                                        ║
║                    │           │       port = 24166                                                                                                                                        ║
║words.log-v1-0      │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_0.log     ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_0.log     ║
║                    │           │        url = https://192.168.0.102:24166                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 41269                                                                                                                                        ║
║                    │           │        pid = 33098                                                                                                                                        ║
║                    │           │       port = 41269                                                                                                                                        ║
║words.log-v1-1      │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_1.log     ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_1.log     ║
║                    │           │        url = https://192.168.0.102:41269                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1                  ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.http-v1       │ deployed  │                                                                             1                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 9900                                                                                                                                         ║
║                    │           │        pid = 33094                                                                                                                                        ║
║                    │           │       port = 9900                                                                                                                                         ║
║words.http-v1-0     │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stderr_0.log    ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stdout_0.log    ║
║                    │           │        url = https://192.168.0.102:9900                                                                                                                    ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1                 ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.splitter-v1   │ deployed  │                                                                             1                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 33963                                                                                                                                        ║
║                    │           │        pid = 33093                                                                                                                                        ║
║                    │           │       port = 33963                                                                                                                                        ║
║words.splitter-v1-0 │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stderr_0.log║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stdout_0.log║
║                    │           │        url = https://192.168.0.102:33963                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1             ║
╚════════════════════╧═══════════╧═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝

当你查看 words.log-v1-0 日志时,你应该会看到以下内容:spring-doc.cadn.net.cn

2016-06-05 18:35:47.047  INFO 58638 --- [  kafka-binder-] log.sink                                 : How
2016-06-05 18:35:47.066  INFO 58638 --- [  kafka-binder-] log.sink                                 : chuck
2016-06-05 18:35:47.066  INFO 58638 --- [  kafka-binder-] log.sink                                 : chuck

当你查看 words.log-v1-1 日志时,你应该会看到以下内容:spring-doc.cadn.net.cn

2016-06-05 18:35:47.047  INFO 58639 --- [  kafka-binder-] log.sink                                 : much
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : wood
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : would
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : a
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : woodchuck
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : if
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : a
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : woodchuck
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : could
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : wood

此示例表明,包含相同单词的有效载荷拆分会被路由到同一个应用程序实例。spring-doc.cadn.net.cn

24.3. 其他源(Source)和接收器(Sink)应用程序类型

此示例展示了一个稍微复杂一些的场景:将 time 源替换为其他内容。另一种受支持的源类型是 http,它通过 HTTP POST 请求接收待摄入的数据。请注意,http 源使用的端口与 Data Flow Server(默认为 8080)不同,默认情况下该端口是随机分配的。spring-doc.cadn.net.cn

要创建一个使用 http 源但仍使用相同 log 接收器的流,我们需要将简单流处理示例中的原始命令修改为以下内容:spring-doc.cadn.net.cn

dataflow:> stream create --definition "http | log" --name myhttpstream --deploy

请注意,这一次,在我们实际发送一些数据(通过使用 shell 命令)之前,不会看到任何其他输出。要查看 http 源监听的随机分配端口,请运行以下命令:spring-doc.cadn.net.cn

dataflow:>runtime apps

╔══════════════════════╤═══════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ App Id / Instance Id │Unit Status│                                                                    No. of Instances / Attributes                                                                    ║
╠══════════════════════╪═══════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║myhttpstream.log-v1   │ deploying │                                                                                  1                                                                                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                      │           │       guid = 39628                                                                                                                                                  ║
║                      │           │        pid = 34403                                                                                                                                                  ║
║                      │           │       port = 39628                                                                                                                                                  ║
║myhttpstream.log-v1-0 │ deploying │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stderr_0.log ║
║                      │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stdout_0.log ║
║                      │           │        url = https://192.168.0.102:39628                                                                                                                             ║
║                      │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1              ║
╟──────────────────────┼───────────┼─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║myhttpstream.http-v1  │ deploying │                                                                                  1                                                                                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                      │           │       guid = 52143                                                                                                                                                  ║
║                      │           │        pid = 34401                                                                                                                                                  ║
║                      │           │       port = 52143                                                                                                                                                  ║
║myhttpstream.http-v1-0│ deploying │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stderr_0.log║
║                      │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stdout_0.log║
║                      │           │        url = https://192.168.0.102:52143                                                                                                                             ║
║                      │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1             ║
╚══════════════════════╧═══════════╧═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝

你应该看到对应的 http 源具有一个 url 属性,其中包含它所监听的主机和端口信息。你现在就可以向该 URL 发送 POST 请求了,如下例所示:spring-doc.cadn.net.cn

dataflow:> http post --target http://localhost:1234 --data "hello"
dataflow:> http post --target http://localhost:1234 --data "goodbye"

然后,该流将来自 http 源的数据传输到由 log 接收器实现的输出日志中,生成类似于以下内容的输出:spring-doc.cadn.net.cn

2016-06-01 09:50:22.121  INFO 79654 --- [  kafka-binder-] log.sink    : hello
2016-06-01 09:50:26.810  INFO 79654 --- [  kafka-binder-] log.sink    : goodbye

我们也可以更改接收器(sink)的实现。你可以将输出导向文件(file)、Hadoop(hdfs),或者任何其他可用的接收器应用程序。你还可以定义自己的应用程序。spring-doc.cadn.net.cn