这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

序列化,反序列化,和消息转换

概述

Spring Boot 提供了用于配置和管理应用程序的高级 API。 它在 org.apache.kafka.common.serialization.Serializer<T>org.apache.kafka.common.serialization.Deserializer<T> 的抽象中提供了一些内置实现。 同时,我们可以使用 ProducerConsumer 配置属性来指定配置类和配置属性。 以下示例展示了如何进行配置:spring-doc.cadn.net.cn

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

对于更复杂或特定的情况,KafkaConsumer(因此也是KafkaProducer)提供了重载构造函数,可以接受SerializerDeserializer的实例,分别用于keysvaluesspring-doc.cadn.net.cn

当您使用此API时,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 也通过构造函数或setter方法提供属性,用于将自定义的 SerializerDeserializer 实例注入到目标 ProducerConsumer 中。 此外,您还可以通过构造函数传入 Supplier<Serializer>Supplier<Deserializer> 实例 - 这些 Supplier 在创建每个 ProducerConsumer 时被调用。spring-doc.cadn.net.cn

字符串序列化

自 2.5 版本起,Spring for Apache Kafka 提供了用于实体的字符串表示的 ToStringSerializerParseStringDeserializer 类。它们依赖于方法 toString 和某些 Function<String>BiFunction<String, Headers> 来解析字符串并填充实例的属性。通常,这会调用类上的静态方法,例如 parsespring-doc.cadn.net.cn

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);

默认情况下,ToStringSerializer 被配置为在记录 Headers 中传达关于序列化实体的类型信息。 可以通过将 addTypeInfo 属性设置为 false 来禁用此功能。 此信息可以用于 ParseStringDeserializer 在接收端。spring-doc.cadn.net.cn

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS (默认 true): 您可以将其设置为 false 以在 ToStringSerializer (设置 addTypeInfo 属性) 时禁用此功能。spring-doc.cadn.net.cn

ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});

你可以通过默认值为UTF-8Charset来配置将String转换为/从byte[]转换的Charsetspring-doc.cadn.net.cn

你可以使用 ConsumerConfig 属性通过解析器方法的名称来配置反序列化器:spring-doc.cadn.net.cn

The properties must contain the fully qualified name of the class followed by the method name, separated by a period .. The method must be static and have a signature of either (String, Headers) or (String).spring-doc.cadn.net.cn

一个 ToFromStringSerde 也已提供,用于与 Kafka Streams 一起使用。spring-doc.cadn.net.cn

JSON

Spring for Apache Kafka 也提供了基于 Jackson JSON 对象映射器的 JacksonJsonSerializerJacksonJsonDeserializer 实现。 JacksonJsonSerializer 允许将任何 Java 对象写入为 JSON byte[]JacksonJsonDeserializer 需要额外的 Class<?> targetType 参数以允许将消费的 byte[] 反序列化到正确的目标对象。 以下示例展示了如何创建一个 JacksonJsonDeserializerspring-doc.cadn.net.cn

JacksonJsonDeserializer<Thing> thingDeserializer = new JacksonJsonDeserializer<>(Thing.class);

您可以自定义JacksonJsonSerializerJacksonJsonDeserializer,使用一个ObjectMapper。 您还可以扩展它们,在configure(Map<String, ?> configs, boolean isKey)方法中实现特定的配置逻辑。spring-doc.cadn.net.cn

从版本 2.3 开始,所有 JSON-aware 组件默认配置为带有 JacksonUtils.enhancedObjectMapper() 的实例,该实例会禁用 MapperFeature.DEFAULT_VIEW_INCLUSIONDeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES 特性。 此外,该实例还包含用于自定义数据类型的知名模块,例如 Java 时间和 Kotlin 支持。 请参阅 JacksonUtils.enhancedObjectMapper() JavaDocs 以获取更多信息。 此方法还注册了将 org.springframework.util.MimeType 对象序列化为 plain string 的功能,以实现跨平台网络兼容性。 可以在应用程序上下文中注册一个 JacksonMimeTypeModule 作为 bean,并会自动配置到 Spring Boot ObjectMapper 实例。spring-doc.cadn.net.cn

