Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<jackson.version>2.15.2</jackson.version>
<java.version>11</java.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
<kafka-avro-serializer.version>5.2.2</kafka-avro-serializer.version>
<kafka-schema-registry.version>6.2.12</kafka-schema-registry.version>
<kafka.version>2.6.3</kafka.version>
<kotest-assertions-core-jvm.version>5.6.2</kotest-assertions-core-jvm.version>
<kotlin.coroutines.version>1.7.3</kotlin.coroutines.version>
Expand Down Expand Up @@ -120,10 +120,20 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>testing</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka-avro-serializer.version}</version>
<version>${kafka-schema-registry.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${kafka-schema-registry.version}</version>
</dependency>
<dependency>
<groupId>io.kotest</groupId>
Expand Down
10 changes: 10 additions & 0 deletions sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.sink

import io.confluent.connect.avro.AvroData
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInfo
import org.neo4j.connectors.kafka.testing.sink.Neo4jSink
import org.neo4j.connectors.kafka.testing.sink.TopicProducer
import org.neo4j.driver.Session

class Neo4jSinkIT {

companion object {
const val TOPIC = "persons"
}

@Neo4jSink(
topics = [TOPIC],
queries =
[
"MERGE (p:Person {name: event.name, surname: event.surname, executionId: event.executionId})"])
@Test
fun `writes messages to Neo4j`(
@TopicProducer producer: KafkaProducer<String, GenericRecord>,
session: Session,
testInfo: TestInfo
) {
val executionId = testInfo.displayName + System.currentTimeMillis()
val avroRecord =
GenericData.Record(
AvroData(20)
.fromConnectSchema(
SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
.field("surname", Schema.STRING_SCHEMA)
.field("executionId", Schema.STRING_SCHEMA)
.build()))
avroRecord.put("name", "Ali")
avroRecord.put("surname", "İnce")
avroRecord.put("executionId", executionId)
val record = ProducerRecord<String, GenericRecord>(TOPIC, avroRecord)

producer.send(record)

await().atMost(30.seconds.toJavaDuration()).until {
session
.run(
"MATCH (p:Person {name: \$name, surname: \$surname, executionId: \$executionId}) RETURN count(p) = 1 AS result",
mapOf("name" to "Ali", "surname" to "İnce", "executionId" to executionId))
.single()["result"]
.asBoolean()
}
}
}
11 changes: 5 additions & 6 deletions source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.kotest</groupId>
<artifactId>kotest-assertions-core-jvm</artifactId>
Expand Down Expand Up @@ -74,12 +79,6 @@
<artifactId>mockito-kotlin</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.neo4j.connectors.kafka</groupId>
<artifactId>testing</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInfo
import org.neo4j.connectors.kafka.testing.Neo4jSource
import org.neo4j.connectors.kafka.testing.TopicConsumer
import org.neo4j.connectors.kafka.testing.TopicVerifier
import org.neo4j.connectors.kafka.testing.assertions.TopicVerifier
import org.neo4j.connectors.kafka.testing.source.Neo4jSource
import org.neo4j.connectors.kafka.testing.source.TopicConsumer
import org.neo4j.driver.Session

class Neo4jSourceIT {
Expand Down
1 change: 1 addition & 0 deletions testing/LICENSES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Apache Software License, Version 2.0
JetBrains Java Annotations
kafka-avro-serializer
kafka-schema-registry-client
Kotlin Reflect
Kotlin Stdlib
Kotlin Stdlib Common
LZ4 and xxHash
Expand Down
1 change: 1 addition & 0 deletions testing/NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Apache Software License, Version 2.0
JetBrains Java Annotations
kafka-avro-serializer
kafka-schema-registry-client
Kotlin Reflect
Kotlin Stdlib
Kotlin Stdlib Common
LZ4 and xxHash
Expand Down
14 changes: 14 additions & 0 deletions testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
<packaging>jar</packaging>
<name>testing</name>
<description>Neo4j Connector for Kafka - Testing Library</description>
<properties>
<licensing.skip>true</licensing.skip>
</properties>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand All @@ -30,6 +33,13 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<exclusions>
<exclusion>
<!-- breaks dependency convergence for Confluent Schema Registry client -->
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand All @@ -39,6 +49,10 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.testing

import kotlin.jvm.optionals.getOrNull
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.platform.commons.support.AnnotationSupport

internal object AnnotationSupport {

inline fun <reified T : Annotation> findAnnotation(context: ExtensionContext?): T? {
var current = context
while (current != null) {
val annotation =
AnnotationSupport.findAnnotation(
current.requiredTestMethod,
T::class.java,
)
if (annotation.isPresent) {
return annotation.get()
}
current = current.parent.getOrNull()
}
return null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.testing

import org.neo4j.connectors.kafka.testing.WordSupport.camelCaseToUpperSnakeCase

internal const val DEFAULT_TO_ENV = "___UNSET___"

/**
* AnnotationValueResolver resolves an annotation attribute value in at most two steps. If the value
* has been explicitly initialized on the annotation T, it is returned. If the value holds a
* specific default value, the resolver will look for the corresponding environment variable and
* return the value. The environment variable name results from the camel case to upper snake case
* conversion operated on the attribute name.
*/
internal class AnnotationValueResolver<T : Annotation>(
private val name: String,
private val envAccessor: (String) -> String?,
) {
private val fieldValueOf: (T) -> String = resolveFieldAccessor(name)

private val envVarName = camelCaseToUpperSnakeCase(name)

/** Determines whether the value is resolvable. */
fun isValid(annotation: T): Boolean {
return fieldValueOf(annotation) != DEFAULT_TO_ENV || envAccessor(envVarName) != null
}

/**
* Resolves the value of the provided annotation's attribute. This assumes [isValid] has been
* called first and returned true.
*/
fun resolve(annotation: T): String {
val fieldValue = fieldValueOf(annotation)
if (fieldValue != DEFAULT_TO_ENV) {
return fieldValue
}
return envAccessor(envVarName)!!
}

fun errorMessage(): String {
return "Both annotation field $name and environment variable $envVarName are unset. Please specify one"
}

override fun toString(): String {
return "EnvBackedSetting(name='$name', envVarName='$envVarName')"
}

private fun resolveFieldAccessor(name: String): (T) -> String {
return { annotation ->
annotation::class.members.first { member -> member.name == name }.call(annotation) as String
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.testing

import java.lang.IllegalStateException

internal object MapSupport {

@Suppress("UNCHECKED_CAST")
fun <K, Any> MutableMap<K, Any>.nestUnder(key: K, values: Map<K, Any>): MutableMap<K, Any> {
val map = this[key]
if (map !is Map<*, *>) {
throw IllegalStateException("entry at key $key is not a mutable map")
}
(map as MutableMap<K, Any>).putAll(values)
return this
}
}
Loading