此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

序列化、反序列化和消息转换

概述

Apache Kafka 提供了一个高级 API,用于序列化和反序列化记录值及其键。 它与org.apache.kafka.common.serialization.Serializer<T>org.apache.kafka.common.serialization.Deserializer<T>抽象与一些内置实现。 同时,我们可以使用ProducerConsumer配置属性。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

对于更复杂或更特殊的情况,请执行KafkaConsumer(因此,KafkaProducer) 提供过载 要接受的构造函数SerializerDeserializer实例keysvalues分别。spring-doc.cadn.net.cn

使用此 API 时,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory还提供属性(通过构造函数或 setter 方法)来注入自定义SerializerDeserializer实例到目标中ProducerConsumer. 此外,您可以传入Supplier<Serializer>Supplier<Deserializer>实例通过构造函数 - 这些Suppliers 在创建每个ProducerConsumer.spring-doc.cadn.net.cn

字符串序列化

从 2.5 版开始,Spring for Apache Kafka 提供了ToStringSerializerParseStringDeserializer使用实体的字符串表示的类。 他们依赖于方法toString和一些Function<String>BiFunction<String, Headers>以解析 String 并填充实例的属性。 通常,这会在类上调用一些静态方法,例如parse:spring-doc.cadn.net.cn

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);

默认情况下,ToStringSerializer配置为传达有关记录中序列化实体的类型信息Headers. 您可以通过将addTypeInfo属性设置为false. 此信息可供ParseStringDeserializer在接收方。spring-doc.cadn.net.cn

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS(默认true):您可以将其设置为false要在ToStringSerializer(将addTypeInfo属性)。spring-doc.cadn.net.cn

ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});

您可以配置Charset用于转换String往返byte[]默认值为UTF-8.spring-doc.cadn.net.cn

您可以使用解析器方法的名称配置反序列化器,方法是使用ConsumerConfig性能:spring-doc.cadn.net.cn

属性必须包含类的完全限定名称,后跟方法名称,用句点分隔.. 该方法必须是静态的,并且签名为(String, Headers)(String).spring-doc.cadn.net.cn

一个ToFromStringSerde还提供了用于 Kafka Streams 的。spring-doc.cadn.net.cn

JSON

Spring for Apache Kafka 还提供了JacksonJsonSerializerJacksonJsonDeserializer基于 Jackson JSON 对象映射器。 这JacksonJsonSerializer允许将任何 Java 对象写为 JSONbyte[]. 这JacksonJsonDeserializer需要额外的Class<?> targetType参数,以允许对已使用的byte[]到适当的目标对象。 以下示例演示如何创建JacksonJsonDeserializer:spring-doc.cadn.net.cn

JacksonJsonDeserializer<Thing> thingDeserializer = new JacksonJsonDeserializer<>(Thing.class);

您可以自定义两者JacksonJsonSerializerJacksonJsonDeserializer使用ObjectMapper. 您还可以扩展它们以在configure(Map<String, ?> configs, boolean isKey)方法。spring-doc.cadn.net.cn

从版本 2.3 开始,默认情况下,所有 JSON 感知组件都配置了JacksonUtils.enhancedObjectMapper()实例,它附带了MapperFeature.DEFAULT_VIEW_INCLUSIONDeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES功能已禁用。 此外,此类实例还提供了用于自定义数据类型的知名模块,例如 Java 时间和 Kotlin 支持。 看JacksonUtils.enhancedObjectMapper()JavaDocs 了解更多信息。 此方法还注册一个org.springframework.kafka.support.JacksonMimeTypeModuleorg.springframework.util.MimeType对象序列化为纯字符串,以实现网络上的平台间兼容性。 一个JacksonMimeTypeModule可以在应用程序上下文中注册为 bean,它将自动配置到弹簧靴ObjectMapper实例.spring-doc.cadn.net.cn