也从2.3版本开始,JsonDeserializer提供了基于TypeReference的构造函数,以便更好地处理目标泛型容器类型。spring-doc.cadn.net.cn

从 2.1 版本开始,你可以在记录 Headers 中传达类型信息,允许处理多种类型。 此外,你可以通过以下 Kafka 属性配置序列化器和反序列化器。 如果你为 KafkaConsumerKafkaProducer 分别提供了 SerializerDeserializer 实例,则这些属性将不起作用。spring-doc.cadn.net.cn

配置属性

从 2.2 版本开始,反序列化器会移除由序列化器添加的类型信息头(如果存在)。可以通过将 removeTypeHeaders 属性设置为 false 来恢复之前的处理方式,既可以直接设置在反序列化器上,也可以使用前面描述的配置属性。spring-doc.cadn.net.cn

从 2.8 版本开始,如果你像在 程序化构造 中所示那样程序化地构造序列化器或反序列化器,只要没有使用任何属性进行显式设置(使用 set*() 方法或使用流畅 API),上述属性将由工厂应用。此前,在程序化创建时,配置属性从未被应用;如果直接在对象上显式设置属性,则情况仍如此。

映射类型

从 2.2 版本开始,使用 JSON 时,可以通过前面列出的属性提供类型映射。 此前,您需要在序列化器和反序列化器中自定义类型映射器。 映射由前面列表中属性的以逗号分隔的 token:className 对组成。 在出站方向,负载的类名被映射到相应的 token。 在入站方向,类型头中的 token 被映射到相应的类名。spring-doc.cadn.net.cn

以下示例创建了一组映射:spring-doc.cadn.net.cn

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
对应的对象必须兼容。

如果使用 Spring Boot,你可以在 application.properties(或 yaml)文件中提供这些属性。 以下示例展示了如何操作:spring-doc.cadn.net.cn

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

spring-doc.cadn.net.cn

您只能使用属性执行简单的配置。spring-doc.cadn.net.cn

对于更高级的配置(例如,在序列化程序和反序列化程序中使用自定义的ObjectMapper),您应该使用接受预构建的序列化程序和反序列化程序的生产者和消费者工厂构造函数。以下Spring Boot示例会覆盖默认的工厂:spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

构造器也提供了 setter,作为使用这些构造器的替代方案。spring-doc.cadn.net.cn

When 使用 Spring Boot 并如上所示覆寫 ConsumerFactoryProducerFactory 時,wild card generic types 需要使用與 bean 方法返回類型。 If 具體的 generic types 被提供時,則 Spring Boot 會忽略這些 beans 並仍然會使用默認的 ones。

从 2.2 版本开始,您可以使用具有布尔参数的重载构造函数(该参数默认为 1,但为 0 时会显式配置)来显式配置反序列化器,使其使用指定的目标类型,并忽略标头中的类型信息。以下示例展示了如何实现这一点:spring-doc.cadn.net.cn

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JacksonJsonDeserializer<>(Cat1.class, false));

使用方法确定类型

从2.5版本开始,您可以使用属性配置反序列化器,通过调用方法来确定目标类型。 如果存在该配置,则会覆盖上述讨论的其他技术。 这在数据由不使用Spring序列化的应用程序发布,且需要根据数据或其他头信息反序列化到不同类型时很有用。 将这些属性设置为方法名:完全限定类名后跟方法名,用点 . 分隔。 该方法必须声明为 public static,并具有以下三种签名之一:(String topic, byte[] data, Headers headers)(byte[] data, Headers headers)(byte[] data),并且返回一个 Jackson JavaTypespring-doc.cadn.net.cn

你可以使用任意的头信息或检查数据以确定类型。spring-doc.cadn.net.cn

JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}

对于更复杂的数据检查,可以考虑使用JsonPath或类似方法,但确定类型的测试越简单,处理过程就越高效。spring-doc.cadn.net.cn

以下是一个通过程序方式创建反序列化器的示例(当在提供消费者工厂的构造函数中传入反序列化器时):spring-doc.cadn.net.cn

JacksonJsonDeserializer<Object> deser = new JacksonJsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}

程序化构建

