|
31 | 31 | import org.junit.ClassRule;
|
32 | 32 | import org.junit.Test;
|
33 | 33 |
|
| 34 | +import org.springframework.beans.DirectFieldAccessor; |
34 | 35 | import org.springframework.boot.WebApplicationType;
|
35 | 36 | import org.springframework.boot.autoconfigure.SpringBootApplication;
|
36 | 37 | import org.springframework.boot.builder.SpringApplicationBuilder;
|
37 | 38 | import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
|
38 | 39 | import org.springframework.context.ConfigurableApplicationContext;
|
39 | 40 | import org.springframework.context.annotation.Bean;
|
40 | 41 | import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
| 42 | +import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; |
41 | 43 | import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
42 | 44 |
|
43 | 45 | import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
@@ -141,19 +143,18 @@ public void testStreamConfigGlobalProperties_GH1149() {
|
141 | 143 | assertThat(streamsConfiguration3.containsKey("spring.json.value.type.method")).isFalse();
|
142 | 144 | applicationContext.getBean(KeyValueSerdeResolver.class);
|
143 | 145 |
|
144 |
| - //TODO: In Kafka Streams 3.1, taskTopology field is removed. Re-evaluate this testing strategy. |
145 |
| -// String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) |
146 |
| -// .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2"); |
147 |
| -// |
148 |
| -// assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver); |
149 |
| -// |
150 |
| -// String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) |
151 |
| -// .getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName")); |
152 |
| -// assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName); |
153 |
| -// |
154 |
| -// String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) |
155 |
| -// .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName")); |
156 |
| -// assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName); |
| 146 | + String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) |
| 147 | + .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2"); |
| 148 | + |
| 149 | + assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver); |
| 150 | + |
| 151 | + String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) |
| 152 | + .getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName")); |
| 153 | + assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName); |
| 154 | + |
| 155 | + String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) |
| 156 | + .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName")); |
| 157 | + assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName); |
157 | 158 |
|
158 | 159 | applicationContext.close();
|
159 | 160 | }
|
|
0 commit comments