Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInfo
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.AVRO
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_EMBEDDED
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_SCHEMA
Expand All @@ -45,20 +44,20 @@ abstract class Neo4jSinkIT {
[
CypherStrategy(
TOPIC,
"MERGE (p:Person {name: event.name, surname: event.surname, executionId: event.executionId})")])
"MERGE (p:Person {name: event.name, surname: event.surname})",
),
],
)
@Test
fun `writes messages to Neo4j via sink connector`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
testInfo: TestInfo
) {
val executionId = testInfo.displayName + System.currentTimeMillis()
val value = mapOf("name" to "Jane", "surname" to "Doe", "executionId" to executionId)
val value = mapOf("name" to "Jane", "surname" to "Doe")
val schema =
SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
.field("surname", Schema.STRING_SCHEMA)
.field("executionId", Schema.STRING_SCHEMA)
.build()
val struct = Struct(schema)
schema.fields().forEach { struct.put(it, value[it.name()]) }
Expand All @@ -68,8 +67,9 @@ abstract class Neo4jSinkIT {
await().atMost(30.seconds.toJavaDuration()).until {
session
.run(
"MATCH (p:Person {name: \$name, surname: \$surname, executionId: \$executionId}) RETURN count(p) = 1 AS result",
mapOf("name" to "Jane", "surname" to "Doe", "executionId" to executionId))
"MATCH (p:Person {name: \$name, surname: \$surname}) RETURN count(p) = 1 AS result",
mapOf("name" to "Jane", "surname" to "Doe"),
)
.single()["result"]
.asBoolean()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.sink

import io.kotest.matchers.shouldBe
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.Test
import org.neo4j.connectors.kafka.testing.format.KafkaConverter.JSON_RAW
import org.neo4j.connectors.kafka.testing.format.KeyValueConverter
import org.neo4j.connectors.kafka.testing.kafka.ConvertingKafkaProducer
import org.neo4j.connectors.kafka.testing.sink.CypherStrategy
import org.neo4j.connectors.kafka.testing.sink.Neo4jSink
import org.neo4j.connectors.kafka.testing.sink.TopicProducer
import org.neo4j.driver.Session

@KeyValueConverter(key = JSON_RAW, value = JSON_RAW)
class Neo4jSinkRawJsonIT {
companion object {
private const val TOPIC = "persons"
}

@Neo4jSink(
cypher =
[
CypherStrategy(
TOPIC,
"WITH __value AS person MERGE (p:Person {name: person.name, surname: person.surname})",
),
],
)
@Test
fun `should support json map`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
) {
producer.publish(
value = mapOf("name" to "Jane", "surname" to "Doe"),
valueSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build(),
)

await().atMost(30.seconds.toJavaDuration()).until {
session
.run(
"MATCH (p:Person {name: \$name, surname: \$surname}) RETURN count(p) = 1 AS result",
mapOf("name" to "Jane", "surname" to "Doe"),
)
.single()["result"]
.asBoolean()
}
}

@Neo4jSink(
cypher =
[
CypherStrategy(
TOPIC,
"WITH __value AS persons UNWIND persons AS person MERGE (p:Person {name: person.name, surname: person.surname})",
),
],
)
@Test
fun `should support json list`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
) {
producer.publish(
value =
listOf(
mapOf("name" to "Jane", "surname" to "Doe"),
mapOf("name" to "John", "surname" to "Doe"),
),
valueSchema =
SchemaBuilder.array(
SchemaBuilder.map(
Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA,
)
.build(),
)
.build(),
)

await().atMost(30.seconds.toJavaDuration()).untilAsserted {
session
.run(
"MATCH (p:Person) WHERE [p.name, p.surname] IN ${'$'}names RETURN count(p) as result",
mapOf("names" to listOf(listOf("Jane", "Doe"), listOf("John", "Doe"))),
)
.single()["result"]
.asLong() shouldBe 2L
}
}

@Neo4jSink(
cypher =
[
CypherStrategy(
TOPIC,
"WITH __value AS name MERGE (p:Person {name: name})",
),
],
)
@Test
fun `should support raw string value`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
) {
producer.publish(
value = "John",
valueSchema = Schema.STRING_SCHEMA,
)

await().atMost(30.seconds.toJavaDuration()).untilAsserted {
session
.run(
"MATCH (p:Person {name: ${'$'}name}) RETURN count(p) as result",
mapOf("name" to "John"),
)
.single()["result"]
.asLong() shouldBe 1L
}
}

