对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

消息头

The 0.11.0.0 客户端引入了对消息中头信息的支持。 从 2.0 版本开始,Spring for Apache Kafka 现在支持将这些头信息映射到和从 spring-messaging MessageHeaders 映射。spring-doc.cadn.net.cn

之前的版本将 ConsumerRecordProducerRecord 映射到 spring-messaging 的 Message<?>,其中 value 属性映射到 payload,以及其他属性(topicpartition 等)映射到头(headers)。 这仍然适用,但现在还可以映射额外的(任意的)头(headers)。

Apache Kafka 头部的 API 非常简单,如下面的接口定义所示:spring-doc.cadn.net.cn

public interface Header {

    String key();

    byte[] value();

}

The KafkaHeaderMapper strategy is provided to map header entries between Kafka Headers and MessageHeaders. Its interface definition is as follows:spring-doc.cadn.net.cn

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

The SimpleKafkaHeaderMapper 映射原始头为 byte[],并提供转换为 String 值的配置选项。spring-doc.cadn.net.cn

The DefaultKafkaHeaderMapper maps the key to the MessageHeaders header name and, in order to support rich header types for outbound messages, JSON conversion is performed. A "special" header (with a key of spring_json_header_types) contains a JSON map of <key>:<type>. This header is used on the inbound side to provide appropriate conversion of each header value to the original type.spring-doc.cadn.net.cn

在入站侧,所有 Kafka Header 实例都映射到 MessageHeaders。 在出站侧,默认情况下,所有 MessageHeaders 都已映射,除了 idtimestamp 以及映射到 ConsumerRecord 属性的头信息。spring-doc.cadn.net.cn

您可以使用模式将要映射的出站消息头进行指定,通过将模式提供给映射器。 以下列表展示了多个示例映射:spring-doc.cadn.net.cn

public DefaultKafkaHeaderMapper() { (1)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { (3)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
    ...
}
1 使用默认的 Jackson ObjectMapper,并映射大多数标头,如前所述在示例中讨论的那样。
2 使用提供的 Jackson ObjectMapper,并映射大多数头信息,正如在示例之前讨论的那样。
3 使用默认的 Jackson ObjectMapper,并根据提供的模式映射头。
4 使用提供的 Jackson ObjectMapper 并根据提供的模式映射标头。

模式相当简单,可以包含一个前缀通配符(*),一个后缀通配符,或两者兼有(例如,*.cat.*)。 你可以用带有前缀!的模式来否定模式。 与一个头名称匹配的第一个模式(无论是正向还是负向)获胜。spring-doc.cadn.net.cn

当您提供自己的模式时,我们建议包含!id!timestamp,因为这些标头在入站侧是只读的。spring-doc.cadn.net.cn

默认情况下,mapper 仅反序列化编号为 java.langjava.util 的类。 可以通过使用 addTrustedPackages 方法添加可信的包来信任其他(或所有)包。 如果收到来自不可信来源的消息,您可能只想添加您信任的那些包。 要信任所有包,可以使用 mapper.addTrustedPackages("*")
String 个标头值以原始形式进行映射,在与不熟悉映射器 JSON 格式的系统通信时很有用。

从 2.2.5 版本开始,您可以指定某些字符串值的头不应使用 JSON 映射,但可/从原始 byte[] 映射。 AbstractKafkaHeaderMapper 有新属性;当设置为 true 时,所有字符串值的头将使用 charset 属性(默认 UTF-8)转换为 byte[]。 此外,还有一个属性 rawMappedHeaders,其值为 header name : boolean 的映射;如果该映射包含一个头名称,且该头包含 String 值,则将使用字符集直接映射为原始 byte[]。 该映射还用于在以下情况下将原始传入的 byte[] 头映射为 String:当且仅当映射值中的布尔值为 true。 如果布尔值为 false,或者头名称不在映射中且值为 true,则传入的头将直接作为原始未映射头处理。spring-doc.cadn.net.cn

以下测试用例说明了这种机制。spring-doc.cadn.net.cn

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

两者头映射器默认都会映射所有入站头。 自 2.8.8 版本起,模式也可以应用于入站映射。 要创建入站映射的映射器,请使用相应映射器上的静态方法:spring-doc.cadn.net.cn

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

这将排除所有以 abc 开头的标题,并包含其他所有标题。spring-doc.cadn.net.cn

默认情况下,在MessagingMessageConverterBatchMessagingMessageConverter中使用DefaultKafkaHeaderMapper,只要Jackson在类路径上。spring-doc.cadn.net.cn

使用批处理转换器时,转换后的标题在KafkaHeaders.BATCH_CONVERTED_HEADERS中作为List<Map<String, Object>>出现,其中列表中位置的映射对应于payload中数据的位置。spring-doc.cadn.net.cn

如果不存在转换器(要么是因为 Jackson 不在场,要么是被显式设置为 null),消费者记录中的头部信息将未经转换地提供在 KafkaHeaders.NATIVE_HEADERS 头部。 该头部是一个 Headers 对象(如果是批处理转换器,则是一个 List<Headers>),列表中的位置与有效载荷中的数据位置相对应。spring-doc.cadn.net.cn

某些类型不适合JSON序列化,对于这些类型,简单的toString()序列化可能更受青睐。 DefaultKafkaHeaderMapper具有一种名为addToStringClasses()的方法,该方法让您可以为出站映射提供应以这种方式处理的类的名称。 在入站映射期间,它们被映射为String。 默认情况下,仅org.springframework.util.MimeTypeorg.springframework.http.MediaType被这样映射。
从 2.3 版本开始,String 类型的头部处理被简化。 此类头部不再默认进行 JSON 编码(即不再添加包裹的 "...")。 类型仍然添加到 JSON_TYPES 头部,接收系统可以将其转换回字符串(从 byte[])。 映射器可以处理由旧版本产生的头部(它会检查是否有前置的 ");因此使用 2.3 的应用程序可以消费来自旧版本的记录。
为与早期版本兼容,将 encodeStrings 设置为 true,如果由使用 2.3 的版本产生的记录需要被使用早期版本的应用消费。 当所有应用都使用 2.3 或更高版本时,可以将该属性保留默认值 false
@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

如果使用 Spring Boot,將會自動配置該使用者Converter這個轉換器 bean 到自動配置的 KafkaTemplate;否則您需要將該使用者Converter 添加到模板中。spring-doc.cadn.net.cn