同样从 2.3 版本开始,JsonDeserializer提供TypeReference基于构造函数,以便更好地处理目标通用容器类型。spring-doc.cadn.net.cn

从 2.1 版开始,您可以在记录中传达类型信息Headers,允许处理多种类型。 此外,您可以使用以下 Kafka 属性配置序列化器和反序列化程序。 如果您提供了SerializerDeserializer实例KafkaConsumerKafkaProducer分别。spring-doc.cadn.net.cn

配置属性

从版本 2.2 开始,类型信息标头(如果由序列化程序添加)将由反序列化程序删除。 您可以通过将removeTypeHeaders属性设置为false,直接在反序列化程序上或使用前面所述的配置属性。spring-doc.cadn.net.cn

从版本 2.8 开始,如果您以编程方式构造序列化程序或反序列化程序,如编程构造中所示,只要您没有显式设置任何属性(使用set*()方法或使用 Fluent API)。 以前,在以编程方式创建时,从未应用配置属性;如果直接在对象上显式设置属性,情况仍然如此。

映射类型

从版本 2.2 开始,使用 JSON 时,现在可以使用前面列表中的属性提供类型映射。 以前,必须在序列化程序和反序列化程序中自定义类型映射器。 映射由逗号分隔的列表组成token:className对。 在出站时,有效负载的类名将映射到相应的Tokens。 入站时,类型标头中的Tokens映射到相应的类名。spring-doc.cadn.net.cn

以下示例创建一组映射:spring-doc.cadn.net.cn

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
相应的对象必须兼容。

如果您使用 Spring Boot,则可以在application.properties(或 yaml)文件。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

只能使用属性执行简单配置。 对于更高级的配置(例如使用自定义ObjectMapper在序列化程序和反序列化程序中),应使用接受预生成序列化程序和反序列化程序的生产者和使用者工厂构造函数。 以下 Spring Boot 示例覆盖了默认工厂:spring-doc.cadn.net.cn

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

还提供了 setter,作为使用这些构造函数的替代方法。spring-doc.cadn.net.cn

当使用 Spring Boot 并覆盖ConsumerFactoryProducerFactory如上所示,通配符泛型类型需要与 Bean 方法返回类型一起使用。 如果提供了具体的泛型类型,那么 Spring Boot 将忽略这些 bean 并仍然使用默认的 bean。

从版本 2.2 开始,您可以使用具有布尔值的重载构造函数之一显式将反序列化程序配置为使用提供的目标类型并忽略标头中的类型信息useHeadersIfPresent参数(即true默认情况下)。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

使用方法确定类型

从版本 2.5 开始,您现在可以通过属性配置反序列化器以调用方法来确定目标类型。如果存在,这将覆盖上面讨论的任何其他技术。如果数据是由不使用 Spring 序列化程序的应用程序发布的,并且您需要根据数据或其他标头反序列化为不同的类型,这将很有用。将这些属性设置为方法名称 - 一个完全限定的类名,后跟方法名称,用句点分隔.. 该方法必须声明为public static,具有三个签名之一(String topic, byte[] data, Headers headers),(byte[] data, Headers headers)(byte[] data)并返回JacksonJavaType.spring-doc.cadn.net.cn

您可以使用任意标头或检查数据以确定类型。spring-doc.cadn.net.cn

JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}

对于更复杂的数据检查,请考虑使用JsonPath或类似,但是,确定类型的测试越简单,过程就越有效。spring-doc.cadn.net.cn

下面是以编程方式创建反序列化程序的示例(在构造函数中向使用者工厂提供反序列化程序时):spring-doc.cadn.net.cn

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}

程序化施工

从版本 2.3 开始,以编程方式构造序列化程序/反序列化程序以在生产者/消费者工厂中使用时,您可以使用 fluent API,这简化了配置。spring-doc.cadn.net.cn

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}

要以编程方式提供类型映射,类似于使用方法确定类型,请使用typeFunction财产。spring-doc.cadn.net.cn

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);