在使用2.3版本之后,当程序化地构建用于生产者/消费者的序列化器/反序列化器时,您可以使用流畅API(fluent API),这简化了配置。spring-doc.cadn.net.cn

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JacksonJsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JacksonJsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JacksonJsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JacksonJsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}

为了程序化地提供类型映射,类似于 使用方法确定类型,使用 typeFunction 属性。spring-doc.cadn.net.cn

JacksonJsonDeserializer<Object> deser = new JacksonJsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);

或者,只要不使用流畅API来配置属性,或者使用set*()方法进行设置,工厂将使用配置属性来配置序列化器/反序列化器;参见配置属性spring-doc.cadn.net.cn

委托式序列化器和反序列化器

使用标题

版本 2.3 引入了 DelegatingSerializerDelegatingDeserializer,这允许生产者产生并消费者用不同类型的键和/或值记录。 生产者必须设置一个头 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 到一个选择器值,该值用于选择用于值的序列化器和 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR 用于键;如果未找到匹配项,将抛出 IllegalStateExceptionspring-doc.cadn.net.cn

对于输入记录,反序列化器使用相同的头来选择要使用的反序列化器;如果未找到匹配项或该头不存在,则返回原始byte[]spring-doc.cadn.net.cn

You can configure the map of selector to Serializer / Deserializer via a constructor, or you can configure it via Kafka producer/consumer properties with the keys DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG. For the serializer, the producer property can be a Map<String, Object> where the key is the selector and the value is a Serializer instance, a serializer Class or the class name. The property can also be a String of comma-delimited map entries, as shown below.spring-doc.cadn.net.cn

反序列化器的消费者属性可以是一个 Map<String, Object>,其中键是选择器,值是一个 Deserializer 实例,一个 deserializer Class 或类名。该属性也可以是一个用逗号分隔的映射条目的字符串,如下所示。spring-doc.cadn.net.cn

使用属性进行配置时,请使用以下语法:spring-doc.cadn.net.cn

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

生产者则会将DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR头设置为thing1thing2spring-doc.cadn.net.cn

这种技术支持将不同类型发送到同一主题(或不同主题)。spring-doc.cadn.net.cn

从 2.5.1 版本开始,如果类型(键或值)是 Serdes 所支持的标准类型之一(如 Long, Integer, 等),则无需设置 selector 头部;序列化器会将该头部设置为该类型的类名。对于这些类型无需配置序列化器或反序列化器,它们会在首次使用时动态创建。

另一种将不同类型发送到不同主题的技术,请参见 Using RoutingKafkaTemplatespring-doc.cadn.net.cn

By 类型

版本 2.8 引入了 DelegatingByTypeSerializerspring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

从版本 2.8.3 开始,您可以配置序列化器检查映射键是否可以分配给目标对象,这在当委托序列化器可以序列化子类时很有用。在这种情况下,如果有歧义匹配,应提供有序的Map,例如LinkedHashMap应提供。spring-doc.cadn.net.cn

By Topic

从 2.8 版本开始,DelegatingByTopicSerializerDelegatingByTopicDeserializer 允许根据主题名称选择序列化器/反序列化器。 使用正则表达式 Pattern 来查找要使用的实例。 可以通过构造函数或属性配置映射(以逗号分隔的 pattern:serializer 列表)。spring-doc.cadn.net.cn

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());

使用 KEY_SERIALIZATION_TOPIC_CONFIG 时用于键的用途。spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            new IntegerSerializer(),
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JacksonJsonSerializer<Object>());  // default
}

您可以在没有模式匹配时,使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULTDelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT 指定默认的序列化器/反序列化器。spring-doc.cadn.net.cn

一个额外的属性 DelegatingByTopicSerialization.CASE_SENSITIVE(默认值 true),当设置为 false 时会使主题查找不区分大小写。spring-doc.cadn.net.cn

重试反序列化器

The RetryingDeserializer 使用委托 DeserializerRetryTemplate 在反序列化过程中,当委托可能遇到瞬态错误(如网络问题)时进行重试。spring-doc.cadn.net.cn

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

一个恢复回调可以设置在RetryingDeserializer,如果所有重试都用尽,则返回回退对象。spring-doc.cadn.net.cn

