|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
序列化,反序列化,和消息转换
概述
Spring Boot 提供了用于配置和管理应用程序的高级 API。
它在 org.apache.kafka.common.serialization.Serializer<T> 和
org.apache.kafka.common.serialization.Deserializer<T> 的抽象中提供了一些内置实现。
同时,我们可以使用 Producer 或 Consumer 配置属性来指定配置类和配置属性。
以下示例展示了如何进行配置:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特定的情况,KafkaConsumer(因此也是KafkaProducer)提供了重载构造函数,可以接受Serializer和Deserializer的实例,分别用于keys和values。
当您使用此API时,DefaultKafkaProducerFactory 和 DefaultKafkaConsumerFactory 也通过构造函数或setter方法提供属性,用于将自定义的 Serializer 和 Deserializer 实例注入到目标 Producer 或 Consumer 中。
此外,您还可以通过构造函数传入 Supplier<Serializer> 或 Supplier<Deserializer> 实例 - 这些 Supplier 在创建每个 Producer 或 Consumer 时被调用。
字符串序列化
自 2.5 版本起,Spring for Apache Kafka 提供了用于实体的字符串表示的 ToStringSerializer 和 ParseStringDeserializer 类。它们依赖于方法 toString 和某些 Function<String> 或 BiFunction<String, Headers> 来解析字符串并填充实例的属性。通常,这会调用类上的静态方法,例如 parse:
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下,ToStringSerializer 被配置为在记录 Headers 中传达关于序列化实体的类型信息。
可以通过将 addTypeInfo 属性设置为 false 来禁用此功能。
此信息可以用于 ParseStringDeserializer 在接收端。
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS(默认true): 您可以将其设置为false以在ToStringSerializer(设置addTypeInfo属性) 时禁用此功能。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
你可以通过默认值为UTF-8的Charset来配置将String转换为/从byte[]转换的Charset。
你可以使用 ConsumerConfig 属性通过解析器方法的名称来配置反序列化器:
-
ParseStringDeserializer.KEY_PARSER -
ParseStringDeserializer.VALUE_PARSER
The properties must contain the fully qualified name of the class followed by the method name, separated by a period ..
The method must be static and have a signature of either (String, Headers) or (String).
一个 ToFromStringSerde 也已提供,用于与 Kafka Streams 一起使用。
JSON
Spring for Apache Kafka 也提供了基于 Jackson JSON 对象映射器的 JsonSerializer 和 JsonDeserializer 实现。
JsonSerializer 允许将任何 Java 对象写入为 JSON byte[]。
JsonDeserializer 需要额外的 Class<?> targetType 参数以允许将消费的 byte[] 反序列化到正确的目标对象。
以下示例展示了如何创建一个 JsonDeserializer:
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以自定义JsonSerializer和JsonDeserializer,使用一个ObjectMapper。
您还可以扩展它们,在configure(Map<String, ?> configs, boolean isKey)方法中实现特定的配置逻辑。
从版本 2.3 开始,所有 JSON-aware 组件默认配置为带有 JacksonUtils.enhancedObjectMapper() 的实例,该实例会禁用 MapperFeature.DEFAULT_VIEW_INCLUSION 和 DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES 特性。
此外,该实例还包含用于自定义数据类型的知名模块,例如 Java 时间和 Kotlin 支持。
请参阅 JacksonUtils.enhancedObjectMapper() JavaDocs 以获取更多信息。
此方法还注册了将 org.springframework.util.MimeType 对象序列化为 plain string 的功能,以实现跨平台网络兼容性。
可以在应用程序上下文中注册一个 JacksonMimeTypeModule 作为 bean,并会自动配置到 Spring Boot ObjectMapper 实例。
也从2.3版本开始,JsonDeserializer提供了基于TypeReference的构造函数,以便更好地处理目标泛型容器类型。
从 2.1 版本开始,你可以在记录 Headers 中传达类型信息,允许处理多种类型。
此外,你可以通过以下 Kafka 属性配置序列化器和反序列化器。
如果你为 KafkaConsumer 和 KafkaProducer 分别提供了 Serializer 和 Deserializer 实例,则这些属性将不起作用。
配置属性
-
JsonSerializer.ADD_TYPE_INFO_HEADERS(默认true): 您可以将其设置为false以在JsonSerializer(设置addTypeInfo属性) 时禁用此功能。 -
JsonSerializer.TYPE_MAPPINGS(默认empty): 请参见 Mapping Types。 -
JsonDeserializer.USE_TYPE_INFO_HEADERS(默认true):您可以将其设置为false以忽略序列化器设置的头信息。 -
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS(默认true):您可以将其设置为false以保留序列化器设置的标头。 -
JsonDeserializer.KEY_DEFAULT_TYPE: 无标题信息时键的反序列化回退类型。 -
JsonDeserializer.VALUE_DEFAULT_TYPE: 无头信息时的反序列化回退类型。 -
JsonDeserializer.TRUSTED_PACKAGES(默认java.util,java.lang): 以逗号分隔的包模式列表,允许用于反序列化。*表示反序列化所有。 -
JsonDeserializer.TYPE_MAPPINGS(默认empty): 请参见 Mapping Types。 -
JsonDeserializer.KEY_TYPE_METHOD(默认empty): 请参见 使用方法确定类型。 -
JsonDeserializer.VALUE_TYPE_METHOD(默认empty): 请参见 使用方法确定类型。
从 2.2 版本开始,反序列化器会移除由序列化器添加的类型信息头(如果存在)。可以通过将 removeTypeHeaders 属性设置为 false 来恢复之前的处理方式,既可以直接设置在反序列化器上,也可以使用前面描述的配置属性。
从 2.8 版本开始,如果你像在 程序化构造 中所示那样程序化地构造序列化器或反序列化器,只要没有使用任何属性进行显式设置(使用 set*() 方法或使用流畅 API),上述属性将由工厂应用。此前,在程序化创建时,配置属性从未被应用;如果直接在对象上显式设置属性,则情况仍如此。 |
映射类型
从 2.2 版本开始,使用 JSON 时,可以通过前面列出的属性提供类型映射。
此前,您需要在序列化器和反序列化器中自定义类型映射器。
映射由前面列表中属性的以逗号分隔的 token:className 对组成。
在出站方向,负载的类名被映射到相应的 token。
在入站方向,类型头中的 token 被映射到相应的类名。
以下示例创建了一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
| 对应的对象必须兼容。 |
如果使用 Spring Boot,你可以在 application.properties(或 yaml)文件中提供这些属性。
以下示例展示了如何操作:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
|
您只能使用属性执行简单的配置。 对于更高级的配置(例如,在序列化程序和反序列化程序中使用自定义的
构造器也提供了 setter,作为使用这些构造器的替代方案。 |
When 使用 Spring Boot 并如上所示覆寫 ConsumerFactory 和 ProducerFactory 時,wild card generic types 需要使用與 bean 方法返回類型。
If 具體的 generic types 被提供時,則 Spring Boot 會忽略這些 beans 並仍然會使用默認的 ones。 |
从 2.2 版本开始,您可以使用具有布尔参数的重载构造函数(该参数默认为 1,但为 0 时会显式配置)来显式配置反序列化器,使其使用指定的目标类型,并忽略标头中的类型信息。以下示例展示了如何实现这一点:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法确定类型
从2.5版本开始,您可以使用属性配置反序列化器,通过调用方法来确定目标类型。
如果存在该配置,则会覆盖上述讨论的其他技术。
这在数据由不使用Spring序列化的应用程序发布,且需要根据数据或其他头信息反序列化到不同类型时很有用。
将这些属性设置为方法名:完全限定类名后跟方法名,用点 . 分隔。
该方法必须声明为 public static,并具有以下三种签名之一:(String topic, byte[] data, Headers headers)、(byte[] data, Headers headers) 或 (byte[] data),并且返回一个 Jackson JavaType。
-
JsonDeserializer.KEY_TYPE_METHOD:spring.json.key.type.method -
JsonDeserializer.VALUE_TYPE_METHOD:spring.json.value.type.method
你可以使用任意的头信息或检查数据以确定类型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的数据检查,可以考虑使用JsonPath或类似方法,但确定类型的测试越简单,处理过程就越高效。
以下是一个通过程序方式创建反序列化器的示例(当在提供消费者工厂的构造函数中传入反序列化器时):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程序化构建
在使用2.3版本之后,当程序化地构建用于生产者/消费者的序列化器/反序列化器时,您可以使用流畅API(fluent API),这简化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
为了程序化地提供类型映射,类似于 使用方法确定类型,使用 typeFunction 属性。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要不使用流畅API来配置属性,或者使用set*()方法进行设置,工厂将使用配置属性来配置序列化器/反序列化器;参见配置属性。
委托式序列化器和反序列化器
使用标题
版本 2.3 引入了 DelegatingSerializer 和 DelegatingDeserializer,这允许生产者产生并消费者用不同类型的键和/或值记录。
生产者必须设置一个头 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 到一个选择器值,该值用于选择用于值的序列化器和 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR 用于键;如果未找到匹配项,将抛出 IllegalStateException。
对于输入记录,反序列化器使用相同的头来选择要使用的反序列化器;如果未找到匹配项或该头不存在,则返回原始byte[]。
You can configure the map of selector to Serializer / Deserializer via a constructor, or you can configure it via Kafka producer/consumer properties with the keys DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG.
For the serializer, the producer property can be a Map<String, Object> where the key is the selector and the value is a Serializer instance, a serializer Class or the class name.
The property can also be a String of comma-delimited map entries, as shown below.
反序列化器的消费者属性可以是一个 Map<String, Object>,其中键是选择器,值是一个 Deserializer 实例,一个 deserializer Class 或类名。该属性也可以是一个用逗号分隔的映射条目的字符串,如下所示。
使用属性进行配置时,请使用以下语法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
生产者则会将DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR头设置为thing1或thing2。
这种技术支持将不同类型发送到同一主题(或不同主题)。
从 2.5.1 版本开始,如果类型(键或值)是 Serdes 所支持的标准类型之一(如 Long, Integer, 等),则无需设置 selector 头部;序列化器会将该头部设置为该类型的类名。对于这些类型无需配置序列化器或反序列化器,它们会在首次使用时动态创建。 |
另一种将不同类型发送到不同主题的技术,请参见 Using RoutingKafkaTemplate。
By 类型
版本 2.8 引入了 DelegatingByTypeSerializer。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从版本 2.8.3 开始,您可以配置序列化器检查映射键是否可以分配给目标对象,这在当委托序列化器可以序列化子类时很有用。在这种情况下,如果有歧义匹配,应提供有序的Map,例如LinkedHashMap应提供。
By Topic
从 2.8 版本开始,DelegatingByTopicSerializer 和 DelegatingByTopicDeserializer 允许根据主题名称选择序列化器/反序列化器。
使用正则表达式 Pattern 来查找要使用的实例。
可以通过构造函数或属性配置映射(以逗号分隔的 pattern:serializer 列表)。
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
使用 KEY_SERIALIZATION_TOPIC_CONFIG 时用于键的用途。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以在没有模式匹配时,使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT 和 DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT 指定默认的序列化器/反序列化器。
一个额外的属性 DelegatingByTopicSerialization.CASE_SENSITIVE(默认值 true),当设置为 false 时会使主题查找不区分大小写。
重试反序列化器
The RetryingDeserializer 使用委托 Deserializer 和 RetryTemplate 在反序列化过程中,当委托可能遇到瞬态错误(如网络问题)时进行重试。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
从版本 3.1.2 开始,可以在 RecoveryCallback 上可选地设置 RetryingDeserializer。
参考 spring-retry 项目了解如何配置 RetryTemplate 的重试策略、退避策略等。
Spring 消息 Messaging 消息转换
虽然从低级Kafka的Consumer和Producer视角来看,Serializer和Deserializer API很简单且灵活,但当你在使用@KafkaListener或Spring Integration的Apache Kafka支持时,在Spring消息层可能需要更多灵活性。为了让你容易地转换到和从org.springframework.messaging.Message,Spring for Apache Kafka提供了一个MessageConverter抽象,其MessagingMessageConverter实现和JsonMessageConverter(及其子类)的定制化。
你可以直接将MessageConverter注入到KafkaTemplate实例中,并通过使用AbstractKafkaListenerContainerFactory定义的@KafkaListener.containerFactory()属性进行注入。以下示例展示了如何操作:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
当使用 Spring Boot 时,只需将转换器定义为一个 @Bean,Spring Boot 自动配置就会将其连接到自动配置的模板和容器工厂。
当你使用一个 @KafkaListener,消息转换器会提供参数类型以协助进行转换。
|
这种类型推断只能在 |
|
在消费者侧,您可以配置一个 在生产者一侧,当你使用 Spring Integration 或 0 方法(参见 使用 3)时,必须配置一个与配置的 Kafka 4 兼容的消息转换器。
再次使用 为了方便,在2.3版本开始,框架还提供了一个 |
从 2.7.1 版本开始,消息有效载荷转换可以委托给一个 spring-messaging SmartMessageConverter; 这使得转换,例如,可以基于 MessageHeaders.CONTENT_TYPE 头部进行。
The KafkaMessageConverter.fromMessage() method is called for outbound conversion to a ProducerRecord with the message payload in the ProducerRecord.value() property.
The KafkaMessageConverter.toMessage() method is called for inbound conversion from ConsumerRecord with the payload being the ConsumerRecord.value() property.
The SmartMessageConverter.toMessage() method is called to create a new outbound Message<?> from the Message passed to fromMessage() (usually by KafkaTemplate.send(Message<?> msg)).
Similarly, in the KafkaMessageConverter.toMessage() method, after the converter has created a new Message<?> from the ConsumerRecord, the SmartMessageConverter.fromMessage() method is called and then the final inbound message is created with the newly converted payload.
In either case, if the SmartMessageConverter returns null, the original message is used. |
当使用默认的 converter 在 KafkaTemplate 和 listener 容器工厂中时,您通过在模板上调用 setMessagingConverter() 并通过在 @KafkaListener 的方法上的 contentTypeConverter 属性来配置 SmartMessageConverter。
示例:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring Data 投影接口
从 2.1.1 版本开始,您可以将 JSON 转换为 Spring Data Projection 接口,而不是具体的类型。 这允许非常选择性的、低耦合的数据绑定,包括在 JSON 文档中的多个位置查找值。 例如,可以定义以下接口作为消息负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
访问器方法将默认用于通过接收到的 JSON 文档中的字段来查找属性名称。@JsonPath 表达式允许自定义值查找方式,甚至可以定义多个 JSON 路径表达式,以从多个位置查找值,直到某个表达式返回实际值。
要启用此功能,请使用一个配置了适当委托转换器的ProjectingMessageConverter(用于出站转换和将非投影接口进行转换)。
您还必须将spring-data:spring-data-commons和com.jayway.jsonpath:json-path添加到类路径中。
当用作 @KafkaListener 方法的参数时,接口类型会像平常一样自动传递给转换器。
使用ErrorHandlingDeserializer
When 一个反序列化器无法反序列化一条消息时,Spring 没有办法处理这个问题,因为该问题发生在poll()返回之前。
为了解决这个问题,引入了ErrorHandlingDeserializer。
该反序列化器委托给实际的反序列化器(键或值)。
如果委托的反序列化器无法反序列化记录内容,ErrorHandlingDeserializer会返回一个null值,并在包含原因和原始字节的DeserializationException头中返回。
当你使用记录级的MessageListener时,如果ConsumerRecord包含针对键或值的DeserializationException头,容器的ErrorHandler将被调用,传入失败的ConsumerRecord。
该记录不会传递给监听器。
或者,你可以通过提供一个failedDeserializationFunction来配置ErrorHandlingDeserializer,从而创建一个自定义值。
该函数用于创建T的实例,该实例以 usual 的方式传递给监听器。
一个包含所有上下文信息的FailedDeserializationInfo类型的对象会提供给该函数。
你可以在头信息中找到DeserializationException(作为序列化的Java对象)。
有关ErrorHandlingDeserializer的更多信息,请参阅Javadoc。
您可以使用接受 key 和 value Deserializer 对象的 DefaultKafkaConsumerFactory 构造函数,并将通过使用适当的委托配置的 ErrorHandlingDeserializer 实例进行连接。
或者,您可以使用消费者配置属性(这些属性用于由 ErrorHandlingDeserializer 使用的实例化)来实例化委托。
属性名称为 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。
属性值可以是类或类名。
以下示例展示了如何设置这些属性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用了一个 failedDeserializationFunction。
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
上述示例使用了以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消费者配置为使用一个 ErrorHandlingDeserializer,则重要的是要为消费者配置 KafkaTemplate,并为其生产者配置一个能够处理普通对象以及由反序列化异常产生的原始 byte[] 值的序列化器。
模板的通用值类型应为 Object。
一种技术是使用 DelegatingByTypeSerializer;以下是一个示例: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
当使用批处理监听器的ErrorHandlingDeserializer时,必须在消息头中检查反序列化异常。
当与DefaultBatchErrorHandler一起使用时,可以使用该头确定异常失败的记录,并通过BatchListenerFailedException将错误传递给错误处理程序。
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException() 可用于将头信息转换为 DeserializationException。
当消费 List<ConsumerRecord<?, ?> 时,使用 SerializationUtils.getExceptionFromHeader() 代替:
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果同时使用一个DeadLetterPublishingRecoverer,则为1发布的记录将具有一个 |
从3.1版本开始,你可以向ErrorHandlingDeserializer前添加一个Validator。
如果委托的Deserializer成功反序列化对象,但该对象验证失败,将抛出与反序列化异常类似的异常。
这允许原始原始数据传递给错误处理器。
当你自己创建反序列化器时,只需调用setValidator;如果你使用属性配置序列化器,将消费者配置属性ErrorHandlingDeserializer.VALIDATOR_CLASS设置为你的Validator的类或完整类名。
当你使用Spring Boot时,该属性名称是spring.kafka.consumer.properties.spring.deserializer.validator.class。
负载转换与批处理监听器
You can also use a JsonMessageConverter within a BatchMessagingMessageConverter to convert batch messages when you use a batch listener container factory.
See Serialization, Deserialization, and Message Conversion and Spring Messaging Message Conversion for more information.
默认情况下,转换的类型会从监听器参数推断。
如果你将JsonMessageConverter配置为带有其TypePrecedence设置为TYPE_ID(而不是默认的INFERRED)的DefaultJackson2TypeMapper,转换器会优先使用消息头中的类型信息(如果存在)。
这允许,例如,监听器方法声明为接口而不是具体类。
此外,类型转换器还支持映射,因此反序列化的目标类型可以与源类型不同(只要数据兼容)。
这在使用类级别@KafkaListener实例时也很有用,其中负载必须在确定要调用的方法之前已转换。
以下示例创建了使用此方法的 beans:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,为此要生效,转换目标的方法签名必须是一个具有单个泛型参数类型的容器对象,例如以下所示:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
注意,您仍然可以访问批处理标题。
如果批处理转换器具有支持记录转换器,则也可以接收一个其payload根据泛型类型进行转换的消息列表。 以下示例展示了如何实现:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
If record in the batch cannot be converted, its payload is set as null into the target payloads list.
The conversion exception is logged as warning for this record and also stored into a KafkaHeaders.CONVERSION_FAILURES header as an item of the List<ConversionException>.
The target @KafkaListener method may perform Java Stream API to filter out those null values from the payload list or do something with the conversion exceptions header:
@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
@Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {
for (int i = 0; i < list.size(); i++) {
if (conversionFailures.get(i) != null) {
throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
}
}
}
ConversionService自定义
从版本 2.1.1 开始,用于默认 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory 解析监听器方法参数调用的 org.springframework.core.convert.ConversionService,会在实现以下任一接口的所有 bean 上提供:
-
org.springframework.core.convert.converter.Converter -
org.springframework.core.convert.converter.GenericConverter -
org.springframework.format.Formatter
这允许你在不更改默认配置用于 0 和 1 的情况下,进一步自定义监听器反序列化。
通过KafkaListenerConfigurer beans为KafkaListenerEndpointRegistrar设置的自定义MessageHandlerMethodFactory将禁用此功能。 |
添加自定义HandlerMethodArgumentResolver to @KafkaListener
从 2.4.2 版本开始,您可以添加自己的 HandlerMethodArgumentResolver 并解析自定义方法参数。
您只需要实现 KafkaListenerConfigurer,并使用来自类 KafkaListenerEndpointRegistrar 的方法 setCustomMethodArgumentResolvers()。
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
You can also completely replace the framework’s argument resolution by adding a custom MessageHandlerMethodFactory to the KafkaListenerEndpointRegistrar bean.
If you do this, and your application needs to handle tombstone records, with a null value() (e.g. from a compacted topic), you should add a KafkaNullAwarePayloadArgumentResolver to the factory; it must be the last resolver because it supports all types and can match arguments without a @Payload annotation.
If you are using a DefaultMessageHandlerMethodFactory, set this resolver as the last custom resolver; the factory will ensure that this resolver will be used before the standard PayloadMethodArgumentResolver, which has no knowledge of KafkaNull payloads.