对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

邮件头

0.11.0.0 客户端引入了对消息中标头的支持。 从 2.0 版开始,Spring for Apache Kafka 现在支持将这些标头映射到和映射spring-messaging MessageHeaders.spring-doc.cadn.net.cn

映射的先前版本ConsumerRecordProducerRecord到 spring-messagingMessage<?>,其中 value 属性映射到payload和其他属性 (topic,partition,依此类推)映射到标头。 情况仍然如此,但现在可以映射其他(任意)标头。

Apache Kafka 标头有一个简单的 API,如以下接口定义所示:spring-doc.cadn.net.cn

public interface Header {

    String key();

    byte[] value();

}

KafkaHeaderMapper提供策略来在 Kafka 之间映射标头条目HeadersMessageHeaders. 其接口定义如下:spring-doc.cadn.net.cn

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

SimpleKafkaHeaderMapper将原始标头映射为byte[],具有用于转换为String值。spring-doc.cadn.net.cn

DefaultKafkaHeaderMapper将键映射到MessageHeadersheader name,为了支持出站消息的丰富标头类型,执行了 JSON 转换。 A "special“ 标头(键为spring_json_header_types) 包含<key>:<type>. 此标头用于入站端,以提供每个标头值到原始类型的适当转换。spring-doc.cadn.net.cn

在入站方面,所有 KafkaHeader实例映射到MessageHeaders. 在出站端,默认情况下,所有MessageHeaders映射,但id,timestamp,以及映射到ConsumerRecord性能。spring-doc.cadn.net.cn

您可以通过向映射器提供模式来指定要为出站消息映射的标头。 以下列表显示了一些示例映射:spring-doc.cadn.net.cn

public DefaultKafkaHeaderMapper() { (1)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { (3)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
    ...
}
1 使用默认的 JacksonObjectMapper并映射大多数标头,如示例之前所述。
2 使用提供的JacksonObjectMapper并映射大多数标头,如示例之前所述。
3 使用默认的 JacksonObjectMapper并根据提供的模式映射标头。
4 使用提供的JacksonObjectMapper并根据提供的模式映射标头。

模式相当简单,可以包含前导通配符 ()、尾随通配符或两者兼而有之(例如,**.cat.*). 您可以使用前导!. 与标头名称匹配的第一个模式(无论是正数还是负数)获胜。spring-doc.cadn.net.cn

当您提供自己的模式时,我们建议将!id!timestamp,因为这些标头在入站端是只读的。spring-doc.cadn.net.cn

默认情况下,映射器仅反序列化java.langjava.util. 您可以通过添加带有addTrustedPackages方法。 如果您收到来自不受信任的来源的邮件,您可能希望只添加您信任的软件包。 要信任所有包,您可以使用mapper.addTrustedPackages("*").
映射String原始形式的标头值在与不知道映射器的 JSON 格式的系统通信时非常有用。

从版本 2.2.5 开始,您可以指定某些字符串值标头不应使用 JSON 映射,而应映射到原始byte[]. 这AbstractKafkaHeaderMapper有新的属性;mapAllStringsOut当设置为 true 时,所有字符串值标头都将转换为byte[]使用charset属性(默认UTF-8). 此外,还有一个属性rawMappedHeaders,这是header name : boolean;如果映射包含标头名称,并且标头包含String值,它将被映射为 rawbyte[]使用字符集。 此映射还用于映射原始传入byte[]headers 到String使用字符集当且仅当 map 值中的布尔值为true. 如果布尔值为false,或者标头名称不在地图中,并带有true值,则传入标头仅映射为原始未映射标头。spring-doc.cadn.net.cn

以下测试用例说明了此机制。spring-doc.cadn.net.cn

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

默认情况下,两个标头映射器都映射所有入站标头。 从 2.8.8 版开始,这些模式也可以应用于入站映射。 要为入站映射创建映射器,请在相应的映射器上使用静态方法之一:spring-doc.cadn.net.cn

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

这将排除所有以abc并包括所有其他。spring-doc.cadn.net.cn

默认情况下,DefaultKafkaHeaderMapper用于MessagingMessageConverterBatchMessagingMessageConverter,只要Jackson在班级道路上。spring-doc.cadn.net.cn

使用批处理转换器,转换后的标头可在KafkaHeaders.BATCH_CONVERTED_HEADERS作为List<Map<String, Object>>其中,列表中某个位置的映射对应于有效负载中的数据位置。spring-doc.cadn.net.cn

如果没有转换器(因为 Jackson 不存在,或者它被显式设置为null),消费者记录中的标头在KafkaHeaders.NATIVE_HEADERS页眉。 此标头是一个Headers对象(或List<Headers>在批处理转换器的情况下),其中列表中的位置对应于有效负载中的数据位置。spring-doc.cadn.net.cn

某些类型不适合 JSON 序列化,并且简单的toString()序列化可能是这些类型的首选。 这DefaultKafkaHeaderMapper有一个名为addToStringClasses()这使您可以提供应以这种方式处理的类的名称,以便进行出站映射。 在入站映射期间,它们映射为String. 默认情况下,只有org.springframework.util.MimeTypeorg.springframework.http.MediaType以这种方式映射。
从 2.3 版开始,简化了字符串值标头的处理。 默认情况下,此类标头不再采用 JSON 编码(即它们没有封闭"..."添加)。 该类型仍会添加到 JSON_TYPES 标头中,以便接收系统可以转换回 String(从byte[]). 映射器可以处理(解码)旧版本生成的标头(它检查前导);这样,使用 2.3 的应用程序可以使用旧版本的记录。"
要与早期版本兼容,请将encodeStringstrue,如果使用 2.3 版本生成的记录可能被使用早期版本的应用程序使用。 当所有应用程序都使用 2.3 或更高版本时,您可以将属性保留为默认值false.
@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

如果使用 Spring Boot,它会自动将此转换器 bean 配置为 auto-configuredKafkaTemplate;否则,您应该将此转换器添加到模板中。spring-doc.cadn.net.cn