|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
使用 @SendTo 进行 Forwarding 监听器结果转发
从 2.0 版本开始,如果你同时用一个带有 @SendTo 注解的注解标注一个 @KafkaListener,并且方法调用返回结果,该结果将转发到由 @SendTo 指定的主题。
The @SendTo 值可以有多种形式:
-
@SendTo("someTopic")路由到字面量主题。 -
@SendTo("#{someExpression}")在应用上下文初始化期间评估表达式一次,然后路由到由该评估确定的主题。 -
@SendTo("!{someExpression}")路由到通过运行时评估表达式确定的主题。#root对象用于评估,具有三个属性:-
request: The inboundConsumerRecord(orConsumerRecordsobject for a batch listener). -
source: Theorg.springframework.messaging.Message<?>转换自request. -
result: 该方法的返回结果。
-
-
@SendTo(无属性): 此处视为!{source.headers['kafka_replyTopic']}(自 2.1.3 版本起)。
从 2.1.11 版本和 2.2.1 版本开始,property placeholders 会在 @SendTo 值中解析。
表达式求值的结果必须是一个 String,该值代表主题名称。
以下示例展示了使用 @SendTo 的各种方法:
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
为了支持 @SendTo,监听器容器工厂必须通过其 replyTemplate 属性提供一个 KafkaTemplate,用于发送回复。
这应该是一个 KafkaTemplate,而不是用于客户端请求/回复处理的 ReplyingKafkaTemplate。
当使用 Spring Boot 时,它会自动将模板配置到工厂中;当配置自己的工厂时,必须按照示例所示进行设置。 |
从 2.2 版本开始,您可以向监听器容器工厂添加一个 ReplyHeadersConfigurer。
这将用于确定要在回复消息中设置哪些头信息。
以下示例显示了如何添加一个 ReplyHeadersConfigurer:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
您也可以根据需要添加更多头信息。 以下示例展示了如何操作:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
当您使用 @SendTo 时,必须在 ConcurrentKafkaListenerContainerFactory 的 replyTemplate 属性中配置 KafkaTemplate 才能执行发送。
Spring Boot 会自动将 auto-configured 模板(或在存在单个实例时任何模板)进行 wired。
除非使用请求/回复语义,只会使用简单的send(topic, value)方法,因此可能希望创建一个子类来生成分区或键。
以下示例展示了如何实现: |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<String, String>(producerFactory()) {
@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
|
如果监听器方法返回
|
使用请求/回复语义时,发送方可以请求目标分区。
|
您可以使用
有关详细信息,请参阅处理异常。 |
如果监听器方法返回Iterable,则默认情况下会为每个元素发送一个记录作为值。从版本2.3.5开始,在 这需要在回复模板的生产者配置中有一个合适的序列化程序。 但是,如果回复是 |