流
本节将详细介绍如何创建 Stream,Streams 是 Spring Cloud Stream 应用程序的集合。它涵盖的主题包括 创建和部署 Streams。
如果您刚刚开始使用 Spring Cloud Data Flow,您可能应该在深入研究之前阅读入门指南 本节。
17. 引言
Stream 是长期存在的 Spring Cloud Stream 应用程序的集合,它们通过消息传递中间件相互通信。 基于文本的 DSL 定义了应用程序之间的配置和数据流。虽然提供了许多应用程序来实现常见用例,但您通常会创建一个自定义 Spring Cloud Stream 应用程序来实现自定义业务逻辑。
Stream 的一般生命周期为:
-
注册应用程序。
-
创建流定义。
-
部署流。
-
取消部署或销毁流。
-
升级或回滚 Stream 中的应用程序。
要部署 Streams,必须将 Data Flow Server 配置为将部署委托给 Spring Cloud 生态系统中名为 Skipper 的新服务器。
此外,您可以将 Skipper 配置为将应用程序部署到一个或多个 Cloud Foundry 组织和空间、Kubernetes 集群上的一个或多个命名空间或本地计算机。 在数据流中部署流时,您可以指定在部署时要使用的平台。 Skipper 还为数据流提供了对已部署流执行更新的功能。 可以通过多种方式更新流中的应用程序,但最常见的示例之一是使用新的自定义业务逻辑升级处理器应用程序,同时保留现有的源和接收器应用程序。
17.1. 流管线 DSL
流是使用受 Unix 启发的 Pipeline 语法定义的。
该语法使用垂直条(称为“管道”)来连接多个命令。
命令ls -l | grep key | less
在 Unix 中,获取ls -l
process 并将其通过管道传输到grep key
过程。
的输出grep
反过来,被发送到less
过程。
每|
符号将左侧命令的标准输出连接到右侧命令的标准输入。
数据从左到右流经管道。
在 Data Flow 中,Unix 命令被 Spring Cloud Stream 应用程序取代,每个管道符号表示通过消息传递中间件(例如 RabbitMQ 或 Apache Kafka)连接应用程序的输入和输出。
每个 Spring Cloud Stream 应用程序都以一个简单的名称注册。 注册过程指定可以在何处获取应用程序(例如,在Maven存储库或Docker注册表中)。您可以在本节中找到有关如何注册 Spring Cloud Stream 应用程序的更多信息。 在数据流中,我们将 Spring Cloud Stream 应用程序分类为源、处理器或接收器。
举个简单的例子,考虑从 HTTP 源收集数据并写入文件接收器。 使用 DSL,流描述为:
http | file
涉及某些处理的流将表示为:
http | filter | transform | file
可以使用 shell 的stream create
命令,如以下示例所示:
dataflow:> stream create --name httpIngest --definition "http | file"
流 DSL 被传递给--definition
command 选项。
流定义的部署是通过 Shell 的stream deploy
命令,如下所示:
dataflow:> stream deploy --name ticktock
入门部分向您展示了如何启动服务器以及如何启动和使用 Spring Cloud Data Flow shell。
请注意,shell 调用数据流服务器的 REST API。有关直接向服务器发出 HTTP 请求的更多信息,请参阅 REST API 指南。
命名流定义时,请记住,流中的每个应用程序都将在平台上创建,名称格式为<stream name>-<app name> .因此,生成的应用程序名称的总长度不能超过 58 个字符。 |
17.2. 流应用程序 DSL
您可以使用 Stream Application DSL 为每个 Spring Cloud Stream 应用程序定义自定义绑定属性。 有关更多信息,请参阅微型网站的流应用程序 DSL 部分。
考虑以下 Java 接口,它定义了一个输入方法和两个输出方法:
public interface Barista {
@Input
SubscribableChannel orders();
@Output
MessageChannel hotDrinks();
@Output
MessageChannel coldDrinks();
}
进一步考虑以下 Java 接口,这是创建 Kafka Streams 应用程序的典型接口:
interface KStreamKTableBinding {
@Input
KStream<?, ?> inputStream();
@Input
KTable<?, ?> inputTable();
}
在具有多个输入和输出绑定的这些情况下,数据流无法对从一个应用程序到另一个应用程序的数据流做出任何假设。
因此,需要设置绑定属性以“连接”应用程序。
流应用程序 DSL 使用“双管道”而不是“管道符号”来指示数据流不应配置应用程序的绑定属性。念||
意思是“平行”。
以下示例显示了这样的“并行”定义:
dataflow:> stream create --definition "orderGeneratorApp || baristaApp || hotDrinkDeliveryApp || coldDrinkDeliveryApp" --name myCafeStream
突破性变革!SCDF Local、Cloud Foundry 1.7.0 至 1.7.2 和 SCDF Kubernetes 1.7.0 至 1.7.1 版本使用comma 字符作为应用程序之间的分隔符。这导致传统 Stream DSL 发生重大变化。虽然不理想,但更改分隔符被认为是对现有用户影响最小的最佳解决方案。 |
此流有四个应用程序。baristaApp
有两个输出目的地,hotDrinks
和coldDrinks
,旨在由hotDrinkDeliveryApp
和coldDrinkDeliveryApp
分别。
部署此流时,您需要设置绑定属性,以便baristaApp
将热饮消息发送到hotDrinkDeliveryApp
destination 和 cold drink 消息发送到coldDrinkDeliveryApp
目的地。
以下列表这样做:
app.baristaApp.spring.cloud.stream.bindings.hotDrinks.destination=hotDrinksDest
app.baristaApp.spring.cloud.stream.bindings.coldDrinks.destination=coldDrinksDest
app.hotDrinkDeliveryApp.spring.cloud.stream.bindings.input.destination=hotDrinksDest
app.coldDrinkDeliveryApp.spring.cloud.stream.bindings.input.destination=coldDrinksDest
如果要使用消费者组,需要设置 Spring Cloud Stream 应用属性spring.cloud.stream.bindings.<channelName>.producer.requiredGroups
和spring.cloud.stream.bindings.<channelName>.group
,分别在生产者和消费者应用程序上。
Stream Application DSL 的另一个常见用例是部署一个 HTTP 网关应用程序,该应用程序向 Kafka 或 RabbitMQ 应用程序发送同步请求或回复消息。 在这种情况下,HTTP 网关应用程序和 Kafka 或 RabbitMQ 应用程序都可以是不使用 Spring Cloud Stream 库的 Spring Integration 应用程序。
也可以使用 Stream 应用程序 DSL 仅部署单个应用程序。
17.3. 应用程序属性
每个应用程序都采用属性来自定义其行为。例如,http
source 模块公开了一个port
允许将数据摄取端口从默认值更改的设置:
dataflow:> stream create --definition "http --port=8090 | log" --name myhttpstream
这port
属性实际上与标准 Spring Boot 相同server.port
财产。
数据流添加了使用速记表单的功能port
而不是server.port
.
您还可以指定手写版本:
dataflow:> stream create --definition "http --server.port=8000 | log" --name myhttpstream
流应用程序属性部分详细讨论了此速记行为。
如果已注册应用程序属性元数据,则可以在键入后在 shell 中使用制表符补全来获取候选属性名称的列表。--
shell 为应用程序属性提供制表符补全。这app info --name <appName> --type <appType>
shell 命令为所有受支持的属性提供了其他文档。
支持的流<appType> 可能性是:source ,processor 和sink . |
18. 流生命周期
流的生命周期经历以下阶段:
Skipper 是一个服务器,可让您发现 Spring Boot 应用程序并在多个云平台上管理它们的生命周期。
Skipper 中的应用程序捆绑为包含应用程序的资源位置、应用程序属性和部署属性的包。
您可以将 Skipper 包视为类似于工具中的包,例如apt-get
或brew
.
当数据流部署流时,它会生成一个包并将其上传到 Skipper,该包表示流中的应用程序。 用于升级或回滚流中应用程序的后续命令将传递给 Skipper。 此外,Stream 定义是从包中逆向工程的,Stream 的状态也委托给 Skipper。
18.1. 注册流应用程序
您可以使用app register
命令。必须提供唯一名称、应用程序类型和可解析为应用程序项目的 URI。
对于类型,请指定source
,processor
或sink
.版本从 URI 解析。这里有几个例子:
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.1
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.2
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.3
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│ source │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║ │> mysource-0.0.1 <│ │ │ ║
║ │mysource-0.0.2 │ │ │ ║
║ │mysource-0.0.3 │ │ │ ║
╚═══╧══════════════════╧═════════╧════╧════╝
dataflow:>app register --name myprocessor --type processor --uri file:///Users/example/myprocessor-1.2.3.jar
dataflow:>app register --name mysink --type sink --uri https://example.com/mysink-2.0.1.jar
应用程序 URI 应符合以下架构格式之一:
-
Maven 模式:
maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
-
HTTP 模式:
http://<web-path>/<artifactName>-<version>.jar
-
文件架构:
file:///<local-path>/<artifactName>-<version>.jar
-
Docker 模式:
docker:<docker-image-path>/<imageName>:<version>
The URI<version> 部分对于版本化流应用程序是强制性的。
Skipper 使用多版本化流应用程序,允许在运行时使用部署属性升级或回滚这些应用程序。 |
如果您想注册http
和log
使用 RabbitMQ 绑定器构建的应用程序,您可以执行以下作:
dataflow:>app register --name http --type source --uri maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT
如果您想一次注册多个应用程序,您可以将它们存储在属性文件中,
其中键的格式为<type>.<name>
值是 URI。
例如,要注册http
和log
使用 RabbitMQ 绑定器构建的应用程序,您可以在属性文件中包含以下内容(例如,stream-apps.properties
):
source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT
sink.log=maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT
然后,要批量导入应用程序,请使用app import
命令,并使用--uri
开关,如下所示:
dataflow:>app import --uri file:///<YOUR_FILE_LOCATION>/stream-apps.properties
使用--type app
与注册source
,processor
或sink
.
该类型的应用app
只能在流应用程序 DSL(使用双管道||
而不是单管|
)并指示 Data Flow 不要配置应用程序的 Spring Cloud Stream 绑定属性。
使用--type app
不必是 Spring Cloud Stream 应用程序。它可以是任何 Spring Boot 应用程序。
有关使用此应用程序类型的更多信息,请参阅流应用程序 DSL 简介。
您可以注册同一应用程序的多个版本(例如,相同的名称和类型),但只能将一个设置为默认版本。 默认版本用于部署 Streams。
首次注册应用程序时,它被标记为默认。默认应用程序版本可以使用app default
命令:
dataflow:>app default --id source:mysource --version 0.0.2
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│ source │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║ │mysource-0.0.1 │ │ │ ║
║ │> mysource-0.0.2 <│ │ │ ║
║ │mysource-0.0.3 │ │ │ ║
╚═══╧══════════════════╧═════════╧════╧════╝
这app list --id <type:name>
命令列出给定流应用程序的所有版本。
这app unregister
命令有一个可选的--version
参数来指定要注销的应用程序版本:
dataflow:>app unregister --name mysource --type source --version 0.0.1
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│ source │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║ │> mysource-0.0.2 <│ │ │ ║
║ │mysource-0.0.3 │ │ │ ║
╚═══╧══════════════════╧═════════╧════╧════╝
如果--version
未指定,则默认版本为未注册。
流中的所有应用程序都应为要部署的流设置默认版本。
否则,它们在部署期间将被视为未注册的应用程序。
使用 |
app default --id source:mysource --version 0.0.3
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│ source │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║ │mysource-0.0.2 │ │ │ ║
║ │> mysource-0.0.3 <│ │ │ ║
╚═══╧══════════════════╧═════════╧════╧════╝
这stream deploy
需要设置默认应用程序版本。
这stream update
和stream rollback
但是,命令可以使用所有(默认和非默认)已注册的应用程序版本。
以下命令创建一个使用默认 mysource 版本 (0.0.3) 的流:
dataflow:>stream create foo --definition "mysource | log"
然后我们可以将版本更新到 0.0.2:
dataflow:>stream update foo --properties version.mysource=0.0.2
只有预先注册的应用程序才能用于deploy ,update 或rollback 一个流。 |
尝试更新mysource
到版本0.0.1
(未注册)失败。
18.1.1. 注册支持的应用程序和任务
为方便起见,我们提供了带有应用程序 URI(适用于 Maven 和 Docker)的静态文件 适用于所有现成的流和任务或批处理应用Starters。您可以指向此文件并导入 批量的所有应用程序 URI。否则,如前所述,您可以单独注册它们或拥有自己的 自定义属性文件,其中仅包含所需的应用程序 URI。但是,我们建议有一个“专注” 自定义属性文件中所需应用程序 URI 的列表。
Spring Cloud Stream 应用入门
下表包括dataflow.spring.io
指向基于 Spring Cloud Stream 2.1.x 的可用 Stream Application Starters 的链接
和 Spring Boot 2.1.x:
工件类型 | 稳定版本 | SNAPSHOT 发布 |
---|---|---|
RabbitMQ + Maven |
dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-rabbit-maven |
|
RabbitMQ + Docker |
dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-rabbit-docker |
|
Apache Kafka + Maven |
dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-kafka-maven |
|
Apache Kafka + Docker |
dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-kafka-docker |
默认情况下,App Starter 执行器端点是安全的。您可以通过使用app.*.spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration 财产。
在 Kubernetes 上,请参阅活跃度和就绪情况探测部分,了解如何配置
执行器端点的安全性。 |
从 Spring Cloud Stream 2.1 GA 版本开始,我们现在与 Spring Cloud Function 具有强大的互作性 编程模型。在此基础上,使用 Einstein 发布列车,现在可以选择一些 Stream 应用程序 Starters,并使用函数式编程模型将它们组合到单个应用程序中。查看“组合函数支持 Spring Cloud Data Flow“博客,通过示例了解有关开发人员和编排体验的更多信息。 |
Spring Cloud Task 应用程序入门
下表包括基于 Spring Cloud Task 2.1.x 和 Spring Boot 2.1.x 的可用 Task Application Starters:
工件类型 | 稳定版本 | SNAPSHOT 发布 |
---|---|---|
专家 |
dataflow.spring.io/Elston-BUILD-SNAPSHOT-task-applications-maven |
|
Jetty工人 |
dataflow.spring.io/Elston-BUILD-SNAPSHOT-task-applications-docker |
您可以在任务应用Starters项目页面中找到有关可用任务Starters的详细信息,并且 相关参考文档。有关可用流Starters的详细信息,请参阅 Stream App Starters 项目页面和相关参考文档。
例如,如果您想批量注册使用 Kafka 绑定器构建的所有开箱即用的流应用程序,您可以使用以下命令:
$ dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest
或者,您可以使用 Rabbit 绑定器注册所有流应用程序,如下所示:
$ dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
您还可以将--local
选项(即true
默认情况下),以指示
属性文件位置应在 shell 进程本身中解析。如果位置应
从数据流服务器进程中解析,请指定--local false
.
当您使用任一 但请注意,下载后,应用程序可能会根据资源在本地缓存在数据流服务器上
位置。如果资源位置没有改变(即使实际资源字节可能不同),它
不会重新下载。使用时 此外,如果流已经部署并使用已注册应用程序的某个版本,则(强制)重新注册 在再次部署流之前,其他应用程序不起作用。 |
在某些情况下,资源在服务器端解析。在其他情况下, URI 被传递到运行时容器实例,并在其中进行解析。看 有关更多详细信息,请参阅每个数据流服务器的特定文档。 |
18.1.2. 创建自定义应用程序
虽然数据流包括源、处理器、接收器应用程序,但您可以扩展这些应用程序或编写自定义 Spring Cloud Stream 应用程序。
Spring Cloud Stream 文档中详细介绍了使用 Spring Initializr 创建 Spring Cloud Stream 应用程序的过程。 您可以将多个活页夹包含在一个应用程序中。 如果这样做,请参阅 [passing_producer_consumer_properties] 中的说明,了解如何配置它们。
为了支持允许属性,在 Spring Cloud Data Flow 中运行的 Spring Cloud Stream 应用程序可以包括 Spring Bootconfiguration-processor
作为可选依赖项,如以下示例所示:
<dependencies>
<!-- other dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
注意:确保spring-boot-maven-plugin
包含在 POM 中。
该插件是创建向 Spring Cloud Data Flow 注册的可执行 jar 所必需的。
Spring Initialzr 在生成的 POM 中包含该插件。
创建自定义应用程序后,可以注册它,如注册流应用程序中所述。
18.2. 创建流
Spring Cloud Data Flow Server 公开了一个完整的 RESTful API 来管理流定义的生命周期,但最简单的使用方法是通过 Spring Cloud Data Flow shell。“入门”部分介绍了如何启动 shell。
在流定义的帮助下创建新流。这些定义是从简单的 DSL 构建的。例如,考虑如果我们运行以下 shell 命令会发生什么:
dataflow:> stream create --definition "time | log" --name ticktock
这定义了一个名为ticktock
基于 DSL 表达式time | log
.DSL 使用“管道”符号 (|
),将源连接到接收器。
这stream info
命令显示有关流的有用信息,如以下示例所示(及其输出):
dataflow:>stream info ticktock
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│ Status ║
╠═══════════╪═════════════════╪══════════╣
║ticktock │time | log │undeployed║
╚═══════════╧═════════════════╧══════════╝
18.2.1. 流应用程序属性
应用程序属性是与流中的每个应用程序关联的属性。部署应用程序时,应用程序属性将通过 命令行参数或环境变量,具体取决于底层部署实现。
以可以在创建流时定义应用程序属性:
dataflow:> stream create --definition "time | log" --name ticktock
这app info --name <appName> --type <appType>
shell 命令显示应用程序公开的应用程序属性。
有关公开属性的详细信息,请参阅应用程序元数据。
以下列表显示了time
应用:
dataflow:> app info --name time --type source
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║ Option Name │ Description │ Default │ Type ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║trigger.time-unit │The TimeUnit to apply to delay│<none> │java.util.concurrent.TimeUnit ║
║ │values. │ │ ║
║trigger.fixed-delay │Fixed delay for periodic │1 │java.lang.Integer ║
║ │triggers. │ │ ║
║trigger.cron │Cron expression value for the │<none> │java.lang.String ║
║ │Cron Trigger. │ │ ║
║trigger.initial-delay │Initial delay for periodic │0 │java.lang.Integer ║
║ │triggers. │ │ ║
║trigger.max-messages │Maximum messages per poll, -1 │1 │java.lang.Long ║
║ │means infinity. │ │ ║
║trigger.date-format │Format for the date value. │<none> │java.lang.String ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝
以下列表显示了log
应用:
dataflow:> app info --name log --type sink
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║ Option Name │ Description │ Default │ Type ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║log.name │The name of the logger to use.│<none> │java.lang.String ║
║log.level │The level at which to log │<none> │org.springframework.integratio║
║ │messages. │ │n.handler.LoggingHandler$Level║
║log.expression │A SpEL expression (against the│payload │java.lang.String ║
║ │incoming message) to evaluate │ │ ║
║ │as the logged message. │ │ ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝
您可以为time
和log
应用程序时的应用程序stream
创建,如下所示:
dataflow:> stream create --definition "time --fixed-delay=5 | log --level=WARN" --name ticktock
请注意,在前面的示例中,fixed-delay
和level
为time
和log
应用程序是 shell 补全提供的“简短格式”属性名称。
这些“简短格式”属性名称仅适用于公开的属性。在所有其他情况下,应仅使用完全限定的属性名称。
18.2.2. 常见应用程序属性
除了通过 DSL 进行配置外,Spring Cloud Data Flow 还提供了一种将公共属性设置为所有
它启动的流式处理应用程序。
这可以通过添加以spring.cloud.dataflow.applicationProperties.stream
启动时
服务器。
这样做时,服务器会将所有属性(不带前缀)传递给它启动的实例。
例如,所有已启动的应用程序都可以通过启动 数据流服务器,具有以下选项:
--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=192.168.1.100:9092
--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=192.168.1.100:2181
这样做会导致spring.cloud.stream.kafka.binder.brokers
和spring.cloud.stream.kafka.binder.zkNodes
性能
传递给所有已启动的应用程序。
使用此机制配置的属性的优先级低于流部署属性。
如果在流部署时指定了具有相同键的属性(例如,app.http.spring.cloud.stream.kafka.binder.brokers 覆盖公共属性)。 |
18.3. 部署流
本节介绍当 Spring Cloud Data Flow 服务器负责部署流时,如何部署流。它涵盖了使用 Skipper 服务部署和升级 Streams。如何设置部署属性的说明适用于流部署的两种方法。
考虑一下ticktock
流定义:
dataflow:> stream create --definition "time | log" --name ticktock
若要部署流,请使用以下 shell 命令:
dataflow:> stream deploy --name ticktock
数据流服务器将time
和log
应用。
这stream info
命令显示有关流的有用信息,包括部署属性:
dataflow:>stream info --name ticktock
╔═══════════╤═════════════════╤═════════╗
║Stream Name│Stream Definition│ Status ║
╠═══════════╪═════════════════╪═════════╣
║ticktock │time | log │deploying║
╚═══════════╧═════════════════╧═════════╝
Stream Deployment properties: {
"log" : {
"resource" : "maven://org.springframework.cloud.stream.app:log-sink-rabbit",
"spring.cloud.deployer.group" : "ticktock",
"version" : "2.0.1.RELEASE"
},
"time" : {
"resource" : "maven://org.springframework.cloud.stream.app:time-source-rabbit",
"spring.cloud.deployer.group" : "ticktock",
"version" : "2.0.1.RELEASE"
}
}
有一个重要的可选命令参数(称为--platformName
) 设置为stream deploy
命令。
Skipper 可以配置为部署到多个平台。
Skipper 预配置了一个名为default
,它将应用程序部署到运行 Skipper 的本地计算机。
默认值--platformName
命令行参数为default
.
如果您通常部署到一个平台,则在安装 Skipper 时,您可以覆盖default
平台。
否则,请指定platformName
设置为stream platform-list
命令。
在前面的示例中,时间源每秒将当前时间作为消息发送一次,日志接收器使用日志框架输出。
您可以跟踪stdout
log(其中有一个<instance>
后缀)。日志文件位于数据流服务器的日志输出中显示的目录中,如以下列表所示:
$ tail -f /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481708/ticktock.log/stdout_0.log
2016-06-01 09:45:11.250 INFO 79194 --- [ kafka-binder-] log.sink : 06/01/16 09:45:11
2016-06-01 09:45:12.250 INFO 79194 --- [ kafka-binder-] log.sink : 06/01/16 09:45:12
2016-06-01 09:45:13.251 INFO 79194 --- [ kafka-binder-] log.sink : 06/01/16 09:45:13
您还可以通过将--deploy
标志,如下所示:
dataflow:> stream create --definition "time | log" --name ticktock --deploy
但是,在实际用例中,一步创建和部署流并不常见。
原因是当您使用stream deploy
命令,您可以传入定义如何将应用程序映射到平台的属性(例如,要使用的容器的内存大小、要运行的每个应用程序的数量以及是否启用数据分区功能)。
属性还可以替代创建流时设置的应用程序属性。
接下来的部分将详细介绍此功能。
18.3.1. 部署属性
部署流时,可以指定可以控制应用程序的部署和配置方式的属性。有关详细信息,请参阅微型站点的“部署属性”部分。
18.4. 销毁流
您可以通过发出stream destroy
命令,如下所示:
dataflow:> stream destroy --name ticktock
如果流已部署,则在删除流定义之前取消部署该流。
18.5. 取消部署流
通常,您希望停止流,但保留名称和定义以供将来使用。在这种情况下,您可以undeploy
按名称排列的流:
dataflow:> stream undeploy --name ticktock
dataflow:> stream deploy --name ticktock
您可以向deploy
命令以重新启动它:
dataflow:> stream deploy --name ticktock
18.6. 验证流
有时,流定义中包含的应用程序在其注册中包含无效的 URI。
这可能是由于在应用程序注册时输入了无效的 URI,或者从要从中提取它的存储库中删除了应用程序。
要验证流中包含的所有应用程序是否可解析,用户可以使用validate
命令:
dataflow:>stream validate ticktock
╔═══════════╤═════════════════╗
║Stream Name│Stream Definition║
╠═══════════╪═════════════════╣
║ticktock │time | log ║
╚═══════════╧═════════════════╝
ticktock is a valid stream.
╔═══════════╤═════════════════╗
║ App Name │Validation Status║
╠═══════════╪═════════════════╣
║source:time│valid ║
║sink:log │valid ║
╚═══════════╧═════════════════╝
在前面的示例中,用户验证了他们的滴答流。两个source:time
和sink:log
有效。
现在我们可以看到,如果我们的流定义具有无效的注册应用程序会发生什么:
dataflow:>stream validate bad-ticktock
╔════════════╤═════════════════╗
║Stream Name │Stream Definition║
╠════════════╪═════════════════╣
║bad-ticktock│bad-time | log ║
╚════════════╧═════════════════╝
bad-ticktock is an invalid stream.
╔═══════════════╤═════════════════╗
║ App Name │Validation Status║
╠═══════════════╪═════════════════╣
║source:bad-time│invalid ║
║sink:log │valid ║
╚═══════════════╧═════════════════╝
在这种情况下,Spring Cloud Data Flow 指出流无效,因为source:bad-time
具有无效的 URI。
18.7. 更新流
要更新流,请使用stream update
命令,它采用--properties
或--propertiesFile
作为命令参数。
Skipper 有一个重要的新顶级前缀:version
.
以下命令部署http | log
stream(以及log
部署时注册的1.1.0.RELEASE
):
dataflow:> stream create --name httptest --definition "http --server.port=9000 | log"
dataflow:> stream deploy --name httptest
dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║ Name │ DSL │ Status ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest │http --server.port=9000 | log │deploying ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝
Stream Deployment properties: {
"log" : {
"spring.cloud.deployer.indexed" : "true",
"spring.cloud.deployer.group" : "httptest",
"maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "1.1.0.RELEASE"
},
"http" : {
"spring.cloud.deployer.group" : "httptest",
"maven://org.springframework.cloud.stream.app:http-source-rabbit" : "1.1.0.RELEASE"
}
}
然后,以下命令更新流以使用1.2.0.RELEASE
日志应用程序的版本。
在使用应用程序的特定版本更新流之前,我们需要确保应用程序已注册到该版本:
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.0.RELEASE
Successfully registered application 'sink:log'
然后我们可以更新应用程序:
dataflow:>stream update --name httptest --properties version.log=1.2.0.RELEASE
您只能使用预注册的应用程序版本来deploy ,update 或rollback 一条溪流。 |
为了验证部署属性和更新版本,我们可以使用stream info
,如以下示例所示(及其输出):
dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║ Name │ DSL │ Status ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest │http --server.port=9000 | log │deploying ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝
Stream Deployment properties: {
"log" : {
"spring.cloud.deployer.indexed" : "true",
"spring.cloud.deployer.count" : "1",
"spring.cloud.deployer.group" : "httptest",
"maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "1.2.0.RELEASE"
},
"http" : {
"spring.cloud.deployer.group" : "httptest",
"maven://org.springframework.cloud.stream.app:http-source-rabbit" : "1.1.0.RELEASE"
}
}
18.8. 强制更新流
升级流时,您可以使用--force
选项来部署当前已部署应用程序的新实例,即使没有更改应用程序或部署属性。
当应用程序本身在启动时获取配置信息时(例如,从 Spring Cloud Config Server 获取配置信息时,需要此行为。
您可以使用--app-names
选择。
如果未指定任何应用程序名称,则强制升级所有应用程序。
您可以指定--force
和--app-names
选项以及--properties
或--propertiesFile
选项。
18.9. 流版本
Skipper 保留已部署流的历史记录。
更新流后,流会有第二个版本。
您可以使用stream history --name <name-of-stream>
命令:
dataflow:>stream history --name httptest
╔═══════╤════════════════════════════╤════════╤════════════╤═══════════════╤════════════════╗
║Version│ Last updated │ Status │Package Name│Package Version│ Description ║
╠═══════╪════════════════════════════╪════════╪════════════╪═══════════════╪════════════════╣
║2 │Mon Nov 27 22:41:16 EST 2017│DEPLOYED│httptest │1.0.0 │Upgrade complete║
║1 │Mon Nov 27 22:40:41 EST 2017│DELETED │httptest │1.0.0 │Delete complete ║
╚═══════╧════════════════════════════╧════════╧════════════╧═══════════════╧════════════════╝
18.10. 流清单
在替换所有值后,Skipper 会保留所有应用程序、其应用程序属性及其部署属性的“清单”。 这表示部署到平台的内容的最终状态。 可以使用以下命令查看任何版本的 Stream 的清单:
stream manifest --name <name-of-stream> --releaseVersion <optional-version>
如果--releaseVersion
未指定,则返回最后一个版本的清单。
以下示例演示了清单的用法:
dataflow:>stream manifest --name httptest
使用该命令将产生以下输出:
# Source: log.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
name: log
spec:
resource: maven://org.springframework.cloud.stream.app:log-sink-rabbit
version: 1.2.0.RELEASE
applicationProperties:
spring.cloud.dataflow.stream.app.label: log
spring.cloud.stream.bindings.input.group: httptest
spring.cloud.dataflow.stream.name: httptest
spring.cloud.dataflow.stream.app.type: sink
spring.cloud.stream.bindings.input.destination: httptest.http
deploymentProperties:
spring.cloud.deployer.indexed: true
spring.cloud.deployer.group: httptest
spring.cloud.deployer.count: 1
---
# Source: http.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
name: http
spec:
resource: maven://org.springframework.cloud.stream.app:http-source-rabbit
version: 1.2.0.RELEASE
applicationProperties:
spring.cloud.dataflow.stream.app.label: http
spring.cloud.stream.bindings.output.producer.requiredGroups: httptest
server.port: 9000
spring.cloud.stream.bindings.output.destination: httptest.http
spring.cloud.dataflow.stream.name: httptest
spring.cloud.dataflow.stream.app.type: source
deploymentProperties:
spring.cloud.deployer.group: httptest
大多数部署和应用程序属性都是由数据流设置的,以使应用程序能够相互通信并发送带有标识标签的应用程序指标。
18.11. 回滚流
您可以使用stream rollback
命令:
dataflow:>stream rollback --name httptest
可选的--releaseVersion
command 参数添加流的版本。
如果未指定,则回滚作将转到以前的流版本。
18.12. 应用程序计数
应用程序计数是系统的动态属性,用于指定应用程序的实例数。有关更多信息,请参阅微型网站的“应用程序计数”部分。
18.13. 船长的升级策略
Skipper 有一个简单的“红/黑”升级策略。它使用与当前正在运行的版本一样多的实例来部署新版本的应用程序,并检查/health
应用程序的端点。
如果新应用程序的运行状况良好,则取消部署以前的应用程序。
如果新应用程序的运行状况不佳,则取消部署所有新应用程序,并认为升级不成功。
升级策略不是滚动升级,因此,如果应用程序的五个实例正在运行,那么在晴天的情况下,在取消部署旧版本之前,五个新应用程序也在运行。
19. 流式 DSL
本部分介绍 Stream DSL 简介中未涵盖的 Stream DSL 的其他功能。
19.1. 点击流
可以在流中的各个生产者端点创建分流器。有关更多信息,请参阅微型站点的 Tapping a Stream 部分。
19.2. 在流中使用标签
当流由多个具有相同名称的应用程序组成时,必须使用标签对它们进行限定。 有关更多信息,请参阅微型网站的“标签应用”部分。
19.3. 命名目标
您可以使用命名目标,而不是引用源或接收器应用程序。 有关更多信息,请参阅微型站点的命名目标部分。
19.4. 扇入和扇出
通过使用命名目标,您可以支持扇入和扇出用例。 有关更多信息,请参阅微型网站的扇入和扇出部分。
20. 流式传输 Java DSL
您可以使用spring-cloud-dataflow-rest-client
模块。
有关更多信息,请参阅微型网站的 Java DSL 部分。
21. 使用多个活页夹配置流式传输应用程序
在某些情况下,当需要连接到不同的消息传递时,流可以将其应用程序绑定到多个 spring cloud 流绑定器 中间件配置。在这些情况下,应确保使用其活页夹正确配置应用程序 配置。例如,同时支持 Kafka 和 Rabbit 绑定器的多绑定转换器是以中的处理器:
http | multibindertransform --expression=payload.toUpperCase() | log
在前面的示例中,您将编写自己的multibindertransform 应用。 |
在此流中,每个应用程序都通过以下方式连接到消息传递中间件:
-
HTTP 源向 RabbitMQ (
rabbit1
). -
Multi-Binder Transform 处理器从 RabbitMQ (
rabbit1
) 并将处理后的事件发送到 Kafka (kafka1
). -
日志接收器从 Kafka (
kafka1
).
这里rabbit1
和kafka1
是 Spring Cloud Stream 应用程序属性中给出的绑定器名称。
基于此设置,应用程序在其类路径中具有以下具有适当配置的绑定器:
-
HTTP:兔活页夹
-
转换:Kafka 和 Rabbit 绑定器
-
日志:Kafka 绑定器
这spring-cloud-stream
binder
配置属性可以在应用程序本身中设置。
如果没有,它们可以通过deployment
部署流时的属性:
dataflow:>stream create --definition "http | multibindertransform --expression=payload.toUpperCase() | log" --name mystream
dataflow:>stream deploy mystream --properties "app.http.spring.cloud.stream.bindings.output.binder=rabbit1,app.multibindertransform.spring.cloud.stream.bindings.input.binder=rabbit1,
app.multibindertransform.spring.cloud.stream.bindings.output.binder=kafka1,app.log.spring.cloud.stream.bindings.input.binder=kafka1"
您可以通过部署属性指定任何 Binder 配置属性来覆盖它们。
22. 功能组成
函数组合允许您将函数逻辑动态附加到现有事件流应用程序。有关更多详细信息,请参阅微型网站的函数组成部分。
23. 功能应用
24. 示例
本章包括以下示例:
您可以在“示例”一章中找到更多示例的链接。
24.1. 简单的流处理
作为简单处理步骤的示例,我们可以使用以定义将 HTTP 发布数据的有效负载转换为大写:
http | transform --expression=payload.toUpperCase() | log
要创建此流,请在 shell 中输入以下命令:
dataflow:> stream create --definition "http --server.port=9000 | transform --expression=payload.toUpperCase() | log" --name mystream --deploy
以下示例使用 shell 命令发布一些数据:
dataflow:> http post --target http://localhost:9000 --data "hello"
前面的示例导致大写HELLO
在日志中,如下所示:
2016-06-01 09:54:37.749 INFO 80083 --- [ kafka-binder-] log.sink : HELLO
24.2. 有状态流处理
为了演示数据分区功能,以下列表部署了一个以 Kafka 作为绑定器的流:
dataflow:>stream create --name words --definition "http --server.port=9900 | splitter --expression=payload.split(' ') | log"
Created new stream 'words'
dataflow:>stream deploy words --properties "app.splitter.producer.partitionKeyExpression=payload,deployer.log.count=2"
Deployed stream 'words'
dataflow:>http post --target http://localhost:9900 --data "How much wood would a woodchuck chuck if a woodchuck could chuck wood"
> POST (text/plain;Charset=UTF-8) http://localhost:9900 How much wood would a woodchuck chuck if a woodchuck could chuck wood
> 202 ACCEPTED
dataflow:>runtime apps
╔════════════════════╤═══════════╤═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║App Id / Instance Id│Unit Status│ No. of Instances / Attributes ║
╠════════════════════╪═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║words.log-v1 │ deployed │ 2 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 24166 ║
║ │ │ pid = 33097 ║
║ │ │ port = 24166 ║
║words.log-v1-0 │ deployed │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_0.log ║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_0.log ║
║ │ │ url = https://192.168.0.102:24166 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 41269 ║
║ │ │ pid = 33098 ║
║ │ │ port = 41269 ║
║words.log-v1-1 │ deployed │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_1.log ║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_1.log ║
║ │ │ url = https://192.168.0.102:41269 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1 ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.http-v1 │ deployed │ 1 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 9900 ║
║ │ │ pid = 33094 ║
║ │ │ port = 9900 ║
║words.http-v1-0 │ deployed │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stderr_0.log ║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stdout_0.log ║
║ │ │ url = https://192.168.0.102:9900 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1 ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.splitter-v1 │ deployed │ 1 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 33963 ║
║ │ │ pid = 33093 ║
║ │ │ port = 33963 ║
║words.splitter-v1-0 │ deployed │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stderr_0.log║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stdout_0.log║
║ │ │ url = https://192.168.0.102:33963 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1 ║
╚════════════════════╧═══════════╧═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
当您查看words.log-v1-0
日志,您应该会看到以下内容:
2016-06-05 18:35:47.047 INFO 58638 --- [ kafka-binder-] log.sink : How
2016-06-05 18:35:47.066 INFO 58638 --- [ kafka-binder-] log.sink : chuck
2016-06-05 18:35:47.066 INFO 58638 --- [ kafka-binder-] log.sink : chuck
当您查看words.log-v1-1
日志,您应该会看到以下内容:
2016-06-05 18:35:47.047 INFO 58639 --- [ kafka-binder-] log.sink : much
2016-06-05 18:35:47.066 INFO 58639 --- [ kafka-binder-] log.sink : wood
2016-06-05 18:35:47.066 INFO 58639 --- [ kafka-binder-] log.sink : would
2016-06-05 18:35:47.066 INFO 58639 --- [ kafka-binder-] log.sink : a
2016-06-05 18:35:47.066 INFO 58639 --- [ kafka-binder-] log.sink : woodchuck
2016-06-05 18:35:47.067 INFO 58639 --- [ kafka-binder-] log.sink : if
2016-06-05 18:35:47.067 INFO 58639 --- [ kafka-binder-] log.sink : a
2016-06-05 18:35:47.067 INFO 58639 --- [ kafka-binder-] log.sink : woodchuck
2016-06-05 18:35:47.067 INFO 58639 --- [ kafka-binder-] log.sink : could
2016-06-05 18:35:47.067 INFO 58639 --- [ kafka-binder-] log.sink : wood
此示例表明,包含相同单词的有效负载拆分将路由到同一应用程序实例。
24.3. 其他源和接收器应用程序类型
这个例子展示了一些更复杂的东西:将time
其他东西的来源。另一种支持的源类型是http
,它接受通过 HTTP POST 请求摄取的数据。请注意,http
source 接受与数据流服务器不同的端口上的数据(默认为 8080)。默认情况下,端口是随机分配的。
要创建使用http
source 但仍然使用相同的log
sink,我们将简单流处理示例中的原始命令更改为以下内容:
dataflow:> stream create --definition "http | log" --name myhttpstream --deploy
请注意,这一次,在我们实际发布一些数据(通过使用 shell 命令)之前,我们看不到任何其他输出。要查看随机分配的端口,其中http
source 正在监听,则运行以下命令:
dataflow:>runtime apps
╔══════════════════════╤═══════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ App Id / Instance Id │Unit Status│ No. of Instances / Attributes ║
╠══════════════════════╪═══════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║myhttpstream.log-v1 │ deploying │ 1 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 39628 ║
║ │ │ pid = 34403 ║
║ │ │ port = 39628 ║
║myhttpstream.log-v1-0 │ deploying │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stderr_0.log ║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stdout_0.log ║
║ │ │ url = https://192.168.0.102:39628 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1 ║
╟──────────────────────┼───────────┼─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║myhttpstream.http-v1 │ deploying │ 1 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 52143 ║
║ │ │ pid = 34401 ║
║ │ │ port = 52143 ║
║myhttpstream.http-v1-0│ deploying │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stderr_0.log║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stdout_0.log║
║ │ │ url = https://192.168.0.102:52143 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1 ║
╚══════════════════════╧═══════════╧═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
您应该看到相应的http
source 有一个url
包含它正在侦听的主机和端口信息的属性。现在,您已准备好发布到该 URL,如以下示例所示:
dataflow:> http post --target http://localhost:1234 --data "hello"
dataflow:> http post --target http://localhost:1234 --data "goodbye"
然后,流从http
source 到由log
sink,产生类似于以下内容的输出:
2016-06-01 09:50:22.121 INFO 79654 --- [ kafka-binder-] log.sink : hello
2016-06-01 09:50:26.810 INFO 79654 --- [ kafka-binder-] log.sink : goodbye
我们还可以更改接收器的实现。您可以将输出通过管道传输到文件 (file
),到 Hadoop (hdfs
),或任何其他可用的接收器应用程序。您还可以定义自己的应用程序。