Skip to content

Commit 30bb3c4

Browse files
committed
refactor: hardcode key deserializer as before
1 parent f69c966 commit 30bb3c4

File tree

9 files changed

+35
-122
lines changed

9 files changed

+35
-122
lines changed

legacy-connectors/src/test/kotlin/streams/kafka/connect/source/LegacyNeo4jSourceIT.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package streams.kafka.connect.source
1919
import java.time.Duration
2020
import org.apache.avro.generic.GenericRecord
2121
import org.apache.kafka.clients.consumer.KafkaConsumer
22-
import org.apache.kafka.common.serialization.StringDeserializer
2322
import org.junit.jupiter.api.Test
2423
import org.junit.jupiter.api.TestInfo
2524
import org.neo4j.connectors.kafka.testing.GenericRecordSupport.asMap
@@ -44,8 +43,7 @@ class LegacyNeo4jSourceIT {
4443
@Test
4544
fun `reads latest changes from legacy Neo4j source`(
4645
testInfo: TestInfo,
47-
@TopicConsumer(
48-
topic = TOPIC, offset = "earliest", keyDeserializer = StringDeserializer::class)
46+
@TopicConsumer(topic = TOPIC, offset = "earliest")
4947
consumer: KafkaConsumer<String, GenericRecord>,
5048
session: Session
5149
) {

source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceKeyStrategyIT.kt

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616
*/
1717
package org.neo4j.connectors.kafka.source
1818

19-
import io.confluent.kafka.serializers.KafkaAvroDeserializer
2019
import java.time.Duration
2120
import org.apache.avro.generic.GenericData
2221
import org.apache.avro.generic.GenericRecord
2322
import org.apache.kafka.clients.consumer.KafkaConsumer
24-
import org.apache.kafka.common.serialization.StringDeserializer
2523
import org.junit.jupiter.api.Assertions.assertEquals
2624
import org.junit.jupiter.api.Assertions.assertTrue
2725
import org.junit.jupiter.api.Test
@@ -57,10 +55,7 @@ class Neo4jCdcSourceKeyStrategyIT {
5755
@Test
5856
fun `supports skipping serialization of keys`(
5957
testInfo: TestInfo,
60-
@TopicConsumer(
61-
topic = "neo4j-cdc-topic-key-serialization-none",
62-
offset = "earliest",
63-
keyDeserializer = StringDeserializer::class)
58+
@TopicConsumer(topic = "neo4j-cdc-topic-key-serialization-none", offset = "earliest")
6459
consumer: KafkaConsumer<Any, GenericRecord>,
6560
session: Session
6661
) {
@@ -92,10 +87,7 @@ class Neo4jCdcSourceKeyStrategyIT {
9287
@Test
9388
fun `supports serialization of keys as whole values`(
9489
testInfo: TestInfo,
95-
@TopicConsumer(
96-
topic = "neo4j-cdc-topic-key-serialization-whole",
97-
offset = "earliest",
98-
keyDeserializer = KafkaAvroDeserializer::class)
90+
@TopicConsumer(topic = "neo4j-cdc-topic-key-serialization-whole", offset = "earliest")
9991
consumer: KafkaConsumer<GenericRecord, GenericRecord>,
10092
session: Session
10193
) {
@@ -136,10 +128,7 @@ class Neo4jCdcSourceKeyStrategyIT {
136128
@Test
137129
fun `supports serialization of keys as element IDs`(
138130
testInfo: TestInfo,
139-
@TopicConsumer(
140-
topic = "neo4j-cdc-topic-key-serialization-element-ids",
141-
offset = "earliest",
142-
keyDeserializer = StringDeserializer::class)
131+
@TopicConsumer(topic = "neo4j-cdc-topic-key-serialization-element-ids", offset = "earliest")
143132
consumer: KafkaConsumer<String, GenericRecord>,
144133
session: Session
145134
) {
@@ -174,9 +163,7 @@ class Neo4jCdcSourceKeyStrategyIT {
174163
fun `supports serialization of keys as (missing) node keys`(
175164
testInfo: TestInfo,
176165
@TopicConsumer(
177-
topic = "neo4j-cdc-topic-key-serialization-missing-node-keys",
178-
offset = "earliest",
179-
keyDeserializer = KafkaAvroDeserializer::class)
166+
topic = "neo4j-cdc-topic-key-serialization-missing-node-keys", offset = "earliest")
180167
consumer: KafkaConsumer<GenericRecord, GenericRecord>,
181168
session: Session
182169
) {
@@ -210,10 +197,7 @@ class Neo4jCdcSourceKeyStrategyIT {
210197
@Test
211198
fun `supports serialization of keys as node keys`(
212199
testInfo: TestInfo,
213-
@TopicConsumer(
214-
topic = "neo4j-cdc-topic-key-serialization-node-keys",
215-
offset = "earliest",
216-
keyDeserializer = KafkaAvroDeserializer::class)
200+
@TopicConsumer(topic = "neo4j-cdc-topic-key-serialization-node-keys", offset = "earliest")
217201
consumer: KafkaConsumer<GenericRecord, GenericRecord>,
218202
session: Session
219203
) {
@@ -260,9 +244,7 @@ class Neo4jCdcSourceKeyStrategyIT {
260244
fun `supports serialization of keys as (missing) rel keys`(
261245
testInfo: TestInfo,
262246
@TopicConsumer(
263-
topic = "neo4j-cdc-topic-key-serialization-missing-rel-keys",
264-
offset = "earliest",
265-
keyDeserializer = KafkaAvroDeserializer::class)
247+
topic = "neo4j-cdc-topic-key-serialization-missing-rel-keys", offset = "earliest")
266248
consumer: KafkaConsumer<GenericData.Array<GenericRecord>, GenericRecord>,
267249
session: Session
268250
) {
@@ -296,10 +278,7 @@ class Neo4jCdcSourceKeyStrategyIT {
296278
@Test
297279
fun `supports serialization of keys as rel keys`(
298280
testInfo: TestInfo,
299-
@TopicConsumer(
300-
topic = "neo4j-cdc-topic-key-serialization-rel-keys",
301-
offset = "earliest",
302-
keyDeserializer = KafkaAvroDeserializer::class)
281+
@TopicConsumer(topic = "neo4j-cdc-topic-key-serialization-rel-keys", offset = "earliest")
303282
consumer: KafkaConsumer<GenericData.Array<GenericRecord>, GenericRecord>,
304283
session: Session
305284
) {

source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceNodesIT.kt

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.neo4j.connectors.kafka.source
2020
import java.time.Duration
2121
import org.apache.avro.generic.GenericRecord
2222
import org.apache.kafka.clients.consumer.KafkaConsumer
23-
import org.apache.kafka.common.serialization.StringDeserializer
2423
import org.junit.jupiter.api.Test
2524
import org.junit.jupiter.api.TestInfo
2625
import org.neo4j.connectors.kafka.testing.assertions.AvroCdcRecordAssert.Companion.assertThat
@@ -54,10 +53,7 @@ class Neo4jCdcSourceNodesIT {
5453
@Test
5554
fun `should read changes caught by patterns`(
5655
testInfo: TestInfo,
57-
@TopicConsumer(
58-
topic = "neo4j-cdc-topic",
59-
offset = "earliest",
60-
keyDeserializer = StringDeserializer::class)
56+
@TopicConsumer(topic = "neo4j-cdc-topic", offset = "earliest")
6157
consumer: KafkaConsumer<String, GenericRecord>,
6258
session: Session
6359
) {
@@ -115,10 +111,7 @@ class Neo4jCdcSourceNodesIT {
115111
@Test
116112
fun `should read property removal and additions`(
117113
testInfo: TestInfo,
118-
@TopicConsumer(
119-
topic = "neo4j-cdc-topic-prop-remove-add",
120-
offset = "earliest",
121-
keyDeserializer = StringDeserializer::class)
114+
@TopicConsumer(topic = "neo4j-cdc-topic-prop-remove-add", offset = "earliest")
122115
consumer: KafkaConsumer<String, GenericRecord>,
123116
session: Session
124117
) {
@@ -205,10 +198,7 @@ class Neo4jCdcSourceNodesIT {
205198
@Test
206199
fun `should read only specified field changes on update`(
207200
testInfo: TestInfo,
208-
@TopicConsumer(
209-
topic = "neo4j-cdc-update-topic",
210-
offset = "earliest",
211-
keyDeserializer = StringDeserializer::class)
201+
@TopicConsumer(topic = "neo4j-cdc-update-topic", offset = "earliest")
212202
consumer: KafkaConsumer<String, GenericRecord>,
213203
session: Session
214204
) {
@@ -281,10 +271,7 @@ class Neo4jCdcSourceNodesIT {
281271
@Test
282272
fun `should read changes with different properties using the default topic compatibility mode`(
283273
testInfo: TestInfo,
284-
@TopicConsumer(
285-
topic = "neo4j-cdc-create-inc",
286-
offset = "earliest",
287-
keyDeserializer = StringDeserializer::class)
274+
@TopicConsumer(topic = "neo4j-cdc-create-inc", offset = "earliest")
288275
consumer: KafkaConsumer<String, GenericRecord>,
289276
session: Session
290277
) {
@@ -342,14 +329,11 @@ class Neo4jCdcSourceNodesIT {
342329
@Test
343330
fun `should read each operation to a separate topic`(
344331
testInfo: TestInfo,
345-
@TopicConsumer(
346-
topic = "cdc-creates", offset = "earliest", keyDeserializer = StringDeserializer::class)
332+
@TopicConsumer(topic = "cdc-creates", offset = "earliest")
347333
createsConsumer: KafkaConsumer<String, GenericRecord>,
348-
@TopicConsumer(
349-
topic = "cdc-updates", offset = "earliest", keyDeserializer = StringDeserializer::class)
334+
@TopicConsumer(topic = "cdc-updates", offset = "earliest")
350335
updatesConsumer: KafkaConsumer<String, GenericRecord>,
351-
@TopicConsumer(
352-
topic = "cdc-deletes", offset = "earliest", keyDeserializer = StringDeserializer::class)
336+
@TopicConsumer(topic = "cdc-deletes", offset = "earliest")
353337
deletesConsumer: KafkaConsumer<String, GenericRecord>,
354338
session: Session
355339
) {
@@ -424,10 +408,7 @@ class Neo4jCdcSourceNodesIT {
424408
@Test
425409
fun `should read changes marked with specific transaction metadata attribute`(
426410
testInfo: TestInfo,
427-
@TopicConsumer(
428-
topic = "neo4j-cdc-metadata",
429-
offset = "earliest",
430-
keyDeserializer = StringDeserializer::class)
411+
@TopicConsumer(topic = "neo4j-cdc-metadata", offset = "earliest")
431412
consumer: KafkaConsumer<String, GenericRecord>,
432413
session: Session
433414
) {
@@ -477,10 +458,7 @@ class Neo4jCdcSourceNodesIT {
477458
@Test
478459
fun `should read changes containing node keys`(
479460
testInfo: TestInfo,
480-
@TopicConsumer(
481-
topic = "neo4j-cdc-keys",
482-
offset = "earliest",
483-
keyDeserializer = StringDeserializer::class)
461+
@TopicConsumer(topic = "neo4j-cdc-keys", offset = "earliest")
484462
consumer: KafkaConsumer<String, GenericRecord>,
485463
session: Session
486464
) {

source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceRelationshipsIT.kt

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.neo4j.connectors.kafka.source
2020
import java.time.Duration
2121
import org.apache.avro.generic.GenericRecord
2222
import org.apache.kafka.clients.consumer.KafkaConsumer
23-
import org.apache.kafka.common.serialization.StringDeserializer
2423
import org.junit.jupiter.api.Test
2524
import org.junit.jupiter.api.TestInfo
2625
import org.neo4j.connectors.kafka.testing.assertions.AvroCdcRecordAssert.Companion.assertThat
@@ -55,10 +54,7 @@ class Neo4jCdcSourceRelationshipsIT {
5554
@Test
5655
fun `should read changes caught by patterns`(
5756
testInfo: TestInfo,
58-
@TopicConsumer(
59-
topic = "neo4j-cdc-rels",
60-
offset = "earliest",
61-
keyDeserializer = StringDeserializer::class)
57+
@TopicConsumer(topic = "neo4j-cdc-rels", offset = "earliest")
6258
consumer: KafkaConsumer<String, GenericRecord>,
6359
session: Session
6460
) {
@@ -129,10 +125,7 @@ class Neo4jCdcSourceRelationshipsIT {
129125
@Test
130126
fun `should read property removal and additions`(
131127
testInfo: TestInfo,
132-
@TopicConsumer(
133-
topic = "neo4j-cdc-rels-prop-remove-add",
134-
offset = "earliest",
135-
keyDeserializer = StringDeserializer::class)
128+
@TopicConsumer(topic = "neo4j-cdc-rels-prop-remove-add", offset = "earliest")
136129
consumer: KafkaConsumer<String, GenericRecord>,
137130
session: Session
138131
) {
@@ -226,10 +219,7 @@ class Neo4jCdcSourceRelationshipsIT {
226219
@Test
227220
fun `should read only specified field changes on update`(
228221
testInfo: TestInfo,
229-
@TopicConsumer(
230-
topic = "neo4j-cdc-update-rel",
231-
offset = "earliest",
232-
keyDeserializer = StringDeserializer::class)
222+
@TopicConsumer(topic = "neo4j-cdc-update-rel", offset = "earliest")
233223
consumer: KafkaConsumer<String, GenericRecord>,
234224
session: Session
235225
) {
@@ -286,10 +276,7 @@ class Neo4jCdcSourceRelationshipsIT {
286276
@Test
287277
fun `should read changes with different properties using the default topic compatibility mode`(
288278
testInfo: TestInfo,
289-
@TopicConsumer(
290-
topic = "neo4j-cdc-create-inc-rel",
291-
offset = "earliest",
292-
keyDeserializer = StringDeserializer::class)
279+
@TopicConsumer(topic = "neo4j-cdc-create-inc-rel", offset = "earliest")
293280
consumer: KafkaConsumer<String, GenericRecord>,
294281
session: Session
295282
) {
@@ -351,20 +338,11 @@ class Neo4jCdcSourceRelationshipsIT {
351338
@Test
352339
fun `should read each operation to a separate topic`(
353340
testInfo: TestInfo,
354-
@TopicConsumer(
355-
topic = "cdc-creates-rel",
356-
offset = "earliest",
357-
keyDeserializer = StringDeserializer::class)
341+
@TopicConsumer(topic = "cdc-creates-rel", offset = "earliest")
358342
createsConsumer: KafkaConsumer<String, GenericRecord>,
359-
@TopicConsumer(
360-
topic = "cdc-updates-rel",
361-
offset = "earliest",
362-
keyDeserializer = StringDeserializer::class)
343+
@TopicConsumer(topic = "cdc-updates-rel", offset = "earliest")
363344
updatesConsumer: KafkaConsumer<String, GenericRecord>,
364-
@TopicConsumer(
365-
topic = "cdc-deletes-rel",
366-
offset = "earliest",
367-
keyDeserializer = StringDeserializer::class)
345+
@TopicConsumer(topic = "cdc-deletes-rel", offset = "earliest")
368346
deletesConsumer: KafkaConsumer<String, GenericRecord>,
369347
session: Session
370348
) {
@@ -437,10 +415,7 @@ class Neo4jCdcSourceRelationshipsIT {
437415
@Test
438416
fun `should read changes marked with specific transaction metadata attribute`(
439417
testInfo: TestInfo,
440-
@TopicConsumer(
441-
topic = "neo4j-cdc-metadata-rel",
442-
offset = "earliest",
443-
keyDeserializer = StringDeserializer::class)
418+
@TopicConsumer(topic = "neo4j-cdc-metadata-rel", offset = "earliest")
444419
consumer: KafkaConsumer<String, GenericRecord>,
445420
session: Session
446421
) {
@@ -491,10 +466,7 @@ class Neo4jCdcSourceRelationshipsIT {
491466
@Test
492467
fun `should read changes containing relationship keys`(
493468
testInfo: TestInfo,
494-
@TopicConsumer(
495-
topic = "neo4j-cdc-keys-rel",
496-
offset = "earliest",
497-
keyDeserializer = StringDeserializer::class)
469+
@TopicConsumer(topic = "neo4j-cdc-keys-rel", offset = "earliest")
498470
consumer: KafkaConsumer<String, GenericRecord>,
499471
session: Session
500472
) {

source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jSourceIT.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.neo4j.connectors.kafka.source
1919
import java.time.Duration
2020
import org.apache.avro.generic.GenericRecord
2121
import org.apache.kafka.clients.consumer.KafkaConsumer
22-
import org.apache.kafka.common.serialization.StringDeserializer
2322
import org.junit.jupiter.api.Test
2423
import org.junit.jupiter.api.TestInfo
2524
import org.neo4j.connectors.kafka.testing.GenericRecordSupport.asMap
@@ -44,8 +43,7 @@ class Neo4jSourceIT {
4443
@Test
4544
fun `reads latest changes from Neo4j source`(
4645
testInfo: TestInfo,
47-
@TopicConsumer(
48-
topic = TOPIC, offset = "earliest", keyDeserializer = StringDeserializer::class)
46+
@TopicConsumer(topic = TOPIC, offset = "earliest")
4947
consumer: KafkaConsumer<String, GenericRecord>,
5048
session: Session
5149
) {

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtension.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ internal class Neo4jSourceExtension(
5151
// visible for testing
5252
envAccessor: (String) -> String? = System::getenv,
5353
private val driverFactory: (String, AuthToken) -> Driver = GraphDatabase::driver,
54-
private val consumerFactory: (Properties, String) -> KafkaConsumer<String, GenericRecord> =
54+
private val consumerFactory: (Properties, String) -> KafkaConsumer<*, GenericRecord> =
5555
::getSubscribedConsumer,
5656
) : ExecutionCondition, BeforeEachCallback, AfterEachCallback, ParameterResolver {
5757

@@ -162,7 +162,7 @@ internal class Neo4jSourceExtension(
162162
private fun resolveConsumer(
163163
parameterContext: ParameterContext?,
164164
extensionContext: ExtensionContext?
165-
): KafkaConsumer<String, GenericRecord> {
165+
): KafkaConsumer<*, GenericRecord> {
166166
val consumerAnnotation = parameterContext?.parameter?.getAnnotation(TopicConsumer::class.java)!!
167167
val properties = Properties()
168168
properties.setProperty(
@@ -175,7 +175,7 @@ internal class Neo4jSourceExtension(
175175
)
176176
properties.setProperty(
177177
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
178-
consumerAnnotation.keyDeserializer.qualifiedName,
178+
KafkaAvroDeserializer::class.java.getName(),
179179
)
180180
properties.setProperty(
181181
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
@@ -232,8 +232,8 @@ internal class Neo4jSourceExtension(
232232
private fun getSubscribedConsumer(
233233
properties: Properties,
234234
topic: String
235-
): KafkaConsumer<String, GenericRecord> {
236-
val consumer = KafkaConsumer<String, GenericRecord>(properties)
235+
): KafkaConsumer<*, GenericRecord> {
236+
val consumer = KafkaConsumer<Any, GenericRecord>(properties)
237237
consumer.subscribe(listOf(topic))
238238
return consumer
239239
}

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/source/TopicConsumer.kt

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,6 @@
1616
*/
1717
package org.neo4j.connectors.kafka.testing.source
1818

19-
import kotlin.reflect.KClass
20-
2119
@Target(AnnotationTarget.VALUE_PARAMETER)
2220
@Retention(AnnotationRetention.RUNTIME)
23-
annotation class TopicConsumer(
24-
val topic: String,
25-
val offset: String,
26-
val keyDeserializer: KClass<*>
27-
)
21+
annotation class TopicConsumer(val topic: String, val offset: String)

0 commit comments

Comments
 (0)