4. 参考资料
参考文档的这一部分详细介绍了构成 Spring for Apache Kafka 的各种组件。 主要章节介绍了使用 Spring 开发 Kafka 应用程序的核心类。
4.1. 将 Spring 用于 Apache Kafka
本节详细说明了影响使用 Spring for Apache Kafka 的各种问题。有关快速但不太详细的介绍,请参阅快速导览。
4.1.1. 连接到 Kafka
从 2.5 版开始,其中每一个都扩展了KafkaResourceFactory
.
这允许在运行时通过添加Supplier<String>
到他们的配置:setBootstrapServersSupplier(() → …)
.
将为所有新连接调用此命令以获取服务器列表。
消费者和生产者通常是长寿的。
要关闭现有生产者,请调用reset()
在DefaultKafkaProducerFactory
.
要关闭现有使用者,请调用stop()
(然后start()
) 在KafkaListenerEndpointRegistry
和/或stop()
和start()
在任何其他侦听器容器 Bean 上。
为方便起见,该框架还提供了一个ABSwitchCluster
支持两组引导服务器;其中一个随时处于活动状态。
配置ABSwitchCluster
并将其添加到生产者和消费者工厂,然后KafkaAdmin
,通过调用setBootstrapServersSupplier()
.
当你想切换时,请致电primary()
或secondary()
并调用reset()
在生产工厂建立新的连接;对于消费者来说,stop()
和start()
所有侦听器容器。
使用时@KafkaListener
s,stop()
和start()
这KafkaListenerEndpointRegistry
豆。
有关更多信息,请参阅 Javadocs。
工厂监听器
从 2.5 版开始,DefaultKafkaProducerFactory
和DefaultKafkaConsumerFactory
可以配置为Listener
在创建或关闭生产者或使用者时接收通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id
是通过将client-id
属性(从metrics()
创建后)到工厂beanName
属性,分隔为.
.
例如,这些侦听器可用于创建和绑定千分尺KafkaClientMetrics
实例,当创建新客户端时(并在客户端关闭时关闭它)。
该框架提供的侦听器正是这样做的;请参阅 Micrometer Native Metrics。
4.1.2. 配置主题
如果定义KafkaAdmin
bean 中,它可以自动将主题添加到代理中。
为此,您可以添加一个NewTopic
@Bean
对于应用程序上下文中的每个主题。
2.3 版引入了一个新类TopicBuilder
使此类豆的创建更加方便。
以下示例显示了如何执行此作:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))
@Bean
fun topic1() =
TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build()
@Bean
fun topic2() =
TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
@Bean
fun topic3() =
TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
从 2.6 版开始,您可以省略.partitions()
和/或replicas()
并且代理默认值将应用于这些属性。
代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅 KIP-464。
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()
@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()
@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
从 2.7 版开始,您可以声明多个NewTopic
s 在单个KafkaAdmin.NewTopics
豆子定义:
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build()
)
使用 Spring Boot 时,一个KafkaAdmin bean 会自动注册,因此您只需要NewTopic (和/或NewTopics ) @Bean s. |
默认情况下,如果代理不可用,则会记录一条消息,但上下文会继续加载。
可以以编程方式调用管理员的initialize()
稍后重试的方法。
如果您希望将此情况视为致命,请将管理员的fatalIfBrokerNotAvailable
属性设置为true
.
然后上下文无法初始化。
如果代理支持它(1.0.0 或更高版本),如果发现现有主题的分区少于NewTopic.numPartitions . |
从 2.7 版开始,KafkaAdmin
提供了在运行时创建和检查主题的方法。
-
createOrModifyTopics
-
describeTopics
对于更高级的功能,您可以使用AdminClient
径直。
以下示例显示了如何执行此作:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
4.1.3. 发送消息
本节介绍如何发送消息。
用KafkaTemplate
本节介绍如何使用KafkaTemplate
发送消息。
概述
这KafkaTemplate
包装生产者,并提供向 Kafka 主题发送数据的便捷方法。
以下列表显示了来自KafkaTemplate
:
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
有关更多详细信息,请参阅 Javadoc。
这sendDefault
API 要求已向模板提供默认主题。
API 会接收timestamp
作为参数,并将此时间戳存储在记录中。
用户提供的时间戳的存储方式取决于在 Kafka 主题上配置的时间戳类型。
如果主题配置为CREATE_TIME
,则记录用户指定的时间戳(如果未指定,则生成)。
如果主题配置为LOG_APPEND_TIME
,则忽略用户指定的时间戳,代理会添加本地代理时间。
要使用该模板,您可以配置生产者工厂并在模板的构造函数中提供它。 以下示例显示了如何执行此作:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
从 2.5 版开始,您现在可以覆盖工厂的ProducerConfig
属性,以从同一工厂创建具有不同生产者配置的模板。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
请注意,类型为ProducerFactory<?, ?>
(例如由 Spring Boot 自动配置的那个)可以使用不同的缩小泛型类型进行引用。
您还可以使用标准配置模板<bean/>
定义。
然后,要使用该模板,您可以调用其方法之一。
当您将方法Message<?>
参数,则主题、分区和键信息在消息头中提供,其中包含以下项目:
-
KafkaHeaders.TOPIC
-
KafkaHeaders.PARTITION_ID
-
KafkaHeaders.MESSAGE_KEY
-
KafkaHeaders.TIMESTAMP
消息有效负载是数据。
或者,您可以配置KafkaTemplate
使用ProducerListener
获取具有发送结果(成功或失败)的异步回调,而不是等待Future
完成。
以下列表显示了ProducerListener
接口:
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
默认情况下,模板配置了LoggingProducerListener
,它会记录错误,并且在发送成功时不执行任何作。
为方便起见,如果您只想实现其中一种方法,则提供了默认方法实现。
请注意,send 方法返回一个ListenableFuture<SendResult>
.
您可以向监听器注册回调,以异步接收发送结果。
以下示例显示了如何执行此作:
ListenableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
SendResult
有两个属性,一个ProducerRecord
和RecordMetadata
.
有关这些对象的信息,请参阅 Kafka API 文档。
这Throwable
在onFailure
可以转换为KafkaProducerException
;其failedProducerRecord
属性包含失败的记录。
从 2.5 版开始,您可以使用KafkaSendCallback
而不是ListenableFutureCallback
,更容易提取失败的ProducerRecord
,避免了需要Throwable
:
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(KafkaProducerException ex) {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
}
});
您还可以使用一对 lambda:
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(result -> {
...
}, (KafkaFailureCallback<Integer, String>) ex -> {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
});
如果您希望阻止发送线程以等待结果,您可以调用未来的get()
方法;建议使用带超时的方法。
您可能希望调用flush()
在等待之前,或者,为了方便起见,模板有一个构造函数,其中有一个autoFlush
导致模板flush()
每次发送。
仅当您已将linger.ms
producer 属性,并希望立即发送部分批次。
例子
本节展示了向 Kafka 发送消息的示例:
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
handleSuccess(data);
}
@Override
public void onFailure(KafkaProducerException ex) {
handleFailure(data, record, ex);
}
});
}
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
请注意,原因ExecutionException
是KafkaProducerException
使用failedProducerRecord
财产。
用RoutingKafkaTemplate
从 2.5 版开始,您可以使用RoutingKafkaTemplate
在运行时根据目标选择生产者topic
名字。
路由模板不支持事务,execute ,flush 或metrics 作,因为这些作的主题是未知的。 |
该模板需要java.util.regex.Pattern
自ProducerFactory<Object, Object>
实例。
此映射应按顺序排列(例如LinkedHashMap
)因为它是按顺序遍历的;您应该在开始时添加更具体的模式。
以下简单的 Spring Boot 应用程序提供了一个示例,说明如何使用相同的模板发送到不同的主题,每个主题使用不同的值序列化器。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
相应的@KafkaListener
s 显示在注释属性中。
有关实现类似结果的另一种技术,但具有将不同类型发送到同一主题的附加功能,请参阅委托序列化器和反序列化器。
用DefaultKafkaProducerFactory
如用KafkaTemplate
一个ProducerFactory
用于创建生产者。
当不使用 Transactions 时,默认情况下,DefaultKafkaProducerFactory
创建所有客户端使用的单例生产者,如KafkaProducer
javadocs。
但是,如果您调用flush()
在模板上,这可能会导致使用同一生产者的其他线程出现延迟。
从 2.3 版本开始,DefaultKafkaProducerFactory
有一个新属性producerPerThread
.
当设置为true
,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免此问题。
什么时候producerPerThread 是true ,用户代码必须调用closeThreadBoundProducer() 当不再需要生产者时,在工厂。
这将物理关闭生产者并将其从ThreadLocal .
叫reset() 或destroy() 不会清理这些生产者。 |
创建DefaultKafkaProducerFactory
、键和/或值Serializer
可以通过调用仅接受属性 Map 的构造函数从配置中获取类(请参阅中的示例用KafkaTemplate
),或Serializer
实例可以传递给DefaultKafkaProducerFactory
构造函数(在这种情况下,所有Producer
s 共享相同的实例)。
或者,您可以提供Supplier<Serializer>
s(从 2.3 版开始),将用于获取单独的Serializer
每个实例Producer
:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
从版本 2.5.10 开始,您现在可以在创建工厂后更新生产者属性。
例如,如果必须在凭据更改后更新 SSL 密钥/信任存储位置,这可能很有用。
这些更改不会影响现有的生产者实例;叫reset()
关闭任何现有生产者,以便使用新属性创建新生产者。
注意:您不能将事务性生产者工厂更改为非事务性工厂,反之亦然。
现在提供了两种新方法:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
从版本 2.8 开始,如果您将序列化器作为对象提供(在构造函数中或通过 setter),工厂将调用configure()
使用配置属性配置它们的方法。
用ReplyingKafkaTemplate
2.1.3 版本引入了一个子类KafkaTemplate
提供请求/回复语义。
该类名为ReplyingKafkaTemplate
并且有两种附加方法;下面显示了方法签名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
(另请参阅请求/回复Message<?>
s).
结果是ListenableFuture
异步填充结果(或超时的异常)。
结果还有一个sendFuture
属性,这是调用KafkaTemplate.send()
.
您可以使用此未来来确定发送作的结果。
如果使用第一种方法,或者replyTimeout
参数是null
,模板的defaultReplyTimeout
属性(默认为 5 秒)。
从 2.8.8 版本开始,模板有一个新方法waitForAssignment
. 如果回复容器配置了auto.offset.reset=latest
以避免在容器初始化之前发送请求和回复。
使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的pollTimeout 属性,因为在第一次轮询完成之前不会发送通知。 |
以下 Spring Boot 应用程序显示了如何使用该功能的示例:
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
请注意,我们可以使用 Boot 的自动配置容器工厂来创建回复容器。
如果将重要的反序列化器用于回复,请考虑使用ErrorHandlingDeserializer
委托给配置的反序列化程序。
如此配置后,RequestReplyFuture
将异常完成,您可以捕获ExecutionException
,替换为DeserializationException
在其cause
财产。
从 2.6.7 版本开始,除了检测DeserializationException
s,模板将调用replyErrorChecker
函数(如果提供)。
如果返回异常,则未来将异常完成。
这是一个例子:
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
该模板设置一个标头(名为KafkaHeaders.CORRELATION_ID
默认情况下),必须由服务器端回显。
在这种情况下,以下内容@KafkaListener
应用程序响应:
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
这@KafkaListener
infrastructure 回显关联 ID 并确定回复主题。
看转发侦听器结果@SendTo
了解有关发送回复的更多信息。
模板使用默认标头KafKaHeaders.REPLY_TOPIC
以指示回复所指向的主题。
从 2.2 版开始,模板会尝试从配置的回复容器中检测回复主题或分区。
如果容器配置为侦听单个主题或单个TopicPartitionOffset
,用于设置回复标头。
如果容器以其他方式配置,则用户必须设置回复标头。
在这种情况下,一个INFO
日志消息在初始化期间写入。
以下示例使用KafkaHeaders.REPLY_TOPIC
:
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
当您配置单个回复时TopicPartitionOffset
,您可以对多个模板使用相同的回复主题,只要每个实例在不同的分区上监听即可。
使用单个回复主题进行配置时,每个实例必须使用不同的group.id
.
在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例才能找到相关 ID。
这对于自动缩放可能很有用,但会产生额外网络流量的开销,并且丢弃每个不需要的回复的成本很小。
使用此设置时,建议将模板的sharedReplyTopic
自true
,这降低了对 DEBUG 的意外回复的日志记录级别,而不是默认的 ERROR。
以下是配置回复容器以使用同一共享回复主题的示例:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
如果您有多个客户端实例,并且未按照上一段中所述配置它们,则每个实例都需要一个专用的回复主题。
另一种方法是将KafkaHeaders.REPLY_PARTITION 并为每个实例使用专用分区。
这Header 包含一个四字节的 int(大端)。
服务器必须使用此标头将回复路由到正确的分区 (@KafkaListener 这样做)。
但是,在这种情况下,回复容器不得使用 Kafka 的组管理功能,并且必须配置为在固定分区上监听(通过使用TopicPartitionOffset 在其ContainerProperties 构造函数)。 |
这DefaultKafkaHeaderMapper 要求 Jackson 在类路径上(对于@KafkaListener ).
如果它不可用,则消息转换器没有标头映射器,因此您必须配置MessagingMessageConverter 使用SimpleKafkaHeaderMapper ,如前所述。 |
默认情况下,使用 3 个标头:
-
KafkaHeaders.CORRELATION_ID
- 用于将回复与请求相关联 -
KafkaHeaders.REPLY_TOPIC
- 用于告诉服务器在哪里回复 -
KafkaHeaders.REPLY_PARTITION
- (可选)用于告诉服务器要回复哪个分区
这些标头名称由@KafkaListener
用于路由回复的基础结构。
从 2.3 版开始,您可以自定义标头名称 - 模板有 3 个属性correlationHeaderName
,replyTopicHeaderName
和replyPartitionHeaderName
.
如果您的服务器不是 Spring 应用程序(或不使用@KafkaListener
).
请求/回复Message<?>
s
2.7 版将方法 添加到ReplyingKafkaTemplate
发送和接收spring-messaging
的Message<?>
抽象化:
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
这些将使用模板的默认值replyTimeout
,还有一些重载版本可能会在方法调用中超时。
如果消费者的Deserializer
或模板的MessageConverter
可以在没有任何其他信息的情况下转换有效负载,无论是通过配置还是回复消息中的类型元数据。
如果需要为返回类型提供类型信息,请使用第二种方法来协助消息转换器。 这也允许同一个模板接收不同的类型,即使回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。 以下是后者的示例:
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
@Bean
fun template(
pf: ProducerFactory<String?, String>?,
factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
val template = ReplyingKafkaTemplate(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.defaultTopic = "requests"
return template
}
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())
val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
回复类型 Message<?>
当@KafkaListener
返回一个Message<?>
,对于 2.5 之前的版本,有必要填充回复主题和相关 ID 标头。
在此示例中,我们使用请求中的回复主题标头:
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
这也显示了如何在回复记录上设置键。
从 2.5 版开始,框架将检测这些标头是否丢失,并使用主题填充它们 - 从@SendTo
值或传入的KafkaHeaders.REPLY_TOPIC
标头(如果存在)。
它还将呼应即将到来的KafkaHeaders.CORRELATION_ID
和KafkaHeaders.REPLY_PARTITION
,如果存在。
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.build();
}
聚合多个回复
中的模板用ReplyingKafkaTemplate
严格用于单个请求/回复场景。
对于单个消息的多个接收者返回回复的情况,您可以使用AggregatingReplyingKafkaTemplate
.
这是 Scatter-Gather 企业集成模式的客户端实现。
像ReplyingKafkaTemplate
这AggregatingReplyingKafkaTemplate
构造函数采用生产者工厂和监听器容器来接收回复;它有第三个参数BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy
每次收到回复时都会查阅;当谓词返回时true
、集合ConsumerRecord
s 用于完成Future
由sendAndReceive
方法。
还有一个额外的属性returnPartialOnTimeout
(默认为 false)。
当将其设置为true
,而不是用KafkaReplyTimeoutException
,部分结果正常完成未来(只要至少收到一条回复记录)。
从版本 2.3.5 开始,谓词也会在超时后调用(如果returnPartialOnTimeout
是true
).
第一个参数是当前记录列表;二是true
如果此调用是由于超时造成的。
谓词可以修改记录列表。
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
请注意,返回类型是ConsumerRecord
其值是ConsumerRecord
s.
“外层”ConsumerRecord
不是“真实”记录,它由模板合成,作为请求收到的实际回复记录的持有者。
当发生正常发布(发布策略返回 true)时,主题设置为aggregatedResults
;如果returnPartialOnTimeout
为 true,并且发生超时(并且至少已收到一条回复记录),则该主题设置为partialResultsAfterTimeout
.
该模板为这些“主题”名称提供常量静态变量:
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
真实的ConsumerRecord
s 在Collection
包含接收回复的实际主题。
回复的侦听器容器必须配置为AckMode.MANUAL 或AckMode.MANUAL_IMMEDIATE ;消费者属性enable.auto.commit 必须是false (自 2.3 版以来的默认值)。
为了避免丢失消息的可能性,模板仅在未完成的请求为零时提交偏移量,即当发布策略释放最后一个未完成的请求时。
重新平衡后,可能会重复回复传递;对于任何机上请求,这些请求将被忽略;当收到已发布回复的重复回复时,您可能会看到错误日志消息。 |
如果您使用ErrorHandlingDeserializer 使用此聚合模板,框架不会自动检测DeserializationException s.
相反,记录(带有null value) 将完整返回,标头中存在反序列化异常。
建议应用程序调用实用工具方法ReplyingKafkaTemplate.checkDeserialization() 方法来确定是否发生反序列化异常。
有关更多信息,请参阅其 javadocs。
这replyErrorChecker 也没有调用此聚合模板;您应该对回复的每个元素执行检查。 |
4.1.4. 接收消息
您可以通过配置MessageListenerContainer
并提供消息侦听器或使用@KafkaListener
注解。
消息侦听器
使用消息侦听器容器时,必须提供侦听器来接收数据。 目前有八个支持消息侦听器的接口。 以下列表显示了这些接口:
public interface MessageListener<K, V> { (1)
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> { (2)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (3)
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (4)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> { (5)
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> { (6)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (7)
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (8)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
1 | 使用此接口处理单个ConsumerRecord 从 Kafka 消费者收到的实例poll() 使用自动提交或容器管理的提交方法之一时的作。 |
2 | 使用此接口处理单个ConsumerRecord 从 Kafka 消费者收到的实例poll() 使用手动提交方法之一时的作。 |
3 | 使用此接口处理单个ConsumerRecord 从 Kafka 消费者收到的实例poll() 使用自动提交或容器管理的提交方法之一时的作。
访问Consumer 对象。 |
4 | 使用此接口处理单个ConsumerRecord 从 Kafka 消费者收到的实例poll() 使用手动提交方法之一时的作。
访问Consumer 对象。 |
5 | 使用此接口处理所有ConsumerRecord 从 Kafka 消费者收到的实例poll() 使用自动提交或容器管理的提交方法之一时的作。AckMode.RECORD 使用此接口时不支持,因为侦听器被赋予了完整的批处理。 |
6 | 使用此接口处理所有ConsumerRecord 从 Kafka 消费者收到的实例poll() 使用手动提交方法之一时的作。 |
7 | 使用此接口处理所有ConsumerRecord 从 Kafka 消费者收到的实例poll() 使用自动提交或容器管理的提交方法之一时的作。AckMode.RECORD 使用此接口时不支持,因为侦听器被赋予了完整的批处理。
访问Consumer 对象。 |
8 | 使用此接口处理所有ConsumerRecord 从 Kafka 消费者收到的实例poll() 使用手动提交方法之一时的作。
访问Consumer 对象。 |
这Consumer 对象不是线程安全的。
只能在调用侦听器的线程上调用其方法。 |
您不应执行任何Consumer<?, ?> 影响消费者在侦听器中的位置和/或提交的偏移量的方法;容器需要管理此类信息。 |
消息侦听器容器
二MessageListenerContainer
提供实现:
-
KafkaMessageListenerContainer
-
ConcurrentMessageListenerContainer
这KafkaMessageListenerContainer
接收来自单个线程上所有主题或分区的所有消息。
这ConcurrentMessageListenerContainer
一个或多个委托KafkaMessageListenerContainer
实例来提供多线程消费。
从 2.2.7 版本开始,您可以添加RecordInterceptor
到侦听器容器;在调用侦听器之前,将调用它,允许检查或修改记录。
如果拦截器返回 null,则不会调用侦听器。
从 2.7 版开始,它有额外的方法,这些方法在侦听器退出后调用(通常,或通过抛出异常)。
此外,从 2.7 版本开始,现在有一个BatchInterceptor
,为批处理侦听器提供类似的功能。
此外,ConsumerAwareRecordInterceptor
(和BatchInterceptor
) 提供对Consumer<?, ?>
.
例如,这可用于访问拦截器中的使用者指标。
您不应执行任何影响这些拦截器中消费者位置和/或已提交偏移量的方法;容器需要管理此类信息。 |
如果拦截器更改记录(通过创建新记录),则topic ,partition 和offset 必须保持不变,以避免意外副作用,例如记录丢失。 |
这CompositeRecordInterceptor
和CompositeBatchInterceptor
可用于调用多个拦截器。
默认情况下,从 2.8 版开始,在使用事务时,在事务开始之前调用拦截器。
您可以设置侦听器容器的interceptBeforeTx
属性设置为false
在事务开始后调用拦截器。
从版本 2.3.8、2.4.6 开始,ConcurrentMessageListenerContainer
现在支持并发大于 1 时的静态成员身份。
这group.instance.id
后缀为-n
跟n
起价1
.
这,加上增加的session.timeout.ms
,可用于减少重新平衡事件,例如,当应用程序实例重新启动时。
用KafkaMessageListenerContainer
以下构造函数可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它收到一个ConsumerFactory
以及有关主题和分区以及其他配置的信息,在ContainerProperties
对象。ContainerProperties
具有以下构造函数:
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数采用TopicPartitionOffset
参数来显式指示容器使用哪些分区(使用 consumerassign()
方法),并具有可选的初始偏移量。
默认情况下,正值是绝对偏移量。
默认情况下,负值相对于分区内的当前最后一个偏移量。
的构造函数TopicPartitionOffset
这需要额外的boolean
参数。
如果这是true
,初始偏移量(正或负)相对于此使用者的当前位置。
偏移量在容器启动时应用。
第二个采用主题数组,Kafka 根据group.id
property — 在组中分配分区。第三个使用正则表达式Pattern
以选择主题。
要分配一个MessageListener
到容器,您可以使用ContainerProps.setMessageListener
方法。以下示例显示了如何执行此作:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请注意,在创建DefaultKafkaConsumerFactory
,使用仅采用上述属性的构造函数意味着键和值Deserializer
类是从配置中选取的。 或者Deserializer
实例可以传递给DefaultKafkaConsumerFactory
构造函数,在这种情况下,所有消费者共享相同的实例。另一种选择是提供Supplier<Deserializer>
s(从 2.3 版开始),将用于获取单独的Deserializer
每个实例Consumer
:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请参阅 JavadocContainerProperties
有关您可以设置的各种属性的更多信息。
从 2.1.1 版本开始,一个名为logContainerConfig
可用。
什么时候true
和INFO
启用日志记录时,每个侦听器容器都会写入一条日志消息,汇总其配置属性。
默认情况下,主题偏移量提交的日志记录在DEBUG
日志记录级别。
从 2.1.2 版开始,中的属性ContainerProperties
叫commitLogLevel
允许您指定这些消息的日志级别。
例如,要将日志级别更改为INFO
,您可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
.
从 2.2 版开始,一个名为missingTopicsFatal
已添加(默认:false
自 2.3.4 起)。
如果代理上不存在任何配置的主题,这可以防止容器启动。
如果容器配置为侦听主题模式 (regex),则不适用。
以前,容器线程在consumer.poll()
方法等待主题出现,同时记录许多消息。
除了日志之外,没有迹象表明存在问题。
从 2.8 版本开始,新的容器属性authExceptionRetryInterval
已被引入。
这会导致容器在获取任何消息后重试获取消息AuthenticationException
或AuthorizationException
从KafkaConsumer
.
例如,当配置的用户被拒绝访问读取某个主题或凭据不正确时,就会发生这种情况。
定义authExceptionRetryInterval
允许容器在授予适当的权限时恢复。
默认情况下,不配置任何间隔 - 身份验证和授权错误被视为致命错误,这会导致容器停止。 |
从 2.8 版开始,在创建消费者工厂时,如果您将反序列化器作为对象提供(在构造函数中或通过 setter),工厂将调用configure()
使用配置属性配置它们的方法。
用ConcurrentMessageListenerContainer
单个构造函数类似于KafkaListenerContainer
构造 函数。
以下列表显示了构造函数的签名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还有一个concurrency
财产。
例如container.setConcurrency(3)
创建三个KafkaMessageListenerContainer
实例。
对于第一个构造函数,Kafka 使用其组管理功能将分区分布在使用者之间。
当监听多个主题时,默认的分区分布可能不是你所期望的。
例如,如果您有三个主题,每个主题有五个分区,并且您想使用 使用 Spring Boot 时,您可以分配设置策略,如下所示:
|
当容器属性配置为TopicPartitionOffset
s,则ConcurrentMessageListenerContainer
分发TopicPartitionOffset
跨委托的实例KafkaMessageListenerContainer
实例。
如果,比如说,六个TopicPartitionOffset
实例,并且concurrency
是3
;每个容器都有两个分区。
五人份TopicPartitionOffset
实例中,两个容器获得两个分区,第三个容器获得一个分区。
如果concurrency
大于TopicPartitions
这concurrency
向下调整,使每个容器都有一个分区。
这client.id 属性(如果设置)附加为-n 哪里n 是与并发相对应的使用者实例。
启用 JMX 时,这是为 MBean 提供唯一名称所必需的。 |
从 1.3 版本开始,MessageListenerContainer
提供对基础指标的访问KafkaConsumer
.
在以下情况下ConcurrentMessageListenerContainer
这metrics()
方法返回所有目标的指标KafkaMessageListenerContainer
实例。
这些指标被分组到Map<MetricName, ? extends Metric>
通过client-id
为标的KafkaConsumer
.
从 2.3 版本开始,ContainerProperties
提供idleBetweenPolls
选项,让监听器容器中的主循环在KafkaConsumer.poll()
调用。
从提供的选项中选择实际睡眠间隔作为最小值,并且max.poll.interval.ms
消费者配置和当前记录的批处理时间。
提交偏移量
提供了几个用于提交偏移的选项。
如果enable.auto.commit
消费者属性是true
,Kafka 会根据其配置自动提交偏移量。
如果是false
,容器支持多个AckMode
设置(在下一个列表中描述)。
默认值AckMode
是BATCH
.
从 2.3 版开始,框架集enable.auto.commit
自false
除非在配置中显式设置。
以前,Kafka 默认值 (true
如果未设置属性,则使用 )。
消费者poll()
方法返回一个或多个ConsumerRecords
.
这MessageListener
为每条记录调用。
以下列表描述了容器对每个AckMode
(当事务未被使用时):
-
RECORD
:当侦听器处理记录后返回时提交偏移量。 -
BATCH
:当poll()
已处理。 -
TIME
:当poll()
已处理,只要ackTime
因为已超过上次提交。 -
COUNT
:当poll()
已处理,只要ackCount
自上次提交以来已收到记录。 -
COUNT_TIME
:似TIME
和COUNT
,但如果任一条件为true
. -
MANUAL
:消息监听器负责acknowledge()
这Acknowledgment
. 之后,与BATCH
被应用。 -
MANUAL_IMMEDIATE
:当Acknowledgment.acknowledge()
方法被侦听器调用。
使用事务时,偏移量被发送到事务,语义等同于RECORD
或BATCH
,具体取决于侦听器类型(记录或批处理)。
MANUAL 和MANUAL_IMMEDIATE 要求监听器是AcknowledgingMessageListener 或BatchAcknowledgingMessageListener .
请参阅消息侦听器。 |
根据syncCommits
container 属性,则commitSync()
或commitAsync()
方法。syncCommits
是true
默认情况下;另请参阅setSyncCommitTimeout
.
看setCommitCallback
获取异步提交的结果;默认回调是LoggingCommitCallback
它记录错误(并在调试级别成功)。
因为侦听器容器有自己的提交偏移量机制,所以它更喜欢 KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
成为false
.
从 2.3 版开始,它无条件地将其设置为 false,除非在消费者工厂中专门设置或容器的消费者属性覆盖。
这Acknowledgment
有以下方法:
public interface Acknowledgment {
void acknowledge();
}
此方法使侦听器能够控制何时提交偏移量。
从 2.3 版本开始,Acknowledgment
接口有两个附加方法nack(long sleep)
和nack(int index, long sleep)
.
第一个用于记录侦听器,第二个用于批处理侦听器。
为侦听器类型调用错误的方法将抛出IllegalStateException
.
如果要提交部分批处理,请使用nack() ,使用事务时,将AckMode 自MANUAL ;调用nack() 将成功处理的记录的偏移量发送到交易。 |
nack() 只能在调用侦听器的使用者线程上调用。 |
nack() 使用无序提交时不允许。 |
使用记录侦听器时,当nack()
调用时,将提交任何挂起的偏移量,丢弃上次轮询中的剩余记录,并在其分区上执行搜索,以便在下一个分区重新传递失败的记录和未处理的记录poll()
. 消费者可以在重新交付之前暂停,方法是将sleep
论点。 这类似于在容器配置了DefaultErrorHandler
.
使用批处理侦听器时,可以在发生故障的批处理中指定索引。 什么时候nack()
调用时,将在索引之前为记录提交偏移量,并在失败和丢弃的记录的分区上执行搜索,以便在下一个记录时重新传递它们poll()
.
有关更多信息,请参阅容器错误处理程序。
消费者在睡眠期间暂停,以便我们继续轮询代理以保持消费者处于活动状态。实际睡眠时间及其分辨率取决于容器的pollTimeout 默认为 5 秒。最短睡眠时间等于pollTimeout 并且所有睡眠时间都将是它的倍数。对于较短的睡眠时间,或者为了提高其准确性,请考虑减少容器的pollTimeout . |
手动提交偏移
通常,当使用AckMode.MANUAL
或AckMode.MANUAL_IMMEDIATE
,则必须按顺序确认,因为 Kafka 不维护每条记录的状态,仅维护每个组/分区的已提交偏移量。
从 2.8 版开始,您现在可以设置 container 属性asyncAcks
,这允许按任何顺序确认轮询返回的记录的确认。
侦听器容器将推迟无序提交,直到收到缺少的确认。
使用者将暂停(不传递新记录),直到提交上一个轮询的所有偏移量。
虽然此功能允许应用程序异步处理记录,但应理解,它增加了失败后重复传递的可能性。 |
@KafkaListener
注解
这@KafkaListener
Comments 用于将 Bean 方法指定为侦听器容器的侦听器。
豆子被包裹在一个MessagingMessageListenerAdapter
配置了各种功能,例如转换器,以便在必要时转换数据以匹配方法参数。
您可以使用 SpEL 配置注释上的大多数属性#{…}
或属性占位符 (${…}
).
有关更多信息,请参阅 Javadoc。
录制听众
这@KafkaListener
注解为简单的 POJO 侦听器提供了一种机制。
以下示例演示如何使用它:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
此机制需要@EnableKafka
注释@Configuration
类和监听器容器工厂,用于配置底层ConcurrentMessageListenerContainer
.
默认情况下,名称为kafkaListenerContainerFactory
是意料之中的。
以下示例演示如何使用ConcurrentMessageListenerContainer
:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
请注意,要设置容器属性,必须使用getContainerProperties()
工厂的方法。
它用作注入容器的实际属性的模板。
从 2.1.1 版本开始,您现在可以将client.id
属性。 这clientIdPrefix
后缀为-n
哪里n
是表示使用并发时容器编号的整数。
从版本 2.2 开始,您现在可以覆盖容器工厂的concurrency
和autoStartup
属性,方法是在注释本身上使用属性。属性可以是简单值、属性占位符或 SpEL 表达式。以下示例显示了如何执行此作:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
显式分区分配
您还可以使用显式主题和分区(以及可选的初始偏移量)配置 POJO 侦听器。 以下示例显示了如何执行此作:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
您可以在partitions
或partitionOffsets
属性,但不能同时使用两者。
与大多数注释属性一样,您可以使用 SpEL 表达式;有关如何生成大型分区列表的示例,请参阅手动分配所有分区。
从 2.5.5 版开始,您可以将初始偏移量应用于所有分配的分区:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
通配符表示*
partitions
属性。
必须只有一个@PartitionOffset
每个@TopicPartition
.
此外,当监听器实现ConsumerSeekAware
,onPartitionsAssigned
现在调用,即使在使用手动分配时也是如此。
例如,这允许当时进行任何任意寻道作。
从版本 2.6.4 开始,您可以指定以逗号分隔的分区列表或分区范围:
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
范围是包容的;上面的示例将分配分区0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15
.
指定初始偏移量时可以使用相同的技术:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初始偏移量将应用于所有 6 个分区。
手动确认
使用手动时AckMode
,您还可以为侦听器提供Acknowledgment
.
以下示例还演示了如何使用不同的容器工厂。
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
消费者记录元数据
最后,有关记录的元数据可从消息标头获得。 可以使用以下标头名称来检索邮件的标头:
-
KafkaHeaders.OFFSET
-
KafkaHeaders.RECEIVED_MESSAGE_KEY
-
KafkaHeaders.RECEIVED_TOPIC
-
KafkaHeaders.RECEIVED_PARTITION_ID
-
KafkaHeaders.RECEIVED_TIMESTAMP
-
KafkaHeaders.TIMESTAMP_TYPE
从 2.5 版本开始,RECEIVED_MESSAGE_KEY
如果传入记录具有null
钥匙;以前,标头填充了null
价值。
这一更改是为了使框架与spring-messaging
约定,其中null
值标头不存在。
以下示例演示如何使用标头:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
从 2.5 版开始,您可以在ConsumerRecordMetadata
参数。
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
这包含来自ConsumerRecord
除了键和值。
批处理侦听器
从 1.1 版开始,您可以配置@KafkaListener
方法来接收从消费者轮询收到的整批消费者记录。要将侦听器容器工厂配置为创建批量侦听器,您可以将batchListener
财产。 以下示例显示了如何执行此作:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
从 2.8 版开始,您可以覆盖工厂的batchListener propery 使用batch 属性@KafkaListener 注解。 这与对容器错误处理程序的更改一起,允许将同一工厂用于记录和批处理侦听器。 |
以下示例演示如何接收有效负载列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
主题、分区、偏移量等在并行有效负载的标头中可用。以下示例演示如何使用标头:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
或者,您可以收到List
之Message<?>
对象,每个消息中的每个偏移量和其他详细信息,但它必须是唯一的参数(除了可选的Acknowledgment
,使用手动提交时,和/或Consumer<?, ?>
parameters)定义。以下示例显示了如何执行此作:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
在这种情况下,不会对有效负载执行转换。
如果BatchMessagingMessageConverter
配置了RecordMessageConverter
,您还可以将泛型类型添加到Message
参数,并转换有效负载。有关更多信息,请参阅使用批处理侦听器的有效负载转换。
您还可以收到以下列表ConsumerRecord<?, ?>
对象,但它必须是唯一的参数(除了可选的Acknowledgment
,当使用手动提交时,以及Consumer<?, ?>
parameters)定义。以下示例显示了如何执行此作:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
从 2.2 版本开始,侦听器可以接收完整的ConsumerRecords<?, ?>
对象返回poll()
方法,允许侦听器访问其他方法,例如partitions()
(返回TopicPartition
列表中的实例)和records(TopicPartition)
(获取选择性记录)。
同样,这必须是唯一的参数(除了可选的Acknowledgment
,当使用手动提交或Consumer<?, ?>
parameters) 的方法。
以下示例显示了如何执行此作:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
如果容器工厂有RecordFilterStrategy 已配置,则忽略ConsumerRecords<?, ?> listeners,带有WARN 发出的日志消息。
只有在以下情况下,才能使用批处理侦听器过滤记录<List<?>> 使用监听器的形式。
默认情况下,一次过滤一条记录;从 2.8 版开始,您可以覆盖filterBatch 在一次调用中筛选整个批处理。 |
注释属性
从 2.0 版开始,id
属性(如果存在)用作 Kafka 消费者group.id
属性,覆盖使用者工厂中配置的属性(如果存在)。
您还可以设置groupId
显式或设置idIsGroup
设置为 false 以恢复之前使用消费者工厂的行为group.id
.
您可以在大多数注释属性中使用属性占位符或 SpEL 表达式,如以下示例所示:
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
从 2.1.2 版开始,SpEL 表达式支持一个特殊的标记:__listener
. 它是一个伪 Bean 名称,表示存在此注释的当前 Bean 实例。
请考虑以下示例:
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
给定上一个示例中的 bean,然后我们可以使用以下内容:
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
如果,万一你有一个名为__listener
,您可以通过使用beanRef
属性。 以下示例显示了如何执行此作:
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
groupId = "#{__x.topic}.group")
从 2.2.4 版开始,您可以直接在 Comments 上指定 Kafka 消费者属性,这些属性将覆盖消费者工厂中配置的具有相同名称的任何属性。您不能指定group.id
和client.id
属性;它们将被忽略;使用groupId
和clientIdPrefix
注释属性。
这些属性被指定为具有普通 Java 的单个字符串Properties
文件格式:foo:bar
,foo=bar
或foo bar
.
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
获取消费者group.id
当在多个容器中运行相同的侦听器代码时,能够确定哪个容器(由其group.id
消费者属性),记录来自。
您可以调用KafkaUtils.getConsumerGroupId()
在侦听器线程上执行此作。或者,您可以在方法参数中访问组 ID。
@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")
public void listener(@Payload String foo,
@Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
这在接收List<?> 的记录。
它在接收ConsumerRecords<?, ?> 论点。
使用KafkaUtils 在这种情况下的机制。 |
容器线程命名
侦听器容器当前使用两个任务执行器,一个用于调用消费者,另一个用于在 kafka 消费者属性enable.auto.commit
是false
.
您可以通过设置consumerExecutor
和listenerExecutor
容器的属性ContainerProperties
.
使用池执行器时,请确保有足够的线程可用于处理使用它们的所有容器的并发性。
使用ConcurrentMessageListenerContainer
,每个使用者 (concurrency
).
如果您不提供消费者执行程序,则SimpleAsyncTaskExecutor
被使用。
此执行器创建名称类似于<beanName>-C-1
(消费者线程)。
对于ConcurrentMessageListenerContainer
这<beanName>
线程名称的一部分变为<beanName>-m
哪里m
表示使用者实例。n
每次启动容器时递增。
因此,bean 名称为container
,则此容器中的线程将被命名为container-0-C-1
,container-1-C-1
等,容器第一次启动后;container-0-C-2
,container-1-C-2
等等,在停止和随后启动之后。
@KafkaListener
作为元注释
从 2.2 版开始,您现在可以使用@KafkaListener
作为元注释。
以下示例显示了如何执行此作:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics();
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
您必须至少对以下一个topics
,topicPattern
或topicPartitions
(而且,通常,id
或groupId
除非您指定了group.id
在消费者工厂配置中)。
以下示例显示了如何执行此作:
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
...
}
@KafkaListener
在类上
当您使用@KafkaListener
在类级别,必须指定@KafkaHandler
在方法层面。
传递消息时,转换后的消息有效负载类型用于确定要调用的方法。
以下示例显示了如何执行此作:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
从 2.1.3 版开始,您可以指定@KafkaHandler
method 作为默认方法,如果其他方法没有匹配项,则调用该方法。
最多只能指定一种方法。
使用时@KafkaHandler
方法,有效负载必须已经转换为域对象(以便可以执行匹配)。
使用自定义解序列化器,JsonDeserializer
或JsonMessageConverter
与其TypePrecedence
设置为TYPE_ID
.
有关详细信息,请参阅序列化、反序列化和消息转换。
由于 Spring 解析方法参数的方式存在一些限制,默认的@KafkaHandler 无法接收离散标头;它必须使用ConsumerRecordMetadata 如消费者记录元数据中所述。 |
例如:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果对象是String
;这topic
参数还将获得对object
.
如果需要默认方法中有关记录的元数据,请使用以下命令:
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
@KafkaListener
属性修改
从 V2.7.2 开始,您现在可以在创建容器之前以编程方式修改注释属性。
为此,请添加一个或多个KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer
到应用程序上下文。AnnotationEnhancer
是一个BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>
并且必须返回属性映射。
属性值可以包含 SpEL 和/或属性占位符;在执行任何解析之前调用增强器。
如果存在多个增强器,并且它们实现Ordered
,它们将按顺序被调用。
AnnotationEnhancer 必须声明 bean 定义static 因为它们在应用程序上下文的生命周期中非常早期就需要。 |
示例如下:
@Bean
public static AnnotationEnhancer groupIdEnhancer() {
return (attrs, element) -> {
attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
? ((Class<?>) element).getSimpleName()
: ((Method) element).getDeclaringClass().getSimpleName()
+ "." + ((Method) element).getName()));
return attrs;
};
}
@KafkaListener
生命周期管理
为@KafkaListener
注解不是应用程序上下文中的 bean。
相反,它们使用KafkaListenerEndpointRegistry
.
该 bean 由框架自动声明并管理容器的生命周期;它将自动启动任何具有autoStartup
设置为true
.
所有容器工厂创建的所有容器必须位于同一容器中phase
.
有关更多信息,请参阅侦听器容器自动启动。
可以使用注册表以编程方式管理生命周期。
启动或停止注册表将启动或停止所有已注册的容器。
或者,您可以使用其id
属性。
您可以设置autoStartup
,这将覆盖配置到容器工厂中的默认设置。
您可以从应用程序上下文(例如自动连接)获取对 bean 的引用,以管理其已注册的容器。
以下示例显示了如何执行此作:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
注册表仅维护其管理的容器的生命周期;声明为 Bean 的容器不受注册表管理,可以从应用程序上下文中获取。
可以通过调用注册表的getListenerContainers()
方法。
2.2.5 版本添加了一个方便的方法getAllListenerContainers()
,它返回所有容器的集合,包括由注册表管理的容器和声明为 bean 的容器。
返回的集合将包括任何已初始化的原型 Bean,但它不会初始化任何惰性 Bean 声明。
刷新应用程序上下文后注册的终结点将立即启动,无论其autoStartup 属性,以符合SmartLifecycle contract,其中autoStartup 仅在应用程序上下文初始化期间考虑。
延迟注册的一个示例是具有@KafkaListener 在原型范围内,在初始化上下文后创建实例。
从 2.8.7 版开始,您可以将注册表的alwaysStartAfterRefresh 属性设置为false 然后是容器的autoStartup 属性将定义容器是否启动。 |
@KafkaListener
@Payload
验证
从 2.2 版开始,现在可以更轻松地添加Validator
验证@KafkaListener
@Payload
参数。
以前,您必须配置自定义DefaultMessageHandlerMethodFactory
并将其添加到注册商。
现在,您可以将验证器添加到注册商本身。
以下代码显示了如何执行此作:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(new MyValidator());
}
}
当您将 Spring Boot 与验证Starters一起使用时,一个LocalValidatorFactoryBean 是自动配置的,如以下示例所示: |
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
以下示例演示如何验证:
public static class ValidatedClass {
@Max(10)
private int bar;
public int getBar() {
return this.bar;
}
public void setBar(int bar) {
this.bar = bar;
}
}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}
从版本 2.5.11 开始,验证现在适用于@KafkaHandler
类级监听器中的方法。
看@KafkaListener
在类上.
重新平衡监听器
ContainerProperties
有一个名为consumerRebalanceListener
,它采用 Kafka 客户端的ConsumerRebalanceListener
接口。
如果未提供此属性,则容器将配置一个日志记录侦听器,该侦听器在INFO
水平。
该框架还添加了一个子接口ConsumerAwareRebalanceListener
.
以下列表显示了ConsumerAwareRebalanceListener
接口定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
请注意,撤销分区时有两个回调。 第一个立即调用。 第二个在提交任何挂起的偏移量后调用。 如果您希望在某个外部存储库中维护偏移量,这很有用,如以下示例所示:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从 2.4 版本开始,一种新的方法onPartitionsLost() 已添加(类似于ConsumerRebalanceLister ).
默认实现ConsumerRebalanceLister 简单调用onPartionsRevoked .
默认实现ConsumerAwareRebalanceListener 什么都不做。
当为侦听器容器提供自定义侦听器(任一类型)时,重要的是您的实现不要调用onPartitionsRevoked 从onPartitionsLost .
如果实现ConsumerRebalanceListener 您应该覆盖默认方法。
这是因为侦听器容器将调用自己的onPartitionsRevoked 从其实施onPartitionsLost 在实现上调用该方法之后。
如果实现委托给默认行为,onPartitionsRevoked 每次调用Consumer 在容器的侦听器上调用该方法。 |
转发侦听器结果@SendTo
从 2.0 版开始,如果您还注释了@KafkaListener
使用@SendTo
注释,方法调用返回结果,则结果将转发到@SendTo
.
这@SendTo
value 可以有多种形式:
-
@SendTo("someTopic")
到文字主题的路由 -
@SendTo("#{someExpression}")
路由到主题,通过在应用程序上下文初始化期间评估一次表达式来确定。 -
@SendTo("!{someExpression}")
路由到通过在运行时计算表达式来确定的主题。 这#root
对象具有三个属性:-
request
:入站ConsumerRecord
(或ConsumerRecords
对象)) -
source
:这org.springframework.messaging.Message<?>
从request
. -
result
:方法返回结果。
-
-
@SendTo
(无属性):这被视为!{source.headers['kafka_replyTopic']}
(自 2.1.3 版起)。
从版本 2.1.11 和 2.2.1 开始,属性占位符在@SendTo
值。
表达式求值的结果必须是String
表示主题名称。
以下示例显示了各种使用@SendTo
:
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
...
}
}
为了支持@SendTo ,则必须为侦听器容器工厂提供KafkaTemplate (在其replyTemplate 属性),用于发送回复。
这应该是一个KafkaTemplate 而不是ReplyingKafkaTemplate 用于客户端的请求/回复处理。
使用 Spring Boot 时,boot 会自动将模板配置到出厂;配置自己的工厂时,必须按照以下示例所示进行设置。 |
从 2.2 版开始,您可以添加ReplyHeadersConfigurer
到侦听器容器工厂。
参考此选项以确定要在回复消息中设置的标头。
以下示例演示如何添加ReplyHeadersConfigurer
:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
如果您愿意,您还可以添加更多标题。 以下示例显示了如何执行此作:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
当您使用@SendTo
,则必须配置ConcurrentKafkaListenerContainerFactory
使用KafkaTemplate
在其replyTemplate
属性来执行发送。
除非你使用请求/回复语义,否则只有简单的send(topic, value) 方法,因此您可能希望创建一个子类来生成分区或键。
以下示例显示了如何执行此作: |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
@Override
public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
如果侦听器方法返回
|
使用请求/回复语义时,发送方可以请求目标分区。
您可以注释
有关详细信息,请参阅处理异常。 |
如果侦听器方法返回Iterable ,默认情况下,每个元素的记录在发送值时。
从 2.3.5 版本开始,将splitIterables 属性@KafkaListener 自false 整个结果将作为单个ProducerRecord .
这需要在回复模板的生产者配置中有一个合适的序列化器。
但是,如果回复是Iterable<Message<?>> 该属性将被忽略,并且每个消息都单独发送。 |
过滤消息
Spring for Apache Kafka 项目还通过FilteringMessageListenerAdapter
类,它可以将MessageListener
.
此类采用RecordFilterStrategy
在其中实现filter
方法来表示消息是重复的,应该丢弃。
这有一个名为ackDiscarded
,指示适配器是否应确认已丢弃的记录。
是的false
默认情况下。
当您使用@KafkaListener
,将RecordFilterStrategy
(并且可选ackDiscarded
) ,以便侦听器被包装在适当的过滤适配器中。
此外,一个FilteringBatchMessageListenerAdapter
,用于使用批处理消息侦听器时。
这FilteringBatchMessageListenerAdapter 如果您的@KafkaListener 收到一个ConsumerRecords<?, ?> 而不是List<ConsumerRecord<?, ?>> 因为ConsumerRecords 是不可变的。 |
从 2.8.4 版本开始,您可以覆盖侦听器容器工厂的默认值RecordFilterStrategy
通过使用filter
属性。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
重试交付
请参阅DefaultErrorHandler
在处理异常中。
开始@KafkaListener
s 在序列中
一个常见的用例是在另一个侦听器使用了主题中的所有记录后启动侦听器。
例如,您可能希望在处理来自其他主题的记录之前将一个或多个压缩主题的内容加载到内存中。
从 2.7.3 版本开始,一个新组件ContainerGroupSequencer
已被引入。
它使用@KafkaListener
containerGroup
属性将容器分组在一起,并在当前组中的所有容器都空闲时启动下一组中的容器。
最好用一个例子来说明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
在这里,我们有 4 个听众,分为两组,g1
和g2
.
在应用程序上下文初始化期间,排序器将autoStartup
属性设置为false
.
它还设置了idleEventInterval
对于任何容器(尚未设置一个容器)到提供的值(在本例中为 5000 毫秒)。
然后,当应用程序上下文启动排序器时,将启动第一组中的容器。
如ListenerContainerIdleEvent
收到 s,则每个容器中的每个单独的子容器都将停止。
当ConcurrentMessageListenerContainer
停止时,父容器将停止。
当一个组中的所有容器都已停止时,将启动下一个组中的容器。
组中的组或容器数没有限制。
默认情况下,最后一组中的容器 (g2
以上)在空闲时不会停止。
要修改该行为,请将stopLastGroupWhenIdle
自true
在音序器上。
顺便说一句;以前,每个组中的容器都添加到类型为Collection<MessageListenerContainer>
其中 bean 名称是containerGroup
.
这些集合现在已被弃用,取而代之的是类型为ContainerGroup
替换为 bean 名称,该名称是组名称,后缀为.group
;在上面的示例中,将有 2 个 beang1.group
和g2.group
.
这Collection
beans 将在将来的版本中删除。
用KafkaTemplate
接收
本节介绍如何使用KafkaTemplate
接收消息。
从 2.8 版开始,该模板有四个receive()
方法:
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
如您所见,您需要知道需要检索的记录的分区和偏移量;一个新的Consumer
为每个作创建(和关闭)。
使用最后两种方法,将单独检索每条记录,并将结果组装成ConsumerRecords
对象。
创建TopicPartitionOffset
s 对于请求,则仅支持正的绝对偏移量。
4.1.5. 侦听器容器属性
属性 | 默认值 | 描述 |
---|---|---|
1 |
提交待处理偏移量之前的记录数,当 |
|
|
一条链 |
|
Batch |
控制偏移量的提交频率 - 请参阅提交偏移量。 |
|
5000 |
在 |
|
LATEST_ONLY _NO_TX |
是否承诺分配的初始职位;默认情况下,只有在 |
|
|
如果不是空,则 |
|
(空字符串) |
的前缀 |
|
false |
设置为 |
|
false |
设置为 |
|
|
当存在时和 |
|
|
的提供程序 |
|
调试 |
与提交偏移量相关的日志的日志记录级别。 |
|
|
重新平衡的听众;请参阅重新平衡侦听器。 |
|
30多岁 |
在记录错误之前等待使用者启动的时间;例如,如果您使用线程不足的任务执行器,则可能会发生这种情况。 |
|
|
用于运行使用者线程的任务执行器。
默认执行器创建名为 |
|
|
请参阅 Delivery Attempts 标头。 |
|
|
恰好一次语义模式;请参阅 Exactly Once 语义。 |
|
|
当使用事务生产者生成的记录时,并且使用者位于分区的末尾,由于用于指示事务提交/回滚的伪记录,并且可能存在回滚记录,因此滞后可能会错误地报告为大于零。
这在功能上不会影响消费者,但一些用户表示担心“滞后”不是零。
将此属性设置为 |
|
|
覆盖消费者 |
|
5.0 |
乘数 |
|
0 |
用于通过在轮询之间休眠线程来减慢传递速度。
处理一批记录的时间加上此值必须小于 |
|
|
设置后,启用发布 |
|
|
设置后,启用发布 |
|
没有 |
用于覆盖在消费者工厂上配置的任何任意消费者属性。 |
|
|
设置为 true 以在 INFO 级别记录所有容器属性。 |
|
|
消息侦听器。 |
|
|
是否为使用者线程维护千分尺计时器。 |
|
|
如果代理上不存在确认的主题,则为 true 将阻止容器启动。 |
|
30多岁 |
检查使用者线程状态的频率 |
|
3.0 |
乘以 |
|
|
设置为 false 以记录完整的使用者记录(错误、调试日志等),而不仅仅是 |
|
5000 |
超时传递到 |
|
|
用于运行使用者监视任务的调度程序。 |
|
10000 |
阻止 |
|
|
如果出现以下情况,请停止侦听器容器 |
|
|
当容器停止时,在当前记录之后停止处理,而不是在处理上一次轮询中的所有记录之后停止处理。 |
|
参见描述。 |
使用批处理侦听器时,如果这是 |
|
|
在以下情况下使用的超时 |
|
|
是否使用同步或异步提交进行偏移;看 |
|
不适用 |
配置的主题、主题模式或显式分配的主题/分区。
互斥;必须至少提供一个;强制执行者 |
|
|
请参阅事务。 |
属性 | 默认值 | 描述 |
---|---|---|
|
一 |
|
应用程序上下文 |
事件发布者。 |
|
参见描述。 |
已弃用 - 请参阅 |
|
|
设置一个 |
|
豆子名称 |
容器的 bean 名称;后缀为 |
|
参见描述。 |
|
|
|
容器属性实例。 |
|
参见描述。 |
已弃用 - 请参阅 |
|
参见描述。 |
已弃用 - 请参阅 |
|
参见描述。 |
这 |
|
|
确定 |
|
参见描述。 |
用户配置容器的 Bean 名称或 |
|
零 |
要填充的值 |
|
(只读) |
如果已请求使用者暂停,则为 True。 |
|
|
设置一个 |
|
30多岁 |
当 |
属性 | 默认值 | 描述 |
---|---|---|
(只读) |
当前分配给此容器的分区(显式或非显式)。 |
|
(只读) |
当前分配给此容器的分区(显式或非显式)。 |
|
|
由并发容器用于为每个子容器的消费者提供唯一的 |
|
不适用 |
如果已请求暂停并且使用者实际上已暂停,则为 True。 |
属性 | 默认值 | 描述 |
---|---|---|
|
设置为 false 以禁止向 |
|
(只读) |
当前分配给此容器子级的分区聚合 |
|
(只读) |
当前分配给此容器子级的分区 |
|
1 |
子数 |
|
不适用 |
如果已请求暂停并且所有子容器的使用者实际上都已暂停,则为 true。 |
|
不适用 |
对所有子项的引用 |
4.1.6. 动态创建容器
有几种技术可用于在运行时创建侦听器容器。 本节探讨其中的一些技术。
MessageListener 实现
如果您直接实现自己的监听器,则只需使用容器工厂为该监听器创建一个原始容器:
public class MyListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// ...
}
}
private ConcurrentMessageListenerContainer<String, String> createContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(new MyListener());
container.getContainerProperties().setGroupId(group);
container.setBeanName(group);
container.start();
return container;
}
class MyListener : MessageListener<String?, String?> {
override fun onMessage(data: ConsumerRecord<String?, String?>) {
// ...
}
}
private fun createContainer(
factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
val container = factory.createContainer(topic)
container.containerProperties.messageListener = MyListener()
container.containerProperties.groupId = group
container.beanName = group
container.start()
return container
}
原型豆
用@KafkaListener
可以通过将 bean 声明为原型来动态创建:
public class MyPojo {
private final String id;
private final String topic;
public MyPojo(String id, String topic) {
this.id = id;
this.topic = topic;
}
public String getId() {
return this.id;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
return new MyPojo(id, topic);
}
applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
class MyPojo(id: String?, topic: String?) {
@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topics}"])
fun listen(`in`: String?) {
println(`in`)
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun pojo(id: String?, topic: String?): MyPojo {
return MyPojo(id, topic)
}
applicationContext.getBean(MyPojo::class.java, "one", arrayOf("topic2"))
applicationContext.getBean(MyPojo::class.java, "two", arrayOf("topic3"))
侦听器必须具有唯一的 ID。
从 2.8.9 版本开始,KafkaListenerEndpointRegistry 有了新方法unregisterListenerContainer(String id) 以允许您重复使用 ID。
取消注册容器不会stop() 容器,你必须自己做。 |
4.1.7. 应用程序事件
以下 Spring 应用程序事件由侦听器容器及其使用者发布:
-
ConsumerStartingEvent
- 在使用者线程首次启动时发布,在开始轮询之前。 -
ConsumerStartedEvent
- 当使用者即将开始轮询时发布。 -
ConsumerFailedToStartEvent
- 如果没有,则已发布ConsumerStartingEvent
在consumerStartTimeout
container 属性。 此事件可能表示配置的任务执行器没有足够的线程来支持使用它的容器及其并发性。 发生这种情况时,还会记录一条错误消息。 -
ListenerContainerIdleEvent
:在未收到任何消息时发布idleInterval
(如果已配置)。 -
ListenerContainerNoLongerIdleEvent
:在之前发布记录后使用记录时发布ListenerContainerIdleEvent
. -
ListenerContainerPartitionIdleEvent
:当未从该分区收到消息时发布idlePartitionEventInterval
(如果已配置)。 -
ListenerContainerPartitionNoLongerIdleEvent
:当从先前发布过ListenerContainerPartitionIdleEvent
. -
NonResponsiveConsumerEvent
:当消费者似乎在poll
方法。 -
ConsumerPartitionPausedEvent
:在分区暂停时由每个使用者发布。 -
ConsumerPartitionResumedEvent
:由每个使用者在恢复分区时发布。 -
ConsumerPausedEvent
:在容器暂停时由每个使用者发布。 -
ConsumerResumedEvent
:在容器恢复时由每个使用者发布。 -
ConsumerStoppingEvent
:由每个消费者在停止前发布。 -
ConsumerStoppedEvent
:在使用者关闭后发布。 请参阅线程安全。 -
ContainerStoppedEvent
:当所有消费者都停止时发布。
默认情况下,应用程序上下文的事件多播器在调用线程上调用事件侦听器。
如果将多播程序更改为使用异步执行器,则不得调用任何Consumer 当事件包含对消费者的引用时的方法。 |
这ListenerContainerIdleEvent
具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
idleTime
:发布事件时容器处于空闲状态的时间。 -
topicPartitions
:在生成事件时为容器分配的主题和分区。 -
consumer
:对卡夫卡的引用Consumer
对象。 例如,如果消费者的pause()
方法之前调用过,它可以resume()
收到事件时。 -
paused
:容器当前是否已暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ListenerContainerNoLongerIdleEvent
具有相同的属性,但idleTime
和paused
.
这ListenerContainerPartitionIdleEvent
具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
idleTime
:发布事件时,时间分区消耗处于空闲状态。 -
topicPartition
:触发事件的主题和分区。 -
consumer
:对卡夫卡的引用Consumer
对象。 例如,如果消费者的pause()
方法之前调用过,它可以resume()
收到事件时。 -
paused
:当前是否为该使用者暂停了该分区消耗。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ListenerContainerPartitionNoLongerIdleEvent
具有相同的属性,但idleTime
和paused
.
这NonResponsiveConsumerEvent
具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
timeSinceLastPoll
:容器上次调用之前的时间poll()
. -
topicPartitions
:在生成事件时为容器分配的主题和分区。 -
consumer
:对卡夫卡的引用Consumer
对象。 例如,如果消费者的pause()
方法之前调用过,它可以resume()
收到事件时。 -
paused
:容器当前是否已暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ConsumerPausedEvent
,ConsumerResumedEvent
和ConsumerStopping
事件具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partitions
:这TopicPartition
涉及的实例。
这ConsumerPartitionPausedEvent
,ConsumerPartitionResumedEvent
事件具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partition
:这TopicPartition
涉及的实例。
这ConsumerStartingEvent
,ConsumerStartingEvent
,ConsumerFailedToStartEvent
,ConsumerStoppedEvent
和ContainerStoppedEvent
事件具有以下属性:
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。
所有容器(无论是子容器还是父容器)发布ContainerStoppedEvent
.
对于父容器,源和容器属性相同。
此外,ConsumerStoppedEvent
具有以下附加属性:
-
reason
-
NORMAL
- 消费者正常停止(容器已停止)。 -
ERROR
-一个java.lang.Error
被扔了。 -
FENCED
- 事务性生产者被隔离,并且stopContainerWhenFenced
container 属性为true
. -
AUTH
-一AuthenticationException
或AuthorizationException
被抛出,并且authExceptionRetryInterval
未配置。 -
NO_OFFSET
- 分区没有偏移量,并且auto.offset.reset
政策是none
.
-
您可以使用此事件在出现此类情况后重新启动容器:
if (event.getReason.equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的使用者
异步使用者虽然高效,但一个问题是检测它们何时处于空闲状态。 如果一段时间内没有邮件到达,您可能需要采取一些措施。
您可以将侦听器容器配置为发布ListenerContainerIdleEvent
当一段时间过去没有消息传递时。
当容器处于空余状态时,每隔一次就会发布一个事件idleEventInterval
毫秒。
要配置此功能,请将idleEventInterval
在容器上。
以下示例显示了如何执行此作:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);
return container;
}
以下示例演示如何设置idleEventInterval
对于一个@KafkaListener
:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在每种情况下,当容器处于空闲状态时,每分钟发布一次事件。
如果由于某种原因,消费者poll()
方法不退出,没有收到任何消息,也无法生成空闲事件(这是早期版本的kafka-clients
当无法联系到经纪人时)。
在这种情况下,容器会发布一个NonResponsiveConsumerEvent
如果轮询未在3x
这pollTimeout
财产。
默认情况下,每个容器中每 30 秒执行一次此检查。
您可以通过将monitorInterval
(默认 30 秒)和noPollThreshold
(默认 3.0)属性ContainerProperties
配置侦听器容器时。
这noPollThreshold
应大于1.0
以避免由于竞争条件而获得虚假事件。
接收此类事件可以停止容器,从而唤醒使用者以便它可以停止。
从 2.6.2 版开始,如果容器发布了ListenerContainerIdleEvent
,它将发布一个ListenerContainerNoLongerIdleEvent
随后收到记录时。
事件消耗
您可以通过实现ApplicationListener
— 要么是普通侦听器,要么是缩小为仅接收此特定事件的侦听器。
您还可以使用@EventListener
,在 Spring Framework 4.2 中引入。
下一个示例将@KafkaListener
和@EventListener
到单个类中。
您应该了解,应用程序侦听器会获取所有容器的事件,因此,如果要根据空闲的容器执行特定作,则可能需要检查侦听器 ID。
您还可以使用@EventListener
condition
为此目的。
有关事件属性的信息,请参阅应用程序事件。
该事件通常发布在消费者线程上,因此可以安全地与Consumer
对象。
以下示例同时使用@KafkaListener
和@EventListener
:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件侦听器可以看到所有容器的事件。因此,在前面的示例中,我们根据侦听器 ID 缩小了接收到的事件范围。由于为@KafkaListener 支持并发,实际容器命名为id-n 其中n 是每个实例的唯一值,以支持并发。这就是我们使用startsWith 在条件下。 |
如果您希望使用 idle 事件来停止列表器容器,则不应调用container.stop() 在调用侦听器的线程上。这样做会导致延迟和不必要的日志消息。相反,您应该将事件移交给可以停止容器的其他线程。此外,您不应该stop() 容器实例(如果它是子容器)。
您应该停止并发容器。 |
空闲时的当前位置
请注意,您可以通过实现ConsumerSeekAware
在您的听众中。
看onIdleContainer()
在寻求特定的抵消。
4.1.8. 主题/分区初始偏移量
有几种方法可以设置分区的初始偏移量。
手动分配分区时,您可以在配置的TopicPartitionOffset
参数(参见消息侦听器容器)。
您还可以随时查找特定的偏移量。
当您使用组管理时,代理分配分区:
-
对于新的
group.id
,则初始偏移量由auto.offset.reset
消费者属性 (earliest
或latest
). -
对于现有组 ID,初始偏移量是该组 ID 的当前偏移量。 但是,可以在初始化期间(或之后的任何时间)查找特定偏移量。
4.1.9. 寻求特定偏移量
为了寻求,您的监听器必须实现ConsumerSeekAware
,它有以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
这registerSeekCallback
在启动容器时以及每当分配分区时调用。
在初始化后的某个任意时间进行搜索时,应使用此回调。
您应该保存对回调的引用。
如果您在多个容器(或在ConcurrentMessageListenerContainer
),您应该将回调存储在ThreadLocal
或由侦听器键控的其他结构Thread
.
使用组管理时,onPartitionsAssigned
在分配分区时调用。
例如,您可以使用此方法通过调用回调来设置分区的初始偏移量。
您还可以使用此方法将此线程的回调与分配的分区相关联(请参阅下面的示例)。
您必须使用回调参数,而不是传递给registerSeekCallback
. 从版本 2.5.5 开始,即使使用手动分区分配,也会调用此方法。
onPartitionsRevoked
在容器停止或 Kafka 撤销分配时调用。您应该丢弃此线程的回调并删除与已撤销分区的任何关联。
回调具有以下方法:
void seek(String topic, int partition, long offset);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection=<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection=<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
seekRelative
在 2.3 版中添加了,以执行相对搜索。
-
offset
negative 和toCurrent
false
- 相对于分区末尾进行搜索。 -
offset
positive 和toCurrent
false
- 相对于分区的开头进行搜索。 -
offset
negative 和toCurrent
true
- 相对于当前位置进行搜索(倒带)。 -
offset
positive 和toCurrent
true
- 相对于当前位置进行搜索(快进)。
这seekToTimestamp
2.3 版中还添加了方法。
在onIdleContainer 或onPartitionsAssigned 方法,第二种方法是首选,因为在对消费者的offsetsForTimes 方法。
从其他位置调用时,容器将收集所有时间戳查找请求,并对offsetsForTimes . |
您还可以从onIdleContainer()
检测到空闲容器时。
有关如何启用空闲容器检测,请参阅检测空闲和无响应的使用者。
这seekToBeginning 方法很有用,例如,在处理压缩主题并且您希望每次启动应用程序时都搜索到开头: |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
要在运行时任意查找,请使用registerSeekCallback
对于适当的线程。
这是一个简单的 Spring Boot 应用程序,演示了如何使用回调;它向主题发送 10 条记录;打<Enter>
导致所有分区从头开始搜索。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
为了让事情变得简单,2.3 版本添加了AbstractConsumerSeekAware
类,它跟踪要用于主题/分区的回调。
以下示例演示如何在每次容器空闲时查找每个分区中处理的最后一条记录。
它还具有允许任意外部调用按一条记录倒带分区的方法。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
2.6 版为抽象类添加了便利方法:
-
seekToBeginning()
- 查找所有分配到开头的分区 -
seekToEnd()
- 查找所有分配的分区到最后 -
seekToTimestamp(long time)
- 查找所有分配给该时间戳表示的偏移量的分区。
例:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
}
}
4.1.10. 集装箱工厂
如@KafkaListener
注解一个ConcurrentKafkaListenerContainerFactory
用于为带注释的方法创建容器。
从 2.2 版开始,您可以使用相同的工厂创建任何ConcurrentMessageListenerContainer
.
如果您想创建多个具有相似属性的容器,或者您希望使用一些外部配置的工厂,例如 Spring Boot 自动配置提供的工厂,这可能很有用。
创建容器后,您可以进一步修改其属性,其中许多属性是通过使用container.getContainerProperties()
.
以下示例配置ConcurrentMessageListenerContainer
:
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
以这种方式创建的容器不会添加到终结点注册表中。
它们应该创建为@Bean 定义,以便它们向应用程序上下文注册。 |
从 2.3.4 版本开始,您可以添加ContainerCustomizer
到工厂,以便在创建和配置每个容器后进一步配置它。
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
4.1.11. 线程安全
使用并发消息侦听器容器时,将在所有使用者线程上调用单个侦听器实例。 因此,侦听器需要是线程安全的,并且最好使用无状态侦听器。 如果无法使侦听器成为线程安全的,或者添加同步会显着降低添加并发的好处,则可以使用以下几种技术之一:
-
用
n
容器与concurrency=1
使用范围MessageListener
bean,以便每个容器都有自己的实例(当使用@KafkaListener
). -
保持状态
ThreadLocal<?>
实例。 -
让单例侦听器委托给在
SimpleThreadScope
(或类似的范围)。
为了便于清理线程状态(对于前面列表中的第二项和第三项),从 2.2 版开始,侦听器容器发布ConsumerStoppedEvent
当每个线程退出时。您可以使用ApplicationListener
或@EventListener
删除方法ThreadLocal<?>
instances 或remove()
线程作用域的 bean。请注意,SimpleThreadScope
不会销毁具有销毁接口(例如DisposableBean
),所以你应该destroy()
实例自己。
默认情况下,应用程序上下文的事件多播器在调用线程上调用事件侦听器。如果将多播器更改为使用异步执行器,则线程清理无效。 |
4.1.12. 监控
监控侦听器性能
从 2.3 版本开始,监听器容器将自动创建和更新 MicrometerTimer
s 表示侦听器,如果Micrometer
在类路径上检测到单个MeterRegistry
存在于应用程序上下文中。可以通过设置ContainerProperty
micrometerEnabled
自false
.
维护两个计时器 - 一个用于成功调用侦听器,一个用于失败。
计时器被命名为spring.kafka.listener
并具有以下标签:
-
name
:(容器 bean 名称) -
result
:success
或failure
-
exception
:none
或ListenerExecutionFailedException
您可以使用ContainerProperties
micrometerTags
财产。
使用并发容器,为每个线程创建计时器,并且name 标签后缀为-n 其中 n 是0 自concurrency-1 . |
监控 KafkaTemplate 性能
从 2.5 版开始,模板将自动创建和更新 MicrometerTimer
s 用于发送作,如果Micrometer
在类路径上检测到单个MeterRegistry
存在于应用程序上下文中。
可以通过设置模板的micrometerEnabled
属性设置为false
.
维护两个计时器 - 一个用于成功调用侦听器,一个用于失败。
计时器被命名为spring.kafka.template
并具有以下标签:
-
name
:(模板 bean 名称) -
result
:success
或failure
-
exception
:none
或失败的异常类名称
您可以使用模板的micrometerTags
财产。
微米原生指标
从 2.5 版本开始,该框架提供了工厂监听器来管理千分尺KafkaClientMetrics
每当创建和关闭生产者和消费者时,实例。
要启用此功能,只需将监听器添加到生产者和消费者工厂:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
消费者/生产者id
传递给侦听器的标记将添加到带有标记名称的仪表的标记中spring.id
.
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count()
为StreamsBuilderFactoryBean
- 请参阅 KafkaStreams 千分尺支持。
4.1.13. 事务
本节介绍 Spring for Apache Kafka 如何支持事务。
概述
0.11.0.0 客户端库添加了对事务的支持。 Spring for Apache Kafka 通过以下方式添加了支持:
-
KafkaTransactionManager
:与正常的 Spring 事务支持一起使用(@Transactional
,TransactionTemplate
等等)。 -
事务
KafkaMessageListenerContainer
-
本地交易
KafkaTemplate
-
与其他事务管理器的事务同步
通过提供DefaultKafkaProducerFactory
使用transactionIdPrefix
.
在这种情况下,而不是管理单个共享Producer
,工厂维护事务生产者的缓存。
当用户调用close()
在生产者上,它被返回到缓存中以供重用,而不是实际关闭。
这transactional.id
每个生产者的属性是transactionIdPrefix
+ n
哪里n
开头为0
并且会为每个新生产者递增,除非事务是由具有基于记录的侦听器的侦听器容器启动的。
在这种情况下,transactional.id
是<transactionIdPrefix>.<group.id>.<topic>.<partition>
.
这是为了正确支持击剑僵尸,如此处所述。
此新行为已在版本 1.3.7、2.0.6、2.1.10 和 2.2.0 中添加。
如果要恢复到以前的行为,可以将producerPerConsumerPartition
属性DefaultKafkaProducerFactory
自false
.
虽然批处理侦听器支持事务,但默认情况下,不支持僵尸隔离,因为批处理可能包含来自多个主题或分区的记录。
但是,从版本 2.3.2 开始,如果将 container 属性设置为subBatchPerPartition 设置为 true。
在这种情况下,从上次轮询接收的每个分区都会调用一次批处理侦听器,就好像每个轮询只返回单个分区的记录一样。
这是true 默认情况下,从 2.5 版开始,当事务启用时EOSMode.ALPHA ;将其设置为false 如果您正在使用事务但不关心僵尸围栏。
另请参阅 Exactly Once 语义。 |
另请参阅transactionIdPrefix
.
使用 Spring Boot,只需将spring.kafka.producer.transaction-id-prefix
property - Boot 将自动配置KafkaTransactionManager
bean 并将其连接到侦听器容器中。
从 2.5.8 版开始,您现在可以配置maxAge 生产者工厂的财产。
当使用可能为代理的transactional.id.expiration.ms .
与电流kafka-clients ,这可能会导致ProducerFencedException 无需重新平衡。
通过设置maxAge 小于transactional.id.expiration.ms ,如果生产者超过最大使用年限,工厂将刷新生产者。 |
用KafkaTransactionManager
这KafkaTransactionManager
是 Spring Framework 的PlatformTransactionManager
.
在其构造函数中提供了对生产者工厂的引用。
如果您提供自定义生产者工厂,则它必须支持事务。
看ProducerFactory.transactionCapable()
.
您可以使用KafkaTransactionManager
具有正常的 Spring 事务支持(@Transactional
,TransactionTemplate
等)。
如果事务处于活动状态,则任何KafkaTemplate
在事务范围内执行的作使用事务的Producer
.
管理器根据成功或失败提交或回滚事务。
您必须配置KafkaTemplate
使用相同的ProducerFactory
作为事务管理器。
事务同步
本节涉及仅生产者事务(不是由侦听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅使用使用者发起的事务。
如果你想将记录发送到 kafka 并执行一些数据库更新,你可以使用普通的 Spring 事务管理,比如说,一个DataSourceTransactionManager
.
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
的拦截器@Transactional
注释启动事务,并且KafkaTemplate
将事务与该事务管理器同步;每次发送都会参与该交易。
当该方法退出时,数据库事务将提交,然后是 Kafka 事务。
如果您希望以相反的顺序执行提交(首先是 Kafka),请使用 嵌套@Transactional
方法,外部方法配置为使用DataSourceTransactionManager
,并且配置为使用KafkaTransactionManager
.
有关在 Kafka 优先或 DB 优先配置中同步 JDBC 和 Kafka 事务的应用程序示例,请参阅 Kafka 事务与其他事务管理器的示例。
从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果在同步事务上提交失败(在主事务提交之后),则将向调用者抛出异常。 以前,这被静默忽略(在调试时记录)。 如有必要,应用程序应采取补救措施来补偿已提交的主事务。 |
使用使用者发起的事务
这ChainedKafkaTransactionManager
现在已弃用,从 2.7 版开始;请参阅 JavaDocs 了解其超类ChainedTransactionManager
了解更多信息。
相反,请使用KafkaTransactionManager
启动 Kafka 事务,并使用@Transactional
以启动另一笔事务。
有关链接 JDBC 和 Kafka 事务的示例应用程序,请参阅 Kafka 事务与其他事务管理器的示例。
KafkaTemplate
本地交易
您可以使用KafkaTemplate
在本地事务中执行一系列作。以下示例显示了如何执行此作:
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回调中的参数是模板本身(this
). 如果回调正常退出,则事务被提交。如果抛出异常,则事务将回滚。
如果有KafkaTransactionManager (或同步)事务,则不使用它。相反,使用新的“嵌套”事务。 |
transactionIdPrefix
如概述中所述,生产者工厂配置了此属性来构建生产者transactional.id
财产。 指定此属性时存在二分法,即当使用EOSMode.ALPHA
,在侦听器容器线程上生成记录时,它必须在所有实例上都相同,以满足围栏僵尸(在概述中也提到)。但是,当使用不是由侦听器容器启动的事务生成记录时,每个实例上的前缀必须不同。版本 2.3 使配置更简单,尤其是在 Spring Boot 应用程序中。在以前的版本中,您必须创建两个生产者工厂和KafkaTemplate
s - 一个用于在侦听器容器线程上生成记录,另一个用于由kafkaTemplate.executeInTransaction()
或由事务拦截器在@Transactional
方法。
现在,您可以覆盖工厂的transactionalIdPrefix
在KafkaTemplate
和KafkaTransactionManager
.
将事务管理器和模板用于侦听器容器时,通常会将其默认为生产者工厂的属性。
使用EOSMode.ALPHA
.
跟EOSMode.BETA
不再需要使用相同的transactional.id
,即使是消费者发起的交易;事实上,它在每个实例上必须是唯一的,就像生产者发起的事务一样。
对于由模板(或@Transaction
)应分别在模板和事务管理器上设置属性。
此属性在每个应用程序实例上必须具有不同的值。
这个问题(不同的规则transactional.id ) 在EOSMode.BETA 正在使用(代理版本 >= 2.5);请参阅 Exactly Once 语义。 |
KafkaTemplate
事务性和非事务性发布
通常,当KafkaTemplate
是事务性的(配置了支持事务的生产者工厂),则需要事务。
事务可以通过TransactionTemplate
一个@Transactional
方法, 调用executeInTransaction
,或通过侦听器容器,当配置了KafkaTransactionManager
.
任何在事务范围之外使用模板的尝试都会导致模板抛出IllegalStateException
.
从 2.4.3 版开始,您可以将模板的allowNonTransactional
属性设置为true
.
在这种情况下,模板将允许作在没有事务的情况下运行,方法是调用ProducerFactory
的createNonTransactionalProducer()
方法;生产者将像往常一样被缓存或线程绑定,以便重用。
看用DefaultKafkaProducerFactory
.
与批处理侦听器的交易
当侦听器在使用事务时失败时,将AfterRollbackProcessor
在回滚发生后调用以执行某些作。
使用默认值AfterRollbackProcessor
使用记录侦听器时,将执行搜索,以便重新传递失败的记录。
但是,使用批处理侦听器时,将重新传递整个批处理,因为框架不知道批处理中的哪条记录失败了。
有关详细信息,请参阅回滚后处理器。
使用批处理侦听器时,版本 2.4.2 引入了一种替代机制来处理处理批处理时的故障;这BatchToRecordAdapter
.
当集装箱工厂batchListener
设置为 true 配置了BatchToRecordAdapter
,则一次调用一条记录的侦听器。这允许在批处理中进行错误处理,同时仍然可以停止处理整个批处理,具体取决于异常类型。默认BatchToRecordAdapter
提供,可以配置标准ConsumerRecordRecoverer
例如DeadLetterPublishingRecoverer
.
以下测试用例配置片段说明了如何使用此功能:
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}
4.1.14. 恰好一次语义
您可以为侦听器容器提供KafkaAwareTransactionManager
实例。
如此配置后,容器会在调用侦听器之前启动事务。
任何KafkaTemplate
侦听器执行的作参与事务。
如果侦听器成功处理了记录(或多条记录,当使用BatchMessageListener
),容器使用producer.sendOffsetsToTransaction()
),然后再提交事务。
如果侦听器抛出异常,则事务将回滚,使用者将重新定位,以便在下一次轮询时可以检索回滚的记录。
请参阅回滚后处理器,了解更多信息以及处理反复失败的记录。
使用事务可以启用 Exactly Once 语义 (EOS)。
这意味着,对于read→process-write
sequence,则保证序列恰好完成一次。
(读取和进程至少具有一次语义)。
Spring for Apache Kafka 版本 2.5 及更高版本支持两种 EOS 模式:
-
ALPHA
- 别名V1
(已弃用) -
BETA
- 别名V2
(已弃用) -
V1
-即transactional.id
围栏(自版本 0.11.0.0 起) -
V2
- 又名 fetch-offset-request fencing(自 2.5 版起)
带模式V1
,如果另一个实例具有相同的transactional.id
已启动。
Spring 通过使用Producer
对于每个group.id/topic/partition
;发生重新平衡时,新实例将使用相同的transactional.id
老生产者被围起来。
带模式V2
,则不必为每个group.id/topic/partition
因为消费者元数据与偏移量一起发送到事务,并且代理可以使用该信息来确定生产者是否被隔离。
从 2.6 版开始,默认的EOSMode
是V2
.
将容器配置为使用ALPHA
,将容器属性EOSMode
自ALPHA
,以恢复到以前的行为。
跟V2 (默认),您的代理必须是 2.5 或更高版本;kafka-clients 3.0 版本,生产者将不再回退到V1 ;如果经纪人不支持V2 ,则引发异常。
如果您的代理早于 2.5,则必须将EOSMode 自V1 ,将DefaultKafkaProducerFactory producerPerConsumerPartition 设置为true 而且,如果您使用的是批处理监听器,则应将subBatchPerPartition 自true . |
当您的代理升级到 2.5 或更高版本时,您应该将模式切换到V2
,但生产商数量将保持原样。
然后,您可以使用producerPerConsumerPartition
设置为false
减少生产商的数量;您也不应再将subBatchPerPartition
container 属性。
如果您的代理已经是 2.5 或更高版本,您应该将DefaultKafkaProducerFactory
producerPerConsumerPartition
属性设置为false
,以减少所需的生产者数量。
使用时EOSMode.V2 跟producerPerConsumerPartition=false 这transactional.id 在所有应用程序实例中必须是唯一的。 |
使用时V2
模式,则不再需要将subBatchPerPartition
自true
;它将默认为false
当EOSMode
是V2
.
有关更多信息,请参阅 KIP-447。
V1
和V2
以前是ALPHA
和BETA
; 它们已被更改以使框架与 KIP-732 保持一致。
4.1.15. 将 Spring Bean 连接到生产者/消费者拦截器
Apache Kafka 提供了一种向生产者和消费者添加拦截器的机制。这些对象由 Kafka 而不是 Spring 管理,因此正常的 Spring 依赖注入不适用于依赖 Spring Bean 中的连接。但是,您可以使用拦截器手动连接这些依赖项config()
方法。 以下 Spring Boot 应用程序展示了如何通过覆盖 boot 的默认工厂以将一些依赖 bean 添加到配置属性中来执行此作。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
Map<String, Object> consumerProperties = new HashMap<>();
// consumerProperties.put(..., ...)
// ...
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("some.bean", someBean);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(SomeBean someBean) {
Map<String, Object> producerProperties = new HashMap<>();
// producerProperties.put(..., ...)
// ...
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
producerProperties.put("some.bean", someBean);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
return factory;
}
@Bean
public SomeBean someBean() {
return new SomeBean();
}
@KafkaListener(id = "kgk897", topics = "kgh897")
public void listen(String in) {
System.out.println("Received " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("kgh897", "test");
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kgh897")
.partitions(1)
.replicas(1)
.build();
}
}
public class SomeBean {
public void someMethod(String what) {
System.out.println(what + " in my foo bean");
}
}
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.bean.someMethod("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
结果:
producer interceptor in my foo bean
consumer interceptor in my foo bean
Received test
4.1.16. 暂停和恢复侦听器容器
添加了 2.1.3 版本pause()
和resume()
方法到监听器容器。
以前,您可以在ConsumerAwareMessageListener
并通过监听ListenerContainerIdleEvent
,它提供了对Consumer
对象。
虽然可以使用事件侦听器暂停空闲容器中的使用者,但在某些情况下,这不是线程安全的,因为不能保证在使用者线程上调用事件侦听器。
要安全地暂停和恢复消费者,您应该使用pause
和resume
侦听器容器上的方法。
一个pause()
在下一个之前生效poll()
;一个resume()
在当前之后生效poll()
返回。
当容器暂停时,它将继续poll()
使用者,如果正在使用组管理,则避免重新平衡,但它不会检索任何记录。
有关更多信息,请参阅 Kafka 文档。
从 2.1.5 版开始,您可以调用isPauseRequested()
看看是否pause()
被叫了。
然而,消费者可能还没有真正停下来。isConsumerPaused()
如果全部Consumer
实例实际上已经暂停了。
此外(也是从 2.1.5 开始),ConsumerPausedEvent
和ConsumerResumedEvent
实例与容器一起发布为source
属性和TopicPartition
涉及的实例partitions
财产。
以下简单的 Spring Boot 应用程序演示了使用容器注册表获取对@KafkaListener
方法的容器并暂停或恢复其消费者以及接收相应的事件:
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
以下列表显示了上述示例的结果:
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2
4.1.17. 暂停和恢复侦听器容器上的分区
从 2.7 版开始,您可以使用pausePartition(TopicPartition topicPartition)
和resumePartition(TopicPartition topicPartition)
监听器容器中的方法。
暂停和恢复分别发生在poll()
类似于pause()
和resume()
方法。
这isPartitionPauseRequested()
如果已请求该分区的暂停,则方法返回 true。
这isPartitionPaused()
如果该分区实际上已暂停,则方法返回 true。
同样从 2.7 版本开始ConsumerPartitionPausedEvent
和ConsumerPartitionResumedEvent
实例与容器一起发布为source
属性和TopicPartition
实例。
4.1.18. 序列化、反序列化和消息转换
概述
Apache Kafka 提供了一个高级 API,用于序列化和反序列化记录值及其键。
它与org.apache.kafka.common.serialization.Serializer<T>
和org.apache.kafka.common.serialization.Deserializer<T>
抽象与一些内置实现。
同时,我们可以使用Producer
或Consumer
配置属性。
以下示例显示了如何执行此作:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或更特殊的情况,请执行KafkaConsumer
(因此,KafkaProducer
) 提供过载
要接受的构造函数Serializer
和Deserializer
实例keys
和values
分别。
使用此 API 时,DefaultKafkaProducerFactory
和DefaultKafkaConsumerFactory
还提供属性(通过构造函数或 setter 方法)来注入自定义Serializer
和Deserializer
实例到目标中Producer
或Consumer
.
此外,您可以传入Supplier<Serializer>
或Supplier<Deserializer>
实例通过构造函数 - 这些Supplier
s 在创建每个Producer
或Consumer
.
字符串序列化
从 2.5 版开始,Spring for Apache Kafka 提供了ToStringSerializer
和ParseStringDeserializer
使用实体的字符串表示的类。
他们依赖于方法toString
和一些Function<String>
或BiFunction<String, Headers>
以解析 String 并填充实例的属性。
通常,这会在类上调用一些静态方法,例如parse
:
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下,ToStringSerializer
配置为传达有关记录中序列化实体的类型信息Headers
.
您可以通过将addTypeInfo
属性设置为 false。
此信息可供ParseStringDeserializer
在接收方。
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
要在ToStringSerializer
(将addTypeInfo
属性)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以配置Charset
用于转换String
往返byte[]
默认值为UTF-8
.
您可以使用解析器方法的名称配置反序列化器,方法是使用ConsumerConfig
性能:
-
ParseStringDeserializer.KEY_PARSER
-
ParseStringDeserializer.VALUE_PARSER
属性必须包含类的完全限定名称,后跟方法名称,用句点分隔.
.
该方法必须是静态的,并且签名为(String, Headers)
或(String)
.
一个ToFromStringSerde
还提供了用于 Kafka Streams 的。
JSON
Spring for Apache Kafka 还提供了JsonSerializer
和JsonDeserializer
基于
Jackson JSON 对象映射器。
这JsonSerializer
允许将任何 Java 对象写为 JSONbyte[]
.
这JsonDeserializer
需要额外的Class<?> targetType
参数,以允许对已使用的byte[]
到适当的目标对象。
以下示例演示如何创建JsonDeserializer
:
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以自定义两者JsonSerializer
和JsonDeserializer
使用ObjectMapper
.
您还可以扩展它们以在configure(Map<String, ?> configs, boolean isKey)
方法。
从版本 2.3 开始,默认情况下,所有 JSON 感知组件都配置了JacksonUtils.enhancedObjectMapper()
实例,它附带了MapperFeature.DEFAULT_VIEW_INCLUSION
和DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
功能已禁用。
此外,此类实例还提供了用于自定义数据类型的知名模块,例如 Java 时间和 Kotlin 支持。
看JacksonUtils.enhancedObjectMapper()
JavaDocs 了解更多信息。
此方法还注册一个org.springframework.kafka.support.JacksonMimeTypeModule
为org.springframework.util.MimeType
对象序列化为纯字符串,以实现网络上的平台间兼容性。
一个JacksonMimeTypeModule
可以在应用程序上下文中注册为 bean,它将自动配置到弹簧靴ObjectMapper
实例.
同样从 2.3 版本开始,JsonDeserializer
提供TypeReference
基于构造函数,以便更好地处理目标通用容器类型。
从 2.1 版开始,您可以在记录中传达类型信息Headers
,允许处理多种类型。
此外,您可以使用以下 Kafka 属性配置序列化器和反序列化程序。
如果您提供了Serializer
和Deserializer
实例KafkaConsumer
和KafkaProducer
分别。
配置属性
-
JsonSerializer.ADD_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
要在JsonSerializer
(将addTypeInfo
属性)。 -
JsonSerializer.TYPE_MAPPINGS
(默认empty
):请参阅映射类型。 -
JsonDeserializer.USE_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
忽略序列化程序设置的标头。 -
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
以保留序列化程序设置的标头。 -
JsonDeserializer.KEY_DEFAULT_TYPE
:如果不存在标头信息,则用于反序列化键的回退类型。 -
JsonDeserializer.VALUE_DEFAULT_TYPE
:如果不存在标头信息,则用于反序列化值的回退类型。 -
JsonDeserializer.TRUSTED_PACKAGES
(默认java.util
,java.lang
):允许反序列化的包模式的逗号分隔列表。 意味着全部反序列化。*
-
JsonDeserializer.TYPE_MAPPINGS
(默认empty
):请参阅映射类型。 -
JsonDeserializer.KEY_TYPE_METHOD
(默认empty
):请参阅使用方法确定类型。 -
JsonDeserializer.VALUE_TYPE_METHOD
(默认empty
):请参阅使用方法确定类型。
从版本 2.2 开始,类型信息标头(如果由序列化程序添加)将由反序列化程序删除。
您可以通过将removeTypeHeaders
属性设置为false
,直接在反序列化程序上或使用前面所述的配置属性。
从版本 2.8 开始,如果您以编程方式构造序列化程序或反序列化程序,如编程构造中所示,只要您没有显式设置任何属性(使用set*() 方法或使用 Fluent API)。
以前,在以编程方式创建时,从未应用配置属性;如果直接在对象上显式设置属性,情况仍然如此。 |
映射类型
从版本 2.2 开始,使用 JSON 时,现在可以使用前面列表中的属性提供类型映射。
以前,必须在序列化程序和反序列化程序中自定义类型映射器。
映射由逗号分隔的列表组成token:className
对。
在出站时,有效负载的类名将映射到相应的Tokens。
入站时,类型标头中的Tokens映射到相应的类名。
以下示例创建一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.hat");
相应的对象必须兼容。 |
如果您使用 Spring Boot,则可以在application.properties
(或 yaml)文件。
以下示例显示了如何执行此作:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
只能使用属性执行简单配置。
对于更高级的配置(例如使用自定义
还提供了 setter,作为使用这些构造函数的替代方法。 |
从版本 2.2 开始,您可以使用具有布尔值的重载构造函数之一显式将反序列化程序配置为使用提供的目标类型并忽略标头中的类型信息useHeadersIfPresent
(即true
默认情况下)。
以下示例显示了如何执行此作:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法确定类型
从版本 2.5 开始,您现在可以通过属性配置反序列化程序,以调用方法来确定目标类型。
如果存在,这将覆盖上面讨论的任何其他技术。
如果数据是由不使用 Spring 序列化器的应用程序发布的,并且您需要根据数据或其他标头反序列化为不同的类型,这将很有用。
将这些属性设置为方法名称 - 完全限定的类名,后跟方法名称,用句点分隔.
.
该方法必须声明为public static
,具有三个签名之一(String topic, byte[] data, Headers headers)
,(byte[] data, Headers headers)
或(byte[] data)
并返回JacksonJavaType
.
-
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
-
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
您可以使用任意标头或检查数据以确定类型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的数据检查,请考虑使用JsonPath
或类似,但是,确定类型的测试越简单,过程就越有效。
下面是以编程方式创建反序列化程序的示例(在构造函数中向使用者工厂提供反序列化程序时):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程序化施工
从版本 2.3 开始,以编程方式构造序列化程序/反序列化程序以在生产者/消费者工厂中使用时,您可以使用 fluent API,这简化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
要以编程方式提供类型映射,类似于使用方法确定类型,请使用typeFunction
财产。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要不使用 fluent API 来配置属性,或使用set*()
方法,工厂将使用配置属性配置序列化器/反序列化器;请参阅配置属性。
委派序列化器和解序列化器
使用标题
2.3 版引入了DelegatingSerializer
和DelegatingDeserializer
,允许生成和使用具有不同键和/或值类型的记录。
生产者必须设置标头DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
设置为用于选择要用于该值的序列化程序的选择器值,以及DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
钥匙;如果未找到匹配项,则IllegalStateException
被抛出。
对于传入记录,反序列化程序使用相同的标头来选择要使用的反序列化程序;如果未找到匹配项或标头不存在,则原始byte[]
被返回。
您可以将选择器的映射配置为Serializer
/ Deserializer
通过构造函数,或者您可以通过 Kafka 生产者/消费者属性使用密钥进行配置DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
和DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
.
对于序列化程序,生产者属性可以是Map<String, Object>
其中键是选择器,值是Serializer
实例,一个序列化程序Class
或类名。
该属性也可以是逗号分隔的映射条目的字符串,如下所示。
对于反序列化程序,使用者属性可以是Map<String, Object>
其中键是选择器,值是Deserializer
实例,一个解序列化器Class
或类名。
该属性也可以是逗号分隔的映射条目的字符串,如下所示。
要配置使用属性,请使用以下语法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
然后,生产者将设置DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
header 到thing1
或thing2
.
此技术支持将不同类型发送到同一主题(或不同主题)。
从版本 2.5.1 开始,如果类型(键或值)是Serdes (Long ,Integer ,等)。相反,序列化程序会将标头设置为类型的类名。无需为这些类型配置序列化程序或反序列化程序,它们将动态创建(一次)。 |
有关将不同类型发送到不同主题的另一种技术,请参阅用RoutingKafkaTemplate
.
按类型
2.8 版本引入了DelegatingByTypeSerializer
.
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从版本 2.8.3 开始,您可以配置序列化器以检查映射键是否可从目标对象分配,这在委托序列化器可以序列化子类时很有用。在这种情况下,如果存在 amiguous 匹配项,则有序的Map
,例如LinkedHashMap
应该提供。
按主题
从 2.8 版本开始,DelegatingByTopicSerializer
和DelegatingByTopicDeserializer
允许根据主题名称选择序列化器/反序列化器。正则表达式Pattern
s 用于查找要使用的实例。可以使用构造函数或通过属性(逗号分隔的列表pattern:serializer
).
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
用KEY_SERIALIZATION_TOPIC_CONFIG
将其用于键时。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null,
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以使用以下命令指定在没有模式匹配时使用的默认序列化器/反序列化器DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
和DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
.
附加属性DelegatingByTopicSerialization.CASE_SENSITIVE
(默认true
),当设置为false
使主题查找不区分大小写。
重试解序列化程序
这RetryingDeserializer
使用委托Deserializer
和RetryTemplate
当委托在反序列化期间可能出现暂时性错误(例如网络问题)时重试反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
请参阅 spring-retry 项目以配置RetryTemplate
具有重试策略、退避策略等。
Spring Messaging 消息转换
尽管Serializer
和Deserializer
API 与低级 Kafka 相比非常简单灵活Consumer
和Producer
透视,当使用时,您可能需要在 Spring Messaging 级别上具有更大的灵活性@KafkaListener
或 Spring Integration 的 Apache Kafka 支持。
让您轻松地进行转换org.springframework.messaging.Message
,Spring for Apache Kafka 提供了一个MessageConverter
抽象与MessagingMessageConverter
实现及其JsonMessageConverter
(和子类)自定义。
您可以注入MessageConverter
变成一个KafkaTemplate
实例直接并使用AbstractKafkaListenerContainerFactory
bean 定义@KafkaListener.containerFactory()
财产。
以下示例显示了如何执行此作:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
使用 Spring Boot 时,只需将转换器定义为@Bean
Spring Boot 自动配置会将其连接到自动配置的模板和容器工厂中。
当您使用@KafkaListener
,则将参数类型提供给消息转换器以协助转换。
只有当 |
在消费者端,您可以配置 在生产者端,当您使用 Spring Integration 或
同样,使用 为方便起见,从 2.3 版本开始,该框架还提供了 |
从版本 2.7.1 开始,消息有效负载转换可以委托给spring-messaging
SmartMessageConverter
;例如,这使得转换可以基于MessageHeaders.CONTENT_TYPE
页眉。
这KafkaMessageConverter.fromMessage() 方法被调用以将出站转换为ProducerRecord 在ProducerRecord.value() 财产。
这KafkaMessageConverter.toMessage() 方法被调用用于从ConsumerRecord 有效载荷为ConsumerRecord.value() 财产。
这SmartMessageConverter.toMessage() 方法被调用来创建一个新的出站Message<?> 从Message 传递给'fromMessage()'(通常由KafkaTemplate.send(Message<?> msg) ).
同样,在KafkaMessageConverter.toMessage() 方法,在转换器创建新的Message<?> 从ConsumerRecord 这SmartMessageConverter.fromMessage() 方法,然后使用新转换的有效负载创建最终入站消息。
无论哪种情况,如果SmartMessageConverter 返回null ,则使用原始消息。 |
当默认转换器在KafkaTemplate
和侦听器容器工厂,则配置SmartMessageConverter
通过调用setMessagingConverter()
在模板上并通过contentMessageConverter
属性@KafkaListener
方法。
例子:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring 数据投影接口
从版本 2.1.1 开始,您可以将 JSON 转换为 Spring Data Projection 接口而不是具体类型。这允许对数据进行非常选择性和低耦合的绑定,包括从 JSON 文档中的多个位置查找值。例如,可以将以下接口定义为消息有效负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
默认情况下,访问器方法将用于将属性名称查找为接收的 JSON 文档中的字段。
这@JsonPath
expression 允许自定义值查找,甚至可以定义多个 JSON 路径表达式,从多个位置查找值,直到表达式返回实际值。
要启用此功能,请使用ProjectingMessageConverter
配置了适当的委托转换器(用于出站转换和转换非投影接口)。
您还必须添加spring-data:spring-data-commons
和com.jayway.jsonpath:json-path
到类路径。
当用作参数时@KafkaListener
方法,接口类型会正常自动传递给转换器。
用ErrorHandlingDeserializer
当反序列化程序无法反序列化消息时,Spring 无法处理该问题,因为它发生在poll()
返回。
为了解决这个问题,请ErrorHandlingDeserializer
已被引入。
此反序列化程序委托给实际的反序列化程序(键或值)。
如果委托未能反序列化记录内容,则ErrorHandlingDeserializer
返回一个null
value 和DeserializationException
在包含原因和原始字节的标头中。
当您使用记录级别MessageListener
,如果ConsumerRecord
包含一个DeserializationException
标头,容器的ErrorHandler
使用 failedConsumerRecord
.
记录不会传递给侦听器。
或者,您可以配置ErrorHandlingDeserializer
要通过提供failedDeserializationFunction
,这是一个Function<FailedDeserializationInfo, T>
.
调用此函数以创建T
,以通常的方式传递给听众。
类型FailedDeserializationInfo
,其中包含向函数提供的所有上下文信息。
您可以找到DeserializationException
(作为序列化的 Java 对象)在标头中。
请参阅 Javadoc 中的ErrorHandlingDeserializer
了解更多信息。
您可以使用DefaultKafkaConsumerFactory
接受键和值的构造函数Deserializer
适当的物体和电线ErrorHandlingDeserializer
已配置了适当委托的实例。
或者,您可以使用使用者配置属性(由ErrorHandlingDeserializer
) 来实例化委托。
属性名称是ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
和ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
.
属性值可以是类或类名称。
以下示例演示如何设置这些属性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用failedDeserializationFunction
.
public class BadFoo extends Foo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedFooProvider implements Function<FailedDeserializationInfo, Foo> {
@Override
public Foo apply(FailedDeserializationInfo info) {
return new BadFoo(info);
}
}
前面的示例使用以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.class);
...
如果消费者配置了ErrorHandlingDeserializer 配置KafkaTemplate 及其生产者带有一个序列化器,可以处理普通对象和原始对象byte[] 值,这是由反序列化异常产生的。
模板的泛型值类型应为Object .
一种技术是使用DelegatingByTypeSerializer ;下面是一个例子: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
使用ErrorHandlingDeserializer
使用批处理侦听器时,必须检查消息标头中的反序列化异常。
当与DefaultBatchErrorHandler
,您可以使用该标头来确定异常失败的记录,并通过BatchListenerFailedException
.
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
DeserializationException deserEx = ListenerUtils.byteArrayToDeserializationException(this.logger,
(byte[]) headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
ListenerUtils.byteArrayToDeserializationException()
可用于将标头转换为DeserializationException
.
食用时List<ConsumerRecord<?, ?>
,ListenerUtils.getExceptionFromHeader()
代替:
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果您还使用DeadLetterPublishingRecoverer ,则为DeserializationException 将有一个record.value() 类型byte[] ;这不应该序列化。
考虑使用DelegatingByTypeSerializer 配置为使用ByteArraySerializer 为byte[] 以及所有其他类型的普通序列化程序(Json、Avro 等)。 |
使用批量侦听器的有效负载转换
您还可以使用JsonMessageConverter
在BatchMessagingMessageConverter
在使用批处理侦听器容器工厂时转换批处理消息。
有关更多信息,请参阅序列化,反序列化和消息转换和Spring Messaging Message Conversion。
默认情况下,转换的类型是从 listener 参数推断出来的。
如果将JsonMessageConverter
使用DefaultJackson2TypeMapper
它有它的TypePrecedence
设置为TYPE_ID
(而不是默认的INFERRED
),转换器会改用标头中的类型信息(如果存在)。
例如,这允许使用接口而不是具体类声明侦听器方法。
此外,类型转换器支持映射,因此反序列化可以与源不同(只要数据兼容)。
当您使用类级@KafkaListener
实例其中有效负载必须已转换,以确定要调用的方法。
以下示例创建使用此方法的 bean:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,要实现此目的,转换目标的方法签名必须是具有单个泛型参数类型的容器对象,如下所示:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
请注意,您仍然可以访问批处理标头。
如果批处理转换器具有支持它的记录转换器,您还可以接收根据泛型类型转换有效负载的消息列表。 以下示例显示了如何执行此作:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen1(List<Message<Foo>> fooMessages) {
...
}
ConversionService
定制
从 2.1.1 版本开始,org.springframework.core.convert.ConversionService
默认使用o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory
为了解析调用监听器的参数,方法随实现以下任何接口的所有 Bean 一起提供:
-
org.springframework.core.convert.converter.Converter
-
org.springframework.core.convert.converter.GenericConverter
-
org.springframework.format.Formatter
这使您可以进一步自定义侦听器反序列化,而无需更改ConsumerFactory
和KafkaListenerContainerFactory
.
设置自定义MessageHandlerMethodFactory 在KafkaListenerEndpointRegistrar 通过KafkaListenerConfigurer bean 禁用此功能。 |
添加自定义HandlerMethodArgumentResolver
自@KafkaListener
从 2.4.2 版开始,您可以添加自己的HandlerMethodArgumentResolver
并解析自定义方法参数。
您所需要做的就是实现KafkaListenerConfigurer
和使用方法setCustomMethodArgumentResolvers()
从课堂上KafkaListenerEndpointRegistrar
.
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
您还可以通过添加自定义MessageHandlerMethodFactory
到KafkaListenerEndpointRegistrar
豆。
如果执行此作,并且应用程序需要处理逻辑删除记录,则使用null
value()
(例如,从压缩的主题),您应该添加一个KafkaNullAwarePayloadArgumentResolver
到工厂;它必须是最后一个解析器,因为它支持所有类型,并且可以在没有@Payload
注解。
如果您正在使用DefaultMessageHandlerMethodFactory
,将此解析器设置为最后一个自定义解析器;工厂将确保该旋转转换器在标准之前使用PayloadMethodArgumentResolver
,它不知道KafkaNull
负载。
另请参阅“逻辑删除”记录的空有效负载和日志压缩。
4.1.19. 消息头
0.11.0.0 客户端引入了对消息中标头的支持。
从 2.0 版开始,Spring for Apache Kafka 现在支持将这些标头映射到和映射spring-messaging
MessageHeaders
.
映射的先前版本ConsumerRecord 和ProducerRecord 到 spring-messagingMessage<?> ,其中 value 属性映射到payload 和其他属性 (topic ,partition ,依此类推)被映射到标头。情况仍然如此,但现在可以映射其他(任意)标头。 |
Apache Kafka 标头有一个简单的 API,如以下接口定义所示:
public interface Header {
String key();
byte[] value();
}
这KafkaHeaderMapper
提供策略来在 Kafka 之间映射标头条目Headers
和MessageHeaders
. 其接口定义如下:
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
这SimpleKafkaHeaderMapper
将原始标头映射为byte[]
,具有用于转换为String
值。
这DefaultKafkaHeaderMapper
将键映射到MessageHeaders
标头名称,为了支持出站消息的丰富标头类型,执行 JSON 转换。“特殊”标头(键为spring_json_header_types
) 包含<key>:<type>
. 此标头用于入站端,以提供每个标头值到原始类型的适当转换。
在入站方面,所有 KafkaHeader
实例映射到MessageHeaders
. 在出站端,默认情况下,所有MessageHeaders
映射,但id
,timestamp
,以及映射到ConsumerRecord
性能。
您可以通过向映射器提供模式来指定要为出站消息映射的标头。以下列表显示了一些示例映射:
public DefaultKafkaHeaderMapper() { (1)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public DefaultKafkaHeaderMapper(String... patterns) { (3)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
1 | 使用默认的 JacksonObjectMapper 并映射大多数标头,如示例之前所述。 |
2 | 使用提供的JacksonObjectMapper 并映射大多数标头,如示例之前所述。 |
3 | 使用默认的 JacksonObjectMapper 并根据提供的模式映射标头。 |
4 | 使用提供的JacksonObjectMapper 并根据提供的模式映射标头。 |
模式相当简单,可以包含前导通配符 ()、尾随通配符或两者(例如,
.cat.*
). 您可以使用前导!
. 与标头名称匹配的第一个模式(无论是正数还是负数)获胜。
当您提供自己的模式时,我们建议将!id
和!timestamp
,因为这些标头在入站端是只读的。
默认情况下,映射器仅反序列化java.lang 和java.util . 您可以通过添加带有addTrustedPackages 方法。 如果您收到来自不受信任来源的消息,您可能希望仅添加您信任的包。要信任所有包,您可以使用mapper.addTrustedPackages("*") . |
映射String 原始形式的标头值在与不知道映射器的 JSON 格式的系统通信时非常有用。 |
从版本 2.2.5 开始,您可以指定某些字符串值标头不应使用 JSON 映射,而应映射到原始byte[]
.
这AbstractKafkaHeaderMapper
有新的属性;mapAllStringsOut
当设置为 true 时,所有字符串值标头都将转换为byte[]
使用charset
属性(默认UTF-8
). 此外,还有一个属性rawMappedHeaders
,这是header name : boolean
; 如果映射包含标头名称,并且标头包含String
值,它将被映射为 rawbyte[]
使用字符集。此映射还用于映射原始传入byte[]
headers 到String
使用字符集当且仅当 map 值中的布尔值为true
. 如果布尔值为false
,或者标头名称不在地图中,并带有true
值,则传入标头仅映射为原始未映射标头。
以下测试用例说明了此机制。
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
默认情况下,两个标头映射器都映射所有入站标头。从版本 2.8.8 开始,这些模式也可以应用于入站映射。要为入站映射创建映射器,请在相应的映射器上使用静态方法之一:
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以abc
并包括所有其他。
默认情况下,DefaultKafkaHeaderMapper
用于MessagingMessageConverter
和BatchMessagingMessageConverter
,只要Jackson在阶级道路上。
使用批处理转换器,转换后的标头可在KafkaHeaders.BATCH_CONVERTED_HEADERS
作为List<Map<String, Object>>
其中,列表中某个位置的映射对应于有效负载中的数据位置。
如果没有转换器(因为 Jackson 不存在,或者它被显式设置为null
),消费者记录中的标头在KafkaHeaders.NATIVE_HEADERS
页眉。
此标头是一个Headers
对象(或List<Headers>
在批处理转换器的情况下),其中列表中的位置对应于有效负载中的数据位置)。
某些类型不适合 JSON 序列化,并且简单的toString() 序列化可能是这些类型的首选。
这DefaultKafkaHeaderMapper 有一个名为addToStringClasses() 这使您可以提供应以这种方式处理的类的名称,以便进行出站映射。
在入站映射期间,它们映射为String .
默认情况下,只有org.springframework.util.MimeType 和org.springframework.http.MediaType 以这种方式映射。 |
从 2.3 版开始,简化了字符串值标头的处理。
默认情况下,此类标头不再采用 JSON 编码(即它们没有封闭"…" 添加)。
该类型仍会添加到 JSON_TYPES 标头中,以便接收系统可以转换回 String(从byte[] ).
映射器可以处理(解码)旧版本生成的标头(它检查前导);这样,使用 2.3 的应用程序可以使用旧版本的记录。" |
要与早期版本兼容,请将encodeStrings 自true ,如果使用 2.3 版本生成的记录可能被使用早期版本的应用程序使用。
当所有应用程序都使用 2.3 或更高版本时,您可以将属性保留为默认值false . |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,它会自动将此转换器 bean 配置为 auto-configuredKafkaTemplate
;否则,您应该将此转换器添加到模板中。
4.1.20. “墓碑”记录的空有效负载和日志压缩
使用日志压缩时,可以使用null
有效负载来标识密钥的删除。
您还可以收到null
值,例如Deserializer
可能会返回null
当它无法反序列化值时。
要发送null
有效负载,使用KafkaTemplate
,您可以将 null 传递到send()
方法。
一个例外是send(Message<?> message)
变体。
因为spring-messaging
Message<?>
不能有null
有效负载,您可以使用名为KafkaNull
,框架发送null
.
为方便起见,静态KafkaNull.INSTANCE
被提供。
当您使用消息侦听器容器时,收到的ConsumerRecord
有一个null
value()
.
要配置@KafkaListener
处理null
有效负载,则必须使用@Payload
注释required = false
.
如果它是压缩日志的逻辑删除消息,则通常还需要该密钥,以便应用程序可以确定“删除”了哪个密钥。
以下示例显示了这样的配置:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
// value == null represents key deletion
}
当您使用类级别@KafkaListener
与多个@KafkaHandler
方法,需要一些额外的配置。
具体来说,您需要一个@KafkaHandler
方法与KafkaNull
有效载荷。
以下示例演示如何配置一个:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
...
}
}
请注意,参数是null
不KafkaNull
.
请参阅手动分配所有分区。 |
此功能需要使用KafkaNullAwarePayloadArgumentResolver 框架在使用默认MessageHandlerMethodFactory .
使用自定义MessageHandlerMethodFactory 看添加自定义HandlerMethodArgumentResolver 自@KafkaListener . |
4.1.21. 处理异常
本节介绍如何处理使用 Spring for Apache Kafka 时可能出现的各种异常。
侦听器错误处理程序
从 2.0 版开始,@KafkaListener
annotation 有一个新属性:errorHandler
.
您可以使用errorHandler
提供KafkaListenerErrorHandler
实现。
此功能接口具有一种方法,如以下列表所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以访问 spring-messagingMessage<?>
消息转换器生成的对象和侦听器抛出的异常,该异常包装在ListenerExecutionFailedException
.
错误处理程序可以抛出原始异常或新异常,这些异常会抛出到容器。
错误处理程序返回的任何内容都将被忽略。
从 2.7 版开始,您可以将rawRecordHeader
属性MessagingMessageConverter
和BatchMessagingMessageConverter
这导致原始的ConsumerRecord
添加到转换后的Message<?>
在KafkaHeaders.RAW_DATA
页眉。
例如,如果您希望使用DeadLetterPublishingRecoverer
在侦听器错误处理程序中。
它可用于请求/回复方案,在该方案中,您希望在捕获死信主题中的失败记录后,在重试一定次数后将失败结果发送给发件人。
@Bean
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口(ConsumerAwareListenerErrorHandler
) 通过以下方法访问消费者对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
例如,如果错误处理程序实现了此接口,则可以相应地调整偏移量。 例如,若要重置偏移量以重播失败的消息,可以执行如下作:
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
this.listen3Exception = e;
MessageHeaders headers = m.getHeaders();
c.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
同样,您可以对批处理侦听器执行如下作:
@Bean
public ConsumerAwareListenerErrorHandler listen10ErrorHandler() {
return (m, e, c) -> {
this.listen10Exception = e;
MessageHeaders headers = m.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
int index = i;
offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
(k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
}
offsetsToReset.forEach((k, v) -> c.seek(k, v));
return null;
};
}
这会将批处理中的每个主题/分区重置为批处理中的最低偏移量。
前面两个示例是简单的实现,您可能需要在错误处理程序中进行更多检查。 |
容器错误处理程序
从 2.8 版开始,旧版ErrorHandler
和BatchErrorHandler
接口已被新的CommonErrorHandler
.
这些错误处理程序可以处理记录侦听器和批处理侦听器的错误,从而允许单个侦听器容器工厂为这两种类型的侦听器创建容器。CommonErrorHandler
提供了替换大多数旧版框架错误处理程序实现的实现,并且弃用了旧版错误处理程序。
监听器容器和监听器容器工厂仍然支持旧接口;它们将在将来的版本中被弃用。
看将自定义旧版错误处理程序实现迁移到CommonErrorHandler
有关将自定义错误处理程序迁移到CommonErrorHandler
.
使用事务时,默认情况下不配置错误处理程序,以便异常将回滚事务。
事务容器的错误处理由AfterRollbackProcessor
.
如果您在使用事务时提供自定义错误处理程序,则如果要回滚事务,它必须引发异常。
此接口具有默认方法isAckAfterHandle()
容器调用它来确定如果错误处理程序返回而不引发异常,是否应该提交偏移量;默认情况下,它返回 true。
通常,当错误未被“处理”时(例如,在执行查找作之后),框架提供的错误处理程序将抛出异常。
默认情况下,此类异常由容器记录在ERROR
水平。
所有框架错误处理程序都扩展了KafkaExceptionLogLevelAware
它允许您控制记录这些异常的级别。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定一个全局错误处理程序,用于容器工厂中的所有侦听器。 以下示例显示了如何执行此作:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果带 Comments 的监听器方法抛出异常,则将其抛给容器,并根据容器配置处理消息。
容器在调用错误处理程序之前提交任何挂起的偏移量提交。
如果您使用的是 Spring Boot,则只需将错误处理程序添加为@Bean
并且 Boot 会将其添加到自动配置的工厂中。
默认错误处理程序
这个新的错误处理程序将SeekToCurrentErrorHandler
和RecoveringBatchErrorHandler
,它们现在一直是多个版本的默认错误处理程序。
一个区别是批处理侦听器的回退行为(当BatchListenerFailedException
引发)等效于重试完整批次。
错误处理程序可以恢复(跳过)不断失败的记录。
默认情况下,在十次失败后,将记录失败的记录(在ERROR
水平)。
您可以使用自定义恢复器 (BiConsumer
) 和BackOff
控制每个之间的交付尝试和延迟。
使用FixedBackOff
跟FixedBackOff.UNLIMITED_ATTEMPTS
导致(实际上)无限重试。
以下示例配置三次尝试后的恢复:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要使用此处理程序的自定义实例配置侦听器容器,请将其添加到容器工厂。
例如,使用@KafkaListener
集装箱工厂,可以添加DefaultErrorHandler
如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于记录侦听器,这将重试最多 2 次(3 次投放尝试),回退 1 秒,而不是默认配置(FixedBackOff(0L, 9)
).
重试用尽后,只会记录失败。
举个例子;如果poll
返回六条记录(每个分区 0、1、2 中的两条),侦听器在第四条记录上抛出异常,则容器通过提交前三条消息的偏移量来确认前三条消息。
这DefaultErrorHandler
寻求偏移分区 1 的偏移量 1 和分区 2 的偏移量 0。
下一个poll()
返回三条未处理的记录。
如果AckMode
是BATCH
,容器在调用错误处理程序之前提交前两个分区的偏移量。
对于批处理侦听器,侦听器必须抛出BatchListenerFailedException
指示批处理中的哪些记录失败。
事件的顺序为:
-
在索引之前提交记录的偏移量。
-
如果重试未用尽,请执行搜索,以便重新传递所有剩余记录(包括失败的记录)。
-
如果重试用尽,请尝试恢复失败的记录(仅限默认日志)并执行搜索,以便重新传递剩余的记录(不包括失败的记录)。 已提交已恢复记录的偏移量
-
如果重试用尽且恢复失败,则执行搜索,就像重试未用尽一样。
默认恢复程序在重试用尽后记录失败的记录。
您可以使用自定义恢复器,也可以使用框架提供的恢复器,例如DeadLetterPublishingRecoverer
.
当使用 POJO 批处理监听器时(例如List<Thing>
),并且您没有要添加到异常的完整使用者记录,您可以只添加失败记录的索引:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < records.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置为AckMode.MANUAL_IMMEDIATE
,可以将错误处理程序配置为提交恢复记录的偏移量;将commitRecovered
属性设置为true
.
另请参阅发布死信记录。
使用事务时,类似的功能由DefaultAfterRollbackProcessor
. 请参阅回滚后处理器。
这DefaultErrorHandler
将某些异常视为致命异常,并且跳过此类异常的重试;恢复器在第一次失败时调用。默认情况下,被视为致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
因为这些异常不太可能在重试投放时得到解决。
您可以向不可重试类别添加更多异常类型,或完全替换分类异常的映射。请参阅 JavadocsDefaultErrorHandler.addNotRetryableException()
和DefaultErrorHandler.setClassifications()
更多信息,以及spring-retry
BinaryExceptionClassifier
.
这是一个示例,它添加了IllegalArgumentException
到不可重试的异常:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
错误处理程序可以配置一个或多个RetryListener
s,接收重试和恢复进度的通知。
从 2.8.10 版本开始,添加了批处理侦听器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 javadocs。
如果恢复器失败(抛出异常),则失败的记录将包含在查找中。
如果恢复器失败,则BackOff 将默认重置,并且重新传递将再次经历回退,然后再尝试恢复。
要在恢复失败后跳过重试,请将错误处理程序的resetStateOnRecoveryFailure 自false . |
您可以为错误处理程序提供BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
以确定BackOff
使用,基于失败的记录和/或异常:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回null
,则处理程序的默认值BackOff
将被使用。
设置resetStateOnExceptionChange
自true
重试序列将重新启动(包括选择新的BackOff
,如果已配置),如果异常类型在失败之间发生变化。
默认情况下,不考虑异常类型。
另请参阅 Delivery Attempts Header。
使用批处理错误处理程序的转换错误
从 2.8 版开始,批处理监听器现在可以正确处理转换错误,当使用MessageConverter
使用ByteArrayDeserializer
一个BytesDeserializer
或StringDeserializer
,以及DefaultErrorHandler
.
当发生转换错误时,有效负载将设置为 null,并将反序列化异常添加到记录标头中,类似于ErrorHandlingDeserializer
.
列表ConversionException
s 在侦听器中可用,因此侦听器可以抛出BatchListenerFailedException
指示发生转换异常的第一个索引。
例:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
重试完整批次
现在,这是DefaultErrorHandler
对于批处理侦听器,其中侦听器抛出的异常不是BatchListenerFailedException
.
无法保证在重新交付批次时,该批次具有相同的记录数和/或重新交付的记录按相同的顺序。
因此,不可能轻松维护批处理的重试状态。
这FallbackBatchErrorHandler
采取以下方法。
如果批处理侦听器抛出的异常不是BatchListenerFailedException
,则从内存中的记录批次执行重试。
为了避免在扩展重试序列期间重新平衡,错误处理程序会暂停使用者,在休眠之前轮询它以进行回退,每次重试,然后再次调用侦听器。
如果/当重试用尽时,则ConsumerRecordRecoverer
为批处理中的每条记录调用。
如果恢复器抛出异常,或者线程在其睡眠期间中断,则将在下一次轮询时重新传递这批记录。
在退出之前,无论结果如何,消费者都会恢复。
此机制不能用于事务。 |
在等待BackOff
interval,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在stop()
而不是造成延误。
容器停止错误处理程序
这CommonContainerStoppingErrorHandler
如果侦听器抛出异常,则停止容器。
对于记录侦听器,当AckMode
是RECORD
,则提交已处理记录的偏移量。
对于记录侦听器,当AckMode
是任何手动值,则提交已确认记录的偏移量。
对于唱片侦听器,w当AckMode
是BATCH
,或者对于批处理侦听器,当容器重新启动时,将重播整个批处理。
容器停止后,将ListenerExecutionFailedException
被抛出。
这是为了导致事务回滚(如果启用了事务)。
委托错误处理程序
这CommonDelegatingErrorHandler
可以委托给不同的错误处理程序,具体取决于异常类型。
例如,您可能希望调用DefaultErrorHandler
对于大多数例外情况,或CommonContainerStoppingErrorHandler
对于其他人来说。
对记录侦听器和批处理侦听器使用不同的常见错误处理程序
如果您希望对记录和批处理侦听器使用不同的错误处理策略,请CommonMixedErrorHandler
允许为每种侦听器类型配置特定的错误处理程序。
常见错误处理程序 Summery
-
DefaultErrorHandler
-
CommonContainerStoppingErrorHandler
-
CommonDelegatingErrorHandler
-
CommonLoggingErrorHandler
-
CommonMixedErrorHandler
旧版错误处理程序及其替换程序
旧版错误处理程序 | 更换 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
无需更换,使用 |
|
|
|
无更换 - 使用 |
将自定义旧版错误处理程序实现迁移到CommonErrorHandler
请参阅中的 javadocsCommonErrorHandler
.
要将ErrorHandler
或ConsumerAwareErrorHandler
实现,你应该实现handleRecord()
并离开remainingRecords()
返回false
(默认值)。
您还应该实现handleOtherException()
- 处理记录处理范围之外发生的异常(例如消费者错误)。
要将RemainingRecordsErrorHandler
实现,你应该实现handleRemaining()
并覆盖remainingRecords()
返回true
.
您还应该实现handleOtherException()
- 处理记录处理范围之外发生的异常(例如消费者错误)。
要将任何BatchErrorHandler
实现,你应该实现handleBatch()
您还应该实现handleOtherException()
- 处理记录处理范围之外发生的异常(例如消费者错误)。
后回滚处理器
使用事务时,如果侦听器抛出异常(并且错误处理程序(如果存在)抛出异常),则事务将回滚。
默认情况下,任何未处理的记录(包括失败的记录)都会在下一次轮询时重新提取。
这是通过执行seek
作中的DefaultAfterRollbackProcessor
.
使用批处理侦听器,将重新处理整批记录(容器不知道批处理中的哪条记录失败)。
要修改此行为,您可以使用自定义AfterRollbackProcessor
.
例如,对于基于记录的侦听器,您可能希望跟踪失败的记录,并在尝试了几次后放弃,也许可以通过将其发布到死信主题。
从 2.2 版开始,DefaultAfterRollbackProcessor
现在可以恢复(跳过)不断失败的记录。
默认情况下,在十次失败后,将记录失败的记录(在ERROR
水平)。
您可以使用自定义恢复器 (BiConsumer
)和最大故障。
设置maxFailures
属性设置为负数会导致无限重试。
以下示例配置三次尝试后的恢复:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
当您不使用事务时,您可以通过配置DefaultErrorHandler
.
请参阅容器错误处理程序。
使用批处理侦听器无法进行恢复,因为框架不知道批处理中的哪条记录不断失败。 在这种情况下,应用程序侦听器必须处理不断失败的记录。 |
另请参阅发布死信记录。
从 2.2.5 版本开始,DefaultAfterRollbackProcessor
可以在新事务中调用(在失败的事务回滚后启动)。
然后,如果您使用DeadLetterPublishingRecoverer
要发布失败的记录,处理器会将恢复记录在原始主题/分区中的偏移量发送到事务。
要启用此功能,请将commitRecovered
和kafkaTemplate
属性DefaultAfterRollbackProcessor
.
如果恢复器失败(抛出异常),则失败的记录将包含在查找中。
从 2.5.5 版开始,如果恢复器出现故障,BackOff 将默认重置,并且重新传递将再次经历回退,然后再尝试恢复。
对于早期版本,BackOff 未重置,并在下一次故障时重新尝试恢复。
要恢复到以前的行为,请将处理器的resetStateOnRecoveryFailure 属性设置为false . |
从 2.6 版开始,您现在可以为处理器提供BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
以确定BackOff
使用,基于失败的记录和/或异常:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回null
,处理器的默认值BackOff
将被使用。
从 2.6.3 版本开始,将resetStateOnExceptionChange
自true
重试序列将重新启动(包括选择新的BackOff
,如果已配置),如果异常类型在失败之间发生变化。
默认情况下,不考虑异常类型。
从 2.3.1 版本开始,类似于DefaultErrorHandler
这DefaultAfterRollbackProcessor
将某些异常视为致命异常,并且跳过此类异常的重试;恢复器在第一次失败时调用。默认情况下,被视为致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
因为这些异常不太可能在重试投放时得到解决。
您可以向不可重试类别添加更多异常类型,或完全替换分类异常的映射。请参阅 JavadocsDefaultAfterRollbackProcessor.setClassifications()
更多信息,以及spring-retry
BinaryExceptionClassifier
.
这是一个示例,它添加了IllegalArgumentException
到不可重试的异常:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另请参阅 Delivery Attempts Header。
与电流kafka-clients ,则容器无法检测到ProducerFencedException 是由重新平衡引起的,或者如果生产者的transactional.id 由于超时或过期而被吊销。因为在大多数情况下,它是由重新平衡引起的,所以容器不会调用AfterRollbackProcessor (因为寻找分区是不合适的,因为我们不再被分配它们)。如果您确保超时足够大以处理每个事务并定期执行“空”事务(例如,通过ListenerContainerIdleEvent ) 可以避免因超时和过期而导致的隔离。或者,您可以将stopContainerWhenFenced container 属性设置为true 并且容器将停止,避免记录丢失。您可以使用ConsumerStoppedEvent 并检查Reason 属性FENCED 以检测此条件。由于该事件还具有对容器的引用,因此您可以使用此事件重新启动容器。 |
从 2.7 版本开始,在等待BackOff
interval,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在stop()
而不是造成延误。
从 2.7 版开始,处理器可以配置一个或多个RetryListener
s,接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 javadocs。
Delivery Attempts 标头
以下内容仅适用于记录侦听器,不适用于批处理侦听器。
从 2.5 版开始,当使用ErrorHandler
或AfterRollbackProcessor
实现DeliveryAttemptAware
,则可以启用添加KafkaHeaders.DELIVERY_ATTEMPT
标头 (kafka_deliveryAttempt
) 到记录。
此标头的值是从 1 开始的递增整数。
接收原始数据时ConsumerRecord<?, ?>
整数位于byte[4]
.
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt()
使用时@KafkaListener
使用DefaultKafkaHeaderMapper
或SimpleKafkaHeaderMapper
,可以通过添加@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery
作为监听器方法的参数。
要启用此标头的填充,请设置容器属性deliveryAttemptHeader
自true
.
默认情况下,它是禁用的,以避免查找每条记录的状态和添加标头的(小)开销。
这DefaultErrorHandler
和DefaultAfterRollbackProcessor
支持此功能。
侦听器信息标头
在某些情况下,能够知道侦听器在哪个容器中运行是很有用的。
从 2.8.4 版开始,您现在可以将listenerInfo
属性,或将info
属性@KafkaListener
注解。
然后,容器会在KafkaListener.LISTENER_INFO
所有传入消息的标头;然后它可以用于记录拦截器、过滤器等,或用于侦听器本身。
@KafkaListener(id = "something", topic = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen2(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
当用于RecordInterceptor
或RecordFilterStrategy
实现时,标头作为字节数组在消费者记录中,使用KafkaListenerAnnotationBeanPostProcessor
的charSet
财产。
标头映射器还转换为String
创建时MessageHeaders
从使用者记录中,并且永远不要将此标头映射到出站记录上。
对于 POJO 批处理侦听器,从 2.8.6 版开始,标头被复制到批处理的每个成员中,并且也可以作为单个String
参数。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批处理侦听器具有筛选器,并且筛选器导致一个空批处理,则需要将required = false 到@Header 参数,因为该信息不适用于空批次。 |
如果您收到List<Message<Thing>>
信息在KafkaHeaders.LISTENER_INFO
每个的标题Message<?>
.
有关使用批处理的更多信息,请参阅批处理侦听器。
发布死信记录
您可以配置DefaultErrorHandler
和DefaultAfterRollbackProcessor
当达到记录的最大故障数时,与记录恢复器一起使用。
该框架提供了DeadLetterPublishingRecoverer
,它将失败的消息发布到另一个主题。
恢复器需要一个KafkaTemplate<Object, Object>
,用于发送记录。
您还可以选择使用BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
,调用该函数来解析目标主题和分区。
默认情况下,死信记录被发送到名为<originalTopic>.DLT (原始主题名称后缀为.DLT ) 并连接到与原始记录相同的分区。因此,当您使用默认解析器时,死信主题必须至少具有与原始主题一样多的分区。
|
如果返回的TopicPartition
有一个负分区,则该分区未在ProducerRecord
,因此分区由 Kafka 选择。从 2.2.4 版本开始,任何ListenerExecutionFailedException
(抛出,例如,当在@KafkaListener
方法)通过groupId
财产。 这允许目标解析器使用它,除了ConsumerRecord
以选择死信主题。
以下示例显示了如何连接自定义目标解析器:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录使用以下标头进行增强:
-
KafkaHeaders.DLT_EXCEPTION_FQCN
:异常类名称(通常是ListenerExecutionFailedException
,但可以是其他)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN
:异常原因类名(如果存在)(自 2.8 版起)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE
:异常堆栈跟踪。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE
:异常消息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN
:异常类名称(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE
:异常堆栈跟踪(仅限密钥反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
:异常消息(仅限密钥反序列化错误)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC
:原始主题。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION
:原始分区。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET
:原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP
:原始时间戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE
:原始时间戳类型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP
:无法处理记录的原始使用者组(自 2.8 版本起)。
关键异常仅由以下原因引起DeserializationException
s 所以没有DLT_KEY_EXCEPTION_CAUSE_FQCN
.
有两种机制可以添加更多标头。
-
对恢复器进行子类化并覆盖
createProducerRecord()
-叫super.createProducerRecord()
并添加更多标题。 -
提供一个
BiFunction
接收消费者记录和异常,返回一个Headers
对象;来自那里的标头将被复制到最终的制作人记录中;另请参阅管理死信记录头。 用setHeadersFunction()
将BiFunction
.
第二个更易于实现,但第一个有更多信息可用,包括已经组装的标准标头。
从 2.3 版开始,当与ErrorHandlingDeserializer
,发布者将恢复记录value()
,到无法反序列化的原始值。
以前,value()
为 null,用户代码必须解码DeserializationException
从邮件头。
此外,还可以提供多个KafkaTemplate
s 给出版商;例如,如果要发布byte[]
从DeserializationException
,以及使用与成功反序列化的记录不同的序列化程序的值。
下面是配置发布者的示例KafkaTemplate
的String
和byte[]
序列化器:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
发布者使用映射键来查找适合value()
即将出版。 一个LinkedHashMap
建议按顺序检查密钥。
发布时null
值,当有多个模板时,恢复器将为Void
类; 如果不存在,则values().iterator()
将被使用。
从 2.7 开始,您可以使用setFailIfSendResultIsError
方法,以便在消息发布失败时引发异常。您还可以使用setWaitForSendResultTimeout
.
如果恢复器失败(抛出异常),则失败的记录将包含在查找中。
从 2.5.5 版开始,如果恢复器出现故障,BackOff 将默认重置,并且重新传递将再次经历回退,然后再尝试恢复。
对于早期版本,BackOff 未重置,并在下一次故障时重新尝试恢复。
要恢复到以前的行为,请将错误处理程序的resetStateOnRecoveryFailure 属性设置为false . |
从 2.6.3 版本开始,将resetStateOnExceptionChange
自true
重试序列将重新启动(包括选择新的BackOff
,如果已配置),如果异常类型在失败之间发生变化。
默认情况下,不考虑异常类型。
从版本 2.3 开始,恢复器还可以与 Kafka Streams 一起使用 - 有关更多信息,请参阅从反序列化异常中恢复。
这ErrorHandlingDeserializer
在标头中添加反序列化异常ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER
和ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER
(使用 Java 序列化)。
默认情况下,这些标头不会保留在发布到死信主题的邮件中。
从 2.7 版开始,如果键和值都反序列化失败,则两者的原始值将填充到发送到 DLT 的记录中。
如果传入记录相互依赖,但可能无序到达,则将失败的记录重新发布到原始主题的尾部(一定次数)可能很有用,而不是将其直接发送到死信主题。 有关示例,请参阅此 Stack Overflow 问题。
以下错误处理程序配置将完全执行此作:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从 V2.7 开始,恢复程序会检查目标解析器选择的分区是否实际存在。
如果分区不存在,则ProducerRecord
设置为null
,允许KafkaProducer
以选择分区。
您可以通过将verifyPartition
属性设置为false
.
管理死信记录标题
-
appendOriginalHeaders
(默认true
) -
stripPreviousExceptionHeaders
(默认true
从 2.8 版本开始)
Apache Kafka 支持多个同名的标头;要获取“最新”值,您可以使用headers.lastHeader(headerName)
;要获取多个标头的迭代器,请使用headers.headers(headerName).iterator()
.
当重复重新发布失败的记录时,这些标头可能会增长(并最终导致发布失败,因为RecordTooLargeException
);对于异常标头,尤其是堆栈跟踪标头,尤其如此。
使用这两个属性的原因是,虽然您可能只想保留最后一个异常信息,但您可能希望保留记录在每次失败中传递的主题的历史记录。
appendOriginalHeaders
应用于所有名为ORIGINAL
而stripPreviousExceptionHeaders
应用于所有名为EXCEPTION
.
从 2.8.4 版开始,您现在可以控制将哪些标准标头添加到输出记录中。
请参阅enum HeadersToAdd
对于默认添加的(当前)10 个标准标头的通用名称(这些不是实际的标头名称,只是一个抽象;实际的标头名称由getHeaderNames()
子类可以覆盖的方法。
要排除标头,请使用excludeHeaders()
方法;例如,若要禁止在标头中添加异常堆栈跟踪,请使用:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以通过添加ExceptionHeadersCreator
;这也会禁用所有标准异常标头。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同样从版本 2.8.4 开始,您现在可以通过addHeadersFunction
方法。
这允许应用其他函数,即使已经注册了另一个函数,例如,在使用非阻塞重试时。
ExponentialBackOffWithMaxRetries
实现
Spring Framework 提供了许多BackOff
实现。 默认情况下,ExponentialBackOff
将无限期重试;在多次重试尝试后放弃需要计算maxElapsedTime
. 从版本 2.7.3 开始,Spring for Apache Kafka 提供了ExponentialBackOffWithMaxRetries
这是一个接收maxRetries
属性并自动计算maxElapsedTime
,这更方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
这将在1, 2, 4, 8, 10, 10
秒,然后再调用恢复器。
4.1.22. JAAS 和 Kerberos
从 2.0 版开始,一个KafkaJaasLoginModuleInitializer
已添加类以协助 Kerberos 配置。您可以将此 Bean 与所需的配置一起添加到应用程序上下文中。以下示例配置了这样的 Bean:
@Bean
public KafkaJaasLoginModuleInitializer jaasConfig() throws IOException {
KafkaJaasLoginModuleInitializer jaasConfig = new KafkaJaasLoginModuleInitializer();
jaasConfig.setControlFlag("REQUIRED");
Map<String, String> options = new HashMap<>();
options.put("useKeyTab", "true");
options.put("storeKey", "true");
options.put("keyTab", "/etc/security/keytabs/kafka_client.keytab");
options.put("principal", "[email protected]");
jaasConfig.setOptions(options);
return jaasConfig;
}
4.2. Apache Kafka Streams 支持
从版本 1.1.4 开始,Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。要从 Spring 应用程序使用它,请kafka-streams
jar 必须存在于类路径上。它是 Spring for Apache Kafka 项目的可选依赖项,不会传递下载。
4.2.1. 基础知识
参考 Apache Kafka Streams 文档建议使用该 API 的以下方式:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
因此,我们有两个主要组件:
-
StreamsBuilder
:使用要构建的 APIKStream
(或KTable
) 实例。 -
KafkaStreams
:管理这些实例的生命周期。
都KStream 暴露给KafkaStreams 实例由单个StreamsBuilder 同时启动和停止,即使它们具有不同的逻辑。
换句话说,由StreamsBuilder 与单个生命周期控制相关联。
一次KafkaStreams 实例已被streams.close() ,则无法重新启动。
相反,新的KafkaStreams 必须创建实例以重新启动流处理。 |
4.2.2. 弹簧管理
为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了StreamsBuilderFactoryBean
.
这是一个AbstractFactoryBean
实现以公开StreamsBuilder
singleton 实例作为 bean。
以下示例创建这样的 bean:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration 对象而不是StreamsConfig . |
这StreamsBuilderFactoryBean
还实现SmartLifecycle
管理内部KafkaStreams
实例。 与 Kafka Streams API 类似,您必须定义KStream
实例,然后启动KafkaStreams
. 这也适用于 Kafka Streams 的 Spring API。因此,当您使用 defaultautoStartup = true
在StreamsBuilderFactoryBean
,您必须声明KStream
实例StreamsBuilder
在刷新应用程序上下文之前。 例如KStream
可以是常规的 bean 定义,而使用 Kafka Streams API 时不会产生任何影响。以下示例显示了如何执行此作:
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果您想手动控制生命周期(例如,通过某些条件停止和启动),您可以引用StreamsBuilderFactoryBean
直接使用工厂 bean () 前缀进行 bean。 因为&
StreamsBuilderFactoryBean
使用其内部KafkaStreams
实例,可以安全地停止并重新启动它。一个新的KafkaStreams
在每个start()
. 您还可以考虑使用不同的StreamsBuilderFactoryBean
实例,如果您想控制KStream
实例。
您还可以指定KafkaStreams.StateListener
,Thread.UncaughtExceptionHandler
和StateRestoreListener
选项StreamsBuilderFactoryBean
,这些KafkaStreams
实例。 此外,除了间接将这些选项设置为StreamsBuilderFactoryBean
,从 2.1.5 版开始,您可以使用KafkaStreamsCustomizer
callback 接口来配置内部KafkaStreams
实例。 请注意KafkaStreamsCustomizer
覆盖StreamsBuilderFactoryBean
. 如果您需要执行一些KafkaStreams
作,您可以直接访问该内部KafkaStreams
实例,使用StreamsBuilderFactoryBean.getKafkaStreams()
. 您可以自动布线StreamsBuilderFactoryBean
bean 的定义,但您应该确保在 bean 定义中使用完整类型,如以下示例所示:
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,您可以添加@Qualifier
如果使用接口 bean 定义,则按名称注入。以下示例显示了如何执行此作:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从 2.4.1 版开始,工厂 bean 有一个新属性infrastructureCustomizer
带类型KafkaStreamsInfrastructureCustomizer
; 这允许自定义StreamsBuilder
(例如,添加状态存储)和/或Topology
在创建流之前。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供了默认的无作实现,以避免在不需要时必须实现这两种方法。
一个CompositeKafkaStreamsInfrastructureCustomizer
,用于需要应用多个定制器时。
4.2.3. KafkaStreams 千分尺支持
在 2.5.3 版本中引入,您可以配置KafkaStreamsMicrometerListener
自动注册千分尺KafkaStreams
由工厂 Bean 管理的对象:
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
4.2.4. Streams JSON 序列化和反序列化
为了在以 JSON 格式读取或写入主题或状态存储时序列化和反序列化数据,Spring for Apache Kafka 提供了一个JsonSerde
使用 JSON 的实现,委托给JsonSerializer
和JsonDeserializer
序列化、反序列化和消息转换中所述。
这JsonSerde
实现通过其构造函数(目标类型或ObjectMapper
).
在下面的示例中,我们使用JsonSerde
序列化和反序列化Cat
Kafka 流的有效负载(JsonSerde
可以在需要实例的地方以类似的方式使用):
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
从版本 2.3 开始,以编程方式构造序列化程序/反序列化程序以在生产者/消费者工厂中使用时,您可以使用 fluent API,这简化了配置。
stream.through(new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
4.2.5. 使用KafkaStreamBrancher
这KafkaStreamBrancher
class 引入了一种更方便的方法来构建条件分支KStream
.
考虑以下不使用KafkaStreamBrancher
:
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用KafkaStreamBrancher
:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
4.2.6. 配置
要配置 Kafka Streams 环境,请StreamsBuilderFactoryBean
需要一个KafkaStreamsConfiguration
实例。
有关所有可能的选项,请参阅 Apache Kafka 文档。
从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration 对象,而不是作为StreamsConfig . |
为了避免在大多数情况下使用样板代码,尤其是在开发微服务时,Spring for Apache Kafka 提供了@EnableKafkaStreams
注释,您应该将其放在@Configuration
类。
您只需要声明一个KafkaStreamsConfiguration
名为 beandefaultKafkaStreamsConfig
.
一个StreamsBuilderFactoryBean
bean,命名为defaultKafkaStreamsBuilder
,在应用程序上下文中自动声明。
您可以声明和使用任何额外的StreamsBuilderFactoryBean
Beans也是如此。
您可以通过提供实现StreamsBuilderFactoryBeanConfigurer
.
如果有多个这样的豆子,它们将根据它们的Ordered.order
财产。
默认情况下,当工厂 Bean 停止时,KafkaStreams.cleanUp()
方法被调用。
从 2.1.2 版开始,工厂 bean 具有额外的构造函数,将CleanupConfig
对象,该对象具有属性,可让您控制cleanUp()
方法在start()
或stop()
或者两者都不是。
从 2.7 版开始,默认值是从不清理本地状态。
4.2.7. 标头扩充器
2.3 版添加了HeaderEnricher
实现Transformer
.
这可用于在流处理中添加标头;标头值是 SpEL 表达式;表达式求值的根对象有 3 个属性:
-
context
-这ProcessorContext
,允许访问当前记录元数据 -
key
- 当前记录的键 -
value
- 当前记录的值
表达式必须返回byte[]
或String
(将转换为byte[]
用UTF-8
).
要在流中使用扩充器,请执行以下作:
.transform(() -> enricher)
转换器不会改变key
或value
;它只是添加标题。
如果您的流是多线程的,则需要为每条记录创建一个新实例。 |
.transform(() -> new HeaderEnricher<..., ...>(expressionMap))
这是一个简单的示例,添加一个文字标头和一个变量:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("context.timestamp() + ' @' + context.offset()"));
HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.transform(() -> enricher)
.to(OUTPUT);
4.2.8.MessagingTransformer
2.3 版添加了MessagingTransformer
这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)交互。
转换器需要实现MessagingFunction
.
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 使用其GatewayProxyFactoryBean
.
它还需要一个MessagingMessageConverter
将键、值和元数据(包括标头)转换为 Spring Messaging 或从 Spring Messaging 转换Message<?>
.
看[从KStream
] 以获取更多信息。
4.2.9. 从反序列化异常中恢复
2.3 版引入了RecoveringDeserializationExceptionHandler
当发生反序列化异常时,它可以采取一些作。
请参阅 Kafka 文档DeserializationExceptionHandler
,其中RecoveringDeserializationExceptionHandler
是一个实现。
这RecoveringDeserializationExceptionHandler
配置了ConsumerRecordRecoverer
实现。
该框架提供了DeadLetterPublishingRecoverer
将失败的记录发送到死信主题。
有关此恢复器的更多信息,请参阅发布死信记录。
要配置恢复器,请将以下属性添加到流配置中:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,recoverer()
bean 可以是你自己的实现ConsumerRecordRecoverer
.
4.2.10. Kafka Streams 示例
以下示例结合了本章中涵盖的所有主题:
@Configuration
@EnableKafka
@EnableKafkaStreams
public static class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}
4.3. 测试应用程序
这spring-kafka-test
jar 包含一些有用的实用程序来帮助测试您的应用程序。
4.3.1. 卡夫卡测试实用程序
o.s.kafka.test.utils.KafkaTestUtils
提供了许多静态帮助程序方法来使用记录、检索各种记录偏移量等。
有关完整详细信息,请参阅其 Javadocs。
4.3.2. JUnit
o.s.kafka.test.utils.KafkaTestUtils
还提供了一些静态方法来设置生产者和消费者属性。
以下列表显示了这些方法签名:
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) { ... }
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
从 2.5 版开始, 使用嵌入式代理时,通常最佳做法是为每个测试使用不同的主题,以防止串扰。
如果由于某种原因无法做到这一点,请注意 |
一个 JUnit 4@Rule
包装器EmbeddedKafkaBroker
用于创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器。
(有关使用 @EmbeddedKafka 的信息,请参阅 注释@EmbeddedKafka
与 JUnit 5)。
以下列表显示了这些方法的签名:
/**
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param topics the topics to create (2 partitions per).
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }
/**
*
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param partitions partitions per topic.
* @param topics the topics to create.
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
这EmbeddedKafkaBroker
class 有一个实用方法,可让您使用它创建的所有主题。
以下示例演示如何使用它:
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(
consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
这KafkaTestUtils
有一些实用方法可以从消费者那里获取结果。
以下列表显示了这些方法签名:
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }
/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
以下示例演示如何使用KafkaTestUtils
:
...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
当嵌入式 Kafka 和嵌入式 Zookeeper 服务器由EmbeddedKafkaBroker
,一个名为spring.embedded.kafka.brokers
设置为 Kafka 代理的地址和名为spring.embedded.zookeeper.connect
设置为 Zookeeper 的地址。
方便的常量 (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS
和EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT
) 为该物业提供。
使用EmbeddedKafkaBroker.brokerProperties(Map<String, String>)
,您可以为 Kafka 服务器提供其他属性。
有关可能的代理属性的更多信息,请参阅 Kafka Config。
4.3.3. 配置主题
以下示例配置创建名为cat
和hat
有五个分区,一个名为thing1
有 10 个分区,以及一个名为thing2
有 15 个分区:
public class MyTests {
@ClassRule
private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");
@Test
public void test() {
embeddedKafkaRule.getEmbeddedKafka()
.addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
...
}
}
默认情况下,addTopics
当出现问题时(例如添加已经存在的主题)将抛出异常。
版本 2.6 添加了该方法的新版本,该方法返回Map<String, Exception>
;键是主题名称,值是null
成功,或Exception
失败。
4.3.4. 对多个测试类使用相同的代理
没有内置支持这样做,但您可以对多个测试类使用相同的代理,类似于以下内容:
public final class EmbeddedKafkaHolder {
private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, false)
.brokerListProperty("spring.kafka.bootstrap-servers");
private static boolean started;
public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
try {
embeddedKafka.afterPropertiesSet();
}
catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
return embeddedKafka;
}
private EmbeddedKafkaHolder() {
super();
}
}
这假设有一个 Spring Boot 环境,嵌入式代理替换了引导服务器属性。
然后,在每个测试类中,您可以使用类似于以下内容的内容:
static {
EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
如果您不使用 Spring Boot,您可以使用以下命令获取引导服务器broker.getBrokersAsString()
.
前面的示例没有提供在所有测试完成后关闭代理的机制。
例如,如果您在 Gradle 守护进程中运行测试,这可能会成为问题。
在这种情况下,您不应该使用此技术,或者您应该使用一些东西来调用destroy() 在EmbeddedKafkaBroker 当您的测试完成时。 |
4.3.5. @EmbeddedKafka注释
我们通常建议您将该规则用作@ClassRule
以避免在测试之间启动和停止代理(并为每个测试使用不同的主题)。
从 2.0 版本开始,如果您使用 Spring 的测试应用程序上下文缓存,您还可以声明一个EmbeddedKafkaBroker
bean,因此单个代理可以跨多个测试类使用。
为方便起见,我们提供了一个名为@EmbeddedKafka
以注册EmbeddedKafkaBroker
豆。
以下示例演示如何使用它:
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
KafkaStreamsTests.STREAMING_TOPIC1,
KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void someTest() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
}
@Configuration
@EnableKafkaStreams
public static class KafkaStreamsConfiguration {
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
return new KafkaStreamsConfiguration(props);
}
}
}
从 2.2.4 版开始,您还可以使用@EmbeddedKafka
注释来指定 Kafka 端口属性。
以下示例将topics
,brokerProperties
和brokerPropertiesLocation
属性@EmbeddedKafka
支持属性占位符解析:
@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
"listeners=PLAINTEXT://localhost:${kafka.broker.port}",
"auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
brokerPropertiesLocation = "classpath:/broker.properties")
在前面的示例中,属性占位符${kafka.topics.another-topic}
,${kafka.broker.logs-dir}
和${kafka.broker.port}
从 Spring 解析Environment
.
此外,代理属性是从broker.properties
classpath 资源由brokerPropertiesLocation
.
属性占位符已解析为brokerPropertiesLocation
URL 和资源中找到的任何属性占位符。
属性定义brokerProperties
覆盖在brokerPropertiesLocation
.
您可以使用@EmbeddedKafka
使用 JUnit 4 或 JUnit 5 进行注释。
4.3.6. 使用 JUnit5 @EmbeddedKafka注释
从 2.3 版开始,有两种方法可以使用@EmbeddedKafka
使用 JUnit5 进行注释。
当与@SpringJunitConfig
注释,则将嵌入式代理添加到测试应用程序上下文中。
您可以在类或方法级别自动将代理连接到测试中,以获取代理地址列表。
当不使用 spring test 上下文时,EmbdeddedKafkaCondition
创建经纪人;该条件包括一个参数解析器,因此您可以访问测试方法中的代理...
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {
@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}
}
如果类注释为@EmbeddedBroker
也没有ExtendedWith(SpringExtension.class)
.@SpringJunitConfig
和@SpringBootTest
是如此元注释,并且当这些注释中的任何一个也存在时,将使用基于上下文的代理。
当有可用的 Spring 测试应用程序上下文时,主题和代理属性可以包含属性占位符,只要在某处定义了属性,这些占位符就会被解析。 如果没有可用的 Spring 上下文,则不会解析这些占位符。 |
4.3.7. 嵌入式代理@SpringBootTest
附注
Spring Initializr 现在会自动添加spring-kafka-test
测试范围内对项目配置的依赖关系。
如果您的应用程序在
|
有几种方法可以在 Spring Boot 应用程序测试中使用嵌入式代理。
他们包括:
JUnit4 类规则
以下示例显示如何使用 JUnit4 类规则创建嵌入式代理:
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {
@ClassRule
public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1,
false, "someTopic")
.brokerListProperty("spring.kafka.bootstrap-servers");
}
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
请注意,由于这是一个 Spring Boot 应用程序,因此我们覆盖代理列表属性以设置 Boot 的属性。
@EmbeddedKafka
注释或EmbeddedKafkaBroker
豆
以下示例演示如何使用@EmbeddedKafka
用于创建嵌入式代理的注释:
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
public class MyApplicationTests {
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
4.3.8. Hamcrest 匹配器
这o.s.kafka.test.hamcrest.KafkaMatchers
提供以下匹配器:
/**
* @param key the key
* @param <K> the type.
* @return a Matcher that matches the key in a consumer record.
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Matcher that matches the value in a consumer record.
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }
/**
* @param partition the partition.
* @return a Matcher that matches the partition in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/**
* Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
*
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
return hasTimestamp(TimestampType.CREATE_TIME, ts);
}
/**
* Matcher testing the timestamp of a {@link ConsumerRecord}
* @param type timestamp type of the record
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
return new ConsumerRecordTimestampMatcher(type, ts);
}
4.3.9. AssertJ 条件
您可以使用以下 AssertJ 条件:
/**
* @param key the key
* @param <K> the type.
* @return a Condition that matches the key in a consumer record.
*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Condition that matches the value in a consumer record.
*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }
/**
* @param key the key.
* @param value the value.
* @param <K> the key type.
* @param <V> the value type.
* @return a Condition that matches the key in a consumer record.
* @since 2.2.12
*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }
/**
* @param partition the partition.
* @return a Condition that matches the partition in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
/**
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}
/**
* @param type the type of timestamp
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
return new ConsumerRecordTimestampCondition(type, value);
}
4.3.10. 示例
以下示例汇集了本章中涵盖的大多数主题:
public class KafkaTemplateTests {
private static final String TEMPLATE_TOPIC = "templateTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka.getEmbeddedKafka());
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<Integer, String>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}
}
前面的示例使用 Hamcrest 匹配器。
跟AssertJ
,最后一部分如下所示:
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
4.4. 非阻塞重试
这是一项实验性功能,在删除实验性指定之前,不进行重大 API 更改的通常规则不适用于此功能。 鼓励用户试用该功能并通过 GitHub 问题或 GitHub 讨论提供反馈。 这仅与 API 有关;该功能被认为是完整且可靠的。 |
使用 Kafka 实现非阻塞重试 / dlt 功能通常需要设置额外的主题并创建和配置相应的监听器。
从 2.7 开始,Spring for Apache Kafka 通过@RetryableTopic
annotation 和RetryTopicConfiguration
类来简化引导。
4.4.1. 模式的工作原理
如果消息处理失败,则消息将转发到具有回退时间戳的重试主题。 然后,重试主题使用者会检查时间戳,如果时间戳不是到期的,则会暂停该主题分区的消耗。 到期时,将恢复分区消耗,并再次使用消息。 如果消息处理再次失败,则消息将被转发到下一个重试主题,并重复该模式,直到成功处理发生,或者尝试用尽,并将消息发送到死信主题(如果已配置)。
举例来说,如果您有一个“main-topic”主题,并且想要设置指数退避为 1000 毫秒的非阻塞重试,乘数为 2 次和 4 次最大尝试,它将创建 main-topic-retry-1000、main-topic-retry-2000、main-topic-retry-4000 和 main-topic-dlt 主题并配置相应的使用者。 该框架还负责创建主题以及设置和配置侦听器。
使用此策略,您将失去 Kafka 对该主题的排序保证。 |
您可以将AckMode 您喜欢的模式,但是RECORD 建议。 |
目前,此功能不支持类级别@KafkaListener 附注 |
4.4.2. 回退延迟精度
概述和保证
所有消息处理和回退都由使用者线程处理,因此,延迟精度在尽最大努力的基础上得到保证。 如果一条消息的处理时间长于该使用者的下一条消息的回退期,则下一条消息的延迟将高于预期。 此外,对于短延迟(大约 1 秒或更短),线程必须执行的维护工作(例如提交偏移量)可能会延迟消息处理的执行。 如果重试主题的使用者正在处理多个分区,精度也会受到影响,因为我们依赖于从轮询中唤醒使用者并具有完整的 pollTimeouts 来进行计时调整。
话虽这么说,对于处理单个分区的使用者来说,在大多数情况下,消息的处理应该在确切的到期时间后 100 毫秒内进行。
可以保证消息永远不会在到期时间之前得到处理。 |
调整延迟精度
消息的处理延迟精度依赖于两个ContainerProperties
:ContainerProperties.pollTimeout
和ContainerProperties.idlePartitionEventInterval
.
这两个属性都将在重试主题中自动设置,而 dlt 的ListenerContainerFactory
到该主题的最小延迟值的四分之一,最小值为 250 毫秒,最大值为 5000 毫秒。
仅当属性具有默认值时才会设置这些值 - 如果您自己更改任一值,则不会覆盖您的更改。
这样,如果需要,可以调整重试主题的精度和性能。
您可以单独拥有ListenerContainerFactory 主主题和重试主题的实例 - 这样,您可以进行不同的设置以更好地满足您的需求,例如为主要主题设置较高的轮询超时,为重试主题设置较低的轮询超时。 |
4.4.3. 配置
使用@RetryableTopic
注解
配置重试主题和 dlt@KafkaListener
annotated 方法,您只需添加@RetryableTopic
注释,Spring for Apache Kafka 将使用默认配置引导所有必要的主题和消费者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
您可以在同一类中指定一个方法来处理 dlt 消息,方法是使用@DltHandler
注解。
如果未提供 DltHandler 方法,则会创建一个默认使用者,该使用者仅记录使用情况。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果您没有指定 kafkaTemplate 名称retryTopicDefaultKafkaTemplate 将被查找。
如果未找到 bean,则抛出异常。 |
用RetryTopicConfiguration
豆
您还可以通过创建RetryTopicConfiguration
bean 中的@Configuration
带注释的类。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将使用默认配置为用“@KafkaListener”注释的方法中的所有主题创建重试主题和 dlt 以及相应的消费者。这KafkaTemplate
实例是消息转发所必需的。
为了实现对如何处理每个主题的非阻塞重审的更细粒度的控制,不止一个RetryTopicConfiguration
可以提供 bean。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(5)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
重试主题和 dlt 的消费者将被分配给一个消费者组,该组 ID 是您在groupId 参数@KafkaListener 带有主题后缀的注释。如果您不提供任何内容,它们都将属于同一组,并且重试主题的重新平衡将导致主要主题不必要的重新平衡。 |
如果消费者配置了ErrorHandlingDeserializer ,要处理反真实化异常,配置KafkaTemplate 及其生产者带有一个序列化器,可以处理普通对象和原始对象byte[] 值,这是由反序列化异常产生的。
模板的泛型值类型应为Object .
一种技术是使用DelegatingByTypeSerializer ;下面是一个例子: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
倍数@KafkaListener 注释可以用于同一主题,无论是否手动分配分区以及非阻塞重试,但给定主题将仅使用一种配置。
最好使用单个RetryTopicConfiguration 用于配置此类主题的 bean;如果多个@RetryableTopic 注解用于同一主题,它们都应该具有相同的值,否则其中一个注解将应用于该主题的所有侦听器,而其他注解的值将被忽略。 |
4.4.4. 特性
大多数功能都可用于@RetryableTopic
注释和RetryTopicConfiguration
豆。
回退配置
BackOff 配置依赖于BackOffPolicy
接口从Spring Retry
项目。
它包括:
-
固定后退
-
指数退缩
-
随机指数退避
-
均匀随机退缩
-
没有退缩
-
自定义退后
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(4)
.build();
}
您还可以提供 Spring Retry 的SleepingBackOffPolicy
接口:
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackOff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.build();
}
默认回退策略为FixedBackOffPolicy 最多尝试 3 次,间隔为 1000 毫秒。 |
默认最大延迟为 30 秒ExponentialBackOffPolicy .
如果退避策略需要值大于该值的延迟,请相应地调整 maxDelay 属性。 |
第一次尝试计入maxAttempts ,因此如果您提供maxAttempts 值为 4,则将有原始尝试加上 3 次重试。 |
单主题固定延迟重试
如果您使用的是固定延迟策略,例如FixedBackOffPolicy
或NoBackOffPolicy
您可以使用单个主题来完成非阻塞重试。
本主题将以提供的或默认后缀为后缀,并且不会附加索引或延迟值。
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.build();
}
默认行为是为每次尝试创建单独的重试主题,并附加其索引值:retry-0、retry-1、... |
全局超时
您可以设置重试过程的全局超时。 如果达到该时间,则下次使用者抛出异常时,消息将直接转到 DLT,或者如果没有可用的 DLT,则仅结束处理。
@RetryableTopic(backoff = @Backoff(2000), timeout = 5000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(2000)
.timeoutAfter(5000)
.build();
}
默认值是没有设置超时,这也可以通过提供 -1 作为 timout 值来实现。 |
异常分类器
您可以指定要重试的异常和不重试的异常。 您还可以将其设置为遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // Will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
默认行为是重试所有异常,而不是遍历原因。 |
从 2.8.3 开始,有一个致命异常的全局列表,这将导致记录无需任何重试即可发送到 DLT。 有关致命异常的默认列表,请参阅 DefaultErrorHandler。 您可以使用以下命令在此列表中添加或删除例外:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicationContext,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
ddtr.addNotRetryableExceptions(MyFatalException.class);
ddtr.removeNotRetryableException(ConversionException.class);
return ddtr;
}
要禁用致命异常的分类,请使用setClassifications 方法DefaultDestinationTopicResolver . |
包含和排除主题
您可以决定哪些主题将由RetryTopicConfiguration
bean 通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics) .excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
默认行为是包含所有主题。 |
主题自动创建
除非另有说明,否则框架将使用NewTopic
由KafkaAdmin
豆。
您可以指定将用于创建主题的分区数和复制因子,并且可以关闭此功能。
请注意,如果您不使用 Spring Boot,则必须提供 KafkaAdmin bean 才能使用此功能。 |
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
默认情况下,主题会自动创建一个分区和 1 的复制因子。 |
故障标头管理
在考虑如何管理故障标头(原始标头和异常标头)时,框架将委托给DeadLetterPublishingRecover
以决定是追加还是替换标头。
默认情况下,它显式地将appendOriginalHeaders
自false
和叶子stripPreviousExceptionHeaders
设置为DeadLetterPublishingRecover
.
这意味着默认配置中仅保留第一个“原始”和最后一个异常标头。这是为了避免在涉及许多重试步骤时创建过大的消息(例如,由于堆栈跟踪标头)。
有关详细信息,请参阅管理死信记录标题。
若要重新配置框架以对这些属性使用不同的设置,请将标准DeadLetterPublishingRecovererFactory
bean 通过添加一个recovererCustomizer
:
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver);
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
dlpr.appendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
return factory;
}
从 2.8.4 版本开始,如果您希望添加自定义标头(除了工厂添加的重试信息标头外,还可以添加headersFunction
到工厂 -factory.setHeadersFunction((rec, ex) → { … })
4.4.5. 结合阻塞和非阻塞重试
从 2.8.4 开始,您可以将框架配置为同时使用阻塞和非阻塞重试。
例如,您可以有一组异常,这些异常也可能会在下一个记录上触发错误,例如DatabaseAccessException
,因此您可以在将同一记录发送到重试主题之前重试几次,或者直接发送到 DLT。
要配置阻止重试,您只需通过addRetryableExceptions
方法ListenerContainerFactoryConfigurer
bean 如下。
默认策略为FixedBackOff
,有九次重试,它们之间没有延迟。
或者,您可以提供自己的退避策略。
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
如果需要进一步调整异常分类,可以设置自己的异常分类Map
的分类通过ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()
方法,例如:
lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
结合全局可重试主题的致命异常分类,可以为所需的任何行为配置框架,例如让某些异常同时触发阻止和非阻止重试,仅触发一种或另一种类型,或者直接转到 DLT,而无需任何类型的重试。 |
下面是两种配置协同工作的示例:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
return lcfc;
}
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
return ddtr;
}
在此示例中:
-
ShouldRetryOnlyBlockingException.class
将仅通过阻止重试,如果所有重试都失败,将直接转到 DLT。 -
ShouldRetryViaBothException.class
将通过阻止重试,如果所有阻止重试都失败,则将转发到下一个重试主题以进行另一组尝试。 -
ShouldSkipBothRetriesException.class
永远不会以任何方式重试,如果第一次处理尝试失败,将直接转到 DLT。
请注意,阻止重试行为是允许列表 - 您添加您确实想要以这种方式重试的异常;而非阻塞重试分类适用于致命异常,因此是拒绝列表 - 您添加您不想执行非阻塞重试的异常,而是直接发送到 DLT。 |
非阻塞异常分类行为还取决于特定主题的配置。 |
4.4.6. 主题命名
重试主题和 DLT 的命名方法是在主主题后缀为提供的或默认值,并附加该主题的延迟或索引。
例子:
“我的主题” → “我的主题重试-0”、“我的主题重试-1”, ..., “我的主题-dlt”
“my-other-topic” → “my-topic-myRetrySuffix-1000”、“my-topic-myRetrySuffix-2000”, ..., “my-topic-myDltSuffix”。
重试主题和 DLT 后缀
您可以指定重试和 dlt 主题将使用的后缀。
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
默认后缀是“-retry”和“-dlt”,分别用于重试主题和 dlt。 |
附加主题的索引或延迟
可以在后缀后附加主题的索引或延迟值。
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
默认行为是以延迟值为后缀,但具有多个主题的固定延迟配置除外,在这种情况下,主题以主题的索引为后缀。 |
自定义命名策略
更复杂的命名策略可以通过注册实现RetryTopicNamesProviderFactory
.默认实现是SuffixingRetryTopicNamesProviderFactory
可以通过以下方式注册不同的实现:
@Bean
public RetryTopicNamesProviderFactory myRetryNamingProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
例如,除了标准后缀之外,以下实现还为 retry/dl 主题名称添加了前缀:
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if(properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}
4.4.7. Dlt策略
该框架提供了一些使用 DLT 的策略。可以提供 DLT 处理方法,使用默认日志记录方法,或者根本没有 DLT。此外,您还可以选择如果 DLT 处理失败会发生什么。
DLT处理方法
您可以指定用于处理主题的 Dlt 的方法,以及该处理失败时的行为。
为此,您可以使用@DltHandler
类的方法中的注释,其中包含@RetryableTopic
注释。
请注意,相同的方法将用于所有@RetryableTopic
该类中的 Commentted 方法。
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
DLT 处理程序方法也可以通过 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) 方法提供,将应处理 DLT 消息的 bean 名称和方法名称作为参数传递。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltProcessor("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
如果未提供 DLT 处理程序,则使用默认的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod。 |
从版本 2.8 开始,如果您根本不想在此应用程序中使用 DLT,包括通过默认处理程序(或您希望延迟使用),您可以控制 DLT 容器是否启动,而与容器工厂的autoStartup
财产。
使用@RetryableTopic
注释,请将autoStartDltHandler
属性设置为false
;使用配置生成器时,请使用.autoStartDltHandler(false)
.
稍后可以通过KafkaListenerEndpointRegistry
.
DLT 故障行为
如果 DLT 处理失败,则有两种可能的行为可用:ALWAYS_RETRY_ON_ERROR
和FAIL_ON_ERROR
.
在前者中,记录被转发回 DLT 主题,因此它不会阻止其他 DLT 记录的处理。 在后者中,使用者在不转发消息的情况下结束执行。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
默认行为是ALWAYS_RETRY_ON_ERROR . |
从 2.8.3 版本开始,ALWAYS_RETRY_ON_ERROR 如果记录导致引发致命异常,则不会将记录路由回 DLT,
例如DeserializationException 因为,通常,此类异常总是会被抛出。 |
被视为致命的例外情况是:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
您可以使用DestinationTopicResolver
豆。
有关详细信息,请参阅异常分类器。
配置无 DLT
该框架还提供了不为主题配置 DLT 的可能性。 在这种情况下,重审用尽后,处理就结束了。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotConfigureDlt()
.create(template);
}
4.4.8. 指定 ListenerContainerFactory
默认情况下,RetryTopic 配置将使用@KafkaListener
注释,但您可以指定一个不同的注释来创建重试主题和 DLT 侦听器容器。
对于@RetryableTopic
注释,您可以提供工厂的 bean 名称,并使用RetryTopicConfiguration
bean 中,您可以提供 bean 名称或实例本身。
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template,
ConcurrentKafkaListenerContainerFactory<Integer, MyPojo> factory) {
return RetryTopicConfigurationBuilder
.newInstance()
.listenerFactory(factory)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.listenerFactory("my-retry-topic-factory")
.create(template);
}
从 2.8.3 开始,您可以将相同的工厂用于可重试和不可重试的主题。 |
如果需要将出厂配置行为恢复到之前的 2.8.3,可以将标准RetryTopicConfigurer
豆子和套装useLegacyFactoryConfigurer
自true
如:
@Bean(name = RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryResolver containerFactoryResolver,
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
BeanFactory beanFactory,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
RetryTopicConfigurer retryTopicConfigurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, retryTopicNamesProviderFactory);
retryTopicConfigurer.useLegacyFactoryConfigurer(true);
return retryTopicConfigurer;
}
4.4.9. 更改 KafkaBackOffException 日志记录级别
当重试主题中的消息不应使用时,将KafkaBackOffException
被抛出。默认情况下,此类异常记录在DEBUG
级别,但您可以通过在ListenerContainerFactoryConfigurer
在@Configuration
类。
例如,要将日志记录级别更改为 WARN,您可以添加:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer listenerContainer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer configurer = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
configurer.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN));
return configurer;
}