|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
自定义Kafka活页夹健康指示器
覆盖默认Kafka Binder健康指示器
当Spring Boot执行器出现在类路径上时,Kafka绑定器会激活默认的健康指示器。
该健康指示器检查活页夹的健康状况及与Kafka经纪人的任何沟通问题。
如果应用程序希望禁用该默认健康检查实现并包含自定义实现,则可以提供卡夫卡·宾德健康接口。卡夫卡·宾德健康是一个从健康指标.
在自定义实现中,必须提供健康()方法。
自定义实现必须以豆子的形式存在于应用配置中。
当绑定器发现自定义实现时,它会用该实现代替默认实现。
这里有一个应用程序中自定义实现豆的示例。
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
自定义Kafka Binder Health Indicator示例
这是编写自定义 Kafka 活页夹 HealthIndicator 的伪代码。 在这个例子中,我们尝试通过先检查集群连接性,然后检查主题相关问题,来覆盖 Kafka HealthIndicator 提供的绑定器。
首先,我们需要创建一个自定义的实现卡夫卡·宾德健康接口。
public class KafkaBinderHealthImplementation implements KafkaBinderHealth {
@Value("${spring.cloud.bus.destination}")
private String topic;
private final AdminClient client;
public KafkaBinderHealthImplementation(final KafkaAdmin admin) {
// More about configuring Kafka
// https://docs.spring.io/spring-kafka/reference/html/#configuring-topics
this.client = AdminClient.create(admin.getConfigurationProperties());
}
@Override
public Health health() {
if (!checkBrokersConnection()) {
logger.error("Error when connect brokers");
return Health.down().withDetail("BrokersConnectionError", "Error message").build();
}
if (!checkTopicConnection()) {
logger.error("Error when trying to connect with specific topic");
return Health.down().withDetail("TopicError", "Error message with topic name").build();
}
return Health.up().build();
}
public boolean checkBrokersConnection() {
// Your implementation
}
public boolean checkTopicConnection() {
// Your implementation
}
}
然后我们需要为自定义实现创建一个豆子。
@Configuration
public class KafkaBinderHealthIndicatorConfiguration {
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator(final KafkaAdmin admin) {
return new KafkaBinderHealthImplementation(admin);
}
}