Skip to content

Add Kafka health indicator #11515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

Jcamilorada
Copy link
Contributor

@Jcamilorada Jcamilorada commented Jan 5, 2018

solves 11435

  • added kafka health indicator
  • added kafka health indicator auto-configuration

 - added kafka health indicator
 - added kafka health inidicator auto-configuration
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Jan 5, 2018
Copy link
Member

@snicoll snicoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, it looks quite complete! I am a bit nervous adding the AdminClient as part of this change. Is that a general purpose bean? If so, I guess we probably need a separate PR for that.

/**
* Test for {@link KafkaHealthIndicator}
*/
public class KafkaHealthIndicatorTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should end with Tests.

@@ -138,4 +140,10 @@ public KafkaAdmin kafkaAdmin() {
return kafkaAdmin;
}

@Bean
@ConditionalOnMissingBean(AdminClient.class)
public AdminClient adminClient() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a separate feature that shouldn't be part of this change. Can you please create a separate PR for this?

}

@Configuration
@AutoConfigureBefore(KafkaHealthIndicatorAutoConfiguration.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't use @AutoConfigurationBefore on something that's not an auto-configuration class. User configuration is processed before auto-configuration anyway so that's not needed at all.

import org.springframework.boot.context.properties.ConfigurationProperties;

/**
* External configuration properties for {@link KafkaHealthIndicator}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the word "external" is supposed to mean here. You can inspire yourself from other *Properties type in the same module.

@snicoll snicoll added priority: normal status: waiting-for-feedback We need additional information before we can continue type: enhancement A general enhancement and removed status: waiting-for-triage An issue we've not yet triaged labels Jan 5, 2018
@Jcamilorada
Copy link
Contributor Author

Jcamilorada commented Jan 5, 2018

@snicoll

Thanks!, added AdminClient Bean in independent pr an issue 11517

is there anything that I can to fix build? looks like error is not related to pr changes.

@snicoll
Copy link
Member

snicoll commented Jan 5, 2018

@Jcamilorada thanks for asking, we have a general issue with our CI pipeline and nothing you can do I am afraid. I'll review and build locally.

@snicoll snicoll changed the title GH-11435: Add Kafka health indicator Add Kafka health indicator Jan 5, 2018
@garyrussell
Copy link
Contributor

BTW, one observation - if the producer is configured to support transactions (transactionIdPrefix), the broker will not work if there are less than 3 instances running (maybe more - depending on broker configuration).

I think this would incorrectly report UP.

@snicoll
Copy link
Member

snicoll commented Jan 7, 2018

@garyrussell is there a way to avoid that? Or maybe we should implement the "ping" feature differently then?

@garyrussell
Copy link
Contributor

@snicoll @Jcamilorada

I just ran this test...

@SpringBootApplication
public class Boot11515Application {

	public static void main(String[] args) {
		SpringApplication.run(Boot11515Application.class, args);
	}

	@Bean
	public ApplicationRunner runner(KafkaAdmin admin, KafkaProperties kafkaProps) {
		return args -> {
			AdminClient client = AdminClient.create(admin.getConfig());
			try {
				DescribeClusterOptions options = new DescribeClusterOptions().timeoutMs(60_000);
				DescribeClusterResult cluster = client.describeCluster(options);
				System.out.println(cluster.clusterId().get());
				if (StringUtils.hasText(kafkaProps.getProducer().getTransactionIdPrefix())) {
					KafkaFuture<Collection<Node>> nodesFuture = cluster.nodes();
					Collection<Node> nodes = nodesFuture.get();
					System.out.println(nodes.size());
					if (nodes.size() < 3) {
						System.out.println("Insufficient active nodes for transactions");
					}
				}
			}
			finally {
				client.close();
			}
		};
	}
}

with

spring.kafka.bootstrap-servers=10.0.0.6:9092,10.0.0.6:9093,10.0.0.6:9094
spring.kafka.producer.transaction-id-prefix=myTxId

And it works (detects < 3 active brokers).

However, the 3 is the default value for the broker transaction.state.log.replication.factor property. I don't believe there is any way for the client to query the actual value for that property set on the broker; I suppose it could be made a boot property and the user would be responsible for making sure it matches the actual property, if they want the health check to only report UP if there are enough broker instances, when transactions are in use.

That said, it could be a transient issue...

When I run the spring-kafka transaction test cases, with only one broker, I get this error...

16:10:25.862 ERROR [kafka-request-handler-3][kafka.server.KafkaApis] [KafkaApi-0] Number of alive brokers '1' does not meet the required replication factor '3' for the transactions state topic (configured via 'transaction.state.log.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.

- refactor code to create and destroy admin client when health monitoring
@Jcamilorada
Copy link
Contributor Author

Jcamilorada commented Jan 8, 2018

@snicoll @garyrussell just update pr according to your suggestions. About health monitoring when transaction support I did some testing and replication factor can be obtained using:

String controller = adminClient.describeCluster().controller().get().id().toString();
ConfigResource controllerBroker = new ConfigResource(Type.BROKER, controllerId);
Map<ConfigResource, Config> configs = adminClient.describeConfigs(Collections.singletonList(new ConfigResource(Type.BROKER, controllerId).all();
configs.get("transaction.state.log.replication.factor")

So if you think is appropriated either I can add support for transaction as part of this issue or in another one.

@garyrussell
Copy link
Contributor

Cool. I'll leave it to @snicoll as to whether you should add it to this or create another.

@snicoll
Copy link
Member

snicoll commented Jan 8, 2018

Let's do this here. If there aren't enough brokers, it would be nice to state so in the detail then.

@snicoll
Copy link
Member

snicoll commented Jan 10, 2018

@Jcamilorada Am I right to expect an update of this PR for the thing we've discussed above? Thanks!

@Jcamilorada
Copy link
Contributor Author

@snicoll really sorry for late replay, just comeback for holidays, working in changes from today, so I will update pr asap. thanks!

*
* @author Juan Rada
*/
public class KafkaHealthIndicatorTests {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one can argue that this is an IntegrationTest .

@Jcamilorada
Copy link
Contributor Author

@snicoll @garyrussell pr have been updated to validate replication factor. thanks!

@snicoll snicoll removed the status: waiting-for-feedback We need additional information before we can continue label Feb 6, 2018
@snicoll snicoll added this to the 2.0.0.RC2 milestone Feb 6, 2018
@snicoll snicoll self-assigned this Feb 6, 2018
snicoll pushed a commit that referenced this pull request Feb 8, 2018
@snicoll snicoll closed this in 7cd1982 Feb 8, 2018
snicoll added a commit that referenced this pull request Feb 8, 2018
* pr/11515:
  Polish "Add Kafka health indicator"
  Add Kafka health indicator
@snicoll
Copy link
Member

snicoll commented Feb 8, 2018

@Jcamilorada this now merged with a polish commit, thank you.

@kiview
Copy link

kiview commented Feb 23, 2018

Just as a small heads up, this change lead to our app not booting anymore with RC2 (we are using spring-kafka and kafka-streams), creating a new KafkaAdmin object multiple times and basically hanging. I'll raise an issue once I have more insights, right now I've disabled the healthpoint as a workaround.

@snicoll
Copy link
Member

snicoll commented Feb 23, 2018

ping @garyrussell

@garyrussell
Copy link
Contributor

FYI, I just ran a test (with a 1.0.0 broker) and it worked as expected...

@SpringBootApplication
public class Bgh11515Application {

	public static void main(String[] args) {
		SpringApplication.run(Bgh11515Application.class, args);
	}

	@KafkaListener(id = "bgh11515", topics = "bgh11515")
	public void listen(String in) {
		System.out.println(in);
	}

}

screen shot 2018-02-23 at 10 34 57 am

However, running with an OLD broker (0.10.2.0), I get DOWN and

2018-02-23 10:25:05.636  WARN 72705 --- [nio-8080-exec-5] o.s.b.a.kafka.KafkaHealthIndicator       : Health check failed

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_CONFIGS

I am not sure what the "right" thing to do is there; since the application is functional. Maybe the healthcheck should detect the UnsupportedVersionException for the config check?

@snicoll
Copy link
Member

snicoll commented Feb 23, 2018

@garyrussell depends if we are supposed to support that broker or not. But we can catch that exception and switch the status to UNKNOWN. If you feel we should do something about this, could you please raise a separate issue?

@garyrussell
Copy link
Contributor

@snicoll We use the 1.0.0 client but it can talk to older brokers, so we might not want to restrict their use.

I was thinking a try/catch in getReplicationFactor and return 1 (perhaps a debug log saying the replicationFactor could not be determined).

BTW, when I killed kafka, I got...

screen shot 2018-02-23 at 10 58 12 am

@kiview
Copy link

kiview commented Feb 23, 2018

We are using Confluent 4.0.0 (which is Kafka 1.0.0) and are getting the following log output in kind of a loop (no errors, also please note, that we are running this with containers in a docker-compose setup):

2018-02-23 16:30:35.444  INFO 1 --- [      elastic-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
2018-02-23 16:30:35.444  INFO 1 --- [      elastic-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d
2018-02-23 16:30:40.610  INFO 1 --- [      elastic-2] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
	bootstrap.servers = [kafka:9092]
	client.id = 
	connections.max.idle.ms = 300000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 120000
	retries = 5
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

2018-02-23 16:30:40.611  INFO 1 --- [      elastic-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
2018-02-23 16:30:40.612  INFO 1 --- [      elastic-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d
2018-02-23 16:30:45.787  INFO 1 --- [      elastic-3] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
	bootstrap.servers = [kafka:9092]
	client.id = 
	connections.max.idle.ms = 300000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 120000
	retries = 5
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

2018-02-23 16:30:45.788  INFO 1 --- [      elastic-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
2018-02-23 16:30:45.788  INFO 1 --- [      elastic-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

We are basically using the following Kafka dependencies:

dependencies {
        compile "org.codehaus.groovy:groovy-all:$groovyVersion"
        compile 'org.springframework.boot:spring-boot-starter'
        compile 'org.springframework.boot:spring-boot-starter-validation'
        compile 'org.springframework.boot:spring-boot-starter-webflux'
        compile 'io.github.http-builder-ng:http-builder-ng-core:1.0.3'
        compile 'org.springframework.kafka:spring-kafka'
        compile 'org.springframework.boot:spring-boot-starter-actuator'
        compile 'io.micrometer:micrometer-registry-prometheus'
        compile 'io.dropwizard.metrics:metrics-core:4.0.2'

        testCompile 'com.github.testcontainers:testcontainers-spock:e81d6fe9e4'
        testCompile 'org.springframework.boot:spring-boot-starter-test'
        testCompile 'org.spockframework:spock-spring:1.1-groovy-2.4'
        testCompile 'org.jetbrains:annotations:13.0'
        testCompile 'org.springframework.kafka:spring-kafka-test'
        testCompile 'com.anotherchrisberry:spock-retry:0.6.2'

        optional "org.springframework.boot:spring-boot-configuration-processor"
    }

@snicoll
Copy link
Member

snicoll commented Feb 23, 2018

@kiview we'd really need a sample for this one. I am not sure I understand why several admins are created and how it relates to the health check. Perhaps you have an agressive invocation of the health endpoint somewhere?

@garyrussell
Copy link
Contributor

Looks like a health check is being performed every 5 seconds; who/what is doing that?

I see the same in my logs each time I hit the endpoint (kafka is a bit verbose at INFO level).

What do you see if if you hit the health actuator in a browser?

@garyrussell
Copy link
Contributor

@snicoll a new AdminClient is created/closed for each health check.

@garyrussell
Copy link
Contributor

@kiview Can you explain exactly what you mean by "hanging"? A couple of the polls were done on the same thread so it doesn't look like a "hang" in the indicator.

It looks like the indicator is working ok (although we don't know if it's reporting UP or DOWN) - is there something in your app that's polling the indicator and preventing the app from starting if it reports DOWN?

@kiview
Copy link

kiview commented Feb 23, 2018

Okay, now some things make more sense, thanks for your explanations @garyrussell and @snicoll.
We use the endpoint as a healthcheck in our docker image, like this:

HEALTHCHECK --start-period=10s --interval=5s --timeout=3s \
    CMD curl -f http://localhost:$SERVER_PORT/actuator/health/ | grep '"status":"UP"' || exit 1

Since the endpoint is reporting DOWN, Docker is constantly polling the endpoint. How can I enable additional output for the DOWN reason, like you did in our above example?
By hanging I mean, the application never gets status UP and is never started correctly (this does not happen if I disable the kafka health endpoint).

I'll try to come up with a small example next week, if I can reproduce it easily.

@garyrussell
Copy link
Contributor

See my comment above - if you log the output of the curl command, you should see the reason.

I just submitted a PR to use a single AdminClient instead of connecting a new one each time.

@garyrussell
Copy link
Contributor

Here are the results from a test I ran; the broker is not considered "UP" until there are enough nodes to support the configured replication factor...

DOWN {error=java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.}
DOWN {clusterId=UMVTlmOvQ6uXnpdOcxwEBQ, brokerId=0, nodes=1}
DOWN {clusterId=UMVTlmOvQ6uXnpdOcxwEBQ, brokerId=0, nodes=1}
DOWN {clusterId=UMVTlmOvQ6uXnpdOcxwEBQ, brokerId=0, nodes=1}
DOWN {clusterId=UMVTlmOvQ6uXnpdOcxwEBQ, brokerId=0, nodes=2}
DOWN {clusterId=UMVTlmOvQ6uXnpdOcxwEBQ, brokerId=0, nodes=2}
DOWN {clusterId=UMVTlmOvQ6uXnpdOcxwEBQ, brokerId=0, nodes=2}
UP {clusterId=UMVTlmOvQ6uXnpdOcxwEBQ, brokerId=0, nodes=3}
UP {clusterId=UMVTlmOvQ6uXnpdOcxwEBQ, brokerId=0, nodes=3}

@kiview
Copy link

kiview commented Feb 24, 2018

@garyrussell Thanks for getting back, my curl of the health endpoint is missing the details key, I only see DOWN (I assume missing configuration of endpoint on my side).

Okay, your explanation makes sense, which topic do you use for determining the needed replication factor? I assume here lies the problem with our app (don't have the project in front of me right now, I'll look into it after the weekend).

@garyrussell
Copy link
Contributor

Here's another occurrence : https://stackoverflow.com/questions/48965775/spring-boot-2-0-0-rc2-kafkahealthindicator-actuator-status-down/48966640#48966640 - the workaround in my answer fixed it for me.

It's the transaction.state.log.replication.factor.

We added it because, if you are using transactions, connections hang until there are enough brokers.

@snicoll I wonder if we should either disable the replication check, or somehow only do it if the producer config enables transations?

@Artgit
Copy link

Artgit commented Feb 24, 2018

Hi All, unfortunately, as you may see from SO topic, I was unable to fix the issue with transaction.state.log.replication.factor=1

Maybe I'm doing something wrong or maybe the issue is somewhere else.

@kiview
Copy link

kiview commented Feb 24, 2018

@garyrussell We aren't using transactions so yes, in our case this doesn't really make sense. I think it would be better to have more defensive defaults for the autoconfigured endpoint.

@garyrussell
Copy link
Contributor

I just added another commit to the PR to only check if transaction are enabled; let's see what @snicoll thinks.

@snicoll
Copy link
Member

snicoll commented Feb 25, 2018

I agree we should be more defensive and was about to ask @garyrussell about that actually.

@garyrussell
Copy link
Contributor

Yeah. FYI. the reason we need something if you are using transactions is because the kafka client (currently) simply hangs if there are not enough brokers available KAFKA-6446; no timeouts, nothing.

@snicoll
Copy link
Member

snicoll commented Feb 26, 2018

Thanks all for the feedback. Looking at the issues and how to fix them properly, it became quite apparent that the feature isn't mature enough for us to consider adding an health indicator out of the box for Kafka. We've decided to revert what we have for 2.0 (see #12225)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants