Skip to content

Commit 460d994

Browse files
authored
Spring kafka autoconfiguration (#2013)
* Add auto-configuration for spring kafka * Add auto-configuration for spring kafka * Make spring-kafka instrumentation autoconfig conditional on missing brave Co-authored-by: flavium31 <w7og3bljlmmn742puqw2zwzu7ycuwmflr2nb4swrwzivccx6arpq>
1 parent a541d6c commit 460d994

File tree

7 files changed

+305
-0
lines changed

7 files changed

+305
-0
lines changed

docs/src/main/asciidoc/integrations.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ KafkaReceiver<String, String> reactiveKafkaReceiver(TracingKafkaConsumerFactory
2424
}
2525
----
2626

27+
Additionally, we decorate any https://docs.spring.io/spring-kafka/docs/current/reference/html/[Spring Kafka] `ProducerFactory` and `ConsumerFactory` available in the context. However, this is disabled if Brave instrumentation is on the classpath.
28+
2729
[[sleuth-async-integration]]
2830
== Asynchronous Communication
2931

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.sleuth.autoconfig.instrument.kafka;
18+
19+
import org.springframework.beans.factory.BeanFactory;
20+
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
21+
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
22+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
23+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
24+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
25+
import org.springframework.cloud.sleuth.Tracer;
26+
import org.springframework.cloud.sleuth.autoconfig.brave.BraveAutoConfiguration;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.kafka.core.ProducerFactory;
30+
31+
/**
32+
* {@link org.springframework.boot.autoconfigure.EnableAutoConfiguration
33+
* Auto-configuration} that registers instrumentation for Spring Kafka.
34+
*
35+
* @author Anders Clausen
36+
* @author Flaviu Muresan
37+
* @since 3.1.0
38+
*/
39+
@Configuration(proxyBeanMethods = false)
40+
@ConditionalOnClass(ProducerFactory.class)
41+
@ConditionalOnMissingClass("brave.kafka.clients.KafkaTracing")
42+
@ConditionalOnBean(Tracer.class)
43+
@AutoConfigureAfter(BraveAutoConfiguration.class)
44+
@ConditionalOnProperty(value = "spring.sleuth.kafka.enabled", matchIfMissing = true)
45+
public class SpringKafkaAutoConfiguration {
46+
47+
@Bean
48+
static SpringKafkaFactoryBeanPostProcessor springKafkaFactoryBeanPostProcessor(BeanFactory beanFactory) {
49+
return new SpringKafkaFactoryBeanPostProcessor(beanFactory);
50+
}
51+
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.sleuth.autoconfig.instrument.kafka;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
21+
import org.springframework.beans.factory.BeanFactory;
22+
import org.springframework.cloud.sleuth.instrument.kafka.TracingKafkaConsumer;
23+
import org.springframework.kafka.core.ConsumerPostProcessor;
24+
25+
class SpringKafkaConsumerPostProcessor<K, V> implements ConsumerPostProcessor<K, V> {
26+
27+
private final BeanFactory beanFactory;
28+
29+
SpringKafkaConsumerPostProcessor(BeanFactory beanFactory) {
30+
this.beanFactory = beanFactory;
31+
}
32+
33+
@Override
34+
public Consumer<K, V> apply(Consumer<K, V> kvConsumer) {
35+
return new TracingKafkaConsumer<>(kvConsumer, beanFactory);
36+
}
37+
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.sleuth.autoconfig.instrument.kafka;
18+
19+
import org.springframework.beans.BeansException;
20+
import org.springframework.beans.factory.BeanFactory;
21+
import org.springframework.beans.factory.config.BeanPostProcessor;
22+
import org.springframework.kafka.core.ConsumerFactory;
23+
import org.springframework.kafka.core.ProducerFactory;
24+
25+
/**
26+
* Bean post processor for {@link org.springframework.kafka.core.ProducerFactory} and
27+
* {@link org.springframework.kafka.core.ConsumerFactory}.
28+
*
29+
* @author Anders Clausen
30+
* @author Flaviu Muresan
31+
* @since 3.1.0
32+
*/
33+
public class SpringKafkaFactoryBeanPostProcessor implements BeanPostProcessor {
34+
35+
private final BeanFactory beanFactory;
36+
37+
public SpringKafkaFactoryBeanPostProcessor(BeanFactory beanFactory) {
38+
this.beanFactory = beanFactory;
39+
}
40+
41+
@Override
42+
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
43+
if (bean instanceof ConsumerFactory) {
44+
ConsumerFactory factory = (ConsumerFactory) bean;
45+
if (factory.getPostProcessors().stream().noneMatch(o -> o instanceof SpringKafkaConsumerPostProcessor)) {
46+
factory.addPostProcessor(new SpringKafkaConsumerPostProcessor(this.beanFactory));
47+
}
48+
}
49+
else if (bean instanceof ProducerFactory) {
50+
ProducerFactory factory = (ProducerFactory) bean;
51+
if (factory.getPostProcessors().stream().noneMatch(o -> o instanceof SpringKafkaProducerPostProcessor)) {
52+
factory.addPostProcessor(new SpringKafkaProducerPostProcessor(this.beanFactory));
53+
}
54+
}
55+
return bean;
56+
}
57+
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.sleuth.autoconfig.instrument.kafka;
18+
19+
import org.apache.kafka.clients.producer.Producer;
20+
21+
import org.springframework.beans.factory.BeanFactory;
22+
import org.springframework.cloud.sleuth.instrument.kafka.TracingKafkaProducer;
23+
import org.springframework.kafka.core.ProducerPostProcessor;
24+
25+
class SpringKafkaProducerPostProcessor<K, V> implements ProducerPostProcessor<K, V> {
26+
27+
private final BeanFactory beanFactory;
28+
29+
SpringKafkaProducerPostProcessor(BeanFactory beanFactory) {
30+
this.beanFactory = beanFactory;
31+
}
32+
33+
@Override
34+
public Producer<K, V> apply(Producer<K, V> kvProducer) {
35+
return new TracingKafkaProducer<>(kvProducer, beanFactory);
36+
}
37+
38+
}

spring-cloud-sleuth-autoconfigure/src/main/resources/META-INF/spring.factories

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
33
org.springframework.cloud.sleuth.autoconfig.actuate.TraceSleuthActuatorAutoConfiguration,\
44
org.springframework.cloud.sleuth.autoconfig.instrument.kafka.TracingKafkaAutoConfiguration,\
55
org.springframework.cloud.sleuth.autoconfig.instrument.kafka.TracingReactorKafkaAutoConfiguration,\
6+
org.springframework.cloud.sleuth.autoconfig.instrument.kafka.SpringKafkaAutoConfiguration,\
67
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncAutoConfiguration,\
78
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncCustomAutoConfiguration,\
89
org.springframework.cloud.sleuth.autoconfig.instrument.async.TraceAsyncDefaultAutoConfiguration,\
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.sleuth.autoconfig.instrument.kafka;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
22+
import brave.kafka.clients.KafkaTracing;
23+
import org.apache.kafka.clients.consumer.Consumer;
24+
import org.apache.kafka.clients.producer.Producer;
25+
import org.junit.jupiter.api.Test;
26+
27+
import org.springframework.boot.autoconfigure.AutoConfigurations;
28+
import org.springframework.boot.test.context.FilteredClassLoader;
29+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
30+
import org.springframework.cloud.sleuth.autoconfig.TraceNoOpAutoConfiguration;
31+
import org.springframework.kafka.core.ConsumerFactory;
32+
import org.springframework.kafka.core.ConsumerPostProcessor;
33+
import org.springframework.kafka.core.ProducerFactory;
34+
import org.springframework.kafka.core.ProducerPostProcessor;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
class SpringKafkaAutoConfigurationTests {
39+
40+
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
41+
.withPropertyValues("spring.sleuth.noop.enabled=true").withConfiguration(
42+
AutoConfigurations.of(TraceNoOpAutoConfiguration.class, TracingKafkaAutoConfiguration.class,
43+
TracingReactorKafkaAutoConfiguration.class, SpringKafkaAutoConfiguration.class));
44+
45+
@Test
46+
void should_be_disabled_when_brave_on_classpath() {
47+
this.contextRunner
48+
.run((context) -> assertThat(context).doesNotHaveBean(SpringKafkaFactoryBeanPostProcessor.class));
49+
}
50+
51+
@Test
52+
void should_decorate_spring_kafka_producer_factory() {
53+
this.contextRunner.withClassLoader(new FilteredClassLoader(KafkaTracing.class))
54+
.withBean(ProducerFactory.class, TestProducerFactory::new)
55+
.run(context -> assertThat(context).getBean(ProducerFactory.class)
56+
.extracting(ProducerFactory::getPostProcessors).matches(postProcessors -> postProcessors
57+
.stream().filter(p -> p instanceof SpringKafkaProducerPostProcessor).count() == 1));
58+
}
59+
60+
@Test
61+
void should_decorate_spring_kafka_consumer_factory() {
62+
this.contextRunner.withClassLoader(new FilteredClassLoader(KafkaTracing.class))
63+
.withBean(ConsumerFactory.class, TestConsumerFactory::new)
64+
.run(context -> assertThat(context).getBean(ConsumerFactory.class)
65+
.extracting(ConsumerFactory::getPostProcessors).matches(postProcessors -> postProcessors
66+
.stream().filter(p -> p instanceof SpringKafkaConsumerPostProcessor).count() == 1));
67+
}
68+
69+
class TestConsumerFactory implements ConsumerFactory {
70+
71+
List<ConsumerPostProcessor> postProcessors = new ArrayList<>();
72+
73+
@Override
74+
public Consumer createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix) {
75+
return null;
76+
}
77+
78+
@Override
79+
public boolean isAutoCommit() {
80+
return false;
81+
}
82+
83+
@Override
84+
public void addPostProcessor(ConsumerPostProcessor postProcessor) {
85+
this.postProcessors.add(postProcessor);
86+
}
87+
88+
@Override
89+
public List<ConsumerPostProcessor> getPostProcessors() {
90+
return this.postProcessors;
91+
}
92+
93+
}
94+
95+
class TestProducerFactory implements ProducerFactory {
96+
97+
List<ProducerPostProcessor> postProcessors = new ArrayList<>();
98+
99+
@Override
100+
public Producer createProducer() {
101+
return null;
102+
}
103+
104+
@Override
105+
public void addPostProcessor(ProducerPostProcessor postProcessor) {
106+
this.postProcessors.add(postProcessor);
107+
}
108+
109+
@Override
110+
public List<ProducerPostProcessor> getPostProcessors() {
111+
return this.postProcessors;
112+
}
113+
114+
}
115+
116+
}

0 commit comments

Comments
 (0)