参考 Spring Framework 项目中,为 RetryTemplate 配置重试策略、退避等策略。spring-doc.cadn.net.cn

Spring 消息 Messaging 消息转换

虽然从低级Kafka的ConsumerProducer视角来看,SerializerDeserializer API很简单且灵活,但当你在使用@KafkaListenerSpring Integration的Apache Kafka支持时,在Spring消息层可能需要更多灵活性。为了让你容易地转换到和从org.springframework.messaging.Message,Spring for Apache Kafka提供了一个MessageConverter抽象,其MessagingMessageConverter实现和JacksonJsonMessageConverter(及其子类)的定制化。 你可以直接将MessageConverter注入到KafkaTemplate实例中,并通过使用AbstractKafkaListenerContainerFactory定义的@KafkaListener.containerFactory()属性进行注入。以下示例展示了如何操作:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordMessageConverter(new JacksonJsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

当使用 Spring Boot 时,只需将转换器定义为一个 @Bean,Spring Boot 自动配置就会将其连接到自动配置的模板和容器工厂。spring-doc.cadn.net.cn

当你使用一个 @KafkaListener,消息转换器会提供参数类型以协助进行转换。spring-doc.cadn.net.cn

这种类型推断只能在 @KafkaListener 注解在方法级别声明时实现。 使用类级别的 @KafkaListener 时,会使用payload类型来选择要调用的 @KafkaHandler 方法,因此在可以选择方法之前,该payload类型必须已经转换完成。spring-doc.cadn.net.cn

在消费者侧,您可以配置一个JacksonJsonMessageConverter;它可以处理类型为byte[]ConsumerRecord值,适用于BytesString,应与ByteArrayDeserializerBytesDeserializerStringDeserializer一起使用。 (byte[]Bytes更高效,因为它们避免了不必要的byte[]String转换) 您也可以配置与反序列化器对应的JacksonJsonMessageConverter的特定子类,如果您愿意的话。spring-doc.cadn.net.cn

在生产者一侧,当你使用 Spring Integration 或 0 方法(参见 使用 3)时,必须配置一个与配置的 Kafka 4 兼容的消息转换器。spring-doc.cadn.net.cn

再次使用 byte[]Bytes 更高效,因为它们避免了 Stringbyte[] 的转换。spring-doc.cadn.net.cn

为了方便,在2.3版本开始,框架还提供了一个StringOrBytesSerializer,可以将三种值类型进行序列化,以便与任何消息转换器一起使用。spring-doc.cadn.net.cn

从 2.7.1 版本开始,消息有效载荷转换可以委托给一个 spring-messaging SmartMessageConverter; 这使得转换,例如,可以基于 MessageHeaders.CONTENT_TYPE 头部进行。spring-doc.cadn.net.cn

The KafkaMessageConverter.fromMessage() method is called for outbound conversion to a ProducerRecord with the message payload in the ProducerRecord.value() property. The KafkaMessageConverter.toMessage() method is called for inbound conversion from ConsumerRecord with the payload being the ConsumerRecord.value() property. The SmartMessageConverter.toMessage() method is called to create a new outbound Message<?> from the Message passed to fromMessage() (usually by KafkaTemplate.send(Message<?> msg)). Similarly, in the KafkaMessageConverter.toMessage() method, after the converter has created a new Message<?> from the ConsumerRecord, the SmartMessageConverter.fromMessage() method is called and then the final inbound message is created with the newly converted payload. In either case, if the SmartMessageConverter returns null, the original message is used.

当使用默认的 converter 在 KafkaTemplate 和 listener 容器工厂中时,您通过在模板上调用 setMessagingConverter() 并通过在 @KafkaListener 的方法上的 contentTypeConverter 属性来配置 SmartMessageConverterspring-doc.cadn.net.cn

template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}

使用 Spring Data 投影接口

从 2.1.1 版本开始,您可以将 JSON 转换为 Spring Data Projection 接口,而不是具体的类型。 这允许非常选择性的、低耦合的数据绑定,包括在 JSON 文档中的多个位置查找值。 例如,可以定义以下接口作为消息负载类型:spring-doc.cadn.net.cn

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