或者,只要不使用 fluent API 来配置属性,或使用set*()方法,工厂将使用配置属性配置序列化器/反序列化器;请参阅配置属性spring-doc.cadn.net.cn

委派序列化器和解序列化器

使用标题

2.3 版引入了DelegatingSerializerDelegatingDeserializer,允许生成和使用具有不同键和/或值类型的记录。 生产者必须设置标头DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR设置为用于选择要用于该值的序列化程序的选择器值,以及DelegatingSerializer.KEY_SERIALIZATION_SELECTOR钥匙;如果未找到匹配项,则IllegalStateException被抛出。spring-doc.cadn.net.cn

对于传入记录,反序列化程序使用相同的标头来选择要使用的反序列化程序;如果未找到匹配项或标头不存在,则原始byte[]被返回。spring-doc.cadn.net.cn

您可以将选择器的映射配置为Serializer / Deserializer通过构造函数,或者您可以通过 Kafka 生产者/消费者属性使用密钥进行配置DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIGDelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG. 对于序列化程序,生产者属性可以是Map<String, Object>其中键是选择器,值是Serializer实例,一个序列化程序Class或类名。 该属性也可以是逗号分隔的映射条目的字符串,如下所示。spring-doc.cadn.net.cn

对于反序列化程序,使用者属性可以是Map<String, Object>其中键是选择器,值是Deserializer实例,一个解序列化器Class或类名。 该属性也可以是逗号分隔的映射条目的字符串,如下所示。spring-doc.cadn.net.cn

要配置使用属性,请使用以下语法:spring-doc.cadn.net.cn

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

然后,生产者将设置DelegatingSerializer.VALUE_SERIALIZATION_SELECTORheader 到thing1thing2.spring-doc.cadn.net.cn

此技术支持将不同类型发送到同一主题(或不同主题)。spring-doc.cadn.net.cn

从版本 2.5.1 开始,如果类型(键或值)是Serdes (Long,Integer,等)。相反,序列化程序会将标头设置为类型的类名。无需为这些类型配置序列化程序或反序列化程序,它们将动态创建(一次)。

有关将不同类型发送到不同主题的另一种技术,请参阅RoutingKafkaTemplate.spring-doc.cadn.net.cn

按类型

2.8 版本引入了DelegatingByTypeSerializer.spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

从版本 2.8.3 开始,您可以配置序列化器以检查映射键是否可从目标对象分配,这在委托序列化器可以序列化子类时很有用。在这种情况下,如果存在不明确的匹配项,则有序的Map,例如LinkedHashMap应该提供。spring-doc.cadn.net.cn

按主题

从 2.8 版本开始,DelegatingByTopicSerializerDelegatingByTopicDeserializer允许根据主题名称选择序列化器/反序列化器。正则表达式Patterns 用于查找要使用的实例。可以使用构造函数或通过属性(逗号分隔的列表pattern:serializer).spring-doc.cadn.net.cn

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());

KEY_SERIALIZATION_TOPIC_CONFIG将其用于键时。spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            new IntegerSerializer(),
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}

您可以使用以下命令指定在没有模式匹配时使用的默认序列化器/反序列化器DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULTDelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT.spring-doc.cadn.net.cn

附加属性DelegatingByTopicSerialization.CASE_SENSITIVE(默认true),当设置为false使主题查找不区分大小写。spring-doc.cadn.net.cn

重试解序列化程序

RetryingDeserializer使用委托DeserializerRetryTemplate当委托在反序列化期间可能出现暂时性错误(如网络问题)时重试反序列化。spring-doc.cadn.net.cn

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

从版本开始3.1.2一个RecoveryCallback可以设置在RetryingDeserializer选择。spring-doc.cadn.net.cn

请参阅 spring-retry 项目以配置RetryTemplate具有重试策略、退避策略等。spring-doc.cadn.net.cn

Spring Messaging 消息转换

