Skip to content

Commit e9ca54a

Browse files
committed
test: add tests for configuration & implemented serializers
1 parent a43a0a8 commit e9ca54a

File tree

14 files changed

+268
-53
lines changed

14 files changed

+268
-53
lines changed

legacy-connectors/src/test/kotlin/streams/kafka/connect/source/LegacyNeo4jSourceIT.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package streams.kafka.connect.source
1919
import java.time.Duration
2020
import org.apache.avro.generic.GenericRecord
2121
import org.apache.kafka.clients.consumer.KafkaConsumer
22+
import org.apache.kafka.common.serialization.StringDeserializer
2223
import org.junit.jupiter.api.Test
2324
import org.junit.jupiter.api.TestInfo
2425
import org.neo4j.connectors.kafka.testing.GenericRecordSupport.asMap
@@ -43,7 +44,8 @@ class LegacyNeo4jSourceIT {
4344
@Test
4445
fun `reads latest changes from legacy Neo4j source`(
4546
testInfo: TestInfo,
46-
@TopicConsumer(topic = TOPIC, offset = "earliest")
47+
@TopicConsumer(
48+
topic = TOPIC, offset = "earliest", keyDeserializer = StringDeserializer::class)
4749
consumer: KafkaConsumer<String, GenericRecord>,
4850
session: Session
4951
) {
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.source
18+
19+
import io.confluent.kafka.serializers.KafkaAvroDeserializer
20+
import java.time.Duration
21+
import org.apache.avro.generic.GenericRecord
22+
import org.apache.kafka.clients.consumer.KafkaConsumer
23+
import org.apache.kafka.common.serialization.StringDeserializer
24+
import org.junit.jupiter.api.Test
25+
import org.junit.jupiter.api.TestInfo
26+
import org.neo4j.connectors.kafka.testing.assertions.AvroCdcRecordAssert
27+
import org.neo4j.connectors.kafka.testing.assertions.EventType
28+
import org.neo4j.connectors.kafka.testing.assertions.Operation
29+
import org.neo4j.connectors.kafka.testing.assertions.TopicVerifier
30+
import org.neo4j.connectors.kafka.testing.source.CdcSource
31+
import org.neo4j.connectors.kafka.testing.source.CdcSourceParam
32+
import org.neo4j.connectors.kafka.testing.source.CdcSourceTopic
33+
import org.neo4j.connectors.kafka.testing.source.Neo4jSource
34+
import org.neo4j.connectors.kafka.testing.source.SourceStrategy
35+
import org.neo4j.connectors.kafka.testing.source.TopicConsumer
36+
import org.neo4j.driver.Session
37+
38+
class Neo4jCdcSourceKeySerializationIT {
39+
40+
@Neo4jSource(
41+
startFrom = "EARLIEST",
42+
strategy = SourceStrategy.CDC,
43+
cdc =
44+
CdcSource(
45+
topics =
46+
arrayOf(
47+
CdcSourceTopic(
48+
topic = "neo4j-cdc-topic-key-serialization-none",
49+
patterns = arrayOf(CdcSourceParam("(:TestSource{name,+execId})")),
50+
keySerialization = "SKIP"),
51+
),
52+
),
53+
)
54+
@Test
55+
fun `supports skipping serialization of keys`(
56+
testInfo: TestInfo,
57+
@TopicConsumer(
58+
topic = "neo4j-cdc-topic-key-serialization-none",
59+
offset = "earliest",
60+
keyDeserializer = StringDeserializer::class)
61+
consumer: KafkaConsumer<Any, GenericRecord>,
62+
session: Session
63+
) {
64+
val executionId = testInfo.displayName + System.currentTimeMillis()
65+
session
66+
.run(
67+
"CREATE (:TestSource {name: 'Jane', execId: \$execId})",
68+
mapOf("execId" to executionId),
69+
)
70+
.consume()
71+
72+
TopicVerifier.create(consumer).assertNoMessageKey().verifyWithin(Duration.ofSeconds(30))
73+
}
74+
75+
@Neo4jSource(
76+
startFrom = "EARLIEST",
77+
strategy = SourceStrategy.CDC,
78+
cdc =
79+
CdcSource(
80+
topics =
81+
arrayOf(
82+
CdcSourceTopic(
83+
topic = "neo4j-cdc-topic-key-serialization-whole",
84+
patterns = arrayOf(CdcSourceParam("(:TestSource{name,+execId})")),
85+
keySerialization = "WHOLE_VALUE"),
86+
),
87+
),
88+
)
89+
@Test
90+
fun `supports serialization of keys as whole values`(
91+
testInfo: TestInfo,
92+
@TopicConsumer(
93+
topic = "neo4j-cdc-topic-key-serialization-whole",
94+
offset = "earliest",
95+
keyDeserializer = KafkaAvroDeserializer::class)
96+
consumer: KafkaConsumer<GenericRecord, GenericRecord>,
97+
session: Session
98+
) {
99+
val executionId = testInfo.displayName + System.currentTimeMillis()
100+
session
101+
.run(
102+
"CREATE (:TestSource {name: 'Jane', execId: \$execId})",
103+
mapOf("execId" to executionId),
104+
)
105+
.consume()
106+
107+
TopicVerifier.create(consumer)
108+
.assertMessageKey { key ->
109+
AvroCdcRecordAssert.assertThat(key)
110+
.hasEventType(EventType.NODE)
111+
.hasOperation(Operation.CREATE)
112+
.labelledAs("TestSource")
113+
.hasNoBeforeState()
114+
.hasAfterStateProperties(mapOf("name" to "Jane", "execId" to executionId))
115+
}
116+
.verifyWithin(Duration.ofSeconds(30))
117+
}
118+
}

source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceNodesIT.kt

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.neo4j.connectors.kafka.source
2020
import java.time.Duration
2121
import org.apache.avro.generic.GenericRecord
2222
import org.apache.kafka.clients.consumer.KafkaConsumer
23+
import org.apache.kafka.common.serialization.StringDeserializer
2324
import org.junit.jupiter.api.Disabled
2425
import org.junit.jupiter.api.Test
2526
import org.junit.jupiter.api.TestInfo
@@ -55,7 +56,10 @@ class Neo4jCdcSourceNodesIT {
5556
@Test
5657
fun `should read changes caught by patterns`(
5758
testInfo: TestInfo,
58-
@TopicConsumer(topic = "neo4j-cdc-topic", offset = "earliest")
59+
@TopicConsumer(
60+
topic = "neo4j-cdc-topic",
61+
offset = "earliest",
62+
keyDeserializer = StringDeserializer::class)
5963
consumer: KafkaConsumer<String, GenericRecord>,
6064
session: Session
6165
) {
@@ -119,7 +123,10 @@ class Neo4jCdcSourceNodesIT {
119123
@Test
120124
fun `should read only specified field changes on update`(
121125
testInfo: TestInfo,
122-
@TopicConsumer(topic = "neo4j-cdc-update-topic", offset = "earliest")
126+
@TopicConsumer(
127+
topic = "neo4j-cdc-update-topic",
128+
offset = "earliest",
129+
keyDeserializer = StringDeserializer::class)
123130
consumer: KafkaConsumer<String, GenericRecord>,
124131
session: Session
125132
) {
@@ -192,7 +199,10 @@ class Neo4jCdcSourceNodesIT {
192199
@Test
193200
fun `should read changes with different properties using the default topic compatibility mode`(
194201
testInfo: TestInfo,
195-
@TopicConsumer(topic = "neo4j-cdc-create-inc", offset = "earliest")
202+
@TopicConsumer(
203+
topic = "neo4j-cdc-create-inc",
204+
offset = "earliest",
205+
keyDeserializer = StringDeserializer::class)
196206
consumer: KafkaConsumer<String, GenericRecord>,
197207
session: Session
198208
) {
@@ -250,11 +260,14 @@ class Neo4jCdcSourceNodesIT {
250260
@Test
251261
fun `should read each operation to a separate topic`(
252262
testInfo: TestInfo,
253-
@TopicConsumer(topic = "cdc-creates", offset = "earliest")
263+
@TopicConsumer(
264+
topic = "cdc-creates", offset = "earliest", keyDeserializer = StringDeserializer::class)
254265
createsConsumer: KafkaConsumer<String, GenericRecord>,
255-
@TopicConsumer(topic = "cdc-updates", offset = "earliest")
266+
@TopicConsumer(
267+
topic = "cdc-updates", offset = "earliest", keyDeserializer = StringDeserializer::class)
256268
updatesConsumer: KafkaConsumer<String, GenericRecord>,
257-
@TopicConsumer(topic = "cdc-deletes", offset = "earliest")
269+
@TopicConsumer(
270+
topic = "cdc-deletes", offset = "earliest", keyDeserializer = StringDeserializer::class)
258271
deletesConsumer: KafkaConsumer<String, GenericRecord>,
259272
session: Session
260273
) {
@@ -329,7 +342,10 @@ class Neo4jCdcSourceNodesIT {
329342
@Test
330343
fun `should read changes marked with specific transaction metadata attribute`(
331344
testInfo: TestInfo,
332-
@TopicConsumer(topic = "neo4j-cdc-metadata", offset = "earliest")
345+
@TopicConsumer(
346+
topic = "neo4j-cdc-metadata",
347+
offset = "earliest",
348+
keyDeserializer = StringDeserializer::class)
333349
consumer: KafkaConsumer<String, GenericRecord>,
334350
session: Session
335351
) {
@@ -379,7 +395,10 @@ class Neo4jCdcSourceNodesIT {
379395
@Test
380396
fun `should read changes containing node keys`(
381397
testInfo: TestInfo,
382-
@TopicConsumer(topic = "neo4j-cdc-keys", offset = "earliest")
398+
@TopicConsumer(
399+
topic = "neo4j-cdc-keys",
400+
offset = "earliest",
401+
keyDeserializer = StringDeserializer::class)
383402
consumer: KafkaConsumer<String, GenericRecord>,
384403
session: Session
385404
) {

source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jCdcSourceRelationshipsIT.kt

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.neo4j.connectors.kafka.source
2020
import java.time.Duration
2121
import org.apache.avro.generic.GenericRecord
2222
import org.apache.kafka.clients.consumer.KafkaConsumer
23+
import org.apache.kafka.common.serialization.StringDeserializer
2324
import org.junit.jupiter.api.Disabled
2425
import org.junit.jupiter.api.Test
2526
import org.junit.jupiter.api.TestInfo
@@ -56,7 +57,10 @@ class Neo4jCdcSourceRelationshipsIT {
5657
@Test
5758
fun `should read changes caught by patterns`(
5859
testInfo: TestInfo,
59-
@TopicConsumer(topic = "neo4j-cdc-rels", offset = "earliest")
60+
@TopicConsumer(
61+
topic = "neo4j-cdc-rels",
62+
offset = "earliest",
63+
keyDeserializer = StringDeserializer::class)
6064
consumer: KafkaConsumer<String, GenericRecord>,
6165
session: Session
6266
) {
@@ -131,7 +135,10 @@ class Neo4jCdcSourceRelationshipsIT {
131135
@Test
132136
fun `should read only specified field changes on update`(
133137
testInfo: TestInfo,
134-
@TopicConsumer(topic = "neo4j-cdc-update-rel", offset = "earliest")
138+
@TopicConsumer(
139+
topic = "neo4j-cdc-update-rel",
140+
offset = "earliest",
141+
keyDeserializer = StringDeserializer::class)
135142
consumer: KafkaConsumer<String, GenericRecord>,
136143
session: Session
137144
) {
@@ -188,7 +195,10 @@ class Neo4jCdcSourceRelationshipsIT {
188195
@Test
189196
fun `should read changes with different properties using the default topic compatibility mode`(
190197
testInfo: TestInfo,
191-
@TopicConsumer(topic = "neo4j-cdc-create-inc-rel", offset = "earliest")
198+
@TopicConsumer(
199+
topic = "neo4j-cdc-create-inc-rel",
200+
offset = "earliest",
201+
keyDeserializer = StringDeserializer::class)
192202
consumer: KafkaConsumer<String, GenericRecord>,
193203
session: Session
194204
) {
@@ -250,11 +260,20 @@ class Neo4jCdcSourceRelationshipsIT {
250260
@Test
251261
fun `should read each operation to a separate topic`(
252262
testInfo: TestInfo,
253-
@TopicConsumer(topic = "cdc-creates-rel", offset = "earliest")
263+
@TopicConsumer(
264+
topic = "cdc-creates-rel",
265+
offset = "earliest",
266+
keyDeserializer = StringDeserializer::class)
254267
createsConsumer: KafkaConsumer<String, GenericRecord>,
255-
@TopicConsumer(topic = "cdc-updates-rel", offset = "earliest")
268+
@TopicConsumer(
269+
topic = "cdc-updates-rel",
270+
offset = "earliest",
271+
keyDeserializer = StringDeserializer::class)
256272
updatesConsumer: KafkaConsumer<String, GenericRecord>,
257-
@TopicConsumer(topic = "cdc-deletes-rel", offset = "earliest")
273+
@TopicConsumer(
274+
topic = "cdc-deletes-rel",
275+
offset = "earliest",
276+
keyDeserializer = StringDeserializer::class)
258277
deletesConsumer: KafkaConsumer<String, GenericRecord>,
259278
session: Session
260279
) {
@@ -327,7 +346,10 @@ class Neo4jCdcSourceRelationshipsIT {
327346
@Test
328347
fun `should read changes marked with specific transaction metadata attribute`(
329348
testInfo: TestInfo,
330-
@TopicConsumer(topic = "neo4j-cdc-metadata-rel", offset = "earliest")
349+
@TopicConsumer(
350+
topic = "neo4j-cdc-metadata-rel",
351+
offset = "earliest",
352+
keyDeserializer = StringDeserializer::class)
331353
consumer: KafkaConsumer<String, GenericRecord>,
332354
session: Session
333355
) {
@@ -378,7 +400,10 @@ class Neo4jCdcSourceRelationshipsIT {
378400
@Test
379401
fun `should read changes containing relationship keys`(
380402
testInfo: TestInfo,
381-
@TopicConsumer(topic = "neo4j-cdc-keys-rel", offset = "earliest")
403+
@TopicConsumer(
404+
topic = "neo4j-cdc-keys-rel",
405+
offset = "earliest",
406+
keyDeserializer = StringDeserializer::class)
382407
consumer: KafkaConsumer<String, GenericRecord>,
383408
session: Session
384409
) {

source-connector/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jSourceIT.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.neo4j.connectors.kafka.source
1919
import java.time.Duration
2020
import org.apache.avro.generic.GenericRecord
2121
import org.apache.kafka.clients.consumer.KafkaConsumer
22+
import org.apache.kafka.common.serialization.StringDeserializer
2223
import org.junit.jupiter.api.Test
2324
import org.junit.jupiter.api.TestInfo
2425
import org.neo4j.connectors.kafka.testing.GenericRecordSupport.asMap
@@ -43,7 +44,8 @@ class Neo4jSourceIT {
4344
@Test
4445
fun `reads latest changes from Neo4j source`(
4546
testInfo: TestInfo,
46-
@TopicConsumer(topic = TOPIC, offset = "earliest")
47+
@TopicConsumer(
48+
topic = TOPIC, offset = "earliest", keyDeserializer = StringDeserializer::class)
4749
consumer: KafkaConsumer<String, GenericRecord>,
4850
session: Session
4951
) {

source/src/main/kotlin/org/neo4j/connectors/kafka/source/SourceConfiguration.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,8 @@ class SourceConfiguration(originals: Map<*, *>) :
500500
}
501501
}
502502
if (CDC_KEY_STRATEGY_REGEX.matches(it.key)) {
503-
Validators.enum(Neo4jCdcKeySerializationStrategy::class.java).ensureValid(it.key, it.value)
503+
Validators.enum(Neo4jCdcKeySerializationStrategy::class.java)
504+
.ensureValid(it.key, it.value)
504505
}
505506
} catch (e: ConfigException) {
506507
strategy.addErrorMessage(e.message)

source/src/test/kotlin/org/neo4j/connectors/kafka/source/SourceConfigurationTest.kt

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -591,20 +591,19 @@ class SourceConfigurationTest {
591591
@Test
592592
fun `fail validation on invalid CDC key serialization strategy`() {
593593
assertFailsWith(ConfigException::class) {
594-
SourceConfiguration(
595-
mapOf(
596-
Neo4jConfiguration.URI to "neo4j://localhost",
597-
Neo4jConfiguration.AUTHENTICATION_TYPE to AuthenticationType.NONE.name,
598-
SourceConfiguration.STRATEGY to "CDC",
599-
SourceConfiguration.START_FROM to "EARLIEST",
600-
SourceConfiguration.BATCH_SIZE to "10000",
601-
SourceConfiguration.ENFORCE_SCHEMA to "true",
602-
SourceConfiguration.CDC_POLL_INTERVAL to "5s",
603-
"neo4j.cdc.topic.topic-1.patterns" to "(),()-[]-()",
604-
"neo4j.cdc.topic.topic-1.key-strategy" to "INVALID"
605-
)
606-
).validate()
607-
}
594+
SourceConfiguration(
595+
mapOf(
596+
Neo4jConfiguration.URI to "neo4j://localhost",
597+
Neo4jConfiguration.AUTHENTICATION_TYPE to AuthenticationType.NONE.name,
598+
SourceConfiguration.STRATEGY to "CDC",
599+
SourceConfiguration.START_FROM to "EARLIEST",
600+
SourceConfiguration.BATCH_SIZE to "10000",
601+
SourceConfiguration.ENFORCE_SCHEMA to "true",
602+
SourceConfiguration.CDC_POLL_INTERVAL to "5s",
603+
"neo4j.cdc.topic.topic-1.patterns" to "(),()-[]-()",
604+
"neo4j.cdc.topic.topic-1.key-strategy" to "INVALID"))
605+
.validate()
606+
}
608607
.also {
609608
it shouldHaveMessage
610609
"Invalid value INVALID for configuration neo4j.cdc.topic.topic-1.key-strategy: Must be one of: 'SKIP', 'ELEMENT_ID', 'ENTITY_KEYS', 'WHOLE_VALUE'."

0 commit comments

Comments
 (0)