|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
测试应用程序
The spring-kafka-test jar 包含一些有用的功能,用于帮助您测试您的应用程序。
嵌入式Kafka Brokers
以下提供了两种实现方式:
-
EmbeddedKafkaZKBroker- 旧实现,启动一个嵌入式的Zookeeper实例(当使用EmbeddedKafka时,默认仍然如此)。 -
EmbeddedKafkaKraftBroker- 使用Kraft而不是Zookeeper在组合控制器和代理模式中(自 3.1 版本起)。
有几种技术可以配置经纪人,如后文各节所述。
Kafka测试工具
org.springframework.kafka.test.utils.KafkaTestUtils 提供了许多静态辅助方法来消费记录、检索各种记录偏移量等。
参见其 Javadocs 以获取完整详情。
JUnit
org.springframework.kafka.test.utils.KafkaTestUtils 也提供了一些静态方法来设置生产者和消费者的属性。
以下列表显示了这些方法的签名:
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) { ... }
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
|
从 2.5 版本开始, 在使用嵌入式消息代理时,最佳实践是为每个测试使用不同的主题,以防止不同测试之间互相干扰。
如果出于某些原因无法做到这一点,注意 |
JUnit 4 的 @Rule 提供了一个用于创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器的包装器。
(参见 @EmbeddedKafka 注解,了解如何使用 @EmbeddedKafka 配合 JUnit 5)。
以下列表显示了这些方法的签名:
/**
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param topics the topics to create (2 partitions per).
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }
/**
*
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param partitions partitions per topic.
* @param topics the topics to create.
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
The EmbeddedKafkaKraftBroker is not supported with JUnit4. |
The EmbeddedKafkaBroker 类有一个实用方法,可以消费它所创建的所有主题。
以下示例展示了如何使用它:
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
的 KafkaTestUtils 包含一些用于从消费者获取结果的实用方法。
以下列表显示了这些方法签名:
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }
/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
下面的例子展示了如何使用KafkaTestUtils:
...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
当通过EmbeddedKafkaBroker启动嵌入式 Kafka 和嵌入式 Zookeeper 服务器时,会设置一个系统属性 spring.embedded.kafka.brokers,该属性的值为 Kafka 协调器的地址,并设置另一个系统属性 spring.embedded.zookeeper.connect,其值为 Zookeeper 的地址。
提供了一些方便的常量 (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS 和 EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT) 用于这些属性。
代替默认的 spring.embedded.kafka.brokers 系统属性,Kafka 引导程序的 broker 地址可以暴露到任意方便的属性上。
为此,可以在启动嵌入式 Kafka 之前设置一个 spring.embedded.kafka.brokers.property (EmbeddedKafkaBroker.BROKER_LIST_PROPERTY)系统属性。
例如,在 Spring Boot 中,需要设置一个 spring.kafka.bootstrap-servers 配置属性以自动配置 Kafka 客户端。
因此,在使用嵌入式 Kafka 在随机端口运行测试之前,可以将 spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers 作为系统属性设置,那么 EmbeddedKafkaBroker 将会使用它来暴露其 broker 地址。
从 3.0.10 版本开始,这是该属性的默认值。
使用 EmbeddedKafkaBroker.brokerProperties(Map<String, String>),您可以为 Kafka 服务器提供额外的属性。
见 Kafka 配置 以了解有关可能的 broker 属性的更多信息。
配置主题
以下示例配置创建了名为cat和hat的主题,每个有五个分区,名为thing1的主题有10个分区,以及名为thing2的主题有15个分区:
public class MyTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");
@Test
public void test() {
embeddedKafkaRule.getEmbeddedKafka()
.addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
...
}
}
默认情况下,当出现问题(例如添加已存在的主题)时,addTopics 会抛出异常。
版本 2.6 增加了该方法的新版本,返回值为 Map<String, Exception>;键是主题名称,值为 null 表示成功,或 Exception 表示失败。
使用相同的经纪人(S)为多个测试类
你可以使用类似的以下内容,为多个测试类使用同一个 broker:
public final class EmbeddedKafkaHolder {
private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
.brokerListProperty("spring.kafka.bootstrap-servers");
private static volatile boolean started;
public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
synchronized (EmbeddedKafkaBroker.class) {
try {
embeddedKafka.afterPropertiesSet();
}
catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
return embeddedKafka;
}
}
这假设使用的是 Spring Boot 环境,并且嵌入式的消息中间件替换了 bootstrap.servers 属性。
然后,在每个测试类中,您可以使用类似以下内容:
static {
EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
如果未使用 Spring Boot,可以通过 broker.getBrokersAsString() 获取 bootstrap servers。
| 上面的例子在所有测试完成后没有提供关闭经纪人(broker)的机制。 这可能在所有测试完成后关闭经纪人(broker)时出现问题。 例如,如果你在使用 Gradle daemon 运行测试,这就会成为一个问题。 你不应该在这种情况下使用这种技术,或者你应该使用某种方式在测试完成后调用对 1 的 0 方法。 |
从 3.0 版本开始,框架暴露了 JUnit 平台的 GlobalEmbeddedKafkaTestExecutionListener;默认情况下是禁用的。
这需要 JUnit 平台 1.8 或更高版本。
该监听器的目的是在整个测试计划中启动一个全局的 EmbeddedKafkaBroker 并在计划结束时停止。
要启用此监听器,从而在整个项目的所有测试中使用一个单独的嵌入式 Kafka 集群,必须通过系统属性或 JUnit 平台配置将 spring.kafka.global.embedded.enabled 属性设置为 true。
此外,还可以提供以下属性:
-
spring.kafka.embedded.count- Kafka 需要管理的 broker 数量; -
spring.kafka.embedded.ports- 每个Kafka broker启动时的端口(逗号分隔值),0表示使用随机端口;该值的数量必须等于上面提到的count; -
spring.kafka.embedded.topics- 以逗号分隔的值,用于在启动的Kafka集群中创建的主题数量; -
spring.kafka.embedded.partitions- 为创建的主题分配的分区数量; -
spring.kafka.embedded.broker.properties.location- 文件的位置以供额外的 Kafka 中继器配置属性使用;此属性的值必须遵循 Spring 资源抽象模式; -
spring.kafka.embedded.kraft- 默认为false,当设置为true时,将使用EmbeddedKafkaKraftBroker代替EmbeddedKafkaZKBroker。
本质上这些属性模拟了一些@EmbeddedKafka属性。
见更多关于配置属性的信息及如何提供它们,请参阅 JUnit 5 用户指南。
例如,在测试类路径中的一个 spring.embedded.kafka.brokers.property=my.bootstrap-servers 文件中可以添加一条 junit-platform.properties 入口信息。从版本 3.0.10 开始,代理会默认将此设置为 spring.kafka.bootstrap-servers,以便与 Spring Boot 应用程序进行测试。
| 不建议在一个测试套件中同时包含一个全局嵌入的Kafka和每个测试类中的嵌入Kafka。 它们共享相同的系统属性,因此很可能会导致不可预见的行为。 |
spring-kafka-test 依赖于 junit-jupiter-api 和 junit-platform-launcher(后者用于支持全局内嵌代理)。
如果你希望使用内嵌代理,但又不使用 JUnit,可能希望排除这些依赖。 |
@EmbeddedKafka注解
我们通常建议您将规则设置为@ClassRule以避免在测试之间启动和停止代理(并为每个测试使用不同的主题)。
自2.0版本起,如果您使用Spring的测试应用程序上下文缓存功能,还可以声明一个EmbeddedKafkaBroker bean,因此单个代理可以在多个测试类之间共享。
为了方便起见,我们提供了一个名为@EmbeddedKafka的测试类级别的注解来注册EmbeddedKafkaBroker bean。
以下示例展示了如何使用它:
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
KafkaStreamsTests.STREAMING_TOPIC1,
KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void someTest() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
}
@Configuration
@EnableKafkaStreams
public static class TestKafkaStreamsConfiguration {
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
return new KafkaStreamsConfiguration(props);
}
}
}
从 2.2.4 版本开始,你也可以使用 @EmbeddedKafka 注解来指定 Kafka 端口属性。
使用 3.2 版本后,将 kraft 属性设置为 true 以使用 EmbeddedKafkaKraftBroker 替代 EmbeddedKafkaZKBroker。
以下示例设置了对 @EmbeddedKafka 的属性占位解析的 topics、brokerProperties 和 brokerPropertiesLocation 属性:
@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
"listeners=PLAINTEXT://localhost:${kafka.broker.port}",
"auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
brokerPropertiesLocation = "classpath:/broker.properties")
在前面的例子中,属性占位符${kafka.topics.another-topic}、${kafka.broker.logs-dir}和${kafka.broker.port}从Spring Environment中解析。
此外,经纪人属性从由brokerPropertiesLocation指定的类路径资源broker.properties加载。
属性占位符对于brokerPropertiesLocationURL以及在该资源中发现的任何属性占位符进行解析。
由brokerProperties定义的属性会覆盖在brokerPropertiesLocation中找到的属性。
您可以使用 @EmbeddedKafka 注解与 JUnit 4 或 JUnit 5。
@EmbeddedKafka注解与 JUnit5
从2.3版本开始,使用@EmbeddedKafka注解与JUnit5结合有两条途径。
当与@SpringJunitConfig注解一起使用时,内嵌的代理会被添加到测试应用上下文中。
您可以在类级别或方法级别自动注入代理以获取代理地址列表。
当不使用 spring 测试上下文时,EmbdeddedKafkaCondition 会创建一个经纪人;该条件包括一个参数解析器,因此你可以在测试方法中访问该经纪人。
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {
@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}
}
除非存在一个同时被@EmbeddedKafka注解和(或)ExtendWith(SpringExtension.class)注解(或通过元注解)注解的类,否则将创建一个独立的代理(位于Spring的TestContext之外)。当@SpringJunitConfig和@SpringBootTest(它们本身也是元注解)存在时,将使用基于上下文的代理。
| 当可用Spring测试应用上下文存在时,主题和经纪人属性可以包含属性占位符,只要某个属性在某处被定义,这些占位符将被解析。 如果不存在Spring上下文,这些占位符将不会被解析。 |
嵌入式 Broker in@SpringBootTest注解
Spring Initializr 现在会自动将 spring-kafka-test 依赖添加到项目配置的测试作用域中。
|
如果您的应用程序使用在
|
有几种方法可以在 Spring Boot 应用程序测试中使用嵌入式消息中间件。
它们包括:
JUnit4 类规则
The following example shows how to use a JUnit4 class rule to create an embedded broker:
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {
@ClassRule
public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
.brokerListProperty("spring.kafka.bootstrap-servers");
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
注意,由于这是一个Spring Boot应用程序,我们重写了broker列表属性以设置Spring Boot的属性。
@EmbeddedKafka使用@SpringJunitConfig
当使用 @EmbeddedKafka 与 @SpringJUnitConfig 时,建议在测试类上使用 @DirtiesContext。
这是为防止在运行测试套件后JVM关闭时出现潜在的竞态条件。
例如,如果没有使用 @DirtiesContext,EmbeddedKafkaBroker 可能会提前关闭,而应用上下文仍需要来自它的资源。
由于每个 EmbeddedKafka 测试运行都会创建自己的临时目录,当出现这种竞态条件时,会产生错误日志,提示其试图删除或清理的文件已不可用。
添加 @DirtiesContext 将确保每次测试后清理应用上下文且不进行缓存,从而减少出现此类资源竞态条件的脆弱性。
@EmbeddedKafka注解或EmbeddedKafkaBrokerBean
以下示例展示了如何使用一个 @EmbeddedKafka 注解来创建一个内嵌的 broker:
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
在版本 3.0.10 及以后,bootstrapServersProperty 会默认设置为 spring.kafka.bootstrap-servers。 |
Hamcrest 匹配器
The org.springframework.kafka.test.hamcrest.KafkaMatchers 提供以下匹配器:
/**
* @param key the key
* @param <K> the type.
* @return a Matcher that matches the key in a consumer record.
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Matcher that matches the value in a consumer record.
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }
/**
* @param partition the partition.
* @return a Matcher that matches the partition in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/**
* Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
*
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
return hasTimestamp(TimestampType.CREATE_TIME, ts);
}
/**
* Matcher testing the timestamp of a {@link ConsumerRecord}
* @param type timestamp type of the record
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
return new ConsumerRecordTimestampMatcher(type, ts);
}
AssertJ 条件判断
您可以使用以下 AssertJ 条件:
/**
* @param key the key
* @param <K> the type.
* @return a Condition that matches the key in a consumer record.
*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Condition that matches the value in a consumer record.
*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }
/**
* @param key the key.
* @param value the value.
* @param <K> the key type.
* @param <V> the value type.
* @return a Condition that matches the key in a consumer record.
* @since 2.2.12
*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }
/**
* @param partition the partition.
* @return a Condition that matches the partition in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
/**
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}
/**
* @param type the type of timestamp
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
return new ConsumerRecordTimestampCondition(type, value);
}
例举
以下示例将本章中讨论的大部分主题结合起来:
public class KafkaTemplateTests {
private static final String TEMPLATE_TOPIC = "templateTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka.getEmbeddedKafka());
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}
}
前面的例子使用了Hamcrest匹配器。
With AssertJ,最终部分看起来如下代码:
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
模拟消费者和生产者
该 kafka-clients 库提供了用于测试目的的 MockConsumer 和 MockProducer 类。
如果希望在您的测试中使用这些类,与监听器容器或KafkaTemplate分别使用,从3.0.7版本开始,框架现在提供了MockConsumerFactory和MockProducerFactory的实现。
这些工厂可以用于监听容器和模板中,代替需要运行中(或嵌入式)消息代理的默认工厂。
这里是单个消费者简单实现的例子:
@Bean
ConsumerFactory<String, String> consumerFactory() {
MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
TopicPartition topicPartition0 = new TopicPartition("topic", 0);
List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
.toMap(Function.identity(), tp -> 0L));
consumer.updateBeginningOffsets(beginningOffsets);
consumer.schedulePollTask(() -> {
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
new RecordHeaders(), Optional.empty()));
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
new RecordHeaders(), Optional.empty()));
});
return new MockConsumerFactory(() -> consumer);
}
如果您希望进行并发测试,工厂构造函数中的0 lambda每次都需要创建一个新的实例。
在MockProducerFactory的情况下,有两种构造器;一种用于创建简单的工厂,另一种用于创建支持事务的工厂。
这里有示例:
@Bean
ProducerFactory<String, String> nonTransFactory() {
return new MockProducerFactory<>(() ->
new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}
@Bean
ProducerFactory<String, String> transFactory() {
MockProducer<String, String> mockProducer =
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
mockProducer.initTransactions();
return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}
注意在第二种情况下,lambda 是一个 BiFunction<Boolean, String>,其中第一个参数在调用方希望使用事务性生产者时为 true;可选的第二个参数包含事务性标识符。
该标识符可以作为构造函数中提供的默认值,也可以通过 KafkaTransactionManager(或 KafkaTemplate 用于本地事务)进行覆盖,前提是已进行相应配置。
在需要根据该值使用不同的 MockProducer 时,可以提供事务性标识符。
如果你在多线程环境中使用生产者,那么当参数为BiFunction时应该返回多个生产者(可能使用线程绑定的ThreadLocal)。
事务性 MockProducers 必须通过调用 initTransaction() 进行初始化以支持事务。 |
When 使用 the MockProducer,如果在不希望每次发送后都关闭生产者的情况下,可以提供一个自定义的 MockProducer 实现,覆盖 close 方法并且不调用超类的 close 方法。
这在测试中很有用,当你需要在同一个生产者上验证多次发布时,无需关闭它。
这是一个示例:
@Bean
MockProducer<String, String> mockProducer() {
return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
@Override
public void close() {
}
};
}
@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
return new MockProducerFactory<>(() -> mockProducer);
}