消息头
The 0.11.0.0 客户端引入了对消息中头信息的支持。
从 2.0 版本开始,Spring for Apache Kafka 现在支持将这些头信息映射到和从 spring-messaging MessageHeaders 映射。
之前的版本将 ConsumerRecord 和 ProducerRecord 映射到 spring-messaging 的 Message<?>,其中 value 属性映射到 payload,以及其他属性(topic、partition 等)映射到头(headers)。
这仍然适用,但现在还可以映射额外的(任意的)头(headers)。 |
Apache Kafka 头部的 API 非常简单,如下面的接口定义所示:
public interface Header {
String key();
byte[] value();
}
The KafkaHeaderMapper strategy is provided to map header entries between Kafka Headers and MessageHeaders.
Its interface definition is as follows:
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
The SimpleKafkaHeaderMapper 映射原始头为 byte[],并提供转换为 String 值的配置选项。
The JsonKafkaHeaderMapper maps the key to the MessageHeaders header name and, in order to support rich header types for outbound messages, JSON conversion is performed.
A "special" header (with a key of spring_json_header_types) contains a JSON map of <key>:<type>.
This header is used on the inbound side to provide appropriate conversion of each header value to the original type.
在入站侧,所有 Kafka Header 实例都映射到 MessageHeaders。
在出站侧,默认情况下,所有 MessageHeaders 都已映射,除了 id、timestamp 以及映射到 ConsumerRecord 属性的头信息。
您可以使用模式将要映射的出站消息头进行指定,通过将模式提供给映射器。 以下列表展示了多个示例映射:
public JsonKafkaHeaderMapper() { (1)
...
}
public JsonKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public JsonKafkaHeaderMapper(String... patterns) { (3)
...
}
public JsonKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
| 1 | 使用默认的 Jackson ObjectMapper,并映射大多数标头,如前所述在示例中讨论的那样。 |
| 2 | 使用提供的 Jackson ObjectMapper,并映射大多数头信息,正如在示例之前讨论的那样。 |
| 3 | 使用默认的 Jackson ObjectMapper,并根据提供的模式映射头。 |
| 4 | 使用提供的 Jackson ObjectMapper 并根据提供的模式映射标头。 |
模式相当简单,可以包含一个前缀通配符(*),一个后缀通配符,或两者兼有(例如,*.cat.*)。
你可以用带有前缀!的模式来否定模式。
与一个头名称匹配的第一个模式(无论是正向还是负向)获胜。
当您提供自己的模式时,我们建议包含!id和!timestamp,因为这些标头在入站侧是只读的。
默认情况下,mapper 仅反序列化编号为 java.lang 和 java.util 的类。
可以通过使用 addTrustedPackages 方法添加可信的包来信任其他(或所有)包。
如果收到来自不可信来源的消息,您可能只想添加您信任的那些包。
要信任所有包,可以使用 mapper.addTrustedPackages("*")。 |
将 String 个标头值以原始形式进行映射,在与不熟悉映射器 JSON 格式的系统通信时很有用。 |
从 2.2.5 版本开始,您可以指定某些字符串值的头不应使用 JSON 映射,但可/从原始 byte[] 映射。
AbstractKafkaHeaderMapper 有新属性;当设置为 true 时,所有字符串值的头将使用 charset 属性(默认 UTF-8)转换为 byte[]。
此外,还有一个属性 rawMappedHeaders,其值为 header name : boolean 的映射;如果该映射包含一个头名称,且该头包含 String 值,则将使用字符集直接映射为原始 byte[]。
该映射还用于在以下情况下将原始传入的 byte[] 头映射为 String:当且仅当映射值中的布尔值为 true。
如果布尔值为 false,或者头名称不在映射中且值为 true,则传入的头将直接作为原始未映射头处理。
以下测试用例说明了这种机制。
@Test
public void testSpecificStringConvert() {
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
两者头映射器默认都会映射所有入站头。 自 2.8.8 版本起,模式也可以应用于入站映射。 要创建入站映射的映射器,请使用相应映射器上的静态方法:
public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
JsonKafkaHeaderMapper inboundMapper = JsonKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以 abc 开头的标题,并包含其他所有标题。
默认情况下,在MessagingMessageConverter和BatchMessagingMessageConverter中使用JsonKafkaHeaderMapper,只要Jackson在类路径上。
使用批处理转换器时,转换后的标题在KafkaHeaders.BATCH_CONVERTED_HEADERS中作为List<Map<String, Object>>出现,其中列表中位置的映射对应于payload中数据的位置。
如果不存在转换器(要么是因为 Jackson 不在场,要么是被显式设置为 null),消费者记录中的头部信息将未经转换地提供在 KafkaHeaders.NATIVE_HEADERS 头部。 该头部是一个 Headers 对象(如果是批处理转换器,则是一个 List<Headers>),列表中的位置与有效载荷中的数据位置相对应。
某些类型不适合JSON序列化,对于这些类型,简单的toString()序列化可能更受青睐。
JsonKafkaHeaderMapper具有一种名为addToStringClasses()的方法,该方法让您可以为出站映射提供应以这种方式处理的类的名称。
在入站映射期间,它们被映射为String。
默认情况下,仅org.springframework.util.MimeType和org.springframework.http.MediaType被这样映射。 |
从 2.3 版本开始,String 类型的头部处理被简化。
此类头部不再默认进行 JSON 编码(即不再添加包裹的 "...")。
类型仍然添加到 JSON_TYPES 头部,接收系统可以将其转换回字符串(从 byte[])。
映射器可以处理由旧版本产生的头部(它会检查是否有前置的 ");因此使用 2.3 的应用程序可以消费来自旧版本的记录。 |
为与早期版本兼容,将 encodeStrings 设置为 true,如果由使用 2.3 的版本产生的记录需要被使用早期版本的应用消费。
当所有应用都使用 2.3 或更高版本时,可以将该属性保留默认值 false。 |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,將會自動配置該使用者Converter這個轉換器 bean 到自動配置的 KafkaTemplate;否則您需要將該使用者Converter 添加到模板中。
支持多值头映射
从 4.0 开始,支持多值头映射,其中同一逻辑头键可以在 Kafka 记录中出现多次。
默认情况下,HeaderMapper 不会为相同名称创建多个 Kafka 头。
相反,当它遇到集合值(例如一个 List<byte[]>)时,它会将整个集合序列化为一个 Kafka 头,其值为一个 JSON 数组。
-
生产者侧:
JsonKafkaHeaderMapper写入 JSON 字节,而SimpleKafkaHeaderMapper忽略它。 -
消费者侧: 映射器将标头作为单个值暴露出来——遵循后出现者胜出的规则;更早的重复会被静默丢弃。
保留每个单独的标头需要显式注册模式,该模式将该标头指定为多值标头。
JsonKafkaHeaderMapper#setMultiValueHeaderPatterns(String… patterns) 接受一组模式,这些模式可以是通配符表达式或精确的头名称。
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
// Explicit header names
mapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2");
// Wildcard patterns for test-multi-value1, test-multi-value2
mapper.setMultiValueHeaderPatterns("test-multi-*");
任一名称匹配所提供模式的标题
-
生产者侧: 以单独的Kafka头形式书写,每个元素一个头。
-
消费者端: 收集到一个
List<?>中,该对象包含各个头信息值;每个元素在配置的HeaderMapper执行通常的反序列化或类型转换后返回给应用程序。
| 正则表达式不被支持;仅在简单模式中支持*通配符——支持直接相等和形式如:xxx*,*xxx,*xxx*,xxx*yyy。 |
|
在生产者端,当 |