此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
邮件头
0.11.0.0 客户端引入了对消息中标头的支持。
从 2.0 版开始,Spring for Apache Kafka 现在支持将这些标头映射到和映射spring-messaging
MessageHeaders
.
映射的先前版本ConsumerRecord 和ProducerRecord 到 spring-messagingMessage<?> ,其中 value 属性映射到payload 和其他属性 (topic ,partition ,依此类推)映射到标头。
情况仍然如此,但现在可以映射其他(任意)标头。 |
Apache Kafka 标头有一个简单的 API,如以下接口定义所示:
public interface Header {
String key();
byte[] value();
}
这KafkaHeaderMapper
提供策略来在 Kafka 之间映射标头条目Headers
和MessageHeaders
.
其接口定义如下:
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
这SimpleKafkaHeaderMapper
将原始标头映射为byte[]
,具有用于转换为String
值。
这JsonKafkaHeaderMapper
将键映射到MessageHeaders
header name,为了支持出站消息的丰富标头类型,执行了 JSON 转换。
A "special
“ 标头(键为spring_json_header_types
) 包含<key>:<type>
.
此标头用于入站端,以提供每个标头值到原始类型的适当转换。
在入站方面,所有 KafkaHeader
实例映射到MessageHeaders
.
在出站端,默认情况下,所有MessageHeaders
映射,但id
,timestamp
,以及映射到ConsumerRecord
性能。
您可以通过向映射器提供模式来指定要为出站消息映射的标头。 以下列表显示了一些示例映射:
public JsonKafkaHeaderMapper() { (1)
...
}
public JsonKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public JsonKafkaHeaderMapper(String... patterns) { (3)
...
}
public JsonKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
1 | 使用默认的 JacksonObjectMapper 并映射大多数标头,如示例之前所述。 |
2 | 使用提供的JacksonObjectMapper 并映射大多数标头,如示例之前所述。 |
3 | 使用默认的 JacksonObjectMapper 并根据提供的模式映射标头。 |
4 | 使用提供的JacksonObjectMapper 并根据提供的模式映射标头。 |
模式相当简单,可以包含前导通配符 ()、尾随通配符或两者兼而有之(例如,*
*.cat.*
).
您可以使用前导!
.
与标头名称匹配的第一个模式(无论是正数还是负数)获胜。
当您提供自己的模式时,我们建议将!id
和!timestamp
,因为这些标头在入站端是只读的。
默认情况下,映射器仅反序列化java.lang 和java.util .
您可以通过添加带有addTrustedPackages 方法。
如果您收到来自不受信任的来源的邮件,您可能希望只添加您信任的软件包。
要信任所有包,您可以使用mapper.addTrustedPackages("*") . |
映射String 原始形式的标头值在与不知道映射器的 JSON 格式的系统通信时非常有用。 |
从版本 2.2.5 开始,您可以指定某些字符串值标头不应使用 JSON 映射,而应映射到原始byte[]
.
这AbstractKafkaHeaderMapper
有新的属性;mapAllStringsOut
当设置为 true 时,所有字符串值标头都将转换为byte[]
使用charset
属性(默认UTF-8
).
此外,还有一个属性rawMappedHeaders
,这是header name : boolean
;如果映射包含标头名称,并且标头包含String
值,它将被映射为 rawbyte[]
使用字符集。
此映射还用于映射原始传入byte[]
headers 到String
使用字符集当且仅当 map 值中的布尔值为true
.
如果布尔值为false
,或者标头名称不在地图中,并带有true
值,则传入标头仅映射为原始未映射标头。
以下测试用例说明了此机制。
@Test
public void testSpecificStringConvert() {
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
默认情况下,两个标头映射器都映射所有入站标头。 从 2.8.8 版开始,这些模式也可以应用于入站映射。 要为入站映射创建映射器,请在相应的映射器上使用静态方法之一:
public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
JsonKafkaHeaderMapper inboundMapper = JsonKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以abc
并包括所有其他。
默认情况下,JsonKafkaHeaderMapper
用于MessagingMessageConverter
和BatchMessagingMessageConverter
,只要Jackson在班级道路上。
使用批处理转换器,转换后的标头可在KafkaHeaders.BATCH_CONVERTED_HEADERS
作为List<Map<String, Object>>
其中,列表中某个位置的映射对应于有效负载中的数据位置。
如果没有转换器(因为 Jackson 不存在,或者它被显式设置为null
),消费者记录中的标头在KafkaHeaders.NATIVE_HEADERS
页眉。
此标头是一个Headers
对象(或List<Headers>
在批处理转换器的情况下),其中列表中的位置对应于有效负载中的数据位置。
某些类型不适合 JSON 序列化,并且简单的toString() 序列化可能是这些类型的首选。
这JsonKafkaHeaderMapper 有一个名为addToStringClasses() 这使您可以提供应以这种方式处理的类的名称,以便进行出站映射。
在入站映射期间,它们映射为String .
默认情况下,只有org.springframework.util.MimeType 和org.springframework.http.MediaType 以这种方式映射。 |
从 2.3 版开始,简化了字符串值标头的处理。
默认情况下,此类标头不再采用 JSON 编码(即它们没有封闭"..." 添加)。
该类型仍会添加到 JSON_TYPES 标头中,以便接收系统可以转换回 String(从byte[] ).
映射器可以处理(解码)旧版本生成的标头(它检查前导);这样,使用 2.3 的应用程序可以使用旧版本的记录。" |
要与早期版本兼容,请将encodeStrings 自true ,如果使用 2.3 版本生成的记录可能被使用早期版本的应用程序使用。
当所有应用程序都使用 2.3 或更高版本时,您可以将属性保留为默认值false . |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,它会自动将此转换器 bean 配置为 auto-configuredKafkaTemplate
;否则,您应该将此转换器添加到模板中。
支持多值标头映射
从 4.0 开始,支持多值标头映射,其中相同的逻辑标头键在 Kafka 记录中出现多次。
默认情况下,HeaderMapper
不会创建多个具有相同名称的 Kafka 标头。
相反,当它遇到集合值(例如List<byte[]>
),它将整个集合序列化为一个 Kafka 标头,其值是 JSON 数组。
-
生产方:
JsonKafkaHeaderMapper
写入 JSON 字节,而SimpleKafkaHeaderMapper
忽略它。 -
消费者端:映射器将标头公开为单个值——最后出现的获胜;较早的重复项将以静默方式丢弃。
保留每个单独的标头需要显式注册将标头指定为多值的模式。
JsonKafkaHeaderMapper#setMultiValueHeaderPatterns(String… patterns)
接受模式列表,这些模式可以是通配符表达式或确切的标头名称。
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
// Explicit header names
mapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2");
// Wildcard patterns for test-multi-value1, test-multi-value2
mapper.setMultiValueHeaderPatterns("test-multi-*");
名称与提供的模式之一匹配的任何标头都是
-
生产者端:写成单独的 Kafka 头,每个元素一个。
-
消费者端:收集到
List<?>
包含单个标头值;每个元素都返回给应用程序在配置的HeaderMapper
.
不支持正则表达式;在简单模式中只允许使用 * 通配符,支持直接相等和形式,例如:xxx*、*xxx、*xxx*、xxx*y。 |
在生产者方面,当 |