|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
消息头
The 0.11.0.0 客户端引入了对消息中头信息的支持。
从 2.0 版本开始,Spring for Apache Kafka 现在支持将这些头信息映射到和从 spring-messaging MessageHeaders 映射。
之前的版本将 ConsumerRecord 和 ProducerRecord 映射到 spring-messaging 的 Message<?>,其中 value 属性映射到 payload,以及其他属性(topic、partition 等)映射到头(headers)。
这仍然适用,但现在还可以映射额外的(任意的)头(headers)。 |
Apache Kafka 头部的 API 非常简单,如下面的接口定义所示:
public interface Header {
String key();
byte[] value();
}
The KafkaHeaderMapper strategy is provided to map header entries between Kafka Headers and MessageHeaders.
Its interface definition is as follows:
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
The SimpleKafkaHeaderMapper 映射原始头为 byte[],并提供转换为 String 值的配置选项。
The DefaultKafkaHeaderMapper maps the key to the MessageHeaders header name and, in order to support rich header types for outbound messages, JSON conversion is performed.
A "special" header (with a key of spring_json_header_types) contains a JSON map of <key>:<type>.
This header is used on the inbound side to provide appropriate conversion of each header value to the original type.
在入站侧,所有 Kafka Header 实例都映射到 MessageHeaders。
在出站侧,默认情况下,所有 MessageHeaders 都已映射,除了 id、timestamp 以及映射到 ConsumerRecord 属性的头信息。
您可以使用模式将要映射的出站消息头进行指定,通过将模式提供给映射器。 以下列表展示了多个示例映射:
public DefaultKafkaHeaderMapper() { (1)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public DefaultKafkaHeaderMapper(String... patterns) { (3)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
| 1 | 使用默认的 Jackson ObjectMapper,并映射大多数标头,如前所述在示例中讨论的那样。 |
| 2 | 使用提供的 Jackson ObjectMapper,并映射大多数头信息,正如在示例之前讨论的那样。 |
| 3 | 使用默认的 Jackson ObjectMapper,并根据提供的模式映射头。 |
| 4 | 使用提供的 Jackson ObjectMapper 并根据提供的模式映射标头。 |
模式相当简单,可以包含一个前缀通配符(*),一个后缀通配符,或两者兼有(例如,*.cat.*)。
你可以用带有前缀!的模式来否定模式。
与一个头名称匹配的第一个模式(无论是正向还是负向)获胜。
当您提供自己的模式时,我们建议包含!id和!timestamp,因为这些标头在入站侧是只读的。
默认情况下,mapper 仅反序列化编号为 java.lang 和 java.util 的类。
可以通过使用 addTrustedPackages 方法添加可信的包来信任其他(或所有)包。
如果收到来自不可信来源的消息,您可能只想添加您信任的那些包。
要信任所有包,可以使用 mapper.addTrustedPackages("*")。 |
将 String 个标头值以原始形式进行映射,在与不熟悉映射器 JSON 格式的系统通信时很有用。 |
从 2.2.5 版本开始,您可以指定某些字符串值的头不应使用 JSON 映射,但可/从原始 byte[] 映射。
AbstractKafkaHeaderMapper 有新属性;当设置为 true 时,所有字符串值的头将使用 charset 属性(默认 UTF-8)转换为 byte[]。
此外,还有一个属性 rawMappedHeaders,其值为 header name : boolean 的映射;如果该映射包含一个头名称,且该头包含 String 值,则将使用字符集直接映射为原始 byte[]。
该映射还用于在以下情况下将原始传入的 byte[] 头映射为 String:当且仅当映射值中的布尔值为 true。
如果布尔值为 false,或者头名称不在映射中且值为 true,则传入的头将直接作为原始未映射头处理。
以下测试用例说明了这种机制。
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
两者头映射器默认都会映射所有入站头。 自 2.8.8 版本起,模式也可以应用于入站映射。 要创建入站映射的映射器,请使用相应映射器上的静态方法:
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以 abc 开头的标题,并包含其他所有标题。
默认情况下,在MessagingMessageConverter和BatchMessagingMessageConverter中使用DefaultKafkaHeaderMapper,只要Jackson在类路径上。
使用批处理转换器时,转换后的标题在KafkaHeaders.BATCH_CONVERTED_HEADERS中作为List<Map<String, Object>>出现,其中列表中位置的映射对应于payload中数据的位置。
如果不存在转换器(要么是因为 Jackson 不在场,要么是被显式设置为 null),消费者记录中的头部信息将未经转换地提供在 KafkaHeaders.NATIVE_HEADERS 头部。 该头部是一个 Headers 对象(如果是批处理转换器,则是一个 List<Headers>),列表中的位置与有效载荷中的数据位置相对应。
某些类型不适合JSON序列化,对于这些类型,简单的toString()序列化可能更受青睐。
DefaultKafkaHeaderMapper具有一种名为addToStringClasses()的方法,该方法让您可以为出站映射提供应以这种方式处理的类的名称。
在入站映射期间,它们被映射为String。
默认情况下,仅org.springframework.util.MimeType和org.springframework.http.MediaType被这样映射。 |
从 2.3 版本开始,String 类型的头部处理被简化。
此类头部不再默认进行 JSON 编码(即不再添加包裹的 "...")。
类型仍然添加到 JSON_TYPES 头部,接收系统可以将其转换回字符串(从 byte[])。
映射器可以处理由旧版本产生的头部(它会检查是否有前置的 ");因此使用 2.3 的应用程序可以消费来自旧版本的记录。 |
为与早期版本兼容,将 encodeStrings 设置为 true,如果由使用 2.3 的版本产生的记录需要被使用早期版本的应用消费。
当所有应用都使用 2.3 或更高版本时,可以将该属性保留默认值 false。 |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,將會自動配置該使用者Converter這個轉換器 bean 到自動配置的 KafkaTemplate;否則您需要將該使用者Converter 添加到模板中。