|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
编程模型
使用 Kafka Streams 绑定器提供的编程模型时,可以使用高级 Streams DSL 以及高层和低层 Processor-API 的混合选项。
当混合高层和低层 API 时,通常通过调用来实现变换或过程API 方法在KStream.
功能风格
从春云溪开始3.0.0Kafka Streams 绑定器允许使用 Java 8 中可用的函数式编程风格设计和开发应用程序。
这意味着应用程序可以简明地表示为类型的λ表达式java.util.function.函数或java.util.function.Consumer.
我们来举一个非常基础的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
虽然简单,但它是一个完整的独立 Spring Boot 应用程序,利用 Kafka Streams 进行流处理。
这是一个消费者应用,没有出站绑定,只有一个入站绑定。
应用程序消耗数据,并简单地记录来自KStream标准输出的键和值。
该应用程序包含SpringBootApplication注释和标记为豆.
豆子法属于java.util.function.Consumer参数化为KStream.
然后在实现中,我们返回一个本质上是λ表达式的消费者对象。
在λ表达式中,提供了处理数据的代码。
在该应用中,有一个类型的输入绑定KStream.
该绑定器为应用程序创建了带有名称的绑定process-in-0,即函数豆名后跟一个破折号字符()和字面-在再接一个破折号,最后是参数的序数位置。
你可以用这个绑定名来设置其他属性,比如目的地。
例如Spring.cloud.stream.bindings.process-in-0.destination=my-topic.
| 如果目标属性未在绑定上设置,则创建与绑定同名的主题(如果应用权限足够),或者该主题应已可用。 |
一旦被构建成超级罐子(例如,kstream-consumer-app.jar),你可以按照以下方式运行上述示例。
如果应用程序选择使用 Spring 来定义功能豆子元件注释,Binder 也支持该模型。
上述功能豆可以重写如下。
@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {
@Override
public void accept(KStream<Object, String> input) {
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
这里还有另一个例子,它是一个包含输入和输出绑定的完整处理器。 这是经典的字数示例,应用程序接收主题数据后,在一个滚动时间窗口内计算每个词的出现次数。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
这里同样是一个完整的 Spring Boot 应用。这里与第一个应用的不同之处在于豆子法属于java.util.function.函数.
第一个参数化类型功能是输入KStream第二个是输出。
在方法本体中,提供了一个类型为 的 λ 表达式功能作为实现,实际的业务逻辑也被给出。
类似于之前讨论的基于消费者的应用,这里的输入绑定命名为process-in-0默认。对于输出,绑定名也会自动设置为process-out-0.
一旦被构建成超级罐子(例如,wordcount-processor.jar),你可以按照以下方式运行上述示例。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
该应用程序将接收来自卡夫卡主题的消息的话计算结果会被发布到输出中
主题计数.
Spring Cloud Stream 将确保来自进出主题的消息自动绑定为 KStream对象。作为开发者,你可以专注于代码的业务层面,也就是编写逻辑 处理器中必须如此。设置Kafka Streams基础设施所需的特定配置 由框架自动处理。
我们上面看到的两个例子是单一的KStream输入绑定。在这两种情况下,装订者都接收了单一主题的记录。
如果你想把多个主题合并成一个KStream有约束力的话题,你可以在下方提供逗号分隔的卡夫卡主题作为目的地。
Spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果你想将主题与常规内容匹配,也可以提供主题模式作为目的地。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多输入绑定
许多非平凡的 Kafka Streams 应用程序经常通过多个绑定消耗多个主题的数据。
例如,一个主题被消费为克斯特里姆还有一个为KTable(英国可爱的)音乐或全球可爱.
应用程序希望以表类型接收数据的原因有很多。
想象一个用例:底层主题通过数据库中的变更数据捕获(CDC)机制填充,或者应用程序只关心最新更新以便下游处理。
如果应用程序指定数据需要绑定为KTable(英国可爱的)音乐或全球可爱那么Kafka Streams的绑定器就能正确绑定目标到一个KTable(英国可爱的)音乐或全球可爱并让这些信息可供应用程序运行。
我们将探讨 Kafka Streams 绑定器中多输入绑定的几种不同场景。
BiFunction in Kafka Streams Binder
这里有一个例子,我们有两个输入和一个输出。在这种情况下,应用可以利用java.util.function.BiFunction.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
这里的基本主题与前述例子相同,但这里有两个输入。
爪哇的双功能支持用于将输入绑定到目标。
绑定器为输入生成的默认绑定名称为process-in-0和process-in-1分别。默认输出绑定为process-out-0.
在这个例子中,第一个参数双功能被束缚为KStream对于第一个输入,第二个参数被绑定为KTable(英国可爱的)音乐对于第二个输入。
BiConsumer in Kafka Streams Binder
如果有两个输入但没有输出,那么我们可以使用java.util.function.BiConsumer如下所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入
如果你有超过两个输入怎么办? 有些情况下你需要超过两个输入。在这种情况下,绑定器允许你串联部分函数。 在函数式编程术语中,这种技术通常被称为currying。 随着Java 8新增函数式编程支持,Java现在可以编写curri函数。 Spring Cloud Stream Kafka Streams 绑定器可以利用此功能实现多输入绑定。
让我们举个例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
让我们来看看上面介绍的绑定模型的细节。
在这个模型中,我们有3个部分应用的函数在入站线上。我们称它们为f(x),f(y)和f(z).
如果我们将这些函数展开为真正的数学函数,它会呈现如下:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>.
这x变量代表KStream<Long,Order>这y变量代表GlobalKTable<Long,客户>以及z变量代表GlobalKTable<Long,产品部>.
第一个功能f(x)具有应用的第一个输入绑定(KStream<Long,Order>)其输出为函数f(y)。
函数f(y)具有应用的第二个输入绑定(GlobalKTable<Long,客户>)而其输出又是另一个函数,f(z).
函数的输入f(z)是应用的第三个输入(GlobalKTable<Long,产品部>)其输出为Kstream<Long,EnrichedOrder>这是应用程序的最终输出绑定。
来自三个偏函数的输入,分别是KStream,全球可爱,全球可爱这些内容分别在实现业务逻辑作为 lambda 表达式一部分的方法体中可供你使用。
输入绑定被命名为enrichOrder-in-0,enrichOrder-in-1和enrichOrder-in-2分别。输出绑定命名为enrichOrder-out-0.
有了库里函数,你几乎可以有任意数量的输入。不过要记住,超过较少的输入和部分应用函数,就像上面 Java 里那样,可能会导致代码无法读取。 因此,如果你的 Kafka Streams 应用需要的输入绑定数量超过相对较少,并且你想使用这个功能模型,那么你可能需要重新考虑设计,并适当地分解应用程序。
输出绑定
Kafka Streams 活页夹支持以下两种类型KStream或KTable(英国可爱的)音乐作为输出绑定。
在幕后,活页夹使用了自方法KStream将结果记录发送到输出主题。
如果申请提供KTable(英国可爱的)音乐作为函数的输出,绑定器仍然通过委派给自方法KStream.
例如,下面的两个功能都适用:
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
多输出绑定
Kafka Streams 允许将出站数据写入多个主题。这一特征被称为卡夫卡溪流中的分支。
使用多个输出绑定时,你需要提供 KStream 数组 (KStream[])作为出站返回类型。
这里有一个例子:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> {
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.split()
.branch(isEnglish)
.branch(isFrench)
.branch(isSpanish)
.noDefaultBranch();
return stringKStreamMap.values().toArray(new KStream[0]);
};
}
编程模型保持不变,但出站参数类型为KStream[].
默认的输出绑定名称为process-out-0,流程出局1,流程出局二分别对上述函数。
结合器产生三个输出绑定的原因是它检测返回的长度KStream阵列为三。
注意在这个例子中,我们提供了noDefaultBranch();如果我们使用defaultBranch()相反,这需要额外的输出绑定,本质上返回一个KStream长度为四的数组。
Kafka 流函数式编程风格总结
总之,下表展示了功能范式中可用的各种选项。
| 输入数量 | 输出数量 | 组件 |
|---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1......n |
java.util.function.函数 |
2 |
1......n |
java.util.function.BiFunction |
>= 3 |
0......n |
使用curried函数 |
-
当表中输出多于一个时,类型简单变为
KStream[].
Function composition in Kafka Streams binder
Kafka Streams 绑针器支持线性拓扑的最小函数复合形式。
利用 Java 函数式 API 支持,你可以编写多个函数,然后自己用然后方法。
例如,假设你有以下两个函数。
@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
return input -> input.peek((s, s2) -> {});
}
@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
return input -> input.peek((s, s2) -> {});
}
即使没有活页夹中的功能性作文支持,你也可以像下面这样组合这两个功能。
@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
foo().andThen(bar());
}
然后你可以给出 形式的定义spring.cloud.function.definition=foo;酒吧;由.
有了装订器中的函数组合支持,你就不需要写那个你在做显式函数组合的第三个函数。
你可以直接这样做:
spring.cloud.function.definition=foo|bar
你甚至可以这样做:
spring.cloud.function.definition=foo|bar;foo;bar
该组合函数的默认绑定名称在此示例中变为福巴尔-0和福巴尔出局-0.
Kafka Streams 结合器中功能组成的局限性
当你有java.util.function.函数豆子,可以与其他函数或多个函数组合。
同一函数豆可以由java.util.function.Consumer也。在这种情况下,消费者是最后一个组成的组成部分。
一个函数可以由多个函数组合而成,然后以java.util.function.Consumer豆豆也是。
在合成字模时java.util.function.BiFunction这双功能必须是定义中的第一个函数。
组成的实体必须是类型java.util.function.函数或java.util.function.Consumer.
换句话说,你不能取双功能然后再和另一个人作曲双功能.
你无法用双消费者或定义消费者是第一个分量。
你也不能用输出为数组的函数来组合 (KStream[]对于分支)除非这是定义中的最后一个分量。
第一位功能之双功能在函数定义中,也可以使用curried形式。
例如,以下情况是可能的。
@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
return a -> b ->
a.join(b, (value1, value2) -> value1 + value2);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}
函数定义可以为咖喱Foo|bar.
在幕后,绑定器会为curreed函数创建两个输入绑定,以及基于定义中最终函数的输出绑定。
在这种情况下,默认的输入绑定将是咖喱福巴尔-0和咖喱Foobar-in-1.
本例的默认输出绑定为咖喱福巴尔出局-0.
关于使用KTable(英国可爱的)音乐作为函数合成中的输出
假设你有以下两个功能。
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
你可以将它们写成foo|bar,但请记住第二个函数(酒吧此时)必须有KTable(英国可爱的)音乐作为输入,因为第一个函数(福) 具有KTable(英国可爱的)音乐作为输出。