尽管SerializerDeserializerAPI 与低级 Kafka 相比非常简单灵活ConsumerProducer透视,当使用时,您可能需要在 Spring Messaging 级别上具有更大的灵活性@KafkaListenerSpring Integration 的 Apache Kafka 支持。 让您轻松地进行转换org.springframework.messaging.Message,Spring for Apache Kafka 提供了一个MessageConverter抽象与MessagingMessageConverter实现及其JacksonJsonMessageConverter(和子类)自定义。 您可以注入MessageConverter变成一个KafkaTemplate实例直接并使用AbstractKafkaListenerContainerFactorybean 定义@KafkaListener.containerFactory()财产。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

使用 Spring Boot 时,只需将转换器定义为@BeanSpring Boot 自动配置会将其连接到自动配置的模板和容器工厂中。spring-doc.cadn.net.cn

当您使用@KafkaListener,则将参数类型提供给消息转换器以协助转换。spring-doc.cadn.net.cn

只有当@KafkaListener注释是在方法级别声明的。 具有类级@KafkaListener,则有效负载类型用于选择哪个@KafkaHandler方法调用,因此必须先转换后才能选择该方法。spring-doc.cadn.net.cn

在消费者端,您可以配置JacksonJsonMessageConverter;它可以处理ConsumerRecord类型的值byte[],BytesStringso 应与ByteArrayDeserializer,BytesDeserializerStringDeserializer. (byte[]Bytes效率更高,因为它们避免了不必要的byte[]String转换)。 您还可以配置JacksonJsonMessageConverter对应于解序列化器,如果你愿意的话。spring-doc.cadn.net.cn

在生产者端,当您使用 Spring Integration 或KafkaTemplate.send(Message<?> message)方法(参见KafkaTemplate),您必须配置与配置的 Kafka 兼容的消息转换器Serializer.spring-doc.cadn.net.cn

同样,使用byte[]Bytes效率更高,因为它们避免了Stringbyte[]转换。spring-doc.cadn.net.cn

为方便起见,从 2.3 版本开始,该框架还提供了StringOrBytesSerializer它可以序列化所有三种值类型,以便它可以与任何消息转换器一起使用。spring-doc.cadn.net.cn

从版本 2.7.1 开始,消息有效负载转换可以委托给spring-messaging SmartMessageConverter;例如,这使得转换可以基于MessageHeaders.CONTENT_TYPE页眉。spring-doc.cadn.net.cn

KafkaMessageConverter.fromMessage()方法被调用以将出站转换为ProducerRecordProducerRecord.value()财产。 这KafkaMessageConverter.toMessage()方法被调用用于从ConsumerRecord有效载荷为ConsumerRecord.value()财产。 这SmartMessageConverter.toMessage()方法被调用来创建一个新的出站Message<?>Message传递给fromMessage()(通常通过KafkaTemplate.send(Message<?> msg)). 同样,在KafkaMessageConverter.toMessage()方法,在转换器创建新的Message<?>ConsumerRecordSmartMessageConverter.fromMessage()方法,然后使用新转换的有效负载创建最终入站消息。 无论哪种情况,如果SmartMessageConverter返回null,则使用原始消息。

当默认转换器在KafkaTemplate和侦听器容器工厂,则配置SmartMessageConverter通过调用setMessagingConverter()在模板上并通过contentTypeConverter属性@KafkaListener方法。spring-doc.cadn.net.cn

template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}

使用 Spring 数据投影接口

从版本 2.1.1 开始,您可以将 JSON 转换为 Spring Data Projection 接口而不是具体类型。 这允许对数据进行非常有选择性和低耦合的绑定,包括从 JSON 文档中的多个位置查找值。 例如,可以将以下接口定义为消息有效负载类型:spring-doc.cadn.net.cn

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

默认情况下,访问器方法将用于将属性名称查找为接收的 JSON 文档中的字段。 这@JsonPathexpression 允许自定义值查找,甚至可以定义多个 JSON 路径表达式,从多个位置查找值,直到表达式返回实际值。spring-doc.cadn.net.cn