@Neo4jSink(
cypher =
[
CypherStrategy(
TOPIC,
"WITH __value AS age MERGE (p:Person {age: age})",
),
],
)
@Test
fun `should support raw numeric value`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
) {
producer.publish(
value = 25L,
valueSchema = Schema.INT64_SCHEMA,
)

await().atMost(30.seconds.toJavaDuration()).untilAsserted {
session
.run(
"MATCH (p:Person {age: ${'$'}age}) RETURN count(p) as result",
mapOf("age" to 25L),
)
.single()["result"]
.asLong() shouldBe 1L
}
}

@Neo4jSink(
cypher =
[
CypherStrategy(
TOPIC,
"WITH __value AS status MERGE (p:Person {single: status})",
),
],
)
@Test
fun `should support raw boolean value`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session,
) {
producer.publish(
value = true,
valueSchema = Schema.BOOLEAN_SCHEMA,
)

await().atMost(30.seconds.toJavaDuration()).untilAsserted {
session
.run(
"MATCH (p:Person {single: ${'$'}single}) RETURN count(p) as result",
mapOf("single" to true),
)
.single()["result"]
.asLong() shouldBe 1L
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ data class SinkMessage(val record: SinkRecord) {
}
}
converted
}
} ?: value
}

override fun toString(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ import org.junit.jupiter.api.extension.ExtensionContext
import org.neo4j.connectors.kafka.testing.AnnotationSupport
import org.neo4j.connectors.kafka.testing.format.avro.AvroSerializer
import org.neo4j.connectors.kafka.testing.format.json.JsonEmbeddedSerializer
import org.neo4j.connectors.kafka.testing.format.json.JsonRawSerializer
import org.neo4j.connectors.kafka.testing.format.json.JsonSchemaSerializer
import org.neo4j.connectors.kafka.testing.format.protobuf.ProtobufSerializer
import org.neo4j.connectors.kafka.testing.format.string.StringSerializer

private val PROTOBUF_OPTIONS =
mapOf("enhanced.protobuf.schema.support" to "true", "optional.for.nullables" to "true")

private val JSON_RAW_OPTIONS = mapOf("schemas.enable" to "false")

enum class KafkaConverter(
val className: String,
val converterProvider: () -> Converter,
Expand All @@ -50,28 +53,40 @@ enum class KafkaConverter(
className = "io.confluent.connect.avro.AvroConverter",
converterProvider = { AvroConverter() },
serializerClass = KafkaAvroSerializer::class.java,
testShimSerializer = AvroSerializer),
testShimSerializer = AvroSerializer,
),
JSON_SCHEMA(
className = "io.confluent.connect.json.JsonSchemaConverter",
converterProvider = { JsonSchemaConverter() },
serializerClass = KafkaJsonSchemaSerializer::class.java,
testShimSerializer = JsonSchemaSerializer),
testShimSerializer = JsonSchemaSerializer,
),
JSON_EMBEDDED(
className = "org.apache.kafka.connect.json.JsonConverter",
converterProvider = { JsonConverter() },
serializerClass = KafkaJsonSerializer::class.java,
testShimSerializer = JsonEmbeddedSerializer),
testShimSerializer = JsonEmbeddedSerializer,
),
JSON_RAW(
className = "org.apache.kafka.connect.json.JsonConverter",
converterProvider = { JsonConverter() },
serializerClass = KafkaJsonSerializer::class.java,
testShimSerializer = JsonRawSerializer,
additionalProperties = JSON_RAW_OPTIONS,
),
PROTOBUF(
className = "io.confluent.connect.protobuf.ProtobufConverter",
converterProvider = { ProtobufConverter() },
serializerClass = KafkaProtobufSerializer::class.java,
testShimSerializer = ProtobufSerializer(PROTOBUF_OPTIONS),
additionalProperties = PROTOBUF_OPTIONS),
additionalProperties = PROTOBUF_OPTIONS,
),
STRING(
className = "org.apache.kafka.connect.storage.StringConverter",
converterProvider = { StringConverter() },
serializerClass = org.apache.kafka.common.serialization.StringSerializer::class.java,
testShimSerializer = StringSerializer)
testShimSerializer = StringSerializer,
)
}

@Target(AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.testing.format.json

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.json.JsonConverter
import org.neo4j.connectors.kafka.testing.format.KafkaRecordSerializer

object JsonRawSerializer : KafkaRecordSerializer {

private val converter = JsonConverter()
private val objectMapper = ObjectMapper()

override fun serialize(value: Any, schema: Schema, isKey: Boolean): Any {
converter.configure(mapOf("schemas.enable" to false), isKey)
val data = converter.fromConnectData(null, schema, value)
return objectMapper.readTree(data)
}
}