访问器方法将默认用于通过接收到的 JSON 文档中的字段来查找属性名称。@JsonPath 表达式允许自定义值查找方式,甚至可以定义多个 JSON 路径表达式,以从多个位置查找值,直到某个表达式返回实际值。spring-doc.cadn.net.cn

要启用此功能,请使用一个配置了适当委托转换器的JacksonProjectingMessageConverter(用于出站转换和将非投影接口进行转换)。 您还必须将spring-data:spring-data-commonscom.jayway.jsonpath:json-path添加到类路径中。spring-doc.cadn.net.cn

当用作 @KafkaListener 方法的参数时,接口类型会像平常一样自动传递给转换器。spring-doc.cadn.net.cn

使用ErrorHandlingDeserializer

When 一个反序列化器无法反序列化一条消息时,Spring 没有办法处理这个问题,因为该问题发生在poll()返回之前。 为了解决这个问题,引入了ErrorHandlingDeserializer。 该反序列化器委托给实际的反序列化器(键或值)。 如果委托的反序列化器无法反序列化记录内容,ErrorHandlingDeserializer会返回一个null值,并在包含原因和原始字节的DeserializationException头中返回。 当你使用记录级的MessageListener时,如果ConsumerRecord包含针对键或值的DeserializationException头,容器的ErrorHandler将被调用,传入失败的ConsumerRecord。 该记录不会传递给监听器。spring-doc.cadn.net.cn

Alternatively, you can configure the ErrorHandlingDeserializer to create a custom value by providing a failedDeserializationFunction, which is a Function<FailedDeserializationInfo, T>. This function is invoked to create an instance of T, which is passed to the listener in the usual fashion. An object of type FailedDeserializationInfo, which contains all the contextual information is provided to the function. You can find the DeserializationException (as a serialized Java object) in headers. See the Javadoc for the ErrorHandlingDeserializer for more information。spring-doc.cadn.net.cn

您可以使用接受 key 和 value Deserializer 对象的 DefaultKafkaConsumerFactory 构造函数,并将通过使用适当的委托配置的 ErrorHandlingDeserializer 实例进行连接。 或者,您可以使用消费者配置属性(这些属性用于由 ErrorHandlingDeserializer 使用的实例化)来实例化委托。 属性名称为 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASSErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。 属性值可以是类或类名。 以下示例展示了如何设置这些属性:spring-doc.cadn.net.cn

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

以下示例使用了一个 failedDeserializationFunctionspring-doc.cadn.net.cn

public class BadThing extends Thing {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

  @Override
  public Thing apply(FailedDeserializationInfo info) {
    return new BadThing(info);
  }

}

上述示例使用了以下配置:spring-doc.cadn.net.cn

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消费者配置为使用一个 ErrorHandlingDeserializer,则重要的是要为消费者配置 KafkaTemplate,并为其生产者配置一个能够处理普通对象以及由反序列化异常产生的原始 byte[] 值的序列化器。 模板的通用值类型应为 Object。 一种技术是使用 DelegatingByTypeSerializer;以下是一个示例:
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JacksonJsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

当使用批处理监听器的ErrorHandlingDeserializer时,必须在消息头中检查反序列化异常。 当与DefaultBatchErrorHandler一起使用时,可以使用该头确定异常失败的记录,并通过BatchListenerFailedException将错误传递给错误处理程序。spring-doc.cadn.net.cn

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            try {
                DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                        headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
                if (deserEx != null) {
                    logger.error(deserEx, "Record at index " + i + " could not be deserialized");
                }
            }
            catch (Exception ex) {
                logger.error(ex, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

SerializationUtils.byteArrayToDeserializationException() 可用于将头信息转换为 DeserializationExceptionspring-doc.cadn.net.cn

当消费 List<ConsumerRecord<?, ?> 时,使用 SerializationUtils.getExceptionFromHeader() 代替:spring-doc.cadn.net.cn

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}
如果同时使用一个DeadLetterPublishingRecoverer,则为1发布的记录将具有一个record.value()字段,其类型为byte[];这不应该被序列化。 考虑使用一个配置为对byte[]使用ByteArraySerializerDelegatingByTypeSerializer,并将所有其他类型的正常序列化器(如Json、Avro等)使用。

从3.1版本开始,你可以向ErrorHandlingDeserializer前添加一个Validator。 如果委托的Deserializer成功反序列化对象,但该对象验证失败,将抛出与反序列化异常类似的异常。 这允许原始原始数据传递给错误处理器。 当你自己创建反序列化器时,只需调用setValidator;如果你使用属性配置序列化器,将消费者配置属性ErrorHandlingDeserializer.VALIDATOR_CLASS设置为你的Validator的类或完整类名。 当你使用Spring Boot时,该属性名称是spring.kafka.consumer.properties.spring.deserializer.validator.classspring-doc.cadn.net.cn

负载转换与批处理监听器

You can also use a JacksonJsonMessageConverter within a BatchMessagingMessageConverter to convert batch messages when you use a batch listener container factory. See Serialization, Deserialization, and Message Conversion and Spring Messaging Message Conversion for more information.spring-doc.cadn.net.cn

默认情况下,转换的类型会从监听器参数推断。 如果你将JacksonJsonMessageConverter配置为带有其TypePrecedence设置为TYPE_ID(而不是默认的INFERRED)的DefaultJackson2TypeMapper,转换器会优先使用消息头中的类型信息(如果存在)。 这允许,例如,监听器方法声明为接口而不是具体类。 此外,类型转换器还支持映射,因此反序列化的目标类型可以与源类型不同(只要数据兼容)。 这在使用类级别@KafkaListener实例时也很有用,其中负载必须在确定要调用的方法之前已转换。 以下示例创建了使用此方法的 beans:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JacksonJsonMessageConverter converter() {
    return new JacksonJsonMessageConverter();
}

请注意,为此要生效,转换目标的方法签名必须是一个具有单个泛型参数类型的容器对象,例如以下所示:spring-doc.cadn.net.cn

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

注意,您仍然可以访问批处理标题。spring-doc.cadn.net.cn

如果批处理转换器具有支持记录转换器,则也可以接收一个其payload根据泛型类型进行转换的消息列表。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
    ...
}

If record in the batch cannot be converted, its payload is set as null into the target payloads list. The conversion exception is logged as warning for this record and also stored into a KafkaHeaders.CONVERSION_FAILURES header as an item of the List<ConversionException>. The target @KafkaListener method may perform Java Stream API to filter out those null values from the payload list or do something with the conversion exceptions header:spring-doc.cadn.net.cn

@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
         @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {

    for (int i = 0; i < list.size(); i++) {
        if (conversionFailures.get(i) != null) {
            throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
        }
    }
}

ConversionService自定义

从版本 2.1.1 开始,用于默认 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory 解析监听器方法参数调用的 org.springframework.core.convert.ConversionService,会在实现以下任一接口的所有 bean 上提供:spring-doc.cadn.net.cn

这允许你在不更改默认配置用于 0 和 1 的情况下,进一步自定义监听器反序列化。spring-doc.cadn.net.cn

通过KafkaListenerConfigurer beans为KafkaListenerEndpointRegistrar设置的自定义MessageHandlerMethodFactory将禁用此功能。

添加自定义HandlerMethodArgumentResolver to @KafkaListener

从 2.4.2 版本开始,您可以添加自己的 HandlerMethodArgumentResolver 并解析自定义方法参数。 您只需要实现 KafkaListenerConfigurer,并使用来自类 KafkaListenerEndpointRegistrar 的方法 setCustomMethodArgumentResolvers()spring-doc.cadn.net.cn

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}

You can also completely replace the framework’s argument resolution by adding a custom MessageHandlerMethodFactory to the KafkaListenerEndpointRegistrar bean. If you do this, and your application needs to handle tombstone records, with a null value() (e.g. from a compacted topic), you should add a KafkaNullAwarePayloadArgumentResolver to the factory; it must be the last resolver because it supports all types and can match arguments without a @Payload annotation. If you are using a DefaultMessageHandlerMethodFactory, set this resolver as the last custom resolver; the factory will ensure that this resolver will be used before the standard PayloadMethodArgumentResolver, which has no knowledge of KafkaNull payloads.spring-doc.cadn.net.cn