Skip to content

Commit 20c20a7

Browse files
authored
feat: introduce e2e test infrastructure for sink (#22)
Introduces testing infrastructure to test the Neo4j sink connector. This also improves the coverage of both source and sink test components. Finally, this commit moves the version the Kafka Connect/Kafka cluster to the minimum version supported by Confluent.
1 parent 62fbfee commit 20c20a7

33 files changed

+2074
-435
lines changed

docker/docker-compose.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ services:
2121
retries: 5
2222

2323
zookeeper:
24-
image: confluentinc/cp-zookeeper:7.5.0
24+
image: confluentinc/cp-zookeeper:6.1.13
2525
hostname: zookeeper
2626
container_name: zookeeper
2727
ports:
@@ -38,7 +38,7 @@ services:
3838
retries: 5
3939

4040
broker:
41-
image: confluentinc/cp-server:7.5.0
41+
image: confluentinc/cp-server:6.1.13
4242
hostname: broker
4343
container_name: broker
4444
depends_on:
@@ -74,7 +74,7 @@ services:
7474
retries: 5
7575

7676
schema-registry:
77-
image: confluentinc/cp-schema-registry:7.5.0
77+
image: confluentinc/cp-schema-registry:6.1.13
7878
hostname: schema-registry
7979
container_name: schema-registry
8080
depends_on:
@@ -94,7 +94,7 @@ services:
9494
retries: 5
9595

9696
connect:
97-
image: confluentinc/cp-server-connect:7.5.0
97+
image: confluentinc/cp-server-connect:6.1.13
9898
hostname: connect
9999
container_name: connect
100100
depends_on:
@@ -138,7 +138,7 @@ services:
138138
retries: 5
139139

140140
control-center:
141-
image: confluentinc/cp-enterprise-control-center:7.5.0
141+
image: confluentinc/cp-enterprise-control-center:6.1.13
142142
hostname: control-center
143143
container_name: control-center
144144
depends_on:

pom.xml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@
4747
<jackson.version>2.15.2</jackson.version>
4848
<java.version>11</java.version>
4949
<junit-jupiter.version>5.10.0</junit-jupiter.version>
50-
<kafka-avro-serializer.version>5.2.2</kafka-avro-serializer.version>
51-
<kafka.version>2.6.3</kafka.version>
50+
<!-- keep the next two in sync: https://docs.confluent.io/platform/current/installation/versions-interoperability.html -->
51+
<!-- as well as versions in the docker-compose.yml file -->
52+
<kafka-schema-registry.version>6.1.13</kafka-schema-registry.version>
53+
<kafka.version>2.7.2</kafka.version>
5254
<kotest-assertions-core-jvm.version>5.6.2</kotest-assertions-core-jvm.version>
5355
<kotlin.coroutines.version>1.7.3</kotlin.coroutines.version>
5456
<kotlin.version>1.9.0</kotlin.version>
@@ -120,10 +122,20 @@
120122
<type>pom</type>
121123
<scope>import</scope>
122124
</dependency>
125+
<dependency>
126+
<groupId>${project.groupId}</groupId>
127+
<artifactId>testing</artifactId>
128+
<version>${project.version}</version>
129+
</dependency>
123130
<dependency>
124131
<groupId>io.confluent</groupId>
125132
<artifactId>kafka-avro-serializer</artifactId>
126-
<version>${kafka-avro-serializer.version}</version>
133+
<version>${kafka-schema-registry.version}</version>
134+
</dependency>
135+
<dependency>
136+
<groupId>io.confluent</groupId>
137+
<artifactId>kafka-connect-avro-converter</artifactId>
138+
<version>${kafka-schema-registry.version}</version>
127139
</dependency>
128140
<dependency>
129141
<groupId>io.kotest</groupId>

sink/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@
2323
<version>${kafka.version}</version>
2424
<scope>provided</scope>
2525
</dependency>
26+
<dependency>
27+
<groupId>${project.groupId}</groupId>
28+
<artifactId>testing</artifactId>
29+
<scope>test</scope>
30+
</dependency>
31+
<dependency>
32+
<groupId>io.confluent</groupId>
33+
<artifactId>kafka-connect-avro-converter</artifactId>
34+
<scope>test</scope>
35+
</dependency>
2636
<dependency>
2737
<groupId>org.jetbrains.kotlin</groupId>
2838
<artifactId>kotlin-test</artifactId>
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.sink
18+
19+
import io.confluent.connect.avro.AvroData
20+
import kotlin.time.Duration.Companion.seconds
21+
import kotlin.time.toJavaDuration
22+
import org.apache.avro.generic.GenericData
23+
import org.apache.avro.generic.GenericRecord
24+
import org.apache.kafka.clients.producer.KafkaProducer
25+
import org.apache.kafka.clients.producer.ProducerRecord
26+
import org.apache.kafka.connect.data.Schema
27+
import org.apache.kafka.connect.data.SchemaBuilder
28+
import org.awaitility.Awaitility.await
29+
import org.junit.jupiter.api.Test
30+
import org.junit.jupiter.api.TestInfo
31+
import org.neo4j.connectors.kafka.testing.sink.Neo4jSink
32+
import org.neo4j.connectors.kafka.testing.sink.TopicProducer
33+
import org.neo4j.driver.Session
34+
35+
class Neo4jSinkIT {
36+
37+
companion object {
38+
const val TOPIC = "persons"
39+
}
40+
41+
@Neo4jSink(
42+
topics = [TOPIC],
43+
queries =
44+
[
45+
"MERGE (p:Person {name: event.name, surname: event.surname, executionId: event.executionId})"])
46+
@Test
47+
fun `writes messages to Neo4j`(
48+
@TopicProducer producer: KafkaProducer<String, GenericRecord>,
49+
session: Session,
50+
testInfo: TestInfo
51+
) {
52+
val executionId = testInfo.displayName + System.currentTimeMillis()
53+
val avroRecord =
54+
GenericData.Record(
55+
AvroData(20)
56+
.fromConnectSchema(
57+
SchemaBuilder.struct()
58+
.field("name", Schema.STRING_SCHEMA)
59+
.field("surname", Schema.STRING_SCHEMA)
60+
.field("executionId", Schema.STRING_SCHEMA)
61+
.build()))
62+
avroRecord.put("name", "Jane")
63+
avroRecord.put("surname", "Doe")
64+
avroRecord.put("executionId", executionId)
65+
val record = ProducerRecord<String, GenericRecord>(TOPIC, avroRecord)
66+
67+
producer.send(record)
68+
69+
await().atMost(30.seconds.toJavaDuration()).until {
70+
session
71+
.run(
72+
"MATCH (p:Person {name: \$name, surname: \$surname, executionId: \$executionId}) RETURN count(p) = 1 AS result",
73+
mapOf("name" to "Jane", "surname" to "Doe", "executionId" to executionId))
74+
.single()["result"]
75+
.asBoolean()
76+
}
77+
}
78+
}

source/pom.xml

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@
3939
<version>${kafka.version}</version>
4040
<scope>provided</scope>
4141
</dependency>
42+
<dependency>
43+
<groupId>${project.groupId}</groupId>
44+
<artifactId>testing</artifactId>
45+
<scope>test</scope>
46+
</dependency>
4247
<dependency>
4348
<groupId>io.kotest</groupId>
4449
<artifactId>kotest-assertions-core-jvm</artifactId>
@@ -74,12 +79,6 @@
7479
<artifactId>mockito-kotlin</artifactId>
7580
<scope>test</scope>
7681
</dependency>
77-
<dependency>
78-
<groupId>org.neo4j.connectors.kafka</groupId>
79-
<artifactId>testing</artifactId>
80-
<version>${project.version}</version>
81-
<scope>test</scope>
82-
</dependency>
8382
<dependency>
8483
<groupId>org.slf4j</groupId>
8584
<artifactId>slf4j-nop</artifactId>

source/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jSourceIT.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ import org.apache.avro.generic.GenericRecord
2121
import org.apache.kafka.clients.consumer.KafkaConsumer
2222
import org.junit.jupiter.api.Test
2323
import org.junit.jupiter.api.TestInfo
24-
import org.neo4j.connectors.kafka.testing.Neo4jSource
25-
import org.neo4j.connectors.kafka.testing.TopicConsumer
26-
import org.neo4j.connectors.kafka.testing.TopicVerifier
24+
import org.neo4j.connectors.kafka.testing.assertions.TopicVerifier
25+
import org.neo4j.connectors.kafka.testing.source.Neo4jSource
26+
import org.neo4j.connectors.kafka.testing.source.TopicConsumer
2727
import org.neo4j.driver.Session
2828

2929
class Neo4jSourceIT {

testing/LICENSES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Apache Software License, Version 2.0
1616
JetBrains Java Annotations
1717
kafka-avro-serializer
1818
kafka-schema-registry-client
19+
Kotlin Reflect
1920
Kotlin Stdlib
2021
Kotlin Stdlib Common
2122
LZ4 and xxHash

testing/NOTICE.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Apache Software License, Version 2.0
3131
JetBrains Java Annotations
3232
kafka-avro-serializer
3333
kafka-schema-registry-client
34+
Kotlin Reflect
3435
Kotlin Stdlib
3536
Kotlin Stdlib Common
3637
LZ4 and xxHash

testing/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
<packaging>jar</packaging>
1212
<name>testing</name>
1313
<description>Neo4j Connector for Kafka - Testing Library</description>
14+
<properties>
15+
<licensing.skip>true</licensing.skip>
16+
</properties>
1417
<dependencies>
1518
<dependency>
1619
<groupId>com.fasterxml.jackson.core</groupId>
@@ -30,6 +33,13 @@
3033
<dependency>
3134
<groupId>org.apache.avro</groupId>
3235
<artifactId>avro</artifactId>
36+
<exclusions>
37+
<exclusion>
38+
<!-- breaks dependency convergence for Confluent Schema Registry client -->
39+
<groupId>org.apache.commons</groupId>
40+
<artifactId>commons-compress</artifactId>
41+
</exclusion>
42+
</exclusions>
3343
</dependency>
3444
<dependency>
3545
<groupId>org.apache.kafka</groupId>
@@ -39,6 +49,10 @@
3949
<groupId>org.awaitility</groupId>
4050
<artifactId>awaitility</artifactId>
4151
</dependency>
52+
<dependency>
53+
<groupId>org.jetbrains.kotlin</groupId>
54+
<artifactId>kotlin-reflect</artifactId>
55+
</dependency>
4256
<dependency>
4357
<groupId>org.jetbrains.kotlin</groupId>
4458
<artifactId>kotlin-stdlib</artifactId>
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.testing
18+
19+
import kotlin.jvm.optionals.getOrNull
20+
import org.junit.jupiter.api.extension.ExtensionContext
21+
import org.junit.platform.commons.support.AnnotationSupport
22+
23+
internal object AnnotationSupport {
24+
25+
inline fun <reified T : Annotation> findAnnotation(context: ExtensionContext?): T? {
26+
var current = context
27+
while (current != null) {
28+
val annotation =
29+
AnnotationSupport.findAnnotation(
30+
current.requiredTestMethod,
31+
T::class.java,
32+
)
33+
if (annotation.isPresent) {
34+
return annotation.get()
35+
}
36+
current = current.parent.getOrNull()
37+
}
38+
return null
39+
}
40+
}

0 commit comments

Comments
 (0)