要启用此功能,请使用JacksonProjectingMessageConverter配置了适当的委托转换器(用于出站转换和转换非投影接口)。 您还必须添加spring-data:spring-data-commonscom.jayway.jsonpath:json-path到类路径。spring-doc.cadn.net.cn

当用作参数时@KafkaListener方法,接口类型会正常自动传递给转换器。spring-doc.cadn.net.cn

ErrorHandlingDeserializer

当反序列化程序无法反序列化消息时,Spring 无法处理该问题,因为它发生在poll()返回。 为了解决这个问题,请ErrorHandlingDeserializer已被引入。 此反序列化程序委托给实际的反序列化程序(键或值)。 如果委托未能反序列化记录内容,则ErrorHandlingDeserializer返回一个nullvalue 和DeserializationException在包含原因和原始字节的标头中。 当您使用记录级别MessageListener,如果ConsumerRecord包含一个DeserializationException标头,容器的ErrorHandler使用 failedConsumerRecord. 记录不会传递给侦听器。spring-doc.cadn.net.cn

或者,您可以配置ErrorHandlingDeserializer要通过提供failedDeserializationFunction,这是一个Function<FailedDeserializationInfo, T>. 调用此函数以创建T,以通常的方式传递给听众。 类型FailedDeserializationInfo,其中包含向函数提供的所有上下文信息。 您可以找到DeserializationException(作为序列化的 Java 对象)在标头中。 请参阅 Javadoc 中的ErrorHandlingDeserializer了解更多信息。spring-doc.cadn.net.cn

您可以使用DefaultKafkaConsumerFactory接受键和值的构造函数Deserializer适当的物体和电线ErrorHandlingDeserializer已配置了适当委托的实例。 或者,您可以使用使用者配置属性(由ErrorHandlingDeserializer) 来实例化委托。 属性名称是ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASSErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS. 属性值可以是类或类名称。 以下示例演示如何设置这些属性:spring-doc.cadn.net.cn

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

以下示例使用failedDeserializationFunction.spring-doc.cadn.net.cn

public class BadThing extends Thing {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

  @Override
  public Thing apply(FailedDeserializationInfo info) {
    return new BadThing(info);
  }

}

前面的示例使用以下配置:spring-doc.cadn.net.cn

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消费者配置了ErrorHandlingDeserializer,配置KafkaTemplate及其生产者带有一个序列化器,可以处理普通对象和原始对象byte[]值,这是由反序列化异常产生的。 模板的泛型值类型应为Object. 一种技术是使用DelegatingByTypeSerializer;下面是一个例子:
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

使用ErrorHandlingDeserializer使用批处理侦听器时,必须检查消息标头中的反序列化异常。 当与DefaultBatchErrorHandler,您可以使用该标头来确定异常失败的记录,并通过BatchListenerFailedException.spring-doc.cadn.net.cn

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            try {
                DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                        headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
                if (deserEx != null) {
                    logger.error(deserEx, "Record at index " + i + " could not be deserialized");
                }
            }
            catch (Exception ex) {
                logger.error(ex, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

SerializationUtils.byteArrayToDeserializationException()可用于将标头转换为DeserializationException.spring-doc.cadn.net.cn

食用时List<ConsumerRecord<?, ?>,SerializationUtils.getExceptionFromHeader()代替:spring-doc.cadn.net.cn

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}
如果您还使用DeadLetterPublishingRecoverer,则为DeserializationException将有一个record.value()类型byte[];这不应该序列化。 考虑使用DelegatingByTypeSerializer配置为使用ByteArraySerializerbyte[]以及所有其他类型的普通序列化程序(Json、Avro 等)。

从 3.1 版开始,您可以添加ValidatorErrorHandlingDeserializer. 如果委托Deserializer成功反序列化对象,但该对象未通过验证,则引发异常,类似于发生反序列化异常。 这允许将原始原始数据传递给错误处理程序。 自己创建解序列化器时,只需调用setValidator;如果使用属性配置序列化程序,请设置使用者配置属性ErrorHandlingDeserializer.VALIDATOR_CLASS设置为您的类或完全限定的类名称Validator. 使用 Spring Boot 时,此属性名称为spring.kafka.consumer.properties.spring.deserializer.validator.class.spring-doc.cadn.net.cn

使用批量侦听器的有效负载转换

您还可以使用JacksonJsonMessageConverterBatchMessagingMessageConverter在使用批处理侦听器容器工厂时转换批处理消息。 有关更多信息,请参阅序列化,反序列化和消息转换Spring Messaging Message Conversionspring-doc.cadn.net.cn

默认情况下,转换的类型是从 listener 参数推断出来的。 如果将JacksonJsonMessageConverter使用DefaultJackson2TypeMapper它有它的TypePrecedence设置为TYPE_ID(而不是默认的INFERRED),转换器会改用标头中的类型信息(如果存在)。 例如,这允许使用接口而不是具体类声明侦听器方法。 此外,类型转换器支持映射,因此反序列化可以与源不同(只要数据兼容)。 当您使用类级@KafkaListener实例其中有效负载必须已转换,以确定要调用的方法。 以下示例创建使用此方法的 bean:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}

请注意,要实现此目的,转换目标的方法签名必须是具有单个泛型参数类型的容器对象,如下所示:spring-doc.cadn.net.cn

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

请注意,您仍然可以访问批处理标头。spring-doc.cadn.net.cn

如果批处理转换器具有支持它的记录转换器,您还可以接收根据泛型类型转换有效负载的消息列表。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
    ...
}

如果批处理中的记录无法转换,则其有效负载设置为null进入目标payloads列表。 转换异常记录为此记录的警告,并存储在KafkaHeaders.CONVERSION_FAILURESheader 作为List<ConversionException>. 目标@KafkaListener方法可以执行 JavaStreamAPI 来过滤掉这些null有效负载列表中的值或对 conversion exceptions 标头执行某些作:spring-doc.cadn.net.cn

@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
         @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {

    for (int i = 0; i < list.size(); i++) {
        if (conversionFailures.get(i) != null) {
            throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
        }
    }
}

ConversionService定制

从 2.1.1 版本开始,org.springframework.core.convert.ConversionService默认使用org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory为了解析调用监听器的参数,方法随实现以下任何接口的所有 Bean 一起提供:spring-doc.cadn.net.cn

这使您可以进一步自定义侦听器反序列化,而无需更改ConsumerFactoryKafkaListenerContainerFactory.spring-doc.cadn.net.cn

设置自定义MessageHandlerMethodFactoryKafkaListenerEndpointRegistrar通过KafkaListenerConfigurerbean 禁用此功能。

添加自定义HandlerMethodArgumentResolver@KafkaListener

从 2.4.2 版开始,您可以添加自己的HandlerMethodArgumentResolver并解析自定义方法参数。 您所需要做的就是实现KafkaListenerConfigurer和使用方法setCustomMethodArgumentResolvers()从课堂上KafkaListenerEndpointRegistrar.spring-doc.cadn.net.cn

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}

您还可以通过添加自定义MessageHandlerMethodFactoryKafkaListenerEndpointRegistrar豆。 如果执行此作,并且应用程序需要处理逻辑删除记录,则使用null value()(例如,从压缩的主题),您应该添加一个KafkaNullAwarePayloadArgumentResolver到工厂;它必须是最后一个解析器,因为它支持所有类型,并且可以在没有@Payload注解。 如果您正在使用DefaultMessageHandlerMethodFactory,将此解析器设置为最后一个自定义解析器;工厂将确保该旋转转换器在标准之前使用PayloadMethodArgumentResolver,它不知道KafkaNull负载。spring-doc.cadn